Skip to content

STAR-13 Run tests for UnifiedCompactionStrategy #22

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions compaction_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
since = pytest.mark.since
logger = logging.getLogger(__name__)

strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy']
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy', 'UnifiedCompactionStrategy']


class TestCompaction(Tester):
Expand Down Expand Up @@ -298,7 +298,7 @@ def test_compaction_strategy_switching(self, strategy):
Ensure that switching strategies does not result in problems.
Insert data, switch strategies, then check against data loss.
"""
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy']
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy', 'UnifiedCompactionStrategy']

if strategy in strategies:
strategies.remove(strategy)
Expand All @@ -307,6 +307,7 @@ def test_compaction_strategy_switching(self, strategy):
[node1] = cluster.nodelist()

for strat in strategies:
logger.debug("Switching to {}".format(strat))
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 1)

Expand Down
42 changes: 29 additions & 13 deletions disk_balance_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class TestDiskBalance(Tester):
@jira_ticket CASSANDRA-6696
"""

STCS_COMPACTION_OPTS = "SizeTieredCompactionStrategy"
LCS_COMPACTION_OPTS = "LeveledCompactionStrategy,sstable_size_in_mb=1"
UCS_COMPACTION_OPTS = "UnifiedCompactionStrategy"

@pytest.fixture(scope='function', autouse=True)
def fixture_set_cluster_settings(self, fixture_dtest_setup):
cluster = fixture_dtest_setup.cluster
Expand Down Expand Up @@ -190,16 +194,23 @@ def test_disk_balance_after_boundary_change_stcs(self):
"""
@jira_ticket CASSANDRA-13948
"""
self._disk_balance_after_boundary_change_test(lcs=False)
self._disk_balance_after_boundary_change_test(self.STCS_COMPACTION_OPTS)

@since('3.10')
def test_disk_balance_after_boundary_change_lcs(self):
"""
@jira_ticket CASSANDRA-13948
"""
self._disk_balance_after_boundary_change_test(lcs=True)
self._disk_balance_after_boundary_change_test(self.LCS_COMPACTION_OPTS)

@since('4.0')
def test_disk_balance_after_boundary_change_ucs(self):
"""
@jira_ticket CASSANDRA-13948
"""
self._disk_balance_after_boundary_change_test(self.UCS_COMPACTION_OPTS)

def _disk_balance_after_boundary_change_test(self, lcs):
def _disk_balance_after_boundary_change_test(self, compaction_opts):
"""
@jira_ticket CASSANDRA-13948

Expand Down Expand Up @@ -230,7 +241,6 @@ def _disk_balance_after_boundary_change_test(self, lcs):
keys_per_flush = 10000
keys_to_write = num_flushes * keys_per_flush

compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if lcs else "SizeTieredCompactionStrategy"
logger.debug("Writing {} keys in {} flushes (compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts))
total_keys = num_flushes * keys_per_flush
current_keys = 0
Expand All @@ -254,29 +264,36 @@ def _disk_balance_after_boundary_change_test(self, lcs):
node2.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds=10"], set_migration_task=False)
node2.flush()

self._assert_balanced_after_boundary_change(node1, total_keys, lcs)
self._assert_balanced_after_boundary_change(node1, total_keys, compaction_opts)

logger.debug("Decommissioning node1")
node1.decommission()
node1.stop()

self._assert_balanced_after_boundary_change(node2, total_keys, lcs)
self._assert_balanced_after_boundary_change(node2, total_keys, compaction_opts)

@since('3.10')
def test_disk_balance_after_joining_ring_stcs(self):
"""
@jira_ticket CASSANDRA-13948
"""
self._disk_balance_after_joining_ring_test(lcs=False)
self._disk_balance_after_joining_ring_test(self.STCS_COMPACTION_OPTS)

@since('3.10')
def test_disk_balance_after_joining_ring_lcs(self):
"""
@jira_ticket CASSANDRA-13948
"""
self._disk_balance_after_joining_ring_test(lcs=True)
self._disk_balance_after_joining_ring_test(self.LCS_COMPACTION_OPTS)

@since('4.0')
def test_disk_balance_after_joining_ring_ucs(self):
"""
@jira_ticket CASSANDRA-13948
"""
self._disk_balance_after_joining_ring_test(self.UCS_COMPACTION_OPTS)

def _disk_balance_after_joining_ring_test(self, lcs):
def _disk_balance_after_joining_ring_test(self, compaction_opts):
"""
@jira_ticket CASSANDRA-13948

