Skip to content

Commit 765f03d

Browse files
committed
added integration tests
1 parent a4d5f90 commit 765f03d

12 files changed

+217
-38
lines changed

config/sink.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ name=sink
22
connector.class=com.redislabs.kafkaconnect.RedisEnterpriseSinkConnector
33
tasks.max=1
44
topics=test
5-
stream.name.format=test2
5+
redis.stream.name=test2
66
redis.uri=redis://localhost:6379

config/source.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ name=source
22
connector.class=com.redislabs.kafkaconnect.RedisEnterpriseSourceConnector
33
tasks.max=1
44
topic=test
5-
stream=test
5+
redis.stream.name=test
66
redis.uri=redis://localhost:6379

pom.xml

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,7 @@
7171
<dependency>
7272
<groupId>com.redislabs</groupId>
7373
<artifactId>spring-batch-redis</artifactId>
74-
<version>2.9.6-SNAPSHOT</version>
75-
<exclusions>
76-
<exclusion>
77-
<groupId>io.netty</groupId>
78-
<artifactId>*</artifactId>
79-
</exclusion>
80-
</exclusions>
74+
<version>2.10.2</version>
8175
</dependency>
8276

8377
<dependency>
@@ -157,6 +151,9 @@
157151
<![CDATA[This connector is <a href="https://redislabs.com/company/support/">supported by Redis Labs</a> as part of a
158152
<a href="https://redislabs.com/redis-enterprise">Redis Enterprise</a> license.]]>
159153
</supportSummary>
154+
<excludes>
155+
<exclude>io.netty:*</exclude>
156+
</excludes>
160157
</configuration>
161158
</execution>
162159
</executions>

src/main/java/com/redislabs/kafkaconnect/source/RedisEnterpriseSourceConfig.java renamed to src/main/java/com/redislabs/kafkaconnect/RedisEnterpriseSourceConfig.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package com.redislabs.kafkaconnect.source;
16+
package com.redislabs.kafkaconnect;
1717

