1
+ # Licensed to the Apache Software Foundation (ASF) under one or more
2
+ # contributor license agreements. See the NOTICE file distributed with
3
+ # this work for additional information regarding copyright ownership.
4
+ # The ASF licenses this file to You under the Apache License, Version 2.0
5
+ # (the "License"); you may not use this file except in compliance with
6
+ # the License. You may obtain a copy of the License at
7
+ #
8
+ # http://www.apache.org/licenses/LICENSE-2.0
9
+ #
10
+ # Unless required by applicable law or agreed to in writing, software
11
+ # distributed under the License is distributed on an "AS IS" BASIS,
12
+ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ # See the License for the specific language governing permissions and
14
+ # limitations under the License.
15
+
16
+ from kafkatest .services .kafka import KafkaService , quorum , consumer_group
17
+ from kafkatest .services .console_consumer import ConsoleConsumer
18
+ from kafkatest .services .verifiable_producer import VerifiableProducer
19
+ from kafkatest .utils import is_int
20
+
21
+ from ducktape .tests .test import Test
22
+ from ducktape .mark import matrix
23
+ from ducktape .mark .resource import cluster
24
+ from ducktape .utils .util import wait_until
25
+
26
+ import time
27
+
28
+ class EligibleLeaderReplicasTest (Test ):
29
+ """
30
+ Eligible leader replicas test verifies the ELR election can happen after all the replicas are offline.
31
+ The partition will first clean shutdown 2 replicas and unclean shutdown the leader. Then the two clean shutdown
32
+ replicas are started. One of these 2 replicas should be in ELR and become the leader.
33
+ """
34
+
35
+ def __init__ (self , test_context ):
36
+ """:type test_context: ducktape.tests.test.TestContext"""
37
+ super (EligibleLeaderReplicasTest , self ).__init__ (test_context = test_context )
38
+
39
+ self .topic = "input-topic"
40
+
41
+ self .num_brokers = 6
42
+
43
+ # Test parameters
44
+ self .num_partitions = 1
45
+ self .num_seed_messages = 10000
46
+
47
+ self .progress_timeout_sec = 60
48
+ self .consumer_group = "elr-test-consumer-group"
49
+ self .broker_startup_timeout_sec = 120
50
+
51
+ def seed_messages (self , topic , num_seed_messages ):
52
+ seed_timeout_sec = 10000
53
+ seed_producer = VerifiableProducer (context = self .test_context ,
54
+ num_nodes = 1 ,
55
+ kafka = self .kafka ,
56
+ topic = topic ,
57
+ message_validator = is_int ,
58
+ max_messages = num_seed_messages ,
59
+ enable_idempotence = True )
60
+ seed_producer .start ()
61
+ wait_until (lambda : seed_producer .num_acked >= num_seed_messages ,
62
+ timeout_sec = seed_timeout_sec ,
63
+ err_msg = "Producer failed to produce messages %d in %ds." % \
64
+ (self .num_seed_messages , seed_timeout_sec ))
65
+ return seed_producer .acked
66
+
67
+ def get_messages_from_topic (self , topic , num_messages , group_protocol ):
68
+ consumer = self .start_consumer (topic , group_id = "verifying_consumer" , group_protocol = group_protocol )
69
+ return self .drain_consumer (consumer , num_messages )
70
+
71
+ def stop_broker (self , node , clean_shutdown ):
72
+ if clean_shutdown :
73
+ self .kafka .stop_node (node , clean_shutdown = True , timeout_sec = self .broker_startup_timeout_sec )
74
+ else :
75
+ self .kafka .stop_node (node , clean_shutdown = False )
76
+ gracePeriodSecs = 5
77
+ brokerSessionTimeoutSecs = 18
78
+ wait_until (lambda : not self .kafka .pids (node ),
79
+ timeout_sec = brokerSessionTimeoutSecs + gracePeriodSecs ,
80
+ err_msg = "Failed to see timely disappearance of process for hard-killed broker %s" % str (node .account ))
81
+ time .sleep (brokerSessionTimeoutSecs + gracePeriodSecs )
82
+
83
+ def start_consumer (self , topic_to_read , group_id , group_protocol ):
84
+ consumer = ConsoleConsumer (context = self .test_context ,
85
+ num_nodes = 1 ,
86
+ kafka = self .kafka ,
87
+ topic = topic_to_read ,
88
+ group_id = group_id ,
89
+ message_validator = is_int ,
90
+ from_beginning = True ,
91
+ isolation_level = "read_committed" ,
92
+ consumer_properties = consumer_group .maybe_set_group_protocol (group_protocol ))
93
+ consumer .start ()
94
+ # ensure that the consumer is up.
95
+ wait_until (lambda : (len (consumer .messages_consumed [1 ]) > 0 ) == True ,
96
+ timeout_sec = 180 ,
97
+ err_msg = "Consumer failed to consume any messages for %ds" % \
98
+ 60 )
99
+ return consumer
100
+
101
+ def drain_consumer (self , consumer , num_messages ):
102
+ # wait until we read at least the expected number of messages.
103
+ wait_until (lambda : len (consumer .messages_consumed [1 ]) >= num_messages ,
104
+ timeout_sec = 90 ,
105
+ err_msg = "Consumer consumed only %d out of %d messages in %ds" % \
106
+ (len (consumer .messages_consumed [1 ]), num_messages , 90 ))
107
+ consumer .stop ()
108
+ return consumer .messages_consumed [1 ]
109
+
110
+ def setup_topics (self ):
111
+ self .kafka .topics = {
112
+ self .topic : {
113
+ "partitions" : self .num_partitions ,
114
+ "replication-factor" : 3 ,
115
+ "configs" : {
116
+ "min.insync.replicas" : 2
117
+ }
118
+ }
119
+ }
120
+
121
+ @cluster (num_nodes = 9 )
122
+ @matrix (metadata_quorum = [quorum .isolated_kraft ],
123
+ group_protocol = consumer_group .all_group_protocols )
124
+ def test_basic_eligible_leader_replicas (self , metadata_quorum , group_protocol = None ):
125
+ self .kafka = KafkaService (self .test_context ,
126
+ num_nodes = self .num_brokers ,
127
+ zk = None ,
128
+ controller_num_nodes_override = 1 )
129
+ security_protocol = 'PLAINTEXT'
130
+
131
+ self .kafka .security_protocol = security_protocol
132
+ self .kafka .interbroker_security_protocol = security_protocol
133
+ self .kafka .logs ["kafka_data_1" ]["collect_default" ] = True
134
+ self .kafka .logs ["kafka_data_2" ]["collect_default" ] = True
135
+ self .kafka .logs ["kafka_operational_logs_debug" ]["collect_default" ] = True
136
+
137
+ self .setup_topics ()
138
+ self .kafka .start (timeout_sec = self .broker_startup_timeout_sec )
139
+
140
+ self .kafka .run_features_command ("upgrade" , "eligible.leader.replicas.version" , 1 )
141
+ input_messages = self .seed_messages (self .topic , self .num_seed_messages )
142
+ isr = self .kafka .isr_idx_list (self .topic , 0 )
143
+
144
+ self .stop_broker (self .kafka .nodes [isr [1 ] - 1 ], True )
145
+ self .stop_broker (self .kafka .nodes [isr [2 ] - 1 ], True )
146
+
147
+ wait_until (lambda : len (self .kafka .isr_idx_list (self .topic )) == 1 ,
148
+ timeout_sec = 60 ,
149
+ err_msg = "Timed out waiting for the partition to have only 1 ISR" )
150
+
151
+ self .stop_broker (self .kafka .nodes [isr [0 ] - 1 ], False )
152
+
153
+ self .kafka .start_node (self .kafka .nodes [isr [1 ] - 1 ], timeout_sec = self .broker_startup_timeout_sec )
154
+ self .kafka .start_node (self .kafka .nodes [isr [2 ] - 1 ], timeout_sec = self .broker_startup_timeout_sec )
155
+
156
+ output_messages_set = set (self .get_messages_from_topic (self .topic , self .num_seed_messages , group_protocol ))
157
+ input_message_set = set (input_messages )
158
+
159
+ assert input_message_set == output_messages_set , \
160
+ "Input and concurrently consumed output message sets are not equal. Num input messages: %d. Num concurrently_consumed_messages: %d" % \
161
+ (len (input_message_set ), len (output_messages_set ))
0 commit comments