Expand All @@ -302,7 +319,6 @@ def _disk_balance_after_joining_ring_test(self, lcs):
keys_per_flush = 10000
keys_to_write = num_flushes * keys_per_flush

compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if lcs else "SizeTieredCompactionStrategy"
logger.debug("Writing {} keys in {} flushes (compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts))
total_keys = num_flushes * keys_per_flush
current_keys = 0
Expand All @@ -327,9 +343,9 @@ def _disk_balance_after_joining_ring_test(self, lcs):
node1.nodetool("join")
node1.nodetool("join") # Need to run join twice - one to join ring, another to leave write survey mode

self._assert_balanced_after_boundary_change(node1, total_keys, lcs)
self._assert_balanced_after_boundary_change(node1, total_keys, compaction_opts)

def _assert_balanced_after_boundary_change(self, node, total_keys, lcs):
def _assert_balanced_after_boundary_change(self, node, total_keys, compaction_opts):
logger.debug("Cleanup {}".format(node.name))
node.cleanup()

Expand All @@ -351,7 +367,7 @@ def _assert_balanced_after_boundary_change(self, node, total_keys, lcs):
logger.debug("Reading data back ({} keys)".format(total_keys))
node.stress(['read', 'n={}'.format(total_keys), "no-warmup", "cl=ALL", "-pop", "seq=1...{}".format(total_keys), "-rate", "threads=1"])

if lcs:
if compaction_opts == self.LCS_COMPACTION_OPTS:
output = grep_sstables_in_each_level(node, "standard1")
logger.debug("SSTables in each level: {}".format(output))

Expand Down
26 changes: 19 additions & 7 deletions repair_tests/repair_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,8 +1137,7 @@ def test_multiple_concurrent_repairs(self):
_, _, rc = node2.stress(['read', 'n=1M', 'no-warmup', '-rate', 'threads=30'], whitelist=True)
assert rc == 0

@since('4.0')
def test_wide_row_repair(self):
def _test_wide_row_repair(self, compaction_strategy):
"""
@jira_ticket CASSANDRA-13899
Make sure compressed vs uncompressed blocks are handled correctly when stream decompressing
Expand All @@ -1148,13 +1147,26 @@ def test_wide_row_repair(self):
cluster.populate(2).start()
node1, node2 = cluster.nodelist()
node2.stop(wait_other_notice=True)
profile_path = os.path.join(os.getcwd(), 'stress_profiles/repair_wide_rows.yaml')
logger.info(("yaml = " + profile_path))
node1.stress(['user', 'profile=' + profile_path, 'n=50', 'ops(insert=1)', 'no-warmup', '-rate', 'threads=8',
'-insert', 'visits=FIXED(100K)', 'revisit=FIXED(100K)'])
template_path = os.path.join(os.getcwd(), 'stress_profiles/repair_wide_rows.yaml.tmpl')
with open(template_path) as profile_template:
profile = profile_template.read().replace("{{ compaction_strategy }}", compaction_strategy)
with tempfile.NamedTemporaryFile(mode='w+') as stress_profile:
stress_profile.write(profile)
stress_profile.flush()
print("yaml = " + stress_profile.name)
node1.stress(['user', 'profile=' + stress_profile.name, 'n=50', 'ops(insert=1)', 'no-warmup', '-rate', 'threads=8',
'-insert', 'visits=FIXED(100K)', 'revisit=FIXED(100K)'])
node2.start(wait_for_binary_proto=True)
node2.repair()

@since('4.0')
def test_wide_row_repair_lcs(self):
self._test_wide_row_repair('LeveledCompactionStrategy')

@since('4.0')
def test_wide_row_repair_ucs(self):
self._test_wide_row_repair('UnifiedCompactionStrategy')

@since('2.1', max_version='4')
def test_dead_coordinator(self):
"""
Expand Down Expand Up @@ -1211,7 +1223,7 @@ def _repair_abort_test(self, options=[], nodes=1, rf=1, no_common_range=False):
cluster = self.cluster
logger.debug("Starting cluster..")
cluster.populate(nodes).start(wait_for_binary_proto=True)

node1 = self.cluster.nodelist()[0]
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', rf=rf)
Expand Down
19 changes: 15 additions & 4 deletions replace_address_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,17 @@ def test_replace_with_insufficient_replicas(self):

@flaky
@pytest.mark.vnodes
def test_multi_dc_replace_with_rf1(self):
def test_multi_dc_replace_with_rf1_stcs(self):
self._test_multi_dc_replace_with_rf1('SizeTieredCompactionStrategy')

@flaky
@pytest.mark.vnodes
@since("4.0")
def test_multi_dc_replace_with_rf1_ucs(self):
self._test_multi_dc_replace_with_rf1('UnifiedCompactionStrategy')


def _test_multi_dc_replace_with_rf1(self, compaction_strategy):
"""
Test that multi-dc replace works when rf=1 on each dc
"""
Expand All @@ -559,7 +569,7 @@ def test_multi_dc_replace_with_rf1(self):
# Create the keyspace and table
keyspace: keyspace1
keyspace_definition: |
CREATE KEYSPACE keyspace1 WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1};
CREATE KEYSPACE keyspace1 WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}};
table: users
table_definition:
CREATE TABLE users (
Expand All @@ -568,15 +578,16 @@ def test_multi_dc_replace_with_rf1(self):
last_name text,
email text,
PRIMARY KEY(username)
) WITH compaction = {'class':'SizeTieredCompactionStrategy'};
) WITH compaction = {{'class':'{compaction_strategy}'}};
insert:
partitions: fixed(1)
batchtype: UNLOGGED
queries:
read:
cql: select * from users where username = ?
fields: samerow
"""
""".format(compaction_strategy=compaction_strategy)

