15
15
*/
16
16
package com .redis .kafka .connect .sink ;
17
17
18
- import java .io .IOException ;
19
18
import java .util .ArrayList ;
20
19
import java .util .Collection ;
21
20
import java .util .Collections ;
22
21
import java .util .HashMap ;
23
22
import java .util .LinkedHashMap ;
24
23
import java .util .List ;
25
24
import java .util .Map ;
26
- import java .util .concurrent .ConcurrentHashMap ;
25
+ import java .util .Map .Entry ;
26
+ import java .util .stream .Collector ;
27
27
import java .util .stream .Collectors ;
28
28
29
+ import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
29
30
import org .apache .kafka .common .TopicPartition ;
30
31
import org .apache .kafka .common .config .ConfigException ;
31
32
import org .apache .kafka .connect .data .Field ;
32
33
import org .apache .kafka .connect .data .Struct ;
33
34
import org .apache .kafka .connect .errors .ConnectException ;
34
35
import org .apache .kafka .connect .errors .DataException ;
36
+ import org .apache .kafka .connect .errors .RetriableException ;
35
37
import org .apache .kafka .connect .json .JsonConverter ;
36
38
import org .apache .kafka .connect .sink .SinkRecord ;
37
39
import org .apache .kafka .connect .sink .SinkTask ;
38
40
import org .apache .kafka .connect .storage .Converter ;
39
41
import org .slf4j .Logger ;
40
42
import org .slf4j .LoggerFactory ;
41
43
import org .springframework .batch .item .ExecutionContext ;
42
- import org .springframework .util .Assert ;
44
+ import org .springframework .util .CollectionUtils ;
43
45
44
46
import com .fasterxml .jackson .annotation .JsonInclude ;
45
47
import com .fasterxml .jackson .core .JsonProcessingException ;
65
67
66
68
import io .lettuce .core .AbstractRedisClient ;
67
69
import io .lettuce .core .KeyValue ;
70
+ import io .lettuce .core .RedisCommandTimeoutException ;
71
+ import io .lettuce .core .RedisConnectionException ;
68
72
import io .lettuce .core .codec .ByteArrayCodec ;
69
- import io .netty .util .internal .StringUtil ;
70
73
71
74
public class RedisSinkTask extends SinkTask {
72
75
@@ -76,6 +79,9 @@ public class RedisSinkTask extends SinkTask {
76
79
77
80
private static final ObjectMapper objectMapper = objectMapper ();
78
81
82
+ private static final Collector <SinkOffsetState , ?, Map <String , String >> offsetCollector = Collectors
83
+ .toMap (RedisSinkTask ::offsetKey , RedisSinkTask ::offsetValue );
84
+
79
85
private RedisSinkConfig config ;
80
86
81
87
private AbstractRedisClient client ;
@@ -107,42 +113,45 @@ public void start(final Map<String, String> props) {
107
113
client = config .client ();
108
114
connection = RedisModulesUtils .connection (client );
109
115
writer = new OperationItemWriter <>(client , ByteArrayCodec .INSTANCE , operation ());
110
- writer .setMultiExec (config .isMultiexec ());
116
+ writer .setMultiExec (config .isMultiExec ());
111
117
writer .setWaitReplicas (config .getWaitReplicas ());
112
118
writer .setWaitTimeout (config .getWaitTimeout ());
113
119
writer .setPoolSize (config .getPoolSize ());
114
120
writer .open (new ExecutionContext ());
115
- final java .util .Set <TopicPartition > assignment = this .context .assignment ();
116
- if (! assignment .isEmpty ()) {
117
- Map < TopicPartition , Long > partitionOffsets = new HashMap <>( assignment . size ()) ;
118
- for ( SinkOffsetState state : offsetStates ( assignment )) {
119
- partitionOffsets . put ( state . topicPartition (), state . offset ());
120
- log . info ( "Requesting offset {} for {}" , state . offset (), state . topicPartition ());
121
- }
122
- for ( TopicPartition topicPartition : assignment ) {
123
- partitionOffsets . putIfAbsent ( topicPartition , 0L );
124
- }
125
- this . context . offset ( partitionOffsets );
121
+ java .util .Set <TopicPartition > assignment = this .context .assignment ();
122
+ if (CollectionUtils .isEmpty (assignment )) {
123
+ return ;
124
+ }
125
+ Map < TopicPartition , Long > partitionOffsets = new HashMap <>( assignment . size ());
126
+ for ( SinkOffsetState state : offsetStates ( assignment )) {
127
+ partitionOffsets . put ( state . topicPartition (), state . offset ());
128
+ log . info ( "Requesting offset {} for {}" , state . offset (), state . topicPartition ());
129
+ }
130
+ for ( TopicPartition topicPartition : assignment ) {
131
+ partitionOffsets . putIfAbsent ( topicPartition , 0L );
126
132
}
133
+ this .context .offset (partitionOffsets );
127
134
}
128
135
129
136
private Collection <SinkOffsetState > offsetStates (java .util .Set <TopicPartition > assignment ) {
130
- Collection <SinkOffsetState > offsetStates = new ArrayList <>();
131
- String [] partitionKeys = assignment .stream ().map (a -> offsetKey (a .topic (), a .partition ())).toArray (String []::new );
137
+ String [] partitionKeys = assignment .stream ().map (this ::offsetKey ).toArray (String []::new );
132
138
List <KeyValue <String , String >> values = connection .sync ().mget (partitionKeys );
133
- for (KeyValue <String , String > value : values ) {
134
- if (value .hasValue ()) {
135
- try {
136
- offsetStates .add (objectMapper .readValue (value .getValue (), SinkOffsetState .class ));
137
- } catch (IOException e ) {
138
- throw new DataException (e );
139
- }
140
- }
139
+ return values .stream ().filter (KeyValue ::hasValue ).map (this ::offsetState ).collect (Collectors .toList ());
140
+ }
141
+
142
+ private String offsetKey (TopicPartition partition ) {
143
+ return offsetKey (partition .topic (), partition .partition ());
144
+ }
145
+
146
+ private SinkOffsetState offsetState (KeyValue <String , String > value ) {
147
+ try {
148
+ return objectMapper .readValue (value .getValue (), SinkOffsetState .class );
149
+ } catch (JsonProcessingException e ) {
150
+ throw new DataException ("Could not parse sink offset state" , e );
141
151
}
142
- return offsetStates ;
143
152
}
144
153
145
- private String offsetKey (String topic , Integer partition ) {
154
+ private static String offsetKey (String topic , Integer partition ) {
146
155
return String .format (OFFSET_KEY_FORMAT , topic , partition );
147
156
}
148
157
@@ -208,9 +217,6 @@ private byte[] value(SinkRecord sinkRecord) {
208
217
209
218
private byte [] jsonValue (SinkRecord sinkRecord ) {
210
219
Object value = sinkRecord .value ();
211
- if (value == null ) {
212
- return null ;
213
- }
214
220
if (value instanceof byte []) {
215
221
return (byte []) value ;
216
222
}
@@ -262,9 +268,6 @@ private String keyspace(SinkRecord sinkRecord) {
262
268
}
263
269
264
270
private byte [] bytes (String source , Object input ) {
265
- if (input == null ) {
266
- return null ;
267
- }
268
271
if (input instanceof byte []) {
269
272
return (byte []) input ;
270
273
}
@@ -283,9 +286,6 @@ private byte[] collectionKey(SinkRecord sinkRecord) {
283
286
@ SuppressWarnings ("unchecked" )
284
287
private Map <byte [], byte []> map (SinkRecord sinkRecord ) {
285
288
Object value = sinkRecord .value ();
286
- if (value == null ) {
287
- return null ;
288
- }
289
289
if (value instanceof Struct ) {
290
290
Map <byte [], byte []> body = new LinkedHashMap <>();
291
291
Struct struct = (Struct ) value ;
@@ -311,16 +311,13 @@ private Map<byte[], byte[]> map(SinkRecord sinkRecord) {
311
311
public void stop () {
312
312
if (writer != null ) {
313
313
writer .close ();
314
- writer = null ;
315
314
}
316
315
if (connection != null ) {
317
316
connection .close ();
318
- connection = null ;
319
317
}
320
318
if (client != null ) {
321
319
client .shutdown ();
322
320
client .getResources ().shutdown ();
323
- client = null ;
324
321
}
325
322
}
326
323
@@ -330,36 +327,37 @@ public void put(final Collection<SinkRecord> records) {
330
327
try {
331
328
writer .write (new ArrayList <>(records ));
332
329
log .info ("Wrote {} records" , records .size ());
333
- } catch (Exception e ) {
334
- log .warn ("Could not write {} records" , records .size (), e );
330
+ } catch (RedisConnectionException e ) {
331
+ throw new RetriableException ("Could not get connection to Redis" , e );
332
+ } catch (RedisCommandTimeoutException e ) {
333
+ throw new RetriableException ("Timeout while writing sink records" , e );
335
334
}
336
- Map <TopicPartition , Long > data = new ConcurrentHashMap <>(100 );
337
- for (SinkRecord sinkRecord : records ) {
338
- Assert .isTrue (!StringUtil .isNullOrEmpty (sinkRecord .topic ()), "topic cannot be null or empty." );
339
- Assert .notNull (sinkRecord .kafkaPartition (), "partition cannot be null." );
340
- Assert .isTrue (sinkRecord .kafkaOffset () >= 0 , "offset must be greater than or equal 0." );
341
- TopicPartition partition = new TopicPartition (sinkRecord .topic (), sinkRecord .kafkaPartition ());
342
- long current = data .getOrDefault (partition , Long .MIN_VALUE );
343
- if (sinkRecord .kafkaOffset () > current ) {
344
- data .put (partition , sinkRecord .kafkaOffset ());
345
- }
346
- }
347
- List <SinkOffsetState > offsetData = data .entrySet ().stream ().map (e -> SinkOffsetState .of (e .getKey (), e .getValue ()))
348
- .collect (Collectors .toList ());
349
- if (!offsetData .isEmpty ()) {
350
- Map <String , String > offsets = new LinkedHashMap <>(offsetData .size ());
351
- for (SinkOffsetState e : offsetData ) {
352
- String key = offsetKey (e .topic (), e .partition ());
353
- String value ;
354
- try {
355
- value = objectMapper .writeValueAsString (e );
356
- } catch (JsonProcessingException e1 ) {
357
- throw new DataException (e1 );
358
- }
359
- offsets .put (key , value );
360
- log .trace ("put() - Setting offset: {}" , e );
361
- }
335
+ }
336
+
337
+ @ Override
338
+ public void flush (Map <TopicPartition , OffsetAndMetadata > currentOffsets ) {
339
+ Map <String , String > offsets = currentOffsets .entrySet ().stream ().map (this ::offsetState ).collect (offsetCollector );
340
+ log .trace ("Writing offsets: {}" , offsets );
341
+ try {
362
342
connection .sync ().mset (offsets );
343
+ } catch (RedisCommandTimeoutException e ) {
344
+ throw new RetriableException ("Could not write offsets" , e );
345
+ }
346
+ }
347
+
348
+ private SinkOffsetState offsetState (Entry <TopicPartition , OffsetAndMetadata > entry ) {
349
+ return SinkOffsetState .of (entry .getKey (), entry .getValue ().offset ());
350
+ }
351
+
352
+ private static String offsetKey (SinkOffsetState state ) {
353
+ return offsetKey (state .topic (), state .partition ());
354
+ }
355
+
356
+ private static String offsetValue (SinkOffsetState state ) {
357
+ try {
358
+ return objectMapper .writeValueAsString (state );
359
+ } catch (JsonProcessingException e ) {
360
+ throw new DataException ("Could not serialize sink offset state" , e );
363
361
}
364
362
}
365
363
0 commit comments