18
18
19
19
import org .apache .kafka .clients .consumer .ConsumerRecord ;
20
20
import org .apache .kafka .common .MessageFormatter ;
21
+ import org .apache .kafka .common .protocol .ApiMessage ;
22
+ import org .apache .kafka .coordinator .common .runtime .CoordinatorRecord ;
23
+ import org .apache .kafka .coordinator .common .runtime .CoordinatorRecordSerde ;
21
24
22
25
import com .fasterxml .jackson .databind .JsonNode ;
23
26
import com .fasterxml .jackson .databind .node .JsonNodeFactory ;
31
34
32
35
import static java .nio .charset .StandardCharsets .UTF_8 ;
33
36
34
- public abstract class ApiMessageFormatter implements MessageFormatter {
35
-
37
+ public abstract class CoordinatorRecordMessageFormatter implements MessageFormatter {
36
38
private static final String TYPE = "type" ;
37
39
private static final String VERSION = "version" ;
38
40
private static final String DATA = "data" ;
39
41
private static final String KEY = "key" ;
40
42
private static final String VALUE = "value" ;
41
- static final String UNKNOWN = "unknown" ;
43
+
44
+ private final CoordinatorRecordSerde serde ;
45
+
46
+ public CoordinatorRecordMessageFormatter (CoordinatorRecordSerde serde ) {
47
+ this .serde = serde ;
48
+ }
42
49
43
50
@ Override
44
51
public void writeTo (ConsumerRecord <byte [], byte []> consumerRecord , PrintStream output ) {
52
+ if (Objects .isNull (consumerRecord .key ())) return ;
53
+
45
54
ObjectNode json = new ObjectNode (JsonNodeFactory .instance );
55
+ try {
56
+ CoordinatorRecord record = serde .deserialize (
57
+ ByteBuffer .wrap (consumerRecord .key ()),
58
+ consumerRecord .value () != null ? ByteBuffer .wrap (consumerRecord .value ()) : null
59
+ );
60
+
61
+ if (!isRecordTypeAllowed (record .key ().apiKey ())) return ;
46
62
47
- byte [] key = consumerRecord . key ();
48
- if ( Objects . nonNull ( key )) {
49
- short keyVersion = ByteBuffer . wrap ( key ). getShort ();
50
- JsonNode dataNode = readToKeyJson ( ByteBuffer . wrap ( key ));
63
+ json
64
+ . putObject ( KEY )
65
+ . put ( TYPE , record . key (). apiKey ())
66
+ . set ( DATA , keyAsJson ( record . key () ));
51
67
52
- if (dataNode instanceof NullNode ) {
53
- return ;
68
+ if (Objects .nonNull (record .value ())) {
69
+ json
70
+ .putObject (VALUE )
71
+ .put (VERSION , record .value ().version ())
72
+ .set (DATA , valueAsJson (record .value ().message (), record .value ().version ()));
73
+ } else {
74
+ json .set (VALUE , NullNode .getInstance ());
54
75
}
55
- json .putObject (KEY )
56
- .put (TYPE , keyVersion )
57
- .set (DATA , dataNode );
58
- } else {
76
+ } catch (CoordinatorRecordSerde .UnknownRecordTypeException ex ) {
59
77
return ;
60
- }
61
-
62
- byte [] value = consumerRecord .value ();
63
- if (Objects .nonNull (value )) {
64
- short valueVersion = ByteBuffer .wrap (value ).getShort ();
65
- JsonNode dataNode = readToValueJson (ByteBuffer .wrap (value ));
66
-
67
- json .putObject (VALUE )
68
- .put (VERSION , valueVersion )
69
- .set (DATA , dataNode );
70
- } else {
71
- json .set (VALUE , NullNode .getInstance ());
78
+ } catch (RuntimeException ex ) {
79
+ throw new RuntimeException ("Could not read record at offset " + consumerRecord .offset () +
80
+ " due to: " + ex .getMessage (), ex );
72
81
}
73
82
74
83
try {
@@ -78,6 +87,7 @@ public void writeTo(ConsumerRecord<byte[], byte[]> consumerRecord, PrintStream o
78
87
}
79
88
}
80
89
81
- protected abstract JsonNode readToKeyJson (ByteBuffer byteBuffer );
82
- protected abstract JsonNode readToValueJson (ByteBuffer byteBuffer );
90
+ protected abstract boolean isRecordTypeAllowed (short recordType );
91
+ protected abstract JsonNode keyAsJson (ApiMessage message );
92
+ protected abstract JsonNode valueAsJson (ApiMessage message , short version );
83
93
}
0 commit comments