1818
import com.redislabs.kafkaconnect.common.RedisEnterpriseConfigException;
1919
import lombok.Getter;
@@ -33,21 +33,21 @@ public class RedisEnterpriseSourceConfig extends AbstractConfig {
3333
public static final String REDIS_URI_DISPLAY = "Connection URI";
3434
public static final String REDIS_URI_DOC = "URI of the Redis Enterprise database to connect to, e.g. redis://redis-12000.redislabs.com:12000";
3535

36-
public static final String STREAM_NAME = "stream";
36+
public static final String STREAM_NAME = "redis.stream.name";
3737
public static final String STREAM_NAME_DISPLAY = "Stream name";
3838
public static final String STREAM_NAME_DOC = "Name of the Redis stream to read from";
3939

40-
public static final String STREAM_OFFSET = "stream.offset";
40+
public static final String STREAM_OFFSET = "redis.stream.offset";
4141
public static final String STREAM_OFFSET_DEFAULT = "0-0";
4242
public static final String STREAM_OFFSET_DISPLAY = "Stream offset";
4343
public static final String STREAM_OFFSET_DOC = "Stream offset to start reading from";
4444

45-
public static final String STREAM_COUNT = "stream.count";
45+
public static final String STREAM_COUNT = "redis.stream.count";
4646
public static final long STREAM_COUNT_DEFAULT = 50;
4747
public static final String STREAM_COUNT_DISPLAY = "The maximum batch size";
4848
public static final String STREAM_COUNT_DOC = "Maximum number of stream messages to include in a single read when polling for new data (XREAD [COUNT count]). This setting can be used to limit the amount of data buffered internally in the connector.";
4949

50-
public static final String STREAM_BLOCK = "stream.block";
50+
public static final String STREAM_BLOCK = "redis.stream.block";
5151
public static final long STREAM_BLOCK_DEFAULT = 100;
5252
public static final String STREAM_BLOCK_DISPLAY = "Max poll duration";
5353
public static final String STREAM_BLOCK_DOC = "The max amount of time in milliseconds to wait while polling for stream messages (XREAD [BLOCK milliseconds])";

src/main/java/com/redislabs/kafkaconnect/RedisEnterpriseSourceConnector.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
*/
1313
package com.redislabs.kafkaconnect;
1414

15-
import com.redislabs.kafkaconnect.source.RedisEnterpriseSourceConfig;
16-
import com.redislabs.kafkaconnect.source.RedisEnterpriseSourceTask;
1715
import org.apache.kafka.common.config.ConfigDef;
1816
import org.apache.kafka.common.utils.AppInfoParser;
1917
import org.apache.kafka.connect.connector.Task;

src/main/java/com/redislabs/kafkaconnect/source/RedisEnterpriseSourceTask.java renamed to src/main/java/com/redislabs/kafkaconnect/RedisEnterpriseSourceTask.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package com.redislabs.kafkaconnect.source;
16+
package com.redislabs.kafkaconnect;
1717

18-
import com.redislabs.kafkaconnect.RedisEnterpriseSourceConnector;
1918
import io.lettuce.core.RedisClient;
2019
import io.lettuce.core.StreamMessage;
2120
import io.lettuce.core.XReadArgs;

src/main/java/com/redislabs/kafkaconnect/sink/RedisEnterpriseSinkConfig.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package com.redislabs.kafkaconnect.sink;
1717

18-
import com.redislabs.kafkaconnect.source.RedisEnterpriseSourceConfig;
18+
import com.redislabs.kafkaconnect.RedisEnterpriseSourceConfig;
1919
import lombok.Getter;
2020
import org.apache.kafka.common.config.AbstractConfig;
2121
import org.apache.kafka.common.config.ConfigDef;
@@ -26,32 +26,28 @@ public class RedisEnterpriseSinkConfig extends AbstractConfig {
2626

2727
public static final ConfigDef CONFIG_DEF = new RedisEnterpriseSinkConfigDef();
2828

29-
public static final String STREAM_NAME_FORMAT = "stream.name.format";
30-
public static final String STREAM_NAME_FORMAT_DEFAULT = "${topic}";
31-
public static final String STREAM_NAME_FORMAT_DOC = "A format string for the destination stream name, which may contain '${topic}' as a " + "placeholder for the originating topic name.\n" + "For example, ``kafka_${topic}`` for the topic 'orders' will map to the stream name " + "'kafka_orders'.";
32-
public static final String STREAM_NAME_FORMAT_DISPLAY = "Stream Name Format";
29+
public static final String STREAM_NAME = "redis.stream.name";
30+
public static final String STREAM_NAME_DEFAULT = "${topic}";
31+
public static final String STREAM_NAME_DOC = "A format string for the destination stream name, which may contain '${topic}' as a " + "placeholder for the originating topic name.\n" + "For example, ``kafka_${topic}`` for the topic 'orders' will map to the stream name " + "'kafka_orders'.";
32+
public static final String STREAM_NAME_DISPLAY = "Stream Name Format";
3333

34-
public static final String TRANSACTIONAL = "transactional";
35-
public static final String TRANSACTIONAL_DEFAULT = "false";
36-
public static final String TRANSACTIONAL_DOC = "Whether to execute Redis commands in multi/exec transactions.";
37-
public static final String TRANSACTIONAL_DISPLAY = "Use Transactions";
34+
public static final String MULTIEXEC = "redis.multiexec";
35+
public static final String MULTIEXEC_DEFAULT = "false";
36+
public static final String MULTIEXEC_DOC = "Whether to execute Redis commands in multi/exec transactions.";
37+
public static final String MULTIEXEC_DISPLAY = "Use Transactions";
3838

3939
@Getter
4040
private final String redisUri;
4141
@Getter
4242
private final String streamNameFormat;
4343
@Getter
44-
private final Boolean transactional;
44+
private final boolean multiexec;
4545

4646
public RedisEnterpriseSinkConfig(final Map<?, ?> originals) {
47-
this(originals, true);
48-
}
49-
50-
private RedisEnterpriseSinkConfig(final Map<?, ?> originals, final boolean validateAll) {
5147
super(CONFIG_DEF, originals, false);
5248
redisUri = getString(RedisEnterpriseSourceConfig.REDIS_URI);
53-
streamNameFormat = getString(STREAM_NAME_FORMAT).trim();
54-
transactional = getBoolean(TRANSACTIONAL);
49+
streamNameFormat = getString(STREAM_NAME).trim();
50+
multiexec = Boolean.TRUE.equals(getBoolean(MULTIEXEC));
5551
}
5652

5753

@@ -61,8 +57,8 @@ public RedisEnterpriseSinkConfigDef() {
6157
String group = "Redis Enterprise";
6258
int order = 0;
6359
define(RedisEnterpriseSourceConfig.REDIS_URI, Type.STRING, RedisEnterpriseSourceConfig.REDIS_URI_DEFAULT, Importance.HIGH, RedisEnterpriseSourceConfig.REDIS_URI_DOC, group, ++order, Width.MEDIUM, RedisEnterpriseSourceConfig.REDIS_URI_DISPLAY);
64-
define(STREAM_NAME_FORMAT, Type.STRING, STREAM_NAME_FORMAT_DEFAULT, Importance.MEDIUM, STREAM_NAME_FORMAT_DOC, group, ++order, Width.MEDIUM, STREAM_NAME_FORMAT_DISPLAY);
65-
define(TRANSACTIONAL, Type.BOOLEAN, TRANSACTIONAL_DEFAULT, Importance.MEDIUM, TRANSACTIONAL_DOC, group, ++order, Width.SHORT, TRANSACTIONAL_DISPLAY);
60+
define(STREAM_NAME, Type.STRING, STREAM_NAME_DEFAULT, Importance.MEDIUM, STREAM_NAME_DOC, group, ++order, Width.MEDIUM, STREAM_NAME_DISPLAY);
61+
define(MULTIEXEC, Type.BOOLEAN, MULTIEXEC_DEFAULT, Importance.MEDIUM, MULTIEXEC_DOC, group, ++order, Width.SHORT, MULTIEXEC_DISPLAY);
6662
}
6763

6864
}

src/main/java/com/redislabs/kafkaconnect/sink/RedisEnterpriseSinkTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public void start(final Map<String, String> props) {
4646
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig = new GenericObjectPoolConfig<>();
4747
poolConfig.setMaxTotal(1);
4848
pool = ConnectionPoolSupport.createGenericObjectPool(client::connect, poolConfig);
49-
writer = writer(pool, Boolean.TRUE.equals(sinkConfig.getTransactional()));
49+
writer = writer(pool, Boolean.TRUE.equals(sinkConfig.isMultiexec()));
5050
if (writer instanceof ItemStreamSupport) {
5151
((ItemStreamSupport) writer).open(new ExecutionContext());
5252
}
@@ -80,7 +80,7 @@ private Map<String, String> body(SinkRecord record) {
8080
return body;
8181
}
8282
if (value instanceof Map) {
83-
Map map = (Map) value;
83+
Map<?, ?> map = (Map<?, ?>) value;
8484
Map<String, String> body = new LinkedHashMap<>();
8585
map.forEach((k, v) -> body.put(String.valueOf(k), String.valueOf(v)));
8686
return body;
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package com.redislabs.kafkaconnect;
2+
3+
import io.lettuce.core.RedisClient;
4+
import io.lettuce.core.api.StatefulRedisConnection;
5+
import io.lettuce.core.api.sync.RedisCommands;
6+
import org.apache.kafka.connect.source.SourceRecord;
7+
import org.junit.jupiter.api.*;
8+
import org.springframework.util.Assert;
9+
import org.testcontainers.junit.jupiter.Container;
10+
import org.testcontainers.junit.jupiter.Testcontainers;
11+
12+
import java.util.HashMap;
13+
import java.util.LinkedHashMap;
14+
import java.util.List;
15+
import java.util.Map;
16+
17+
import static org.junit.jupiter.api.Assertions.assertEquals;
18+
19+
@Testcontainers
20+
public class AbstractRedisEnterpriseTest {
21+
22+
@Container
23+
private static final RedisContainer REDIS = new RedisContainer();
24+
25+
protected static String redisUri;
26+
protected static RedisClient redisClient;
27+
protected static StatefulRedisConnection<String, String> connection;
28+
protected static RedisCommands<String, String> syncCommands;
29+
30+
@BeforeAll
31+
static void setupAll() {
32+
redisUri = REDIS.getUri();
33+
redisClient = RedisClient.create(redisUri);
34+
connection = redisClient.connect();
35+
syncCommands = connection.sync();
36+
}
37+
38+
@AfterEach
39+
public void cleanupEach() {
40+
syncCommands.flushall();
41+
}
42+
43+
@AfterAll
44+
static void cleanupAll() {
45+
connection.close();
46+
redisClient.shutdown();
47+
}
48+
49+
public static Map<String, String> body(String... args) {
50+
Assert.notNull(args, "Body args cannot be null");
51+
Assert.isTrue(args.length % 2 == 0, "Body args length is not a multiple of 2");
52+
Map<String, String> body = new LinkedHashMap<>();
53+
for (int index = 0; index < args.length / 2; index++) {
54+
body.put(args[index * 2], args[index * 2 + 1]);
55+
}
56+
return body;
57+
}
58+
59+
60+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.redislabs.kafkaconnect;
2+
3+
import org.testcontainers.containers.GenericContainer;
4+
import org.testcontainers.containers.wait.strategy.Wait;
5+
import org.testcontainers.utility.DockerImageName;
6+
7+
public class RedisContainer extends GenericContainer<RedisContainer> {
8+
9+
10+
public RedisContainer() {
11+
super(DockerImageName.parse("redis:latest"));
12+
withExposedPorts(6379);
13+
waitingFor(Wait.forLogMessage(".*Ready to accept connections.*\\n", 1));
14+
}
15+
16+
public String getUri() {
17+
return "redis://" + this.getHost() + ":" + this.getFirstMappedPort();
18+
}
19+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
package com.redislabs.kafkaconnect;
2+
3+
import com.redislabs.kafkaconnect.sink.RedisEnterpriseSinkTask;
4+
import io.lettuce.core.StreamMessage;
5+
import io.lettuce.core.XReadArgs;
6+
import org.apache.kafka.connect.data.Schema;
7+
import org.apache.kafka.connect.data.SchemaBuilder;
8+
import org.apache.kafka.connect.data.Struct;
9+
import org.apache.kafka.connect.sink.SinkRecord;
10+
import org.junit.jupiter.api.Assertions;
11+
import org.junit.jupiter.api.Test;
12+
13+
import java.util.Arrays;
14+
import java.util.HashMap;
15+
import java.util.List;
16+
import java.util.Map;
17+
18+
public class RedisEnterpriseSinkTaskTest extends AbstractRedisEnterpriseTest {
19+
20+
private static final String VALUE_SCHEMA_NAME = "com.redislabs.kafkaconnect.test.value";
21+
private static final Schema MAP_VALUE_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME).build();
22+
private static final String FIELD1 = "field1";
23+
private static final String FIELD2 = "field2";
24+
private static final Schema STRUCT_VALUE_SCHEMA = SchemaBuilder.struct().field(FIELD1, Schema.STRING_SCHEMA).field(FIELD2, Schema.STRING_SCHEMA);
25+
26+
@Test
27+
public void putMaps() {
28+
final Map<String, String> config = new HashMap<>();
29+
config.put("redis.uri", redisUri);
30+
final String topic = "topic1";
31+
final int partition = 0;
32+
final Schema keySchema = null;
33+
final Object key = null;
34+
final Map<String, String> value = body(FIELD1, "value1", FIELD2, "value2");
35+
final long offset = 0L;
36+
37+
// Configure task and write records
38+
final RedisEnterpriseSinkTask sinkTask = new RedisEnterpriseSinkTask();
39+
sinkTask.start(config);
40+
sinkTask.put(Arrays.asList(new SinkRecord(topic, partition, keySchema, key, MAP_VALUE_SCHEMA, value, offset), new SinkRecord(topic, partition, keySchema, key, MAP_VALUE_SCHEMA, value, offset + 1)));
41+
List<StreamMessage<String, String>> messages = syncCommands.xread(XReadArgs.Builder.block(200).count(10), XReadArgs.StreamOffset.from(topic, "0-0"));
42+
Assertions.assertEquals(2, messages.size());
43+
for (StreamMessage<String, String> message : messages) {
44+
Assertions.assertEquals("value1", message.getBody().get(FIELD1));
45+
Assertions.assertEquals("value2", message.getBody().get(FIELD2));
46+
}
47+
}
48+
49+
@Test
50+
public void putStructs() {
51+
final Map<String, String> config = new HashMap<>();
52+
config.put("redis.uri", redisUri);
53+
final String topic = "topic1";
54+
final int partition = 0;
55+
final Schema keySchema = null;
56+
final Object key = null;
57+
58+
final Struct value = new Struct(STRUCT_VALUE_SCHEMA).put(FIELD1, "value1").put(FIELD2, "value2");
59+
final long offset = 0L;
60+
61+
// Configure task and write records
62+
final RedisEnterpriseSinkTask sinkTask = new RedisEnterpriseSinkTask();
63+
sinkTask.start(config);
64+
sinkTask.put(Arrays.asList(new SinkRecord(topic, partition, keySchema, key, STRUCT_VALUE_SCHEMA, value, offset), new SinkRecord(topic, partition, keySchema, key, STRUCT_VALUE_SCHEMA, value, offset + 1)));
65+
List<StreamMessage<String, String>> messages = syncCommands.xread(XReadArgs.Builder.block(200).count(10), XReadArgs.StreamOffset.from(topic, "0-0"));
66+
Assertions.assertEquals(2, messages.size());
67+
for (StreamMessage<String, String> message : messages) {
68+
Assertions.assertEquals("value1", message.getBody().get(FIELD1));
69+
Assertions.assertEquals("value2", message.getBody().get(FIELD2));
70+
}
71+
}
72+
73+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.redislabs.kafkaconnect;
2+
3+
import org.apache.kafka.connect.source.SourceRecord;
4+
import org.junit.jupiter.api.Assertions;
5+
import org.junit.jupiter.api.Test;
6+
7+
import java.util.HashMap;
8+
import java.util.List;
9+
import java.util.Map;
10+
11+
import static org.junit.jupiter.api.Assertions.assertEquals;
12+
13+
public class RedisEnterpriseSourceTaskTest extends AbstractRedisEnterpriseTest {
14+
15+
@Test
16+
public void pollRetrievesStreamMessages() throws InterruptedException {
17+
String topic = "topic1";
18+
String stream = "stream1";
19+
final RedisEnterpriseSourceTask sourceTask = new RedisEnterpriseSourceTask();
20+
final Map<String, String> config = new HashMap<>();
21+
config.put("topic", topic);
22+
config.put("redis.uri", redisUri);
23+
config.put("redis.stream.name", stream);
24+
sourceTask.start(config);
25+
syncCommands.xadd(stream, body("field1", "value1", "field2", "value2"));
26+
syncCommands.xadd(stream, body("field1", "value1", "field2", "value2"));
27+
syncCommands.xadd(stream, body("field1", "value1", "field2", "value2"));
28+
Thread.sleep(100);
29+
final List<SourceRecord> sourceRecords = sourceTask.poll();
30+
assertEquals(3, sourceRecords.size());
31+
for (SourceRecord record : sourceRecords) {
32+
Map<String, String> map = (Map<String, String>) record.value();
33+
Assertions.assertEquals("value1", map.get("field1"));
34+
Assertions.assertEquals("value2", map.get("field2"));
35+
}
36+
}
37+
}

0 commit comments

Comments
 (0)