Skip to content

Commit ac2f0f6

Browse files
djatnieksGerrrr
authored andcommitted
STAR-543: Port guardrail tests and changes (#19)
Co-authored-by: Aleksandr Sorokoumov <[email protected]> (cherry picked from commit fd2b1c3) (cherry picked from commit 66e6b72) (cherry picked from commit 7060130)
1 parent 5a57ef8 commit ac2f0f6

12 files changed

+217
-39
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
RULE return FULL disk usage
2+
CLASS org.apache.cassandra.service.disk.usage.DiskUsageMonitor
3+
METHOD getState
4+
AT EXIT
5+
IF TRUE
6+
DO
7+
return org.apache.cassandra.service.disk.usage.DiskUsageState.FULL;
8+
ENDRULE
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
RULE return STUFFED disk usage
2+
CLASS org.apache.cassandra.service.disk.usage.DiskUsageMonitor
3+
METHOD getState
4+
AT EXIT
5+
IF TRUE
6+
DO
7+
return org.apache.cassandra.service.disk.usage.DiskUsageState.STUFFED;
8+
ENDRULE

client_request_metrics_test.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,15 @@ def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
4242
fixture_dtest_setup.ignore_log_patterns = (
4343
'Testing write failures', # The error to simulate a write failure
4444
'ERROR WRITE_FAILURE', # Logged in DEBUG mode for write failures
45-
f"Scanned over {TOMBSTONE_FAILURE_THRESHOLD + 1} tombstones during query" # Caused by the read failure tests
45+
f"Scanned over {TOMBSTONE_FAILURE_THRESHOLD + 1} (tombstones|tombstone rows) during query" # Caused by the read failure tests
4646
)
4747

4848
def setup_once(self):
4949
cluster = self.cluster
5050
cluster.set_configuration_options({'read_request_timeout_in_ms': 3000,
5151
'write_request_timeout_in_ms': 3000,
5252
'phi_convict_threshold': 12,
53+
'tombstone_warn_threshold': -1,
5354
'tombstone_failure_threshold': TOMBSTONE_FAILURE_THRESHOLD,
5455
'enable_materialized_views': 'true'})
5556
cluster.populate(2, debug=True)

compaction_test.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,10 @@ def test_large_compaction_warning(self):
339339
Check that we log a warning when the partition size is bigger than compaction_large_partition_warning_threshold_mb
340340
"""
341341
cluster = self.cluster
342-
cluster.set_configuration_options({'compaction_large_partition_warning_threshold_mb': 1})
342+
if self.supports_guardrails:
343+
cluster.set_configuration_options({'guardrails': {'partition_size_warn_threshold_in_mb': 1}})
344+
else:
345+
cluster.set_configuration_options({'compaction_large_partition_warning_threshold_mb': 1})
343346
cluster.populate(1).start()
344347
[node] = cluster.nodelist()
345348

@@ -361,7 +364,10 @@ def test_large_compaction_warning(self):
361364
node.nodetool('compact ks large')
362365
verb = 'Writing' if self.cluster.version() > '2.2' else 'Compacting'
363366
sizematcher = '\d+ bytes' if self.cluster.version() < LooseVersion('3.6') else '\d+\.\d{3}(K|M|G)iB'
364-
node.watch_log_for('{} large partition ks/large:user \({}'.format(verb, sizematcher), from_mark=mark, timeout=180)
367+
log_message = '{} large partition ks/large:user \({}'.format(verb, sizematcher)
368+
if self.supports_guardrails:
369+
log_message = "Detected partition 'user' in ks.large of size 2MB is greater than the maximum recommended size \(1MB\)"
370+
node.watch_log_for(log_message, from_mark=mark, timeout=180)
365371

366372
ret = list(session.execute("SELECT properties from ks.large where userid = 'user'"))
367373
assert_length_equal(ret, 1)

cqlsh_tests/test_cqlsh.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1870,8 +1870,11 @@ def test_client_warnings(self):
18701870
"""
18711871
max_partitions_per_batch = 5
18721872
self.cluster.populate(3)
1873-
self.cluster.set_configuration_options({
1874-
'unlogged_batch_across_partitions_warn_threshold': str(max_partitions_per_batch)})
1873+
1874+
config_opts = {'unlogged_batch_across_partitions_warn_threshold': str(max_partitions_per_batch)}
1875+
if self.supports_guardrails:
1876+
config_opts = {"guardrails": config_opts}
1877+
self.cluster.set_configuration_options(config_opts)
18751878

18761879
self.cluster.start()
18771880

cqlsh_tests/test_cqlsh_copy.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2481,8 +2481,12 @@ def test_bulk_round_trip_blogposts(self):
24812481
24822482
@jira_ticket CASSANDRA-9302
24832483
"""
2484+
config_opts = {'batch_size_warn_threshold_in_kb': '10'}
2485+
if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0
2486+
config_opts = {'guardrails': config_opts}
2487+
24842488
self._test_bulk_round_trip(nodes=3, partitioner="murmur3", num_operations=10000,
2485-
configuration_options={'batch_size_warn_threshold_in_kb': '10'},
2489+
configuration_options=config_opts,
24862490
profile=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'blogposts.yaml'),
24872491
stress_table='stresscql.blogposts')
24882492

@@ -2495,9 +2499,16 @@ def test_bulk_round_trip_blogposts_with_max_connections(self):
24952499
24962500
@jira_ticket CASSANDRA-10938
24972501
"""
2502+
batch_size_warn_threshold_in_kb = '10'
2503+
native_transport_max_concurrent_connections = '12'
2504+
if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0
2505+
config_opts = {'guardrails': {'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb},
2506+
'native_transport_max_concurrent_connections': native_transport_max_concurrent_connections}
2507+
else:
2508+
config_opts = {'native_transport_max_concurrent_connections': native_transport_max_concurrent_connections,
2509+
'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb}
24982510
self._test_bulk_round_trip(nodes=3, partitioner="murmur3", num_operations=10000,
2499-
configuration_options={'native_transport_max_concurrent_connections': '12',
2500-
'batch_size_warn_threshold_in_kb': '10'},
2511+
configuration_options=config_opts,
25012512
profile=os.path.join(os.path.dirname(os.path.realpath(__file__)), 'blogposts.yaml'),
25022513
stress_table='stresscql.blogposts',
25032514
copy_to_options={'NUMPROCESSES': 5, 'MAXATTEMPTS': 20},
@@ -2827,8 +2838,13 @@ def test_copy_from_with_large_cql_rows(self):
28272838
@jira_ticket CASSANDRA-11474
28282839
"""
28292840
num_records = 100
2830-
self.prepare(nodes=1, configuration_options={'batch_size_warn_threshold_in_kb': '1', # warn with 1kb and fail
2831-
'batch_size_fail_threshold_in_kb': '5'}) # with 5kb size batches
2841+
batch_size_warn_threshold_in_kb = '1' # warn with 1kb and fail
2842+
batch_size_fail_threshold_in_kb = '5' # with 5kb size batches
2843+
config_opts = {'batch_size_warn_threshold_in_kb': batch_size_warn_threshold_in_kb,
2844+
'batch_size_fail_threshold_in_kb': batch_size_fail_threshold_in_kb}
2845+
if self.supports_guardrails: # batch size thresholds moved to guardrails in 4.0
2846+
config_opts = {'guardrails': config_opts}
2847+
self.prepare(nodes=1, configuration_options=config_opts)
28322848

28332849
logger.debug('Running stress')
28342850
stress_table_name = 'standard1'

dtest_setup.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,10 @@ def dump_jfr_recording(self, nodes):
332332
def supports_v5_protocol(self, cluster_version):
333333
return cluster_version >= LooseVersion('4.0')
334334

335+
def supports_guardrails(self):
336+
return self.cluster.version() >= LooseVersion('4.0')
337+
338+
335339
def cleanup_last_test_dir(self):
336340
if os.path.exists(self.last_test_dir):
337341
os.remove(self.last_test_dir)

guardrails_test.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
import logging
2+
import time
3+
import pytest
4+
import re
5+
6+
from cassandra import InvalidRequest
7+
8+
from dtest import Tester, create_ks
9+
from tools.assertions import assert_one
10+
11+
since = pytest.mark.since
12+
logger = logging.getLogger(__name__)
13+
14+
class BaseGuardrailsTester(Tester):
15+
16+
def prepare(self, rf=1, options=None, nodes=3, install_byteman=False, extra_jvm_args=None, **kwargs):
17+
if options is None:
18+
options = {}
19+
20+
if extra_jvm_args is None:
21+
extra_jvm_args = []
22+
23+
cluster = self.cluster
24+
cluster.set_log_level('TRACE')
25+
cluster.populate(nodes, install_byteman=install_byteman)
26+
if options:
27+
cluster.set_configuration_options(values=options)
28+
29+
cluster.start(jvm_args=extra_jvm_args)
30+
node1 = cluster.nodelist()[0]
31+
32+
session = self.patient_cql_connection(node1, **kwargs)
33+
create_ks(session, 'ks', rf)
34+
35+
return session
36+
37+
38+
@since('4.0')
39+
class TestGuardrails(BaseGuardrailsTester):
40+
41+
def test_disk_usage_guardrail(self):
42+
"""
43+
Test disk usage guardrail will warn if exceeds warn threshold and reject writes if exceeds failure threshold
44+
"""
45+
46+
self.fixture_dtest_setup.ignore_log_patterns = ["Write request failed because disk usage exceeds failure threshold"]
47+
guardrails_config = {'guardrails': {'disk_usage_percentage_warn_threshold': 98,
48+
'disk_usage_percentage_failure_threshold': 99}}
49+
50+
logger.debug("prepare 2-node cluster with rf=1 and guardrails enabled")
51+
session = self.prepare(rf=1, nodes=2, options=guardrails_config, extra_jvm_args=['-Dcassandra.disk_usage.monitor_interval_ms=100'], install_byteman=True)
52+
node1, node2 = self.cluster.nodelist()
53+
session.execute("CREATE TABLE t (id int PRIMARY KEY, v int)")
54+
55+
logger.debug("Inject FULL to node1, expect log on node1 and node2 rejects writes")
56+
mark = node1.mark_log()
57+
self.disk_usage_injection(node1, "full", False)
58+
node1.watch_log_for("Adding state DISK_USAGE: FULL", filename='debug.log', from_mark=mark, timeout=10)
59+
60+
# verify node2 will reject writes if node1 is the replica
61+
session2 = self.patient_exclusive_cql_connection(node2, keyspace="ks")
62+
rows = 100
63+
failed = 0
64+
for x in range(rows):
65+
try:
66+
session2.execute("INSERT INTO t(id, v) VALUES({v}, {v})".format(v=x))
67+
except InvalidRequest as e:
68+
assert re.search("Write request failed because disk usage exceeds failure threshold", str(e))
69+
failed = failed + 1
70+
71+
assert rows != failed, "Expect node2 rejects some writes, but rejected all"
72+
assert 0 != failed, "Expect node2 rejects some writes, but rejected nothing"
73+
assert_one(session2, "SELECT COUNT(*) FROM t", [rows - failed])
74+
75+
logger.debug("Inject STUFFED to node1, node2 should warn client")
76+
session2.execute("TRUNCATE t")
77+
mark = node1.mark_log()
78+
self.disk_usage_injection(node1, "stuffed")
79+
node1.watch_log_for("Adding state DISK_USAGE: STUFFED", filename='debug.log', from_mark=mark, timeout=10)
80+
81+
warnings = 0
82+
for x in range(rows):
83+
fut = session2.execute_async("INSERT INTO t(id, v) VALUES({v}, {v})".format(v=x))
84+
fut.result()
85+
if fut.warnings:
86+
assert ["Replica disk usage exceeds warn threshold"] == fut.warnings
87+
warnings = warnings + 1
88+
89+
assert rows != warnings,"Expect node2 emits some warnings, but got all warnings"
90+
assert 0 != warnings,"Expect node2 emits some warnings, but got no warnings"
91+
assert_one(session2, "SELECT COUNT(*) FROM t", [rows])
92+
93+
session.cluster.shutdown()
94+
session2.cluster.shutdown()
95+
96+
def disk_usage_injection(self, node, state, clear_byteman=True):
97+
if clear_byteman:
98+
node.byteman_submit(['-u'])
99+
node.byteman_submit(["./byteman/guardrails/disk_usage_{}.btm".format(state)])

paging_test.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
assert_one, assert_lists_equal_ignoring_order)
1919
from tools.data import rows_to_list
2020
from tools.datahelp import create_rows, flatten_into_set, parse_data_into_dicts
21+
from tools.misc import restart_cluster_and_update_config
2122
from tools.paging import PageAssertionMixin, PageFetcher
2223

2324
since = pytest.mark.since
@@ -3423,19 +3424,26 @@ def test_failure_threshold_deletions(self):
34233424
supports_v5_protocol = self.supports_v5_protocol(self.cluster.version())
34243425

34253426
self.fixture_dtest_setup.allow_log_errors = True
3426-
self.cluster.set_configuration_options(
3427-
values={'tombstone_failure_threshold': 500}
3428-
)
3427+
if self.supports_guardrails:
3428+
config_opts = {'guardrails': {'tombstone_failure_threshold': 500,
3429+
'tombstone_warn_threshold': -1,
3430+
'write_consistency_levels_disallowed': {}}}
3431+
else:
3432+
config_opts = {'tombstone_failure_threshold': 500}
3433+
restart_cluster_and_update_config(self.cluster, config_opts)
34293434
self.session = self.prepare()
34303435
self.setup_data()
34313436

3432-
# Add more data
3437+
if self.supports_guardrails:
3438+
# cell tombstones are not counted towards the threshold, so we delete rows
3439+
query = "delete from paging_test where id = 1 and mytext = '{}'"
3440+
else:
3441+
# Add more data
3442+
query = "insert into paging_test (id, mytext, col1) values (1, '{}', null)"
3443+
34333444
values = [uuid.uuid4() for i in range(3000)]
34343445
for value in values:
3435-
self.session.execute(SimpleStatement(
3436-
"insert into paging_test (id, mytext, col1) values (1, '{}', null) ".format(
3437-
value
3438-
),
3446+
self.session.execute(SimpleStatement(query.format(value),
34393447
consistency_level=CL.ALL
34403448
))
34413449

@@ -3456,7 +3464,7 @@ def test_failure_threshold_deletions(self):
34563464
failure_msg = ("Scanned over.* tombstones in test_paging_size."
34573465
"paging_test.* query aborted")
34583466
else:
3459-
failure_msg = ("Scanned over.* tombstones during query.* query aborted")
3467+
failure_msg = ("Scanned over.* (tombstones|tombstone rows) during query.* query aborted")
34603468

34613469
self.cluster.wait_for_any_log(failure_msg, 25)
34623470

pushed_notifications_test.py

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -388,13 +388,18 @@ def test_tombstone_failure_threshold_message(self):
388388
have_v5_protocol = self.supports_v5_protocol(self.cluster.version())
389389

390390
self.fixture_dtest_setup.allow_log_errors = True
391-
self.cluster.set_configuration_options(
392-
values={
393-
'tombstone_failure_threshold': 500,
394-
'read_request_timeout_in_ms': 30000, # 30 seconds
395-
'range_request_timeout_in_ms': 40000
396-
}
397-
)
391+
392+
if self.supports_guardrails:
393+
config_options = {'guardrails': {'tombstone_warn_threshold': -1,
394+
'tombstone_failure_threshold': 500},
395+
'read_request_timeout_in_ms': 30000, # 30 seconds
396+
'range_request_timeout_in_ms': 40000}
397+
else:
398+
config_options = {'tombstone_failure_threshold': 500,
399+
'read_request_timeout_in_ms': 30000, # 30 seconds
400+
'range_request_timeout_in_ms': 40000}
401+
402+
self.cluster.set_configuration_options(values=config_options)
398403
self.cluster.populate(3).start()
399404
node1, node2, node3 = self.cluster.nodelist()
400405
proto_version = 5 if have_v5_protocol else None
@@ -407,17 +412,17 @@ def test_tombstone_failure_threshold_message(self):
407412
"PRIMARY KEY (id, mytext) )"
408413
)
409414

410-
# Add data with tombstones
415+
if self.supports_guardrails:
416+
# cell tombstones are not counted towards the threshold, so we delete rows
417+
query = "delete from test where id = 1 and mytext = '{}'"
418+
else:
419+
# Add data with tombstones
420+
query = "insert into test (id, mytext, col1) values (1, '{}', null)"
411421
values = [str(i) for i in range(1000)]
412422
for value in values:
413-
session.execute(SimpleStatement(
414-
"insert into test (id, mytext, col1) values (1, '{}', null) ".format(
415-
value
416-
),
417-
consistency_level=CL.ALL
418-
))
419-
420-
failure_msg = ("Scanned over.* tombstones.* query aborted")
423+
session.execute(SimpleStatement(query.format(value),consistency_level=CL.ALL))
424+
425+
failure_msg = ("Scanned over.* (tombstones|tombstone rows).* query aborted")
421426

422427
@pytest.mark.timeout(25)
423428
def read_failure_query():

read_failures_test.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from cassandra import ConsistencyLevel, ReadFailure, ReadTimeout
55
from cassandra.policies import FallthroughRetryPolicy
66
from cassandra.query import SimpleStatement
7+
from distutils.version import LooseVersion
78

89
from dtest import Tester
910

@@ -21,7 +22,9 @@ class TestReadFailures(Tester):
2122
@pytest.fixture(autouse=True)
2223
def fixture_add_additional_log_patterns(self, fixture_dtest_setup):
2324
fixture_dtest_setup.ignore_log_patterns = (
24-
"Scanned over [1-9][0-9]* tombstones", # This is expected when testing read failures due to tombstones
25+
# These are expected when testing read failures due to tombstones,
26+
"Scanned over [1-9][0-9]* tombstones",
27+
"Scanned over [1-9][0-9]* tombstone rows",
2528
)
2629
return fixture_dtest_setup
2730

@@ -33,9 +36,15 @@ def fixture_dtest_setup_params(self):
3336
self.expected_expt = ReadFailure
3437

3538
def _prepare_cluster(self):
36-
self.cluster.set_configuration_options(
37-
values={'tombstone_failure_threshold': self.tombstone_failure_threshold}
38-
)
39+
if self.supports_guardrails:
40+
self.cluster.set_configuration_options(
41+
values={'guardrails': {'tombstone_warn_threshold': -1,
42+
'tombstone_failure_threshold': self.tombstone_failure_threshold}}
43+
)
44+
else:
45+
self.cluster.set_configuration_options(
46+
values={'tombstone_failure_threshold': self.tombstone_failure_threshold}
47+
)
3948
self.cluster.populate(3)
4049
self.cluster.start()
4150
self.nodes = list(self.cluster.nodes.values())

tools/misc.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,14 @@ def add_skip(cls, reason=""):
157157
else:
158158
cls.pytestmark = [pytest.mark.skip(reason)]
159159
return cls
160+
161+
162+
def restart_cluster_and_update_config(cluster, config):
163+
"""
164+
Takes a new config, and applies it to a cluster. We need to restart
165+
for it to take effect. We _could_ take a node here, but we don't want to.
166+
If you really want to change the config of just one node, use JMX.
167+
"""
168+
cluster.stop()
169+
cluster.set_configuration_options(values=config)
170+
cluster.start()

0 commit comments

Comments
 (0)