Skip to content

Commit 88a6402

Browse files
authored
MINOR: Update kafka-console-share-consumer to pass in delivery count. (#19447)
There was a bug in `ConsoleShareConsumer` where the delivery count received in the `ShareFetchResponse` was not sent to the formatter to be shown to the user. PR fixes the bug by passing in the delivery count in a different constructor for `ConsumerRecord`. Reviewers: Andrew Schofield <[email protected]>
1 parent 21a080f commit 88a6402

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

Diff for: tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ static void process(int maxMessages, MessageFormatter formatter, ConsumerWrapper
111111
messageCount += 1;
112112
try {
113113
formatter.writeTo(new ConsumerRecord<>(msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), msg.timestampType(),
114-
0, 0, msg.key(), msg.value(), msg.headers(), Optional.empty()), output);
114+
0, 0, msg.key(), msg.value(), msg.headers(), Optional.empty(), msg.deliveryCount()), output);
115115
consumer.acknowledge(msg, acknowledgeType);
116116
} catch (Throwable t) {
117117
if (rejectMessageOnError) {

Diff for: tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerTest.java

+44
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,21 @@
2222
import org.apache.kafka.clients.consumer.ShareConsumer;
2323
import org.apache.kafka.common.MessageFormatter;
2424
import org.apache.kafka.common.errors.TimeoutException;
25+
import org.apache.kafka.common.header.internals.RecordHeaders;
26+
import org.apache.kafka.common.record.RecordBatch;
27+
import org.apache.kafka.common.record.TimestampType;
2528
import org.apache.kafka.common.utils.Time;
2629
import org.apache.kafka.server.util.MockTime;
2730

2831
import org.junit.jupiter.api.BeforeEach;
2932
import org.junit.jupiter.api.Test;
33+
import org.mockito.ArgumentCaptor;
3034

3135
import java.io.PrintStream;
3236
import java.time.Duration;
37+
import java.util.Optional;
3338

39+
import static org.junit.jupiter.api.Assertions.assertEquals;
3440
import static org.junit.jupiter.api.Assertions.assertThrows;
3541
import static org.mockito.ArgumentMatchers.any;
3642
import static org.mockito.ArgumentMatchers.eq;
@@ -131,4 +137,42 @@ public void testRejectMessageOnError() {
131137

132138
consumer.cleanup();
133139
}
140+
141+
@Test
142+
public void shouldUpgradeDeliveryCount() {
143+
// Mock dependencies
144+
ConsoleShareConsumer.ConsumerWrapper consumer = mock(ConsoleShareConsumer.ConsumerWrapper.class);
145+
MessageFormatter formatter = mock(MessageFormatter.class);
146+
PrintStream printStream = mock(PrintStream.class);
147+
148+
short deliveryCount = 1;
149+
// Mock a ConsumerRecord with a delivery count
150+
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
151+
"test-topic", 0, 0, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE, 0,
152+
0, new byte[0], new byte[0], new RecordHeaders(), Optional.empty(), Optional.of(deliveryCount)
153+
);
154+
155+
// Mock consumer behavior
156+
when(consumer.receive()).thenReturn(record);
157+
158+
// Process the record
159+
ConsoleShareConsumer.process(1, formatter, consumer, printStream, false, AcknowledgeType.ACCEPT);
160+
161+
// Capture the actual ConsumerRecord passed to formatter.writeTo
162+
ArgumentCaptor<ConsumerRecord> captor = ArgumentCaptor.forClass(ConsumerRecord.class);
163+
verify(formatter).writeTo(captor.capture(), eq(printStream));
164+
165+
// Assert that the captured ConsumerRecord matches the expected values
166+
ConsumerRecord<byte[], byte[]> capturedRecord = captor.getValue();
167+
assertEquals("test-topic", capturedRecord.topic());
168+
assertEquals(0, capturedRecord.partition());
169+
assertEquals(0, capturedRecord.offset());
170+
assertEquals(deliveryCount, capturedRecord.deliveryCount().orElse((short) 0));
171+
172+
// Verify that the consumer acknowledges the record
173+
verify(consumer).acknowledge(record, AcknowledgeType.ACCEPT);
174+
175+
// Cleanup
176+
consumer.cleanup();
177+
}
134178
}

0 commit comments

Comments
 (0)