Skip to content

Commit f737ef3

Browse files
authored
KAFKA-18900: Implement share.acknowledgement.mode to choose acknowledgement mode (#19417)
Choose the acknowledgement mode based on the config (`share.acknowledgement.mode`) and not on the basis of how the user designs the application. - The default value of the config is `IMPLICIT`, so if any empty/null/invalid value is configured, then the mode defaults to `IMPLICIT`. - Removed AcknowledgementModes `UNKNOWN` and `PENDING` as they are no longer required. - Added code to ensure if the application has any unacknowledged records in a batch in "`explicit`" mode, then it will throw an `IllegalStateException`. The expectation is if the mode is "explicit", all the records received in that `poll()` would be acknowledged before the next call to `poll()`. - Modified the `ConsoleShareConsumer` to configure the mode to "explicit" as it was using the explicit mode of acknowledging records. Reviewers: Andrew Schofield <[email protected]>
1 parent 6c3995b commit f737ef3

File tree

8 files changed

+376
-204
lines changed

8 files changed

+376
-204
lines changed

Diff for: clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ShareConsumerTest.java

+47-69
Large diffs are not rendered by default.

Diff for: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java

+10-12
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.kafka.clients.CommonClientConfigs;
2121
import org.apache.kafka.clients.MetadataRecoveryStrategy;
2222
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
23+
import org.apache.kafka.clients.consumer.internals.ShareAcknowledgementMode;
2324
import org.apache.kafka.common.IsolationLevel;
2425
import org.apache.kafka.common.config.AbstractConfig;
2526
import org.apache.kafka.common.config.ConfigDef;
@@ -381,13 +382,10 @@ public class ConsumerConfig extends AbstractConfig {
381382
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;
382383

383384
/**
384-
* <code>share.acknowledgement.mode</code> is being evaluated as a new configuration to control the acknowledgement mode
385-
* for share consumers. It will be removed or converted to a proper configuration before release.
386-
* An alternative being considered is <code>enable.explicit.share.acknowledgement</code> as a boolean configuration.
385+
* <code>share.acknowledgement.mode</code>
387386
*/
388-
public static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = "internal.share.acknowledgement.mode";
389-
private static final String INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the acknowledgement mode for a share consumer." +
390-
" If unset, the acknowledgement mode of the consumer is decided by the method calls it uses to fetch and commit." +
387+
public static final String SHARE_ACKNOWLEDGEMENT_MODE_CONFIG = "share.acknowledgement.mode";
388+
private static final String SHARE_ACKNOWLEDGEMENT_MODE_DOC = "Controls the acknowledgement mode for a share consumer." +
391389
" If set to <code>implicit</code>, the acknowledgement mode of the consumer is implicit and it must not" +
392390
" use <code>org.apache.kafka.clients.consumer.ShareConsumer.acknowledge()</code> to acknowledge delivery of records. Instead," +
393391
" delivery is acknowledged implicitly on the next call to poll or commit." +
@@ -401,7 +399,7 @@ public class ConsumerConfig extends AbstractConfig {
401399
*/
402400
private static final List<String> CLASSIC_PROTOCOL_UNSUPPORTED_CONFIGS = List.of(
403401
GROUP_REMOTE_ASSIGNOR_CONFIG,
404-
INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
402+
SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
405403
);
406404

407405
/**
@@ -411,7 +409,7 @@ public class ConsumerConfig extends AbstractConfig {
411409
PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
412410
HEARTBEAT_INTERVAL_MS_CONFIG,
413411
SESSION_TIMEOUT_MS_CONFIG,
414-
INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
412+
SHARE_ACKNOWLEDGEMENT_MODE_CONFIG
415413
);
416414

417415
static {
@@ -695,12 +693,12 @@ public class ConsumerConfig extends AbstractConfig {
695693
atLeast(0),
696694
Importance.LOW,
697695
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
698-
.define(ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
696+
.define(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG,
699697
Type.STRING,
700-
null,
701-
in(null, "implicit", "explicit"),
698+
ShareAcknowledgementMode.IMPLICIT.name(),
699+
new ShareAcknowledgementMode.Validator(),
702700
Importance.MEDIUM,
703-
ConsumerConfig.INTERNAL_SHARE_ACKNOWLEDGEMENT_MODE_DOC);
701+
ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_DOC);
704702
}
705703

706704
@Override

Diff for: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaShareConsumer.java

+25-59
Original file line numberDiff line numberDiff line change
@@ -116,33 +116,33 @@
116116
* {@code group.share.record.lock.partition.limit}. By limiting the duration of the acquisition lock and automatically
117117
* releasing the locks, the broker ensures delivery progresses even in the presence of consumer failures.
118118
* <p>
119-
* The consumer can choose to use implicit or explicit acknowledgement of the records it processes.
120-
* <p>If the application calls {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch,
121-
* it is using <em>explicit acknowledgement</em>. In this case:
122-
* <ul>
123-
* <li>The application calls {@link #commitSync()} or {@link #commitAsync()} which commits the acknowledgements to Kafka.
124-
* If any records in the batch were not acknowledged, they remain acquired and will be presented to the application
125-
* in response to a future poll.</li>
126-
* <li>The application calls {@link #poll(Duration)} without committing first, which commits the acknowledgements to
127-
* Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgement.
128-
* If any records in the batch were not acknowledged, they remain acquired and will be presented to the application
129-
* in response to a future poll.</li>
130-
* <li>The application calls {@link #close()} which attempts to commit any pending acknowledgements and
131-
* releases any remaining acquired records.</li>
132-
* </ul>
133-
* If the application does not call {@link #acknowledge(ConsumerRecord, AcknowledgeType)} for any record in the batch,
134-
* it is using <em>implicit acknowledgement</em>. In this case:
119+
* The consumer can choose to use implicit or explicit acknowledgement of the records it processes by configuring the
120+
* consumer {@code share.acknowledgement.mode} property.
121+
* <p>
122+
* If the application sets the property to "implicit" or does not set it at all, then the consumer is using
123+
* <em>implicit acknowledgement</em>. In this mode, the application acknowledges delivery by:
135124
* <ul>
136-
* <li>The application calls {@link #commitSync()} or {@link #commitAsync()} which implicitly acknowledges all of
137-
* the delivered records as processed successfully and commits the acknowledgements to Kafka.</li>
138-
* <li>The application calls {@link #poll(Duration)} without committing, which also implicitly acknowledges all of
125+
* <li>Calling {@link #poll(Duration)} without committing, which also implicitly acknowledges all
139126
* the delivered records and commits the acknowledgements to Kafka asynchronously. In this case, no exception is
140127
* thrown by a failure to commit the acknowledgements.</li>
141-
* <li>The application calls {@link #close()} which releases any acquired records without acknowledgement.</li>
128+
* <li>Calling {@link #commitSync()} or {@link #commitAsync()} which implicitly acknowledges all
129+
* the delivered records as processed successfully and commits the acknowledgements to Kafka.</li>
130+
* <li>Calling {@link #close()} which releases any acquired records without acknowledgement.</li>
131+
* </ul>
132+
* If the application sets the property to "explicit", then the consumer is using <em>explicit acknowledgment</em>.
133+
* The application must acknowledge all records returned from {@link #poll(Duration)} using
134+
* {@link #acknowledge(ConsumerRecord, AcknowledgeType)} before its next call to {@link #poll(Duration)}.
135+
* If the application calls {@link #poll(Duration)} without having acknowledged all records, an
136+
* {@link IllegalStateException} is thrown. The remaining unacknowledged records can still be acknowledged.
137+
* In this mode, the application acknowledges delivery by:
138+
* <ul>
139+
* <li>Calling {@link #poll(Duration)} after it has acknowledged all records, which commits the acknowledgements
140+
* to Kafka asynchronously. In this case, no exception is thrown by a failure to commit the acknowledgements.</li>
141+
* <li>Calling {@link #commitSync()} or {@link #commitAsync()} which commits any pending
142+
* acknowledgements to Kafka.</li>
143+
* <li>Calling {@link #close()} which attempts to commit any pending acknowledgements and releases
144+
* any remaining acquired records.</li>
142145
* </ul>
143-
* <p>The consumer can optionally use the {@code internal.share.acknowledgement.mode} configuration property to choose
144-
* between implicit and explicit acknowledgement, specifying <code>"implicit"</code> or <code>"explicit"</code> as required.
145-
* <p>
146146
* The consumer guarantees that the records returned in the {@code ConsumerRecords} object for a specific topic-partition
147147
* are in order of increasing offset. For each topic-partition, Kafka guarantees that acknowledgements for the records
148148
* in a batch are performed atomically. This makes error handling significantly more straightforward because there can be
@@ -195,12 +195,14 @@
195195
*
196196
* <h4>Per-record acknowledgement (explicit acknowledgement)</h4>
197197
* This example demonstrates using different acknowledgement types depending on the outcome of processing the records.
198+
* Here the {@code share.acknowledgement.mode} property is set to "explicit" so the consumer must explicitly acknowledge each record.
198199
* <pre>
199200
* Properties props = new Properties();
200201
* props.setProperty(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
201202
* props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
202203
* props.setProperty(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
203204
* props.setProperty(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
205+
* props.setProperty(&quot;share.acknowledgement.mode&quot;, &quot;explicit&quot;);
204206
* KafkaShareConsumer&lt;String, String&gt; consumer = new KafkaShareConsumer&lt;&gt;(props);
205207
* consumer.subscribe(Arrays.asList(&quot;foo&quot;));
206208
* while (true) {
@@ -227,42 +229,6 @@
227229
* It is only once {@link #commitSync()} is called that the acknowledgements are committed by sending the new state
228230
* information to Kafka.
229231
*
230-
* <h4>Per-record acknowledgement, ending processing of the batch on an error (explicit acknowledgement)</h4>
231-
* This example demonstrates ending processing of a batch of records on the first error.
232-
* <pre>
233-
* Properties props = new Properties();
234-
* props.setProperty(&quot;bootstrap.servers&quot;, &quot;localhost:9092&quot;);
235-
* props.setProperty(&quot;group.id&quot;, &quot;test&quot;);
236-
* props.setProperty(&quot;key.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
237-
* props.setProperty(&quot;value.deserializer&quot;, &quot;org.apache.kafka.common.serialization.StringDeserializer&quot;);
238-
* KafkaShareConsumer&lt;String, String&gt; consumer = new KafkaShareConsumer&lt;&gt;(props);
239-
* consumer.subscribe(Arrays.asList(&quot;foo&quot;));
240-
* while (true) {
241-
* ConsumerRecords&lt;String, String&gt; records = consumer.poll(Duration.ofMillis(100));
242-
* for (ConsumerRecord&lt;String, String&gt; record : records) {
243-
* try {
244-
* doProcessing(record);
245-
* consumer.acknowledge(record, AcknowledgeType.ACCEPT);
246-
* } catch (Exception e) {
247-
* consumer.acknowledge(record, AcknowledgeType.REJECT);
248-
* break;
249-
* }
250-
* }
251-
* consumer.commitSync();
252-
* }
253-
* </pre>
254-
* There are the following cases in this example:
255-
* <ol>
256-
* <li>The batch contains no records, in which case the application just polls again. The call to {@link #commitSync()}
257-
* just does nothing because the batch was empty.</li>
258-
* <li>All of the records in the batch are processed successfully. The calls to {@link #acknowledge(ConsumerRecord, AcknowledgeType)}
259-
* specifying {@code AcknowledgeType.ACCEPT} mark all records in the batch as successfully processed.</li>
260-
* <li>One of the records encounters an exception. The call to {@link #acknowledge(ConsumerRecord, AcknowledgeType)} specifying
261-
* {@code AcknowledgeType.REJECT} rejects that record. Earlier records in the batch have already been marked as successfully
262-
* processed. The call to {@link #commitSync()} commits the acknowledgements, but the records after the failed record
263-
* remain acquired as part of the same delivery attempt and will be presented to the application in response to another poll.</li>
264-
* </ol>
265-
*
266232
* <h3>Reading Transactional Records</h3>
267233
* The way that share groups handle transactional records is controlled by the {@code group.share.isolation.level}</code>
268234
* configuration property. In a share group, the isolation level applies to the entire share group, not just individual
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.clients.consumer.internals;
18+
19+
import org.apache.kafka.common.config.ConfigDef;
20+
import org.apache.kafka.common.config.ConfigException;
21+
import org.apache.kafka.common.utils.Utils;
22+
23+
import java.util.Arrays;
24+
import java.util.Locale;
25+
import java.util.Objects;
26+
import java.util.stream.Collectors;
27+
28+
public class ShareAcknowledgementMode {
29+
public enum AcknowledgementMode {
30+
IMPLICIT, EXPLICIT;
31+
32+
@Override
33+
public String toString() {
34+
return super.toString().toLowerCase(Locale.ROOT);
35+
}
36+
}
37+
38+
private final AcknowledgementMode acknowledgementMode;
39+
40+
public static final ShareAcknowledgementMode IMPLICIT = new ShareAcknowledgementMode(AcknowledgementMode.IMPLICIT);
41+
public static final ShareAcknowledgementMode EXPLICIT = new ShareAcknowledgementMode(AcknowledgementMode.EXPLICIT);
42+
43+
private ShareAcknowledgementMode(AcknowledgementMode acknowledgementMode) {
44+
this.acknowledgementMode = acknowledgementMode;
45+
}
46+
47+
/**
48+
* Returns the ShareAcknowledgementMode from the given string.
49+
*/
50+
public static ShareAcknowledgementMode fromString(String acknowledgementMode) {
51+
if (acknowledgementMode == null) {
52+
throw new IllegalArgumentException("Acknowledgement mode is null");
53+
}
54+
55+
if (Arrays.asList(Utils.enumOptions(AcknowledgementMode.class)).contains(acknowledgementMode)) {
56+
AcknowledgementMode mode = AcknowledgementMode.valueOf(acknowledgementMode.toUpperCase(Locale.ROOT));
57+
switch (mode) {
58+
case IMPLICIT:
59+
return IMPLICIT;
60+
case EXPLICIT:
61+
return EXPLICIT;
62+
default:
63+
throw new IllegalArgumentException("Invalid acknowledgement mode: " + acknowledgementMode);
64+
}
65+
} else {
66+
throw new IllegalArgumentException("Invalid acknowledgement mode: " + acknowledgementMode);
67+
}
68+
}
69+
70+
/**
71+
* Returns the name of the acknowledgement mode.
72+
*/
73+
public String name() {
74+
return acknowledgementMode.toString();
75+
}
76+
77+
@Override
78+
public boolean equals(Object o) {
79+
if (this == o) return true;
80+
if (o == null || getClass() != o.getClass()) return false;
81+
ShareAcknowledgementMode that = (ShareAcknowledgementMode) o;
82+
return acknowledgementMode == that.acknowledgementMode;
83+
}
84+
85+
@Override
86+
public int hashCode() {
87+
return Objects.hash(acknowledgementMode);
88+
}
89+
90+
@Override
91+
public String toString() {
92+
return "ShareAcknowledgementMode{" +
93+
"mode=" + acknowledgementMode +
94+
'}';
95+
}
96+
97+
public static class Validator implements ConfigDef.Validator {
98+
@Override
99+
public void ensureValid(String name, Object value) {
100+
String acknowledgementMode = (String) value;
101+
try {
102+
fromString(acknowledgementMode);
103+
} catch (Exception e) {
104+
throw new ConfigException(name, value, "Invalid value `" + acknowledgementMode + "` for configuration " +
105+
name + ". The value must either be 'implicit' or 'explicit'.");
106+
}
107+
}
108+
109+
@Override
110+
public String toString() {
111+
String values = Arrays.stream(ShareAcknowledgementMode.AcknowledgementMode.values())
112+
.map(ShareAcknowledgementMode.AcknowledgementMode::toString).collect(Collectors.joining(", "));
113+
return "[" + values + "]";
114+
}
115+
}
116+
}

0 commit comments

Comments
 (0)