Skip to content

Commit ab2abb7

Browse files
Gerrrrjacek-lewandowski
authored andcommitted
STAR-13 Run tests for UnifiedCompactionStrategy (#22)
(cherry picked from commit 47b978d) (cherry picked from commit 4ecc81b) (cherry picked from commit a06131c) (cherry picked from commit 1064342)
1 parent 88c022c commit ab2abb7

File tree

6 files changed

+75
-31
lines changed

6 files changed

+75
-31
lines changed

compaction_test.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
since = pytest.mark.since
1616
logger = logging.getLogger(__name__)
1717

18-
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy']
18+
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy', 'UnifiedCompactionStrategy']
1919

2020

2121
class TestCompaction(Tester):
@@ -298,7 +298,7 @@ def test_compaction_strategy_switching(self, strategy):
298298
Ensure that switching strategies does not result in problems.
299299
Insert data, switch strategies, then check against data loss.
300300
"""
301-
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy']
301+
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy', 'UnifiedCompactionStrategy']
302302

303303
if strategy in strategies:
304304
strategies.remove(strategy)
@@ -307,6 +307,7 @@ def test_compaction_strategy_switching(self, strategy):
307307
[node1] = cluster.nodelist()
308308

309309
for strat in strategies:
310+
logger.debug("Switching to {}".format(strat))
310311
session = self.patient_cql_connection(node1)
311312
create_ks(session, 'ks', 1)
312313

disk_balance_test.py

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ class TestDiskBalance(Tester):
2424
@jira_ticket CASSANDRA-6696
2525
"""
2626

27+
STCS_COMPACTION_OPTS = "SizeTieredCompactionStrategy"
28+
LCS_COMPACTION_OPTS = "LeveledCompactionStrategy,sstable_size_in_mb=1"
29+
UCS_COMPACTION_OPTS = "UnifiedCompactionStrategy"
30+
2731
@pytest.fixture(scope='function', autouse=True)
2832
def fixture_set_cluster_settings(self, fixture_dtest_setup):
2933
cluster = fixture_dtest_setup.cluster
@@ -190,16 +194,23 @@ def test_disk_balance_after_boundary_change_stcs(self):
190194
"""
191195
@jira_ticket CASSANDRA-13948
192196
"""
193-
self._disk_balance_after_boundary_change_test(lcs=False)
197+
self._disk_balance_after_boundary_change_test(self.STCS_COMPACTION_OPTS)
194198

195199
@since('3.10')
196200
def test_disk_balance_after_boundary_change_lcs(self):
197201
"""
198202
@jira_ticket CASSANDRA-13948
199203
"""
200-
self._disk_balance_after_boundary_change_test(lcs=True)
204+
self._disk_balance_after_boundary_change_test(self.LCS_COMPACTION_OPTS)
205+
206+
@since('4.0')
207+
def test_disk_balance_after_boundary_change_ucs(self):
208+
"""
209+
@jira_ticket CASSANDRA-13948
210+
"""
211+
self._disk_balance_after_boundary_change_test(self.UCS_COMPACTION_OPTS)
201212

202-
def _disk_balance_after_boundary_change_test(self, lcs):
213+
def _disk_balance_after_boundary_change_test(self, compaction_opts):
203214
"""
204215
@jira_ticket CASSANDRA-13948
205216
@@ -230,7 +241,6 @@ def _disk_balance_after_boundary_change_test(self, lcs):
230241
keys_per_flush = 10000
231242
keys_to_write = num_flushes * keys_per_flush
232243

233-
compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if lcs else "SizeTieredCompactionStrategy"
234244
logger.debug("Writing {} keys in {} flushes (compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts))
235245
total_keys = num_flushes * keys_per_flush
236246
current_keys = 0
@@ -254,29 +264,36 @@ def _disk_balance_after_boundary_change_test(self, lcs):
254264
node2.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds=10"], set_migration_task=False)
255265
node2.flush()
256266

257-
self._assert_balanced_after_boundary_change(node1, total_keys, lcs)
267+
self._assert_balanced_after_boundary_change(node1, total_keys, compaction_opts)
258268

259269
logger.debug("Decommissioning node1")
260270
node1.decommission()
261271
node1.stop()
262272

263-
self._assert_balanced_after_boundary_change(node2, total_keys, lcs)
273+
self._assert_balanced_after_boundary_change(node2, total_keys, compaction_opts)
264274

265275
@since('3.10')
266276
def test_disk_balance_after_joining_ring_stcs(self):
267277
"""
268278
@jira_ticket CASSANDRA-13948
269279
"""
270-
self._disk_balance_after_joining_ring_test(lcs=False)
280+
self._disk_balance_after_joining_ring_test(self.STCS_COMPACTION_OPTS)
271281

272282
@since('3.10')
273283
def test_disk_balance_after_joining_ring_lcs(self):
274284
"""
275285
@jira_ticket CASSANDRA-13948
276286
"""
277-
self._disk_balance_after_joining_ring_test(lcs=True)
287+
self._disk_balance_after_joining_ring_test(self.LCS_COMPACTION_OPTS)
288+
289+
@since('4.0')
290+
def test_disk_balance_after_joining_ring_ucs(self):
291+
"""
292+
@jira_ticket CASSANDRA-13948
293+
"""
294+
self._disk_balance_after_joining_ring_test(self.UCS_COMPACTION_OPTS)
278295

279-
def _disk_balance_after_joining_ring_test(self, lcs):
296+
def _disk_balance_after_joining_ring_test(self, compaction_opts):
280297
"""
281298
@jira_ticket CASSANDRA-13948
282299
@@ -302,7 +319,6 @@ def _disk_balance_after_joining_ring_test(self, lcs):
302319
keys_per_flush = 10000
303320
keys_to_write = num_flushes * keys_per_flush
304321

305-
compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if lcs else "SizeTieredCompactionStrategy"
306322
logger.debug("Writing {} keys in {} flushes (compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts))
307323
total_keys = num_flushes * keys_per_flush
308324
current_keys = 0
@@ -327,9 +343,9 @@ def _disk_balance_after_joining_ring_test(self, lcs):
327343
node1.nodetool("join")
328344
node1.nodetool("join") # Need to run join twice - one to join ring, another to leave write survey mode
329345

330-
self._assert_balanced_after_boundary_change(node1, total_keys, lcs)
346+
self._assert_balanced_after_boundary_change(node1, total_keys, compaction_opts)
331347

332-
def _assert_balanced_after_boundary_change(self, node, total_keys, lcs):
348+
def _assert_balanced_after_boundary_change(self, node, total_keys, compaction_opts):
333349
logger.debug("Cleanup {}".format(node.name))
334350
node.cleanup()
335351

@@ -351,7 +367,7 @@ def _assert_balanced_after_boundary_change(self, node, total_keys, lcs):
351367
logger.debug("Reading data back ({} keys)".format(total_keys))
352368
node.stress(['read', 'n={}'.format(total_keys), "no-warmup", "cl=ALL", "-pop", "seq=1...{}".format(total_keys), "-rate", "threads=1"])
353369

354-
if lcs:
370+
if compaction_opts == self.LCS_COMPACTION_OPTS:
355371
output = grep_sstables_in_each_level(node, "standard1")
356372
logger.debug("SSTables in each level: {}".format(output))
357373

repair_tests/repair_test.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,8 +1167,7 @@ def test_multiple_concurrent_repairs(self):
11671167
_, _, rc = node2.stress(['read', 'n=1M', 'no-warmup', '-rate', 'threads=30'], whitelist=True)
11681168
assert rc == 0
11691169

1170-
@since('4.0')
1171-
def test_wide_row_repair(self):
1170+
def _test_wide_row_repair(self, compaction_strategy):
11721171
"""
11731172
@jira_ticket CASSANDRA-13899
11741173
Make sure compressed vs uncompressed blocks are handled correctly when stream decompressing
@@ -1178,13 +1177,26 @@ def test_wide_row_repair(self):
11781177
cluster.populate(2).start()
11791178
node1, node2 = cluster.nodelist()
11801179
node2.stop(wait_other_notice=True)
1181-
profile_path = os.path.join(os.getcwd(), 'stress_profiles/repair_wide_rows.yaml')
1182-
logger.info(("yaml = " + profile_path))
1183-
node1.stress(['user', 'profile=' + profile_path, 'n=50', 'ops(insert=1)', 'no-warmup', '-rate', 'threads=8',
1184-
'-insert', 'visits=FIXED(100K)', 'revisit=FIXED(100K)'])
1180+
template_path = os.path.join(os.getcwd(), 'stress_profiles/repair_wide_rows.yaml.tmpl')
1181+
with open(template_path) as profile_template:
1182+
profile = profile_template.read().replace("{{ compaction_strategy }}", compaction_strategy)
1183+
with tempfile.NamedTemporaryFile(mode='w+') as stress_profile:
1184+
stress_profile.write(profile)
1185+
stress_profile.flush()
1186+
print("yaml = " + stress_profile.name)
1187+
node1.stress(['user', 'profile=' + stress_profile.name, 'n=50', 'ops(insert=1)', 'no-warmup', '-rate', 'threads=8',
1188+
'-insert', 'visits=FIXED(100K)', 'revisit=FIXED(100K)'])
11851189
node2.start(wait_for_binary_proto=True)
11861190
node2.repair()
11871191

1192+
@since('4.0')
1193+
def test_wide_row_repair_lcs(self):
1194+
self._test_wide_row_repair('LeveledCompactionStrategy')
1195+
1196+
@since('4.0')
1197+
def test_wide_row_repair_ucs(self):
1198+
self._test_wide_row_repair('UnifiedCompactionStrategy')
1199+
11881200
@since('2.1', max_version='4')
11891201
def test_dead_coordinator(self):
11901202
"""
@@ -1241,7 +1253,7 @@ def _repair_abort_test(self, options=[], nodes=1, rf=1, no_common_range=False):
12411253
cluster = self.cluster
12421254
logger.debug("Starting cluster..")
12431255
cluster.populate(nodes).start(wait_for_binary_proto=True)
1244-
1256+
12451257
node1 = self.cluster.nodelist()[0]
12461258
session = self.patient_cql_connection(node1)
12471259
create_ks(session, 'ks', rf=rf)

replace_address_test.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,14 @@ def test_replace_with_insufficient_replicas(self):
584584
self.replacement_node.watch_log_for("Unable to find sufficient sources for streaming range")
585585
assert_not_running(self.replacement_node)
586586

587-
def test_multi_dc_replace_with_rf1(self):
587+
def test_multi_dc_replace_with_rf1_stcs(self):
588+
self._test_multi_dc_replace_with_rf1('SizeTieredCompactionStrategy')
589+
590+
@since("4.0")
591+
def test_multi_dc_replace_with_rf1_ucs(self):
592+
self._test_multi_dc_replace_with_rf1('UnifiedCompactionStrategy')
593+
594+
def _test_multi_dc_replace_with_rf1(self, compaction_strategy):
588595
"""
589596
Test that multi-dc replace works when rf=1 on each dc
590597
"""
@@ -594,7 +601,7 @@ def test_multi_dc_replace_with_rf1(self):
594601
# Create the keyspace and table
595602
keyspace: keyspace1
596603
keyspace_definition: |
597-
CREATE KEYSPACE keyspace1 WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1};
604+
CREATE KEYSPACE keyspace1 WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}};
598605
table: users
599606
table_definition:
600607
CREATE TABLE users (
@@ -603,15 +610,16 @@ def test_multi_dc_replace_with_rf1(self):
603610
last_name text,
604611
email text,
605612
PRIMARY KEY(username)
606-
) WITH compaction = {'class':'SizeTieredCompactionStrategy'};
613+
) WITH compaction = {{'class':'{compaction_strategy}'}};
607614
insert:
608615
partitions: fixed(1)
609616
batchtype: UNLOGGED
610617
queries:
611618
read:
612619
cql: select * from users where username = ?
613620
fields: samerow
614-
"""
621+
""".format(compaction_strategy=compaction_strategy)
622+
615623
with tempfile.NamedTemporaryFile(mode='w+') as stress_config:
616624
stress_config.write(yaml_config)
617625
stress_config.flush()

