Skip to content

Commit dc009bc

Browse files
Gerrrrmichaelsembwever
authored andcommitted
STAR-13 Run tests for UnifiedCompactionStrategy (#22)
(cherry picked from commit 47b978d) (cherry picked from commit 4ecc81b) (cherry picked from commit a06131c) (cherry picked from commit 1064342) (cherry picked from commit ab2abb7) (cherry picked from commit aae8a6b) squashes in e74acd8: STAR-826 Add missing import (#40) (cherry picked from commit 7c8489f) (cherry picked from commit 9196eaa) (cherry picked from commit 28c71d1) (cherry picked from commit 2880475) (cherry picked from commit 2a984ff) (cherry picked from commit aad664b) squashes in 01b3ee5f: STAR-836 Fix TestCompaction_with_UnifiedCompactionStrategy.bloomfilter_size_test (#41) Co-authored-by: Branimir Lambov <[email protected]> (cherry picked from commit 7777fa9) (cherry picked from commit d06d4c6) (cherry picked from commit 12f5fff) (cherry picked from commit a29f899) (cherry picked from commit cfc58a3) (cherry picked from commit 2a2726a) rebase notes: will be upstreamed in CASSANDRA-20629
1 parent 7183c1d commit dc009bc

8 files changed

+162
-33
lines changed

compaction_test.py

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,25 +118,30 @@ def test_bloomfilter_size(self, strategy):
118118
else:
119119
if strategy == "DateTieredCompactionStrategy":
120120
strategy_string = "strategy=DateTieredCompactionStrategy,base_time_seconds=86400" # we want a single sstable, so make sure we don't have a tiny first window
121+
elif self.strategy == "UnifiedCompactionStrategy":
122+
strategy_string = "strategy=UnifiedCompactionStrategy,max_sstables_to_compact=4" # disable layout-preserving compaction which can leave more than one sstable
121123
else:
122124
strategy_string = "strategy={}".format(strategy)
123125
min_bf_size = 100000
124126
max_bf_size = 150000
125127
cluster = self.cluster
126128
cluster.populate(1).start()
127129
[node1] = cluster.nodelist()
130+
logger.debug("Compaction: " + strategy_string)
128131

129132
for x in range(0, 5):
130133
node1.stress(['write', 'n=100K', "no-warmup", "cl=ONE", "-rate",
131134
"threads=300", "-schema", "replication(factor=1)",
132135
"compaction({},enabled=false)".format(strategy_string)])
133136
node1.flush()
137+
logger.debug(node1.nodetool('tablestats keyspace1.standard1').stdout)
134138

135139
node1.nodetool('enableautocompaction')
136140
node1.wait_for_compactions()
137141

138142
table_name = 'standard1'
139-
output = node1.nodetool('tablestats').stdout
143+
output = node1.nodetool('tablestats keyspace1.standard1').stdout
144+
logger.debug(output)
140145
output = output[output.find(table_name):]
141146
output = output[output.find("Bloom filter space used"):]
142147
bfSize = int(output[output.find(":") + 1:output.find("\n")].strip())
@@ -157,7 +162,12 @@ def test_bloomfilter_size(self, strategy):
157162

158163
logger.debug("bloom filter size is: {}".format(bfSize))
159164
logger.debug("size factor = {}".format(size_factor))
160-
assert bfSize >= size_factor * min_bf_size
165+
# In the case where the number of sstables is greater than the number of directories, it's possible this to be
166+
# both with unique keys (where the bf size will remain close to the unadjusted limit) or with repetitions
167+
# of keys (where the bf size will be a multiple of the expected). Permit both by only using the size factor on
168+
# the maximum size. Note that the test is designed to end up with size_factor == 1 and most runs do so, thus
169+
# this is not a loosening of the test in the common case, only ensures that we don't end up with flakes.
170+
assert bfSize >= min_bf_size
161171
assert bfSize <= size_factor * max_bf_size
162172

163173
@pytest.mark.parametrize("strategy", strategies)
@@ -313,9 +323,9 @@ def test_compaction_strategy_switching(self, strategy):
313323
self.skip_if_not_supported(strategy)
314324

315325
if self.cluster.version() >= '5.0':
316-
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy']
326+
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'UnifiedCompactionStrategy']
317327
else:
318-
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy']
328+
strategies = ['LeveledCompactionStrategy', 'SizeTieredCompactionStrategy', 'DateTieredCompactionStrategy', 'UnifiedCompactionStrategy']
319329

320330
if strategy in strategies:
321331
strategies.remove(strategy)
@@ -324,6 +334,7 @@ def test_compaction_strategy_switching(self, strategy):
324334
[node1] = cluster.nodelist()
325335

326336
for strat in strategies:
337+
logger.debug("Switching to {}".format(strat))
327338
session = self.patient_cql_connection(node1)
328339
create_ks(session, 'ks', 1)
329340

disk_balance_test.py

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ class TestDiskBalance(Tester):
2424
@jira_ticket CASSANDRA-6696
2525
"""
2626

27+
STCS_COMPACTION_OPTS = "SizeTieredCompactionStrategy"
28+
LCS_COMPACTION_OPTS = "LeveledCompactionStrategy,sstable_size_in_mb=1"
29+
UCS_COMPACTION_OPTS = "UnifiedCompactionStrategy"
30+
2731
@pytest.fixture(scope='function', autouse=True)
2832
def fixture_set_cluster_settings(self, fixture_dtest_setup):
2933
cluster = fixture_dtest_setup.cluster
@@ -191,16 +195,23 @@ def test_disk_balance_after_boundary_change_stcs(self):
191195
"""
192196
@jira_ticket CASSANDRA-13948
193197
"""
194-
self._disk_balance_after_boundary_change_test(lcs=False)
198+
self._disk_balance_after_boundary_change_test(self.STCS_COMPACTION_OPTS)
195199

196200
@since('3.10')
197201
def test_disk_balance_after_boundary_change_lcs(self):
198202
"""
199203
@jira_ticket CASSANDRA-13948
200204
"""
201-
self._disk_balance_after_boundary_change_test(lcs=True)
205+
self._disk_balance_after_boundary_change_test(self.LCS_COMPACTION_OPTS)
206+
207+
@since('4.0')
208+
def test_disk_balance_after_boundary_change_ucs(self):
209+
"""
210+
@jira_ticket CASSANDRA-13948
211+
"""
212+
self._disk_balance_after_boundary_change_test(self.UCS_COMPACTION_OPTS)
202213

203-
def _disk_balance_after_boundary_change_test(self, lcs):
214+
def _disk_balance_after_boundary_change_test(self, compaction_opts):
204215
"""
205216
@jira_ticket CASSANDRA-13948
206217
@@ -231,7 +242,6 @@ def _disk_balance_after_boundary_change_test(self, lcs):
231242
keys_per_flush = 10000
232243
keys_to_write = num_flushes * keys_per_flush
233244

234-
compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if lcs else "SizeTieredCompactionStrategy"
235245
logger.debug("Writing {} keys in {} flushes (compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts))
236246
total_keys = num_flushes * keys_per_flush
237247
current_keys = 0
@@ -255,29 +265,36 @@ def _disk_balance_after_boundary_change_test(self, lcs):
255265
node2.start(wait_for_binary_proto=True, jvm_args=["-Dcassandra.migration_task_wait_in_seconds=10"], set_migration_task=False)
256266
node2.flush()
257267

258-
self._assert_balanced_after_boundary_change(node1, total_keys, lcs)
268+
self._assert_balanced_after_boundary_change(node1, total_keys, compaction_opts)
259269

260270
logger.debug("Decommissioning node1")
261271
node1.decommission()
262272
node1.stop()
263273

264-
self._assert_balanced_after_boundary_change(node2, total_keys, lcs)
274+
self._assert_balanced_after_boundary_change(node2, total_keys, compaction_opts)
265275

266276
@since('3.10')
267277
def test_disk_balance_after_joining_ring_stcs(self):
268278
"""
269279
@jira_ticket CASSANDRA-13948
270280
"""
271-
self._disk_balance_after_joining_ring_test(lcs=False)
281+
self._disk_balance_after_joining_ring_test(self.STCS_COMPACTION_OPTS)
272282

273283
@since('3.10')
274284
def test_disk_balance_after_joining_ring_lcs(self):
275285
"""
276286
@jira_ticket CASSANDRA-13948
277287
"""
278-
self._disk_balance_after_joining_ring_test(lcs=True)
288+
self._disk_balance_after_joining_ring_test(self.LCS_COMPACTION_OPTS)
289+
290+
@since('4.0')
291+
def test_disk_balance_after_joining_ring_ucs(self):
292+
"""
293+
@jira_ticket CASSANDRA-13948
294+
"""
295+
self._disk_balance_after_joining_ring_test(self.UCS_COMPACTION_OPTS)
279296

280-
def _disk_balance_after_joining_ring_test(self, lcs):
297+
def _disk_balance_after_joining_ring_test(self, compaction_opts):
281298
"""
282299
@jira_ticket CASSANDRA-13948
283300
@@ -303,7 +320,6 @@ def _disk_balance_after_joining_ring_test(self, lcs):
303320
keys_per_flush = 10000
304321
keys_to_write = num_flushes * keys_per_flush
305322

306-
compaction_opts = "LeveledCompactionStrategy,sstable_size_in_mb=1" if lcs else "SizeTieredCompactionStrategy"
307323
logger.debug("Writing {} keys in {} flushes (compaction_opts={})".format(keys_to_write, num_flushes, compaction_opts))
308324
total_keys = num_flushes * keys_per_flush
309325
current_keys = 0
@@ -328,9 +344,9 @@ def _disk_balance_after_joining_ring_test(self, lcs):
328344
node1.nodetool("join")
329345
node1.nodetool("join") # Need to run join twice - one to join ring, another to leave write survey mode
330346

331-
self._assert_balanced_after_boundary_change(node1, total_keys, lcs)
347+
self._assert_balanced_after_boundary_change(node1, total_keys, compaction_opts)
332348

333-
def _assert_balanced_after_boundary_change(self, node, total_keys, lcs):
349+
def _assert_balanced_after_boundary_change(self, node, total_keys, compaction_opts):
334350
logger.debug("Cleanup {}".format(node.name))
335351
node.cleanup()
336352

@@ -352,7 +368,7 @@ def _assert_balanced_after_boundary_change(self, node, total_keys, lcs):
352368
logger.debug("Reading data back ({} keys)".format(total_keys))
353369
node.stress(['read', 'n={}'.format(total_keys), "no-warmup", "cl=ALL", "-pop", "seq=1...{}".format(total_keys), "-rate", "threads=1"])
354370

355-
if lcs:
371+
if compaction_opts == self.LCS_COMPACTION_OPTS:
356372
output = grep_sstables_in_each_level(node, "standard1")
357373
logger.debug("SSTables in each level: {}".format(output))
358374

repair_tests/repair_test.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import os.path
33
import threading
4+
import tempfile
45
import time
56
import re
67
import pytest
@@ -1168,8 +1169,7 @@ def test_multiple_concurrent_repairs(self):
11681169
_, _, rc = node2.stress(['read', 'n=1M', 'no-warmup', '-rate', 'threads=30'], whitelist=True)
11691170
assert rc == 0
11701171

1171-
@since('4.0')
1172-
def test_wide_row_repair(self):
1172+
def _test_wide_row_repair(self, compaction_strategy):
11731173
"""
11741174
@jira_ticket CASSANDRA-13899
11751175
Make sure compressed vs uncompressed blocks are handled correctly when stream decompressing
@@ -1179,13 +1179,26 @@ def test_wide_row_repair(self):
11791179
cluster.populate(2).start()
11801180
node1, node2 = cluster.nodelist()
11811181
node2.stop(wait_other_notice=True)
1182-
profile_path = os.path.join(os.getcwd(), 'stress_profiles/repair_wide_rows.yaml')
1183-
logger.info(("yaml = " + profile_path))
1184-
node1.stress(['user', 'profile=' + profile_path, 'n=50', 'ops(insert=1)', 'no-warmup', '-rate', 'threads=8',
1185-
'-insert', 'visits=FIXED(100K)', 'revisit=FIXED(100K)'])
1182+
template_path = os.path.join(os.getcwd(), 'stress_profiles/repair_wide_rows.yaml.tmpl')
1183+
with open(template_path) as profile_template:
1184+
profile = profile_template.read().replace("{{ compaction_strategy }}", compaction_strategy)
1185+
with tempfile.NamedTemporaryFile(mode='w+') as stress_profile:
1186+
stress_profile.write(profile)
1187+
stress_profile.flush()
1188+
print("yaml = " + stress_profile.name)
1189+
node1.stress(['user', 'profile=' + stress_profile.name, 'n=50', 'ops(insert=1)', 'no-warmup', '-rate', 'threads=8',
1190+
'-insert', 'visits=FIXED(100K)', 'revisit=FIXED(100K)'])
11861191
node2.start(wait_for_binary_proto=True)
11871192
node2.repair()
11881193

1194+
@since('4.0')
1195+
def test_wide_row_repair_lcs(self):
1196+
self._test_wide_row_repair('LeveledCompactionStrategy')
1197+
1198+
@since('4.0')
1199+
def test_wide_row_repair_ucs(self):
1200+
self._test_wide_row_repair('UnifiedCompactionStrategy')
1201+
11891202
@since('2.1', max_version='4')
11901203
def test_dead_coordinator(self):
11911204
"""
@@ -1242,7 +1255,7 @@ def _repair_abort_test(self, options=[], nodes=1, rf=1, no_common_range=False):
12421255
cluster = self.cluster
12431256
logger.debug("Starting cluster..")
12441257
cluster.populate(nodes).start(wait_for_binary_proto=True)
1245-
1258+
12461259
node1 = self.cluster.nodelist()[0]
12471260
session = self.patient_cql_connection(node1)
12481261
create_ks(session, 'ks', rf=rf)

replace_address_test.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -639,7 +639,14 @@ def test_replace_with_insufficient_replicas(self):
639639
self.replacement_node.watch_log_for("Unable to find sufficient sources for streaming range")
640640
assert_not_running(self.replacement_node)
641641

642-
def test_multi_dc_replace_with_rf1(self):
642+
def test_multi_dc_replace_with_rf1_stcs(self):
643+
self._test_multi_dc_replace_with_rf1('SizeTieredCompactionStrategy')
644+
645+
@since("4.0")
646+
def test_multi_dc_replace_with_rf1_ucs(self):
647+
self._test_multi_dc_replace_with_rf1('UnifiedCompactionStrategy')
648+
649+
def _test_multi_dc_replace_with_rf1(self, compaction_strategy):
643650
"""
644651
Test that multi-dc replace works when rf=1 on each dc
645652
"""
@@ -649,7 +656,7 @@ def test_multi_dc_replace_with_rf1(self):
649656
# Create the keyspace and table
650657
keyspace: keyspace1
651658
keyspace_definition: |
652-
CREATE KEYSPACE keyspace1 WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1};
659+
CREATE KEYSPACE keyspace1 WITH replication = {{'class': 'NetworkTopologyStrategy', 'dc1': 1, 'dc2': 1}};
653660
table: users
654661
table_definition:
655662
CREATE TABLE users (
@@ -658,15 +665,16 @@ def test_multi_dc_replace_with_rf1(self):
658665
last_name text,
659666
email text,
660667
PRIMARY KEY(username)
661-
) WITH compaction = {'class':'SizeTieredCompactionStrategy'};
668+
) WITH compaction = {{'class':'{compaction_strategy}'}};
662669
insert:
663670
partitions: fixed(1)
664671
batchtype: UNLOGGED
665672
queries:
666673
read:
667674
cql: select * from users where username = ?
668675
fields: samerow
669-
"""
676+
""".format(compaction_strategy=compaction_strategy)
677+
670678
with tempfile.NamedTemporaryFile(mode='w+') as stress_config:
671679
stress_config.write(yaml_config)
672680
stress_config.flush()

schema_test.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313

1414
class TestSchema(Tester):
1515

16-
def test_table_alteration(self):
16+
def _test_table_alteration(self, compaction_opts):
1717
"""
1818
Tests that table alters return as expected with many sstables at different schema points
1919
"""
@@ -24,7 +24,7 @@ def test_table_alteration(self):
2424
create_ks(session, 'ks', 1)
2525
session.execute("use ks;")
2626
session.execute("create table tbl_o_churn (id int primary key, c0 text, c1 text) "
27-
"WITH compaction = {'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 };")
27+
"WITH compaction = " + compaction_opts + ";")
2828

2929
stmt1 = session.prepare("insert into tbl_o_churn (id, c0, c1) values (?, ?, ?)")
3030
rows_to_insert = 50
@@ -54,6 +54,13 @@ def test_table_alteration(self):
5454
assert row.c2 == 'ddd'
5555
assert not hasattr(row, 'c0')
5656

57+
def test_table_alteration_stcs(self):
58+
self._test_table_alteration("{'class': 'SizeTieredCompactionStrategy', 'min_threshold': 1024, 'max_threshold': 1024 }")
59+
60+
@since("4.0")
61+
def test_table_alteration_ucs(self):
62+
self._test_table_alteration("{'class': 'UnifiedCompactionStrategy'}")
63+
5764
@since("2.0", max_version="3.X") # Compact Storage
5865
def test_drop_column_compact(self):
5966
session = self.prepare()

stress_profiles/repair_wide_rows.yaml renamed to stress_profiles/repair_wide_rows.yaml.tmpl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@ table_definition: |
99
col1 text,
1010
val blob,
1111
PRIMARY KEY(key, col1)
12-
)
13-
WITH compaction = { 'class':'LeveledCompactionStrategy' }
12+
)
13+
WITH compaction = { 'class':'{{ compaction_strategy }}' }
1414
AND compression = {'chunk_length_in_kb': '1', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'};
1515

1616
#
1717
# Optional meta information on the generated columns in the above table
1818
# The min and max only apply to text and blob types
1919
# The distribution field represents the total unique population
2020
# distribution of that column across rows. Supported types are
21-
#
21+
#
2222
# EXP(min..max) An exponential distribution over the range [min..max]
2323
# EXTREME(min..max,shape) An extreme value (Weibull) distribution over the range [min..max]
2424
# GAUSSIAN(min..max,stdvrng) A gaussian/normal distribution, where mean=(min+max)/2, and stdev is (mean-min)/stdvrng
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
--- a/compaction_test.py
2+
+++ b/compaction_test.py
3+
@@ -134,18 +134,14 @@
4+
"threads=300", "-schema", "replication(factor=1)",
5+
"compaction({},enabled=false)".format(strategy_string)])
6+
node1.flush()
7+
- logger.debug(node1.nodetool('cfstats keyspace1.standard1').stdout)
8+
+ logger.debug(node1.nodetool('tablestats keyspace1.standard1').stdout)
9+
10+
node1.nodetool('enableautocompaction')
11+
node1.wait_for_compactions()
12+
13+
table_name = 'standard1'
14+
-<<<<<<<
15+
- output = node1.nodetool('cfstats keyspace1.standard1').stdout
16+
+ output = node1.nodetool('tablestats keyspace1.standard1').stdout
17+
logger.debug(output)
18+
-=======
19+
- output = node1.nodetool('tablestats').stdout
20+
->>>>>>>
21+
output = output[output.find(table_name):]
22+
output = output[output.find("Bloom filter space used"):]
23+
bfSize = int(output[output.find(":") + 1:output.find("\n")].strip())
24+
diff --git a/compaction_test.py b/compaction_test.py
25+
index fa924974..d79f1326 100644
26+
--- a/compaction_test.py
27+
+++ b/compaction_test.py
28+
@@ -134,7 +134,7 @@ class TestCompaction(Tester):
29+
"threads=300", "-schema", "replication(factor=1)",
30+
"compaction({},enabled=false)".format(strategy_string)])
31+
node1.flush()
32+
- logger.debug(node1.nodetool('cfstats keyspace1.standard1').stdout)
33+
+ logger.debug(node1.nodetool('tablestats keyspace1.standard1').stdout)
34+
35+
node1.nodetool('enableautocompaction')
36+
node1.wait_for_compactions()

0 commit comments

Comments
 (0)