Skip to content

Commit bf19079

Browse files
brandboatjanchilling
authored andcommitted
KAFKA-18409: ShareGroupStateMessageFormatter should use CoordinatorRecordMessageFormatter (apache#18510)
ShareGroupStateMessageFormatter should extend CoordinatorRecordMessageFormatter in order to have a consistent handling of records of coordinators. Reviewers: Ken Huang <[email protected]>, David Jacot <[email protected]>
1 parent 5c37896 commit bf19079

File tree

2 files changed

+53
-171
lines changed

2 files changed

+53
-171
lines changed

tools/src/main/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatter.java

+14-142
Original file line numberDiff line numberDiff line change
@@ -16,162 +16,34 @@
1616
*/
1717
package org.apache.kafka.tools.consumer.group.share;
1818

19-
import org.apache.kafka.clients.consumer.ConsumerRecord;
20-
import org.apache.kafka.common.MessageFormatter;
21-
import org.apache.kafka.common.errors.UnsupportedVersionException;
2219
import org.apache.kafka.common.protocol.ApiMessage;
23-
import org.apache.kafka.common.protocol.ByteBufferAccessor;
24-
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
25-
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
26-
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKeyJsonConverter;
27-
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
28-
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValueJsonConverter;
29-
import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
30-
import org.apache.kafka.coordinator.share.generated.ShareUpdateKeyJsonConverter;
31-
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
32-
import org.apache.kafka.coordinator.share.generated.ShareUpdateValueJsonConverter;
20+
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde;
21+
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordJsonConverters;
3322
import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatter;
3423

3524
import com.fasterxml.jackson.databind.JsonNode;
36-
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
37-
import com.fasterxml.jackson.databind.node.NullNode;
38-
import com.fasterxml.jackson.databind.node.ObjectNode;
39-
import com.fasterxml.jackson.databind.node.TextNode;
40-
41-
import java.io.IOException;
42-
import java.io.PrintStream;
43-
import java.nio.ByteBuffer;
44-
import java.util.Objects;
45-
import java.util.Optional;
46-
47-
import static java.nio.charset.StandardCharsets.UTF_8;
4825

4926
/**
5027
* Formatter for records of in __share_group_state topic.
5128
*/
52-
public class ShareGroupStateMessageFormatter implements MessageFormatter {
53-
54-
private static final String VERSION = "version";
55-
private static final String DATA = "data";
56-
private static final String KEY = "key";
57-
private static final String VALUE = "value";
58-
private static final String UNKNOWN = "unknown";
59-
60-
@Override
61-
public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream output) {
62-
ObjectNode json = new ObjectNode(JsonNodeFactory.instance);
63-
64-
byte[] key = consumerRecord.key();
65-
short keyVersion = -1;
66-
if (Objects.nonNull(key)) {
67-
keyVersion = ByteBuffer.wrap(key).getShort();
68-
JsonNode dataNode = readToKeyJson(ByteBuffer.wrap(key), keyVersion);
69-
70-
if (dataNode instanceof NullNode) {
71-
return;
72-
}
73-
json.putObject(KEY)
74-
.put(VERSION, keyVersion)
75-
.set(DATA, dataNode);
76-
} else {
77-
json.set(KEY, NullNode.getInstance());
78-
}
79-
80-
byte[] value = consumerRecord.value();
81-
if (Objects.nonNull(value)) {
82-
short valueVersion = ByteBuffer.wrap(value).getShort();
83-
JsonNode dataNode = readToValueJson(ByteBuffer.wrap(value), keyVersion, valueVersion);
84-
85-
json.putObject(VALUE)
86-
.put(VERSION, valueVersion)
87-
.set(DATA, dataNode);
88-
} else {
89-
json.set(VALUE, NullNode.getInstance());
90-
}
29+
public class ShareGroupStateMessageFormatter extends CoordinatorRecordMessageFormatter {
9130

92-
try {
93-
output.write(json.toString().getBytes(UTF_8));
94-
} catch (IOException e) {
95-
throw new RuntimeException(e);
96-
}
31+
public ShareGroupStateMessageFormatter() {
32+
super(new ShareCoordinatorRecordSerde());
9733
}
9834

99-
private JsonNode readToKeyJson(ByteBuffer byteBuffer, short version) {
100-
return readToSnapshotMessageKey(byteBuffer)
101-
.map(logKey -> transferKeyMessageToJsonNode(logKey, version))
102-
.orElseGet(() -> new TextNode(UNKNOWN));
103-
}
104-
105-
private Optional<ApiMessage> readToSnapshotMessageKey(ByteBuffer byteBuffer) {
106-
short version = byteBuffer.getShort();
107-
try {
108-
switch (CoordinatorRecordType.fromId(version)) {
109-
case SHARE_SNAPSHOT:
110-
return Optional.of(new ShareSnapshotKey(new ByteBufferAccessor(byteBuffer), version));
111-
case SHARE_UPDATE:
112-
return Optional.of(new ShareUpdateKey(new ByteBufferAccessor(byteBuffer), version));
113-
default:
114-
return Optional.empty();
115-
}
116-
} catch (UnsupportedVersionException ex) {
117-
return Optional.empty();
118-
}
119-
}
120-
121-
private JsonNode transferKeyMessageToJsonNode(ApiMessage logKey, short keyVersion) {
122-
if (logKey instanceof ShareSnapshotKey) {
123-
return ShareSnapshotKeyJsonConverter.write((ShareSnapshotKey) logKey, keyVersion);
124-
} else if (logKey instanceof ShareUpdateKey) {
125-
return ShareUpdateKeyJsonConverter.write((ShareUpdateKey) logKey, keyVersion);
126-
}
127-
return new TextNode(UNKNOWN);
128-
}
129-
130-
/**
131-
* Here the valueVersion is not enough to identity the deserializer for the ByteBuffer.
132-
* This is because both {@link ShareSnapshotValue} and {@link ShareUpdateValue} have version 0
133-
* as per RPC spec.
134-
* To differentiate, we need to use the corresponding key versions. This is acceptable as
135-
* the records will always appear in pairs (key, value). However, this means that we cannot
136-
* extend {@link CoordinatorRecordMessageFormatter} as it requires overriding
137-
* readToValueJson whose signature does not allow for passing keyversion.
138-
*
139-
* @param byteBuffer - Represents the raw data read from the topic
140-
* @param keyVersion - Version of the actual key component of the data read from topic
141-
* @param valueVersion - Version of the actual value component of the data read from topic
142-
* @return JsonNode corresponding to the raw data value component
143-
*/
144-
protected JsonNode readToValueJson(ByteBuffer byteBuffer, short keyVersion, short valueVersion) {
145-
return readToSnapshotMessageValue(byteBuffer, keyVersion)
146-
.map(logValue -> transferValueMessageToJsonNode(logValue, valueVersion))
147-
.orElseGet(() -> new TextNode(UNKNOWN));
35+
@Override
36+
protected boolean isRecordTypeAllowed(short recordType) {
37+
return true;
14838
}
14939

150-
private JsonNode transferValueMessageToJsonNode(ApiMessage logValue, short version) {
151-
if (logValue instanceof ShareSnapshotValue) {
152-
return ShareSnapshotValueJsonConverter.write((ShareSnapshotValue) logValue, version);
153-
} else if (logValue instanceof ShareUpdateValue) {
154-
return ShareUpdateValueJsonConverter.write((ShareUpdateValue) logValue, version);
155-
}
156-
return new TextNode(UNKNOWN);
40+
@Override
41+
protected JsonNode keyAsJson(ApiMessage message) {
42+
return CoordinatorRecordJsonConverters.writeRecordKeyAsJson(message);
15743
}
15844

159-
private Optional<ApiMessage> readToSnapshotMessageValue(ByteBuffer byteBuffer, short keyVersion) {
160-
short version = byteBuffer.getShort();
161-
// Check the key version here as that will determine which type
162-
// of value record to fetch. Both share update and share snapshot
163-
// value records can have the same version.
164-
try {
165-
switch (CoordinatorRecordType.fromId(keyVersion)) {
166-
case SHARE_SNAPSHOT:
167-
return Optional.of(new ShareSnapshotValue(new ByteBufferAccessor(byteBuffer), version));
168-
case SHARE_UPDATE:
169-
return Optional.of(new ShareUpdateValue(new ByteBufferAccessor(byteBuffer), version));
170-
default:
171-
return Optional.empty();
172-
}
173-
} catch (UnsupportedVersionException ex) {
174-
return Optional.empty();
175-
}
45+
@Override
46+
protected JsonNode valueAsJson(ApiMessage message, short version) {
47+
return CoordinatorRecordJsonConverters.writeRecordValueAsJson(message, version);
17648
}
17749
}

tools/src/test/java/org/apache/kafka/tools/consumer/group/share/ShareGroupStateMessageFormatterTest.java

+39-29
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
3131
import org.apache.kafka.server.share.SharePartitionKey;
3232
import org.apache.kafka.server.share.persister.PersisterStateBatch;
33+
import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatter;
34+
import org.apache.kafka.tools.consumer.CoordinatorRecordMessageFormatterTest;
3335

3436
import org.junit.jupiter.params.ParameterizedTest;
3537
import org.junit.jupiter.params.provider.Arguments;
@@ -46,7 +48,7 @@
4648
import static org.junit.jupiter.api.Assertions.assertEquals;
4749
import static org.junit.jupiter.api.Assertions.assertThrows;
4850

49-
public class ShareGroupStateMessageFormatterTest {
51+
public class ShareGroupStateMessageFormatterTest extends CoordinatorRecordMessageFormatterTest {
5052
private static final SharePartitionKey KEY_1 = SharePartitionKey.getInstance("gs1", Uuid.fromString("gtb2stGYRk-vWZ2zAozmoA"), 0);
5153
private static final ShareGroupOffset SHARE_GROUP_OFFSET_1 = new ShareGroupOffset.Builder()
5254
.setSnapshotEpoch(0)
@@ -132,28 +134,51 @@ public class ShareGroupStateMessageFormatterTest {
132134
.collect(Collectors.toList())
133135
);
134136

135-
private static Stream<Arguments> parameters() {
137+
@Override
138+
protected CoordinatorRecordMessageFormatter formatter() {
139+
return new ShareGroupStateMessageFormatter();
140+
}
141+
142+
@Override
143+
protected Stream<Arguments> parameters() {
136144
return Stream.of(
137145
Arguments.of(
138146
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_SNAPSHOT_KEY).array(),
139147
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_SNAPSHOT_VALUE).array(),
140-
"{\"key\":{\"version\":0,\"data\":{\"groupId\":\"gs1\",\"topicId\":\"gtb2stGYRk-vWZ2zAozmoA\",\"partition\":0}},\"value\":{\"version\":0,\"data\":{\"snapshotEpoch\":0,\"stateEpoch\":1,\"leaderEpoch\":20,\"startOffset\":50,\"stateBatches\":[{\"firstOffset\":100,\"lastOffset\":200,\"deliveryState\":1,\"deliveryCount\":10},{\"firstOffset\":201,\"lastOffset\":210,\"deliveryState\":2,\"deliveryCount\":10}]}}}"
148+
"""
149+
{"key":{"type":0,"data":{"groupId":"gs1","topicId":"gtb2stGYRk-vWZ2zAozmoA","partition":0}},
150+
"value":{"version":0,
151+
"data":{"snapshotEpoch":0,
152+
"stateEpoch":1,
153+
"leaderEpoch":20,
154+
"startOffset":50,
155+
"stateBatches":[{"firstOffset":100,"lastOffset":200,"deliveryState":1,"deliveryCount":10},
156+
{"firstOffset":201,"lastOffset":210,"deliveryState":2,"deliveryCount":10}]}}}
157+
"""
141158
),
142159
Arguments.of(
143160
MessageUtil.toVersionPrefixedByteBuffer((short) 1, SHARE_UPDATE_KEY).array(),
144161
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_UPDATE_VALUE).array(),
145-
"{\"key\":{\"version\":1,\"data\":{\"groupId\":\"gs2\",\"topicId\":\"r9Nq4xGAQf28jvu36t7gQQ\",\"partition\":0}},\"value\":{\"version\":0,\"data\":{\"snapshotEpoch\":1,\"leaderEpoch\":25,\"startOffset\":55,\"stateBatches\":[{\"firstOffset\":100,\"lastOffset\":150,\"deliveryState\":1,\"deliveryCount\":12},{\"firstOffset\":151,\"lastOffset\":200,\"deliveryState\":2,\"deliveryCount\":15}]}}}"
162+
"""
163+
{"key":{"type":1,"data":{"groupId":"gs2","topicId":"r9Nq4xGAQf28jvu36t7gQQ","partition":0}},
164+
"value":{"version":0,
165+
"data":{"snapshotEpoch":1,
166+
"leaderEpoch":25,
167+
"startOffset":55,
168+
"stateBatches":[{"firstOffset":100,"lastOffset":150,"deliveryState":1,"deliveryCount":12},
169+
{"firstOffset":151,"lastOffset":200,"deliveryState":2,"deliveryCount":15}]}}}
170+
"""
146171
),
147172
// wrong versions
148173
Arguments.of(
149174
MessageUtil.toVersionPrefixedByteBuffer((short) 10, SHARE_SNAPSHOT_KEY).array(),
150175
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_SNAPSHOT_VALUE).array(),
151-
"{\"key\":{\"version\":10,\"data\":\"unknown\"},\"value\":{\"version\":0,\"data\":\"unknown\"}}"
176+
""
152177
),
153178
Arguments.of(
154179
MessageUtil.toVersionPrefixedByteBuffer((short) 15, SHARE_UPDATE_KEY).array(),
155180
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_UPDATE_VALUE).array(),
156-
"{\"key\":{\"version\":15,\"data\":\"unknown\"},\"value\":{\"version\":0,\"data\":\"unknown\"}}"
181+
""
157182
)
158183
);
159184
}
@@ -164,37 +189,22 @@ private static Stream<Arguments> exceptions() {
164189
Arguments.of(
165190
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_SNAPSHOT_KEY).array(),
166191
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_UPDATE_VALUE).array(),
167-
new RuntimeException("non-nullable field stateBatches was serialized as null")
192+
new RuntimeException("""
193+
Could not read record at offset 0 due to: \
194+
Could not read record with version 0 from value's buffer due to: \
195+
non-nullable field stateBatches was serialized as null.""")
168196
),
169197
Arguments.of(
170198
MessageUtil.toVersionPrefixedByteBuffer((short) 1, SHARE_UPDATE_KEY).array(),
171199
MessageUtil.toVersionPrefixedByteBuffer((short) 0, SHARE_SNAPSHOT_VALUE).array(),
172-
new RuntimeException("non-nullable field stateBatches was serialized as null")
200+
new RuntimeException("""
201+
Could not read record at offset 0 due to: \
202+
Could not read record with version 0 from value's buffer due to: \
203+
non-nullable field stateBatches was serialized as null.""")
173204
)
174205
);
175206
}
176207

177-
@ParameterizedTest
178-
@MethodSource("parameters")
179-
public void testShareGroupStateMessageFormatter(
180-
byte[] keyBuffer,
181-
byte[] valueBuffer,
182-
String expectedOutput
183-
) {
184-
ConsumerRecord<byte[], byte[]> record = new ConsumerRecord<>(
185-
Topic.SHARE_GROUP_STATE_TOPIC_NAME, 0, 0,
186-
0L, TimestampType.CREATE_TIME, 0,
187-
0, keyBuffer, valueBuffer,
188-
new RecordHeaders(), Optional.empty());
189-
190-
try (MessageFormatter formatter = new ShareGroupStateMessageFormatter()) {
191-
formatter.configure(emptyMap());
192-
ByteArrayOutputStream out = new ByteArrayOutputStream();
193-
formatter.writeTo(record, new PrintStream(out));
194-
assertEquals(expectedOutput, out.toString());
195-
}
196-
}
197-
198208
@ParameterizedTest
199209
@MethodSource("exceptions")
200210
public void testShareGroupStateMessageFormatterException(

0 commit comments

Comments
 (0)