Skip to content

Commit 7c04f6b

Browse files
mp911dechristophstrobl
authored andcommitted
Accept CompletableFuture subtypes for Lettuce pipelining.
We now no longer require RedisCommand but resort to CompletableFuture as the general asynchronous result type for Lettuce pipelining to allow subtypes such as PipelinedRedisFuture. Closes: #2888 Original Pull Request: #2889
1 parent eac1356 commit 7c04f6b

File tree

4 files changed

+60
-17
lines changed

4 files changed

+60
-17
lines changed

Diff for: src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

+20-4
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@
4949
import java.util.List;
5050
import java.util.Map;
5151
import java.util.Queue;
52+
import java.util.concurrent.CompletableFuture;
5253
import java.util.concurrent.ConcurrentHashMap;
54+
import java.util.concurrent.ExecutionException;
5355
import java.util.concurrent.Future;
5456
import java.util.concurrent.TimeUnit;
5557
import java.util.concurrent.atomic.AtomicLong;
@@ -581,7 +583,7 @@ public List<Object> closePipeline() {
581583
pipeliningFlushState = null;
582584
isPipelined = false;
583585

584-
List<io.lettuce.core.protocol.RedisCommand<?, ?, ?>> futures = new ArrayList<>(ppline.size());
586+
List<CompletableFuture<?>> futures = new ArrayList<>(ppline.size());
585587

586588
for (LettuceResult<?, ?> result : ppline) {
587589
futures.add(result.getResultHolder());
@@ -598,10 +600,24 @@ public List<Object> closePipeline() {
598600
if (done) {
599601
for (LettuceResult<?, ?> result : ppline) {
600602

601-
if (result.getResultHolder().getOutput().hasError()) {
603+
CompletableFuture<?> resultHolder = result.getResultHolder();
604+
if (resultHolder.isCompletedExceptionally()) {
605+
606+
String message;
607+
if (resultHolder instanceof io.lettuce.core.protocol.RedisCommand<?, ?, ?> rc) {
608+
message = rc.getOutput().getError();
609+
} else {
610+
try {
611+
resultHolder.get();
612+
message = "";
613+
} catch (InterruptedException ignore) {
614+
message = "";
615+
} catch (ExecutionException e) {
616+
message = e.getCause().getMessage();
617+
}
618+
}
602619

603-
Exception exception = new InvalidDataAccessApiUsageException(result.getResultHolder()
604-
.getOutput().getError());
620+
Exception exception = new InvalidDataAccessApiUsageException(message);
605621

606622
// remember only the first error
607623
if (problem == null) {

Diff for: src/main/java/org/springframework/data/redis/connection/lettuce/LettuceResult.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
*/
1616
package org.springframework.data.redis.connection.lettuce;
1717

18-
import io.lettuce.core.protocol.RedisCommand;
19-
18+
import java.util.concurrent.CompletableFuture;
2019
import java.util.concurrent.Future;
2120
import java.util.function.Supplier;
2221

@@ -34,7 +33,7 @@
3433
* @since 2.1
3534
*/
3635
@SuppressWarnings("rawtypes")
37-
class LettuceResult<T, R> extends FutureResult<RedisCommand<?, T, ?>> {
36+
class LettuceResult<T, R> extends FutureResult<CompletableFuture<T>> {
3837

3938
private final boolean convertPipelineAndTxResults;
4039

@@ -51,15 +50,15 @@ class LettuceResult<T, R> extends FutureResult<RedisCommand<?, T, ?>> {
5150
LettuceResult(Future<T> resultHolder, Supplier<R> defaultReturnValue, boolean convertPipelineAndTxResults,
5251
@Nullable Converter<T, R> converter) {
5352

54-
super((RedisCommand) resultHolder, converter, defaultReturnValue);
53+
super((CompletableFuture<T>) resultHolder, converter, defaultReturnValue);
5554
this.convertPipelineAndTxResults = convertPipelineAndTxResults;
5655
}
5756

5857
@Nullable
5958
@Override
6059
@SuppressWarnings("unchecked")
6160
public T get() {
62-
return (T) getResultHolder().getOutput().get();
61+
return (T) getResultHolder().join();
6362
}
6463

6564
@Override

Diff for: src/test/java/org/springframework/data/redis/connection/ClusterTestVariables.java

+1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public abstract class ClusterTestVariables {
2525
public static final String KEY_1 = "key1";
2626
public static final String KEY_2 = "key2";
2727
public static final String KEY_3 = "key3";
28+
public static final String KEY_4 = "key4";
2829

2930
public static final String VALUE_1 = "value1";
3031
public static final String VALUE_2 = "value2";

Diff for: src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java

+35-8
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class LettuceClusterConnectionTests implements ClusterConnectionTests {
8383
private static final byte[] KEY_1_BYTES = LettuceConverters.toBytes(KEY_1);
8484
private static final byte[] KEY_2_BYTES = LettuceConverters.toBytes(KEY_2);
8585
private static final byte[] KEY_3_BYTES = LettuceConverters.toBytes(KEY_3);
86+
private static final byte[] KEY_4_BYTES = LettuceConverters.toBytes(KEY_4);
8687

8788
private static final byte[] SAME_SLOT_KEY_1_BYTES = LettuceConverters.toBytes(SAME_SLOT_KEY_1);
8889
private static final byte[] SAME_SLOT_KEY_2_BYTES = LettuceConverters.toBytes(SAME_SLOT_KEY_2);
@@ -91,6 +92,7 @@ public class LettuceClusterConnectionTests implements ClusterConnectionTests {
9192
private static final byte[] VALUE_1_BYTES = LettuceConverters.toBytes(VALUE_1);
9293
private static final byte[] VALUE_2_BYTES = LettuceConverters.toBytes(VALUE_2);
9394
private static final byte[] VALUE_3_BYTES = LettuceConverters.toBytes(VALUE_3);
95+
private static final byte[] VALUE_4_BYTES = LettuceConverters.toBytes(VALUE_4);
9496

9597
private static final GeoLocation<String> ARIGENTO = new GeoLocation<>("arigento", POINT_ARIGENTO);
9698
private static final GeoLocation<String> CATANIA = new GeoLocation<>("catania", POINT_CATANIA);
@@ -179,7 +181,32 @@ void shouldCreateClusterConnectionWithPooling() {
179181
} finally {
180182
factory.destroy();
181183
}
184+
}
185+
186+
@Test // GH-2888
187+
void shouldPipelineAdvancedClusterApi() {
188+
189+
LettuceConnectionFactory factory = createConnectionFactory();
190+
191+
ConnectionVerifier.create(factory) //
192+
.execute(connection -> {
193+
194+
connection.set(KEY_1_BYTES, VALUE_1_BYTES);
195+
connection.set(KEY_2_BYTES, VALUE_2_BYTES);
196+
connection.set(KEY_4_BYTES, VALUE_4_BYTES);
197+
198+
connection.openPipeline();
199+
connection.keyCommands().randomKey();
200+
connection.stringCommands().mGet(KEY_1_BYTES, KEY_2_BYTES);
182201

202+
List<Object> objects = connection.closePipeline();
203+
204+
assertThat(objects).hasSize(1);
205+
assertThat(objects).element(0).isInstanceOf(List.class);
206+
207+
List<Object> mget = (List<Object>) objects.get(0);
208+
assertThat(mget).containsExactly(VALUE_1_BYTES, VALUE_2_BYTES);
209+
}).verifyAndClose();
183210
}
184211

185212
@Test // DATAREDIS-315
@@ -2821,13 +2848,13 @@ void bitFieldIncrByWithOverflowShouldWorkCorrectly() {
28212848

28222849
assertThat(clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1),
28232850
create().incr(unsigned(2)).valueAt(BitFieldSubCommands.Offset.offset(102L)).overflow(FAIL).by(1L)))
2824-
.containsExactly(1L);
2851+
.containsExactly(1L);
28252852
assertThat(clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1),
28262853
create().incr(unsigned(2)).valueAt(BitFieldSubCommands.Offset.offset(102L)).overflow(FAIL).by(1L)))
2827-
.containsExactly(2L);
2854+
.containsExactly(2L);
28282855
assertThat(clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1),
28292856
create().incr(unsigned(2)).valueAt(BitFieldSubCommands.Offset.offset(102L)).overflow(FAIL).by(1L)))
2830-
.containsExactly(3L);
2857+
.containsExactly(3L);
28312858
assertThat(clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1),
28322859
create().incr(unsigned(2)).valueAt(BitFieldSubCommands.Offset.offset(102L)).overflow(FAIL).by(1L))).isNotNull();
28332860
}
@@ -2837,7 +2864,7 @@ void bitfieldShouldAllowMultipleSubcommands() {
28372864

28382865
assertThat(clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1),
28392866
create().incr(signed(5)).valueAt(BitFieldSubCommands.Offset.offset(100L)).by(1L).get(unsigned(4)).valueAt(0L)))
2840-
.containsExactly(1L, 0L);
2867+
.containsExactly(1L, 0L);
28412868
}
28422869

28432870
@Test // DATAREDIS-562
@@ -2847,13 +2874,13 @@ void bitfieldShouldWorkUsingNonZeroBasedOffset() {
28472874
clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1),
28482875
create().set(INT_8).valueAt(BitFieldSubCommands.Offset.offset(0L).multipliedByTypeLength()).to(100L)
28492876
.set(INT_8).valueAt(BitFieldSubCommands.Offset.offset(1L).multipliedByTypeLength()).to(200L)))
2850-
.containsExactly(0L, 0L);
2877+
.containsExactly(0L, 0L);
28512878
assertThat(
28522879
clusterConnection.stringCommands()
28532880
.bitField(LettuceConverters.toBytes(KEY_1),
28542881
create().get(INT_8).valueAt(BitFieldSubCommands.Offset.offset(0L).multipliedByTypeLength()).get(INT_8)
2855-
.valueAt(BitFieldSubCommands.Offset.offset(1L).multipliedByTypeLength()))).containsExactly(100L,
2856-
-56L);
2882+
.valueAt(BitFieldSubCommands.Offset.offset(1L).multipliedByTypeLength())))
2883+
.containsExactly(100L, -56L);
28572884
}
28582885

28592886
@Test // DATAREDIS-1103
@@ -2864,7 +2891,7 @@ void setKeepTTL() {
28642891

28652892
assertThat(
28662893
clusterConnection.stringCommands().set(KEY_1_BYTES, VALUE_2_BYTES, Expiration.keepTtl(), SetOption.upsert()))
2867-
.isTrue();
2894+
.isTrue();
28682895

28692896
assertThat(nativeConnection.ttl(KEY_1)).isCloseTo(expireSeconds, Offset.offset(5L));
28702897
assertThat(nativeConnection.get(KEY_1)).isEqualTo(VALUE_2);

0 commit comments

Comments
 (0)