Skip to content

Commit 26e31c6

Browse files
committed
Added keys reader to source connector. Resolves #2
1 parent fb4cde7 commit 26e31c6

File tree

9 files changed

+152
-34
lines changed

9 files changed

+152
-34
lines changed

pom.xml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@
6262
</repositories>
6363
<dependencies>
6464
<dependency>
65-
<groupId>io.lettuce</groupId>
66-
<artifactId>lettuce-core</artifactId>
67-
<version>6.1.3.RELEASE</version>
65+
<groupId>com.redislabs</groupId>
66+
<artifactId>mesclun</artifactId>
67+
<version>1.3.4</version>
6868
</dependency>
6969
<dependency>
7070
<groupId>org.slf4j</groupId>

src/main/java/com/redislabs/kafka/connect/sink/RedisEnterpriseSinkTask.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
public class RedisEnterpriseSinkTask extends SinkTask {
6060

6161
private static final Logger log = LoggerFactory.getLogger(RedisEnterpriseSinkTask.class);
62+
private static final String OFFSET_KEY_FORMAT = "com.redislabs.kafka.connect.sink.offset.%s.%s";
6263

6364
private RedisClient client;
6465
private RedisEnterpriseSinkConfig config;
@@ -82,7 +83,7 @@ public void start(final Map<String, String> props) {
8283
final java.util.Set<TopicPartition> assignment = this.context.assignment();
8384
if (!assignment.isEmpty()) {
8485
List<SinkOffsetState> offsetStates = new ArrayList<>();
85-
String[] partitionKeys = assignment.stream().map(this::offsetKey).toArray(String[]::new);
86+
String[] partitionKeys = assignment.stream().map(a -> offsetKey(a.topic(), a.partition())).toArray(String[]::new);
8687
List<KeyValue<String, String>> values = connection.sync().mget(partitionKeys);
8788
for (KeyValue<String, String> value : values) {
8889
if (value.hasValue()) {
@@ -108,8 +109,8 @@ public void start(final Map<String, String> props) {
108109
}
109110
}
110111

111-
private String offsetKey(TopicPartition topicPartition) {
112-
return String.format("__kafka.offset.%s.%s", topicPartition.topic(), topicPartition.partition());
112+
private String offsetKey(String topic, Integer partition) {
113+
return String.format(OFFSET_KEY_FORMAT, topic, partition);
113114
}
114115

115116
private OperationItemWriter.RedisOperation<byte[], byte[], SinkRecord> operation() {
@@ -231,7 +232,7 @@ public void put(final Collection<SinkRecord> records) {
231232
if (!offsetData.isEmpty()) {
232233
Map<String, String> offsets = new LinkedHashMap<>(offsetData.size());
233234
for (SinkOffsetState e : offsetData) {
234-
String key = String.format("__kafka.offset.%s.%s", e.topic(), e.partition());
235+
String key = offsetKey(e.topic(), e.partition());
235236
String value;
236237
try {
237238
value = ObjectMapperFactory.INSTANCE.writeValueAsString(e);

src/main/java/com/redislabs/kafka/connect/source/RedisEnterpriseSourceConfig.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,35 @@
1616
package com.redislabs.kafka.connect.source;
1717

1818
import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
19+
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
20+
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
1921
import com.redislabs.kafka.connect.common.RedisEnterpriseConfig;
2022
import org.apache.kafka.common.config.ConfigDef;
2123

24+
import java.util.Arrays;
25+
import java.util.Collections;
26+
import java.util.List;
2227
import java.util.Map;
2328

2429
public class RedisEnterpriseSourceConfig extends RedisEnterpriseConfig {
2530

31+
2632
public static final String TOKEN_STREAM = "${stream}";
2733

34+
public static final String READER_TYPE = "redis.reader";
35+
public static final String READER_DEFAULT = ReaderType.KEYS.name();
36+
public static final String READER_DOC = "Source from which to read Redis records. " + ReaderType.KEYS + ": generate records from key events and respective values generated from write operations in the Redis database. " + ReaderType.STREAM + ": read messages from a Redis stream";
37+
38+
public static final String KEYS_PATTERN = "redis.keys.pattern";
39+
public static final String KEYS_PATTERN_DEFAULT = "*";
40+
public static final String KEYS_PATTERN_DOC = "Generate records only for key names that match the given glob-style pattern";
41+
42+
public static final String KEYS_EVENT_TYPES = "redis.keys.eventtypes";
43+
public static final String KEYS_EVENT_TYPES_DEFAULT = "";
44+
public static final String KEYS_EVENT_TYPES_DOC = "Comma-separated event types for key reader. The list may contain one or more command or event (default: None)";
45+
2846
public static final String STREAM_NAME = "redis.stream.name";
47+
public static final String STREAM_NAME_DEFAULT = "com.redislabs.kafka.connect.source.stream";
2948
public static final String STREAM_NAME_DOC = "Name of the Redis stream to read from";
3049

3150
public static final String STREAM_OFFSET = "redis.stream.offset";
@@ -42,8 +61,11 @@ public class RedisEnterpriseSourceConfig extends RedisEnterpriseConfig {
4261

4362
public static final String TOPIC_NAME_FORMAT = "topic.name";
4463
public static final String TOPIC_NAME_FORMAT_DEFAULT = TOKEN_STREAM;
45-
public static final String TOPIC_NAME_FORMAT_DOC = "A format string for the destination topic name, which may contain '${stream}' as a " + "placeholder for the originating stream name.\n" + "For example, ``redis_${stream}`` for the stream 'orders' will map to the topic name " + "'redis_orders'.";
64+
public static final String TOPIC_NAME_FORMAT_DOC = String.format("A format string for the destination topic name, which may contain '%s' as a placeholder for the originating stream name. For example `redis_%s` for the stream 'orders' will map to the topic name 'redis_orders'.", TOKEN_STREAM, TOKEN_STREAM);
4665

66+
private final ReaderType readerType;
67+
private final List<String> keysEventTypes;
68+
private final String keysPattern;
4769
private final String streamName;
4870
private final String streamOffset;
4971
private final Long streamCount;
@@ -52,13 +74,35 @@ public class RedisEnterpriseSourceConfig extends RedisEnterpriseConfig {
5274

5375
public RedisEnterpriseSourceConfig(Map<?, ?> originals) {
5476
super(new RedisEnterpriseSourceConfigDef(), originals);
77+
this.readerType = ConfigUtils.getEnum(ReaderType.class, this, READER_TYPE);
78+
this.keysEventTypes = splitCommaSeparated(getString(KEYS_EVENT_TYPES));
79+
this.keysPattern = getString(KEYS_PATTERN);
5580
this.streamName = getString(STREAM_NAME);
5681
this.streamOffset = getString(STREAM_OFFSET);
5782
this.streamCount = getLong(STREAM_COUNT);
5883
this.streamBlock = getLong(STREAM_BLOCK);
5984
this.topicNameFormat = getString(TOPIC_NAME_FORMAT);
6085
}
6186

87+
private List<String> splitCommaSeparated(String string) {
88+
if (string.trim().isEmpty()) {
89+
return Collections.emptyList();
90+
}
91+
return Arrays.asList(string.split(","));
92+
}
93+
94+
public List<String> getKeysEventTypes() {
95+
return keysEventTypes;
96+
}
97+
98+
public ReaderType getReaderType() {
99+
return readerType;
100+
}
101+
102+
public String getKeysPattern() {
103+
return keysPattern;
104+
}
105+
62106
public Long getStreamBlock() {
63107
return streamBlock;
64108
}
@@ -91,14 +135,20 @@ public RedisEnterpriseSourceConfigDef(ConfigDef base) {
91135
}
92136

93137
private void define() {
94-
define(ConfigKeyBuilder.of(STREAM_NAME, ConfigDef.Type.STRING).documentation(STREAM_NAME_DOC).importance(ConfigDef.Importance.HIGH).validator(ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), new ConfigDef.NonEmptyString())).build());
138+
define(ConfigKeyBuilder.of(READER_TYPE, ConfigDef.Type.STRING).documentation(READER_DOC).defaultValue(READER_DEFAULT).importance(ConfigDef.Importance.HIGH).validator(Validators.validEnum(ReaderType.class)).build());
139+
define(ConfigKeyBuilder.of(KEYS_EVENT_TYPES, Type.STRING).documentation(KEYS_EVENT_TYPES_DOC).defaultValue(KEYS_EVENT_TYPES_DEFAULT).importance(Importance.MEDIUM).build());
140+
define(ConfigKeyBuilder.of(KEYS_PATTERN, Type.STRING).documentation(KEYS_PATTERN_DOC).defaultValue(KEYS_PATTERN_DEFAULT).importance(Importance.MEDIUM).build());
141+
define(ConfigKeyBuilder.of(STREAM_NAME, ConfigDef.Type.STRING).documentation(STREAM_NAME_DOC).defaultValue(STREAM_NAME_DEFAULT).importance(ConfigDef.Importance.HIGH).build());
95142
define(ConfigKeyBuilder.of(STREAM_OFFSET, ConfigDef.Type.STRING).documentation(STREAM_OFFSET_DOC).defaultValue(STREAM_OFFSET_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
96143
define(ConfigKeyBuilder.of(STREAM_COUNT, ConfigDef.Type.LONG).defaultValue(STREAM_COUNT_DEFAULT).importance(ConfigDef.Importance.LOW).documentation(STREAM_COUNT_DOC).validator(ConfigDef.Range.atLeast(1L)).build());
97144
define(ConfigKeyBuilder.of(STREAM_BLOCK, ConfigDef.Type.LONG).defaultValue(STREAM_BLOCK_DEFAULT).importance(ConfigDef.Importance.LOW).documentation(STREAM_BLOCK_DOC).validator(ConfigDef.Range.atLeast(1L)).build());
98145
define(ConfigKeyBuilder.of(TOPIC_NAME_FORMAT, ConfigDef.Type.STRING).defaultValue(TOPIC_NAME_FORMAT_DEFAULT).importance(ConfigDef.Importance.MEDIUM).documentation(TOPIC_NAME_FORMAT_DOC).build());
99146
}
100147

148+
}
101149

150+
public enum ReaderType {
151+
KEYS, STREAM
102152
}
103153

104154
}

src/main/java/com/redislabs/kafka/connect/source/RedisEnterpriseSourceTask.java

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,12 @@
1515
*/
1616
package com.redislabs.kafka.connect.source;
1717

18+
import com.google.common.base.Charsets;
1819
import com.redislabs.kafka.connect.RedisEnterpriseSourceConnector;
19-
import io.lettuce.core.RedisClient;
20+
import com.redislabs.mesclun.RedisModulesClient;
21+
import com.redislabs.mesclun.api.StatefulRedisModulesConnection;
22+
import com.redislabs.mesclun.api.sync.RedisGearsCommands;
23+
import com.redislabs.mesclun.gears.RedisGearsUtils;
2024
import io.lettuce.core.StreamMessage;
2125
import io.lettuce.core.XReadArgs;
2226
import org.apache.kafka.connect.data.Schema;
@@ -35,19 +39,23 @@
3539
import java.util.Collections;
3640
import java.util.List;
3741
import java.util.Map;
42+
import java.util.stream.Collectors;
3843

3944
public class RedisEnterpriseSourceTask extends SourceTask {
4045

4146
private static final Logger log = LoggerFactory.getLogger(RedisEnterpriseSourceTask.class);
4247

43-
public static final String STREAM_FIELD = "stream";
48+
public static final String OFFSET = "stream";
4449
public static final String OFFSET_FIELD = "offset";
45-
50+
private static final String KEYSREADER_STREAM = "${stream}";
51+
private static final String KEYSREADER_PATTERN = "${prefix}";
52+
private static final String KEYSREADER_EVENTTYPES = "${eventTypes}";
53+
private static final String EVENTTYPES_NONE = "None";
4654
private static final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
4755
private static final String VALUE_SCHEMA_NAME = "com.redislabs.kafka.connect.EventValue";
4856
private static final Schema VALUE_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME);
4957

50-
private RedisClient client;
58+
private RedisModulesClient client;
5159
private RedisEnterpriseSourceConfig sourceConfig;
5260
private StreamItemReader reader;
5361
private Map<String, String> offsetKey;
@@ -60,8 +68,17 @@ public String version() {
6068
@Override
6169
public void start(Map<String, String> props) {
6270
this.sourceConfig = new RedisEnterpriseSourceConfig(props);
63-
this.client = RedisClient.create(sourceConfig.getRedisUri());
64-
this.offsetKey = Collections.singletonMap(STREAM_FIELD, sourceConfig.getStreamName());
71+
this.client = RedisModulesClient.create(sourceConfig.getRedisUri());
72+
if (sourceConfig.getReaderType() == RedisEnterpriseSourceConfig.ReaderType.KEYS) {
73+
StatefulRedisModulesConnection<String, String> connection = client.connect();
74+
RedisGearsCommands<String, String> sync = connection.sync();
75+
String function = RedisGearsUtils.toString(getClass().getClassLoader().getResourceAsStream("keysreader.py"), Charsets.UTF_8);
76+
function = function.replace(KEYSREADER_STREAM, sourceConfig.getStreamName());
77+
function = function.replace(KEYSREADER_EVENTTYPES, toPyArray(sourceConfig.getKeysEventTypes()));
78+
function = function.replace(KEYSREADER_PATTERN, sourceConfig.getKeysPattern());
79+
sync.pyExecute(function);
80+
}
81+
this.offsetKey = Collections.singletonMap(OFFSET, sourceConfig.getStreamName());
6582
String offset = sourceConfig.getStreamOffset();
6683
if (context != null) {
6784
Map<String, Object> storedOffset = context.offsetStorageReader().offset(offsetKey);
@@ -82,6 +99,13 @@ public void start(Map<String, String> props) {
8299
this.reader.open(new ExecutionContext());
83100
}
84101

102+
private String toPyArray(List<String> eventTypes) {
103+
if (eventTypes.isEmpty()) {
104+
return EVENTTYPES_NONE;
105+
}
106+
return eventTypes.stream().map(s -> "'" + s + "'").collect(Collectors.joining(",", "[", "]"));
107+
}
108+
85109
@Override
86110
public void stop() {
87111
if (reader != null) {

src/main/resources/keysreader.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# Capture each keyspace event and store it in a stream
2+
GB('KeysReader') \
3+
.foreach(lambda x:
4+
execute('XADD', '${stream}', '*', 'key', x['key'], 'value', x['value'], 'type', x['type'], 'event', x['event'])) \
5+
.register(prefix='${prefix}',
6+
mode='sync',
7+
eventTypes=${eventTypes},
8+
readValue=True)

src/test/java/com/redislabs/kafka/connect/AbstractRedisEnterpriseIT.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package com.redislabs.kafka.connect;
22

33
import com.redislabs.testcontainers.RedisEnterpriseContainer;
4+
import com.redislabs.testcontainers.RedisModulesContainer;
45
import com.redislabs.testcontainers.RedisServer;
6+
import com.redislabs.testcontainers.support.enterprise.rest.Database;
57
import io.lettuce.core.AbstractRedisClient;
68
import io.lettuce.core.RedisClient;
79
import io.lettuce.core.api.StatefulConnection;
@@ -36,7 +38,8 @@
3638
public class AbstractRedisEnterpriseIT {
3739

3840
@Container
39-
private static final RedisEnterpriseContainer REDIS_ENTERPRISE = new RedisEnterpriseContainer();
41+
private static final RedisEnterpriseContainer REDIS_ENTERPRISE = new RedisEnterpriseContainer().withModules(Database.Module.GEARS);
42+
// private static final RedisModulesContainer REDIS_ENTERPRISE = new RedisModulesContainer();
4043

4144
static Stream<RedisServer> redisServers() {
4245
return Stream.of(REDIS_ENTERPRISE);

src/test/java/com/redislabs/kafka/connect/RedisEnterpriseSourceConnectorTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,7 @@ public void testConfig() {
2020
Map<String, ConfigValue> results = config.validateAll(new HashMap<>());
2121
ConfigValue value = results.get(RedisEnterpriseSourceConfig.STREAM_NAME);
2222
Assertions.assertEquals(RedisEnterpriseSourceConfig.STREAM_NAME, value.name());
23-
Assertions.assertNull(value.value());
24-
String expected = "Missing required configuration \"" + RedisEnterpriseSourceConfig.STREAM_NAME + "\" which has no default value.";
25-
Assertions.assertEquals(expected, value.errorMessages().get(0));
23+
Assertions.assertEquals(RedisEnterpriseSourceConfig.STREAM_NAME_DEFAULT, value.value());
2624
}
2725

2826
@Test

src/test/java/com/redislabs/kafka/connect/RedisEnterpriseSourceTaskIT.java

Lines changed: 49 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,34 @@
33
import com.redislabs.kafka.connect.source.RedisEnterpriseSourceConfig;
44
import com.redislabs.kafka.connect.source.RedisEnterpriseSourceTask;
55
import com.redislabs.testcontainers.RedisServer;
6+
import io.lettuce.core.RedisClient;
7+
import io.lettuce.core.RedisURI;
8+
import io.lettuce.core.codec.ByteArrayCodec;
69
import org.apache.kafka.connect.source.SourceRecord;
710
import org.junit.jupiter.api.AfterEach;
811
import org.junit.jupiter.api.Assertions;
912
import org.junit.jupiter.api.BeforeEach;
1013
import org.junit.jupiter.params.ParameterizedTest;
1114
import org.junit.jupiter.params.provider.MethodSource;
1215

13-
import java.util.HashMap;
16+
import java.util.Arrays;
1417
import java.util.List;
1518
import java.util.Map;
1619

1720
import static org.junit.jupiter.api.Assertions.assertEquals;
1821

1922
public class RedisEnterpriseSourceTaskIT extends AbstractRedisEnterpriseIT {
2023

21-
private static final String STREAM = "stream1";
22-
2324
private RedisEnterpriseSourceTask task;
2425

2526
@BeforeEach
2627
public void createTask() {
2728
task = new RedisEnterpriseSourceTask();
2829
}
2930

30-
private void startTask(RedisServer redis) {
31-
final Map<String, String> config = new HashMap<>();
31+
private void startTask(RedisServer redis, String... props) {
32+
Map<String, String> config = map(props);
3233
config.put(RedisEnterpriseSourceConfig.REDIS_URI, redis.getRedisURI());
33-
config.put(RedisEnterpriseSourceConfig.STREAM_NAME, STREAM);
3434
task.start(config);
3535
}
3636

@@ -42,19 +42,53 @@ public void teardown() {
4242
@SuppressWarnings("unchecked")
4343
@ParameterizedTest
4444
@MethodSource("redisServers")
45-
public void pollRetrievesStreamMessages(RedisServer redis) throws InterruptedException {
46-
startTask(redis);
47-
syncStream(redis).xadd(STREAM, map("field1", "value1", "field2", "value2"));
48-
syncStream(redis).xadd(STREAM, map("field1", "value1", "field2", "value2"));
49-
syncStream(redis).xadd(STREAM, map("field1", "value1", "field2", "value2"));
45+
public void pollStream(RedisServer redis) throws InterruptedException {
46+
final String stream = "stream1";
47+
startTask(redis, RedisEnterpriseSourceConfig.READER_TYPE, RedisEnterpriseSourceConfig.ReaderType.STREAM.name(), RedisEnterpriseSourceConfig.STREAM_NAME, stream);
48+
String field1 = "field1";
49+
String value1 = "value1";
50+
String field2 = "field2";
51+
String value2 = "value2";
52+
syncStream(redis).xadd(stream, map(field1, value1, field2, value2));
53+
syncStream(redis).xadd(stream, map(field1, value1, field2, value2));
54+
syncStream(redis).xadd(stream, map(field1, value1, field2, value2));
5055
Thread.sleep(100);
51-
final List<SourceRecord> sourceRecords = task.poll();
56+
List<SourceRecord> sourceRecords = task.poll();
5257
assertEquals(3, sourceRecords.size());
5358
for (SourceRecord record : sourceRecords) {
54-
Assertions.assertEquals(STREAM, record.topic());
59+
Assertions.assertEquals(stream, record.topic());
5560
Map<String, String> map = (Map<String, String>) record.value();
56-
Assertions.assertEquals("value1", map.get("field1"));
57-
Assertions.assertEquals("value2", map.get("field2"));
61+
Assertions.assertEquals(value1, map.get(field1));
62+
Assertions.assertEquals(value2, map.get(field2));
63+
}
64+
}
65+
66+
@SuppressWarnings("unchecked")
67+
@ParameterizedTest
68+
@MethodSource("redisServers")
69+
public void pollKeys(RedisServer redis) throws InterruptedException {
70+
startTask(redis);
71+
Thread.sleep(1000);
72+
String stringKey = "key:1";
73+
String stringValue = "my string";
74+
syncString(redis).set(stringKey, stringValue);
75+
String hashKey = "key:2";
76+
syncHash(redis).hset(hashKey, map("field1", "value1", "field2", "value2"));
77+
Thread.sleep(1000);
78+
List<SourceRecord> sourceRecords = task.poll();
79+
assertEquals(2, sourceRecords.size());
80+
SourceRecord stringRecord = sourceRecords.get(0);
81+
Map<String, String> stringBody = (Map<String, String>) stringRecord.value();
82+
assertEquals("string", stringBody.get("type"));
83+
assertEquals(stringKey, stringBody.get("key"));
84+
assertEquals(stringValue, stringBody.get("value"));
85+
}
86+
87+
public static void main(String[] args) {
88+
RedisClient client = RedisClient.create(RedisURI.create("localhost", 12000));
89+
List<byte[]> keys = client.connect(new ByteArrayCodec()).sync().keys("*".getBytes());
90+
for (byte[] key : keys) {
91+
System.out.println(Arrays.hashCode(key));
5892
}
5993
}
6094
}

0 commit comments

Comments
 (0)