with tempfile.NamedTemporaryFile(mode='w+') as stress_config:
stress_config.write(yaml_config)
stress_config.flush()
Expand Down
11 changes: 9 additions & 2 deletions schema_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

class TestSchema(Tester):

def test_table_alteration(self):
def _test_table_alteration(self, compaction_opts):
"""
Tests that table alters return as expected with many sstables at different schema points
"""
Expand All @@ -24,7 +24,7 @@ def test_table_alteration(self):
create_ks(session, 'ks', 1)
session.execute("use ks;")
session.execute("create table tbl_o_churn (id int primary key, c0 text, c1 text) "
"WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 };")
"WITH compaction = " + compaction_opts + ";")

stmt1 = session.prepare("insert into tbl_o_churn (id, c0, c1) values (?, ?, ?)")
rows_to_insert = 50
Expand Down Expand Up @@ -54,6 +54,13 @@ def test_table_alteration(self):
assert row.c2 == 'ddd'
assert not hasattr(row, 'c0')

def test_table_alteration_stcs(self):
self._test_table_alteration("{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 }")

@since("4.0")
def test_table_alteration_ucs(self):
self._test_table_alteration("{'class': 'UnifiedCompactionStrategy'}")

@since("2.0", max_version="3.X") # Compact Storage
def test_drop_column_compact(self):
session = self.prepare()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ table_definition: |
col1 text,
val blob,
PRIMARY KEY(key, col1)
)
WITH compaction = { 'class':'LeveledCompactionStrategy' }
)
WITH compaction = { 'class':'{{ compaction_strategy }}' }
AND compression = {'chunk_length_in_kb': '1', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'};

#
# Optional meta information on the generated columns in the above table
# The min and max only apply to text and blob types
# The distribution field represents the total unique population
# distribution of that column across rows. Supported types are
#
#
# EXP(min..max) An exponential distribution over the range [min..max]
# EXTREME(min..max,shape) An extreme value (Weibull) distribution over the range [min..max]
# GAUSSIAN(min..max,stdvrng) A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
Expand Down