schema_test.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
class TestSchema(Tester):
1515

16-
def test_table_alteration(self):
16+
def _test_table_alteration(self, compaction_opts):
1717
"""
1818
Tests that table alters return as expected with many sstables at different schema points
1919
"""
@@ -24,7 +24,7 @@ def test_table_alteration(self):
2424
create_ks(session, 'ks', 1)
2525
session.execute("use ks;")
2626
session.execute("create table tbl_o_churn (id int primary key, c0 text, c1 text) "
27-
"WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 };")
27+
"WITH compaction = " + compaction_opts + ";")
2828

2929
stmt1 = session.prepare("insert into tbl_o_churn (id, c0, c1) values (?, ?, ?)")
3030
rows_to_insert = 50
@@ -54,6 +54,13 @@ def test_table_alteration(self):
5454
assert row.c2 == 'ddd'
5555
assert not hasattr(row, 'c0')
5656

57+
def test_table_alteration_stcs(self):
58+
self._test_table_alteration("{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 }")
59+
60+
@since("4.0")
61+
def test_table_alteration_ucs(self):
62+
self._test_table_alteration("{'class': 'UnifiedCompactionStrategy'}")
63+
5764
@since("2.0", max_version="3.X") # Compact Storage
5865
def test_drop_column_compact(self):
5966
session = self.prepare()

stress_profiles/repair_wide_rows.yaml renamed to stress_profiles/repair_wide_rows.yaml.tmpl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ table_definition: |
99
col1 text,
1010
val blob,
1111
PRIMARY KEY(key, col1)
12-
)
13-
WITH compaction = { 'class':'LeveledCompactionStrategy' }
12+
)
13+
WITH compaction = { 'class':'{{ compaction_strategy }}' }
1414
AND compression = {'chunk_length_in_kb': '1', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'};
1515

1616
#
1717
# Optional meta information on the generated columns in the above table
1818
# The min and max only apply to text and blob types
1919
# The distribution field represents the total unique population
2020
# distribution of that column across rows. Supported types are
21-
#
21+
#
2222
# EXP(min..max) An exponential distribution over the range [min..max]
2323
# EXTREME(min..max,shape) An extreme value (Weibull) distribution over the range [min..max]
2424
# GAUSSIAN(min..max,stdvrng) A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng

0 commit comments

Comments
 (0)