Skip to content

Commit ba014f0

Browse files
author
Julien Ruaux
committed
feat: Added ack policy option. Resolves #17
1 parent 8899079 commit ba014f0

File tree

9 files changed

+220
-100
lines changed

9 files changed

+220
-100
lines changed

src/main/java/com/redis/kafka/connect/RedisSinkConnector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
4747

4848
@Override
4949
public void stop() {
50+
// Do nothing
5051
}
5152

5253
@Override

src/main/java/com/redis/kafka/connect/RedisSourceConnector.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ private Map<String, String> taskConfig(List<String> patterns) {
7575

7676
@Override
7777
public void stop() {
78+
// Do nothing
7879
}
7980

8081
@Override

src/main/java/com/redis/kafka/connect/source/AbstractSourceRecordReader.java

Lines changed: 0 additions & 38 deletions
This file was deleted.

src/main/java/com/redis/kafka/connect/source/KeySourceRecordReader.java

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
package com.redis.kafka.connect.source;
22

33
import java.time.Clock;
4-
import java.time.Duration;
54
import java.util.HashMap;
65
import java.util.List;
76
import java.util.Map;
7+
import java.util.stream.Collectors;
88

99
import org.apache.commons.pool2.impl.GenericObjectPool;
1010
import org.apache.kafka.connect.data.Schema;
1111
import org.apache.kafka.connect.data.SchemaBuilder;
12+
import org.apache.kafka.connect.errors.ConnectException;
1213
import org.apache.kafka.connect.source.SourceRecord;
1314
import org.springframework.batch.item.ExecutionContext;
15+
import org.springframework.util.Assert;
1416

1517
import com.redis.lettucemod.util.RedisModulesUtils;
1618
import com.redis.spring.batch.RedisItemReader;
@@ -19,52 +21,63 @@
1921
import com.redis.spring.batch.common.JobRunner;
2022
import com.redis.spring.batch.reader.LiveReaderOptions;
2123
import com.redis.spring.batch.reader.LiveRedisItemReader;
22-
import com.redis.spring.batch.step.FlushingOptions;
2324

2425
import io.lettuce.core.AbstractRedisClient;
2526
import io.lettuce.core.RedisURI;
2627
import io.lettuce.core.api.StatefulConnection;
2728
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
2829

29-
public class KeySourceRecordReader extends AbstractSourceRecordReader<DataStructure<String>> {
30+
public class KeySourceRecordReader implements SourceRecordReader {
3031

3132
private static final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
3233
private static final Schema STRING_VALUE_SCHEMA = Schema.STRING_SCHEMA;
3334
private static final String HASH_VALUE_SCHEMA_NAME = "com.redis.kafka.connect.HashEventValue";
3435
private static final Schema HASH_VALUE_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)
3536
.name(HASH_VALUE_SCHEMA_NAME).build();
3637

38+
private static JobRunner jobRunner;
39+
40+
private final Clock clock = Clock.systemDefaultZone();
41+
protected final RedisSourceConfig config;
42+
private final LiveReaderOptions options;
3743
private final int batchSize;
3844
private final String topic;
3945
private LiveRedisItemReader<String, DataStructure<String>> reader;
40-
private final Duration idleTimeout;
4146
private AbstractRedisClient client;
4247
private GenericObjectPool<StatefulConnection<String, String>> pool;
4348
private StatefulRedisPubSubConnection<String, String> pubSubConnection;
44-
Clock clock = Clock.systemDefaultZone();
4549

46-
public KeySourceRecordReader(RedisSourceConfig sourceConfig, Duration idleTimeout) {
47-
super(sourceConfig);
48-
this.topic = sourceConfig.getTopicName();
49-
this.batchSize = Math.toIntExact(sourceConfig.getBatchSize());
50-
this.idleTimeout = idleTimeout;
50+
public KeySourceRecordReader(RedisSourceConfig config, LiveReaderOptions options) {
51+
Assert.notNull(config, "Source connector config must not be null");
52+
Assert.notNull(options, "Options must not be null");
53+
this.config = config;
54+
this.options = options;
55+
this.topic = config.getTopicName();
56+
this.batchSize = Math.toIntExact(config.getBatchSize());
5157
}
5258

5359
@Override
54-
public void open() throws Exception {
60+
public void open() {
5561
RedisURI uri = config.uri();
5662
this.client = config.client(uri);
5763
this.pool = config.pool(client);
5864
this.pubSubConnection = RedisModulesUtils.pubSubConnection(client);
59-
reader = RedisItemReader
60-
.liveDataStructure(pool, JobRunner.inMemory(), pubSubConnection, uri.getDatabase(),
61-
config.getKeyPatterns().toArray(new String[0]))
62-
.options(LiveReaderOptions.builder()
63-
.flushingOptions(FlushingOptions.builder().timeout(idleTimeout).build()).build())
64-
.build();
65+
checkJobRunner();
66+
reader = RedisItemReader.liveDataStructure(pool, jobRunner, pubSubConnection, uri.getDatabase(),
67+
config.getKeyPatterns().toArray(new String[0])).options(options).build();
6568
reader.open(new ExecutionContext());
6669
}
6770

71+
private static void checkJobRunner() {
72+
if (jobRunner == null) {
73+
try {
74+
jobRunner = JobRunner.inMemory();
75+
} catch (Exception e) {
76+
throw new ConnectException("Could not initialize in-memory job runner", e);
77+
}
78+
}
79+
}
80+
6881
@Override
6982
public void close() {
7083
if (reader != null) {
@@ -91,12 +104,11 @@ public LiveRedisItemReader<String, DataStructure<String>> getReader() {
91104
}
92105

93106
@Override
94-
protected List<DataStructure<String>> doPoll() throws Exception {
95-
return reader.read(batchSize);
107+
public List<SourceRecord> poll() {
108+
return reader.read(batchSize).stream().map(this::convert).collect(Collectors.toList());
96109
}
97110

98-
@Override
99-
protected SourceRecord convert(DataStructure<String> input) {
111+
private SourceRecord convert(DataStructure<String> input) {
100112
Map<String, ?> sourcePartition = new HashMap<>();
101113
Map<String, ?> sourceOffset = new HashMap<>();
102114
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, input.getKey(), schema(input),
@@ -110,4 +122,9 @@ private Schema schema(DataStructure<String> input) {
110122
return STRING_VALUE_SCHEMA;
111123
}
112124

125+
@Override
126+
public void commit(List<Map<String, ?>> sourceOffsets) {
127+
// do nothing
128+
}
129+
113130
}

src/main/java/com/redis/kafka/connect/source/RedisSourceConfig.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ public enum ReaderType {
3232
KEYS, STREAM
3333
}
3434

35+
public enum AckPolicy {
36+
AUTO, EXPLICIT
37+
}
38+
3539
public static final String TOKEN_STREAM = "${stream}";
3640
public static final String TOKEN_TASK = "${task}";
3741

@@ -58,6 +62,12 @@ public enum ReaderType {
5862
public static final String STREAM_NAME_CONFIG = "redis.stream.name";
5963
public static final String STREAM_NAME_DOC = "Name of the Redis stream to read from";
6064

65+
public static final String STREAM_ACK_CONFIG = "redis.stream.ack";
66+
public static final AckPolicy STREAM_ACK_DEFAULT = AckPolicy.EXPLICIT;
67+
public static final String STREAM_ACK_DOC = "Acknowledgment policy for stream messages. " + AckPolicy.EXPLICIT
68+
+ " mode acks each message after it's been committed (at-least-once processing). " + AckPolicy.AUTO
69+
+ " acks messages as soon as they're read (at-most-once processing).";
70+
6171
public static final String STREAM_OFFSET_CONFIG = "redis.stream.offset";
6272
public static final String STREAM_OFFSET_DEFAULT = "0-0";
6373
public static final String STREAM_OFFSET_DOC = "Stream offset to start reading from";
@@ -79,6 +89,7 @@ public enum ReaderType {
7989
private final ReaderType readerType;
8090
private final List<String> keyPatterns;
8191
private final String streamName;
92+
private final AckPolicy streamAckPolicy;
8293
private final String streamOffset;
8394
private final String streamConsumerGroup;
8495
private final String streamConsumerName;
@@ -93,6 +104,7 @@ public RedisSourceConfig(Map<?, ?> originals) {
93104
this.batchSize = getLong(BATCH_SIZE_CONFIG);
94105
this.keyPatterns = getList(KEY_PATTERNS_CONFIG);
95106
this.streamName = getString(STREAM_NAME_CONFIG);
107+
this.streamAckPolicy = ConfigUtils.getEnum(AckPolicy.class, this, STREAM_ACK_CONFIG);
96108
this.streamOffset = getString(STREAM_OFFSET_CONFIG);
97109
this.streamConsumerGroup = getString(STREAM_CONSUMER_GROUP_CONFIG);
98110
this.streamConsumerName = getString(STREAM_CONSUMER_NAME_CONFIG);
@@ -119,6 +131,10 @@ public String getStreamName() {
119131
return streamName;
120132
}
121133

134+
public AckPolicy getStreamAckPolicy() {
135+
return streamAckPolicy;
136+
}
137+
122138
public String getStreamOffset() {
123139
return streamOffset;
124140
}
@@ -159,6 +175,9 @@ private void define() {
159175
.defaultValue(KEY_PATTERNS_DEFAULT).importance(Importance.MEDIUM).internalConfig(true).build());
160176
define(ConfigKeyBuilder.of(STREAM_NAME_CONFIG, ConfigDef.Type.STRING).documentation(STREAM_NAME_DOC)
161177
.importance(ConfigDef.Importance.HIGH).build());
178+
define(ConfigKeyBuilder.of(STREAM_ACK_CONFIG, ConfigDef.Type.STRING).documentation(STREAM_ACK_DOC)
179+
.defaultValue(STREAM_ACK_DEFAULT.name()).importance(ConfigDef.Importance.HIGH)
180+
.validator(Validators.validEnum(AckPolicy.class)).build());
162181
define(ConfigKeyBuilder.of(STREAM_OFFSET_CONFIG, ConfigDef.Type.STRING).documentation(STREAM_OFFSET_DOC)
163182
.defaultValue(STREAM_OFFSET_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
164183
define(ConfigKeyBuilder.of(STREAM_CONSUMER_GROUP_CONFIG, ConfigDef.Type.STRING)
@@ -179,7 +198,7 @@ public int hashCode() {
179198
final int prime = 31;
180199
int result = super.hashCode();
181200
result = prime * result + Objects.hash(batchSize, keyPatterns, readerType, streamBlock, streamConsumerGroup,
182-
streamConsumerName, streamName, streamOffset, topicName);
201+
streamConsumerName, streamName, streamAckPolicy, streamOffset, topicName);
183202
return result;
184203
}
185204

@@ -196,8 +215,9 @@ public boolean equals(Object obj) {
196215
&& readerType == other.readerType && Objects.equals(streamBlock, other.streamBlock)
197216
&& Objects.equals(streamConsumerGroup, other.streamConsumerGroup)
198217
&& Objects.equals(streamConsumerName, other.streamConsumerName)
199-
&& Objects.equals(streamName, other.streamName) && Objects.equals(streamOffset, other.streamOffset)
200-
&& Objects.equals(topicName, other.topicName);
218+
&& Objects.equals(streamName, other.streamName)
219+
&& Objects.equals(streamAckPolicy, other.streamAckPolicy)
220+
&& Objects.equals(streamOffset, other.streamOffset) && Objects.equals(topicName, other.topicName);
201221
}
202222

203223
}

src/main/java/com/redis/kafka/connect/source/RedisSourceTask.java

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.redis.kafka.connect.source;
1717

1818
import java.time.Duration;
19+
import java.util.ArrayList;
1920
import java.util.List;
2021
import java.util.Map;
2122

@@ -24,12 +25,20 @@
2425
import org.apache.kafka.connect.source.SourceTask;
2526

2627
import com.redis.kafka.connect.RedisSourceConnector;
28+
import com.redis.spring.batch.reader.LiveReaderOptions;
29+
import com.redis.spring.batch.step.FlushingOptions;
2730

2831
public class RedisSourceTask extends SourceTask {
2932

3033
public static final String TASK_ID = "task.id";
3134
public static final String KEYS_IDLE_TIMEOUT = "keys.idletimeout";
3235

36+
/**
37+
* The offsets that have been processed and that are to be acknowledged by the
38+
* reader in {@link RedisSourceTask#commit()}.
39+
*/
40+
private final List<Map<String, ?>> sourceOffsets = new ArrayList<>();
41+
3342
private SourceRecordReader reader;
3443

3544
@Override
@@ -53,14 +62,40 @@ public void start(Map<String, String> props) {
5362

5463
private SourceRecordReader reader(Map<String, String> props) {
5564
RedisSourceConfig sourceConfig = new RedisSourceConfig(props);
56-
if (sourceConfig.getReaderType() == RedisSourceConfig.ReaderType.STREAM) {
57-
String taskIdString = props.get(TASK_ID);
58-
int taskId = taskIdString == null ? 0 : Integer.parseInt(taskIdString);
59-
return new StreamSourceRecordReader(sourceConfig, taskId);
65+
switch (sourceConfig.getReaderType()) {
66+
case STREAM:
67+
String taskId = props.getOrDefault(TASK_ID, String.valueOf(0));
68+
return new StreamSourceRecordReader(sourceConfig, Integer.parseInt(taskId));
69+
case KEYS:
70+
String idleMillis = props.getOrDefault(KEYS_IDLE_TIMEOUT,
71+
String.valueOf(FlushingOptions.DEFAULT_FLUSHING_INTERVAL.toMillis()));
72+
FlushingOptions flushingOptions = FlushingOptions.builder()
73+
.timeout(Duration.ofMillis(Long.parseLong(idleMillis))).build();
74+
LiveReaderOptions liveReaderOptions = LiveReaderOptions.builder().flushingOptions(flushingOptions).build();
75+
return new KeySourceRecordReader(sourceConfig, liveReaderOptions);
76+
default:
77+
throw new IllegalArgumentException("Unknown reader type: " + sourceConfig.getReaderType());
78+
}
79+
}
80+
81+
private void addSourceOffset(Map<String, ?> sourceOffset) {
82+
sourceOffsets.add(sourceOffset);
83+
}
84+
85+
@Deprecated
86+
@Override
87+
public void commitRecord(SourceRecord sourceRecord) throws InterruptedException {
88+
Map<String, ?> currentOffset = sourceRecord.sourceOffset();
89+
if (currentOffset != null) {
90+
addSourceOffset(currentOffset);
91+
}
92+
}
93+
94+
@Override
95+
public void commit() throws InterruptedException {
96+
if (reader != null) {
97+
reader.commit(sourceOffsets);
6098
}
61-
String idleTimeoutString = props.get(KEYS_IDLE_TIMEOUT);
62-
return new KeySourceRecordReader(sourceConfig,
63-
idleTimeoutString == null ? null : Duration.ofMillis(Long.parseLong(idleTimeoutString)));
6499
}
65100

66101
@Override
Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package com.redis.kafka.connect.source;
22

3-
import org.apache.kafka.connect.source.SourceRecord;
4-
53
import java.util.List;
4+
import java.util.Map;
5+
6+
import org.apache.kafka.connect.source.SourceRecord;
67

78
public interface SourceRecordReader {
89

9-
void open() throws Exception;
10+
void open();
11+
12+
List<SourceRecord> poll();
1013

11-
List<SourceRecord> poll();
14+
void close();
1215

13-
void close();
16+
void commit(List<Map<String, ?>> sourceOffsets);
1417
}

0 commit comments

Comments
 (0)