Skip to content

Refine Cluster pipelining using Lettuce #2889

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-redis</artifactId>
<version>3.3.0-SNAPSHOT</version>
<version>3.3.0-GH-2888-SNAPSHOT</version>

<name>Spring Data Redis</name>
<description>Spring Data module for Redis</description>
Expand Down
3 changes: 3 additions & 0 deletions src/main/antora/modules/ROOT/pages/redis/cluster.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,6 @@ clusterOps.shutdown(NODE_7379); <1>

<1> Shut down node at 7379 and cross fingers there is a replica in place that can take over.
====

NOTE: Redis Cluster pipelining is currently only supported throug the Lettuce driver except for the following commands when using cross-slot keys: `rename`, `renameNX`, `sort`, `bLPop`, `bRPop`, `rPopLPush`, `bRPopLPush`, `info`, `sMove`, `sInter`, `sInterStore`, `sUnion`, `sUnionStore`, `sDiff`, `sDiffStore`.
Same-slot keys are fully supported.
8 changes: 7 additions & 1 deletion src/main/antora/modules/ROOT/pages/redis/pipelining.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ List<Object> results = stringRedisTemplate.executePipelined(
});
----

The preceding example runs a bulk right pop of items from a queue in a pipeline. The `results` `List` contains all of the popped items. `RedisTemplate` uses its value, hash key, and hash value serializers to deserialize all results before returning, so the returned items in the preceding example are Strings. There are additional `executePipelined` methods that let you pass a custom serializer for pipelined results.
The preceding example runs a bulk right pop of items from a queue in a pipeline.
The `results` `List` contains all the popped items. `RedisTemplate` uses its value, hash key, and hash value serializers to deserialize all results before returning, so the returned items in the preceding example are Strings.
There are additional `executePipelined` methods that let you pass a custom serializer for pipelined results.

Note that the value returned from the `RedisCallback` is required to be `null`, as this value is discarded in favor of returning the results of the pipelined commands.

Expand All @@ -35,3 +37,7 @@ factory.setPipeliningFlushPolicy(PipeliningFlushPolicy.buffered(3)); <1>
----
<1> Buffer locally and flush after every 3rd command.
====

NOTE: Pipelining is limited to Redis Standalone.
Redis Cluster is currently only supported through the Lettuce driver except for the following commands when using cross-slot keys: `rename`, `renameNX`, `sort`, `bLPop`, `bRPop`, `rPopLPush`, `bRPopLPush`, `info`, `sMove`, `sInter`, `sInterStore`, `sUnion`, `sUnionStore`, `sDiff`, `sDiffStore`.
Same-slot keys are fully supported.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
import io.lettuce.core.KeyScanCursor;
import io.lettuce.core.ScanArgs;

import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.connection.ClusterSlotHashUtil;
Expand Down Expand Up @@ -50,47 +47,6 @@ class LettuceClusterKeyCommands extends LettuceKeyCommands {
this.connection = connection;
}

@Override
public byte[] randomKey() {

List<RedisClusterNode> nodes = connection.clusterGetNodes();
Set<RedisClusterNode> inspectedNodes = new HashSet<>(nodes.size());

do {

RedisClusterNode node = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));

while (inspectedNodes.contains(node)) {
node = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
}
inspectedNodes.add(node);
byte[] key = randomKey(node);

if (key != null && key.length > 0) {
return key;
}
} while (nodes.size() != inspectedNodes.size());

return null;
}

@Override
public Set<byte[]> keys(byte[] pattern) {

Assert.notNull(pattern, "Pattern must not be null");

Collection<List<byte[]>> keysPerNode = connection.getClusterCommandExecutor()
.executeCommandOnAllNodes((LettuceClusterCommandCallback<List<byte[]>>) connection -> connection.keys(pattern))
.resultsAsList();

Set<byte[]> keys = new HashSet<>();

for (List<byte[]> keySet : keysPerNode) {
keys.addAll(keySet);
}
return keys;
}

@Override
public void rename(byte[] oldKey, byte[] newKey) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.lettuce.core.api.sync.RedisServerCommands;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -34,7 +33,6 @@
import org.springframework.data.redis.connection.lettuce.LettuceClusterConnection.LettuceClusterCommandCallback;
import org.springframework.data.redis.core.types.RedisClientInfo;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/**
* @author Mark Paluch
Expand Down Expand Up @@ -71,37 +69,11 @@ public void save(RedisClusterNode node) {
executeCommandOnSingleNode(RedisServerCommands::save, node);
}

@Override
public Long dbSize() {

Collection<Long> dbSizes = executeCommandOnAllNodes(RedisServerCommands::dbsize).resultsAsList();

if (CollectionUtils.isEmpty(dbSizes)) {
return 0L;
}

Long size = 0L;
for (Long value : dbSizes) {
size += value;
}
return size;
}

@Override
public Long dbSize(RedisClusterNode node) {
return executeCommandOnSingleNode(RedisServerCommands::dbsize, node).getValue();
}

@Override
public void flushDb() {
executeCommandOnAllNodes(RedisServerCommands::flushdb);
}

@Override
public void flushDb(FlushOption option) {
executeCommandOnAllNodes(it -> it.flushdb(LettuceConverters.toFlushMode(option)));
}

@Override
public void flushDb(RedisClusterNode node) {
executeCommandOnSingleNode(RedisServerCommands::flushdb, node);
Expand All @@ -112,16 +84,6 @@ public void flushDb(RedisClusterNode node, FlushOption option) {
executeCommandOnSingleNode(it -> it.flushdb(LettuceConverters.toFlushMode(option)), node);
}

@Override
public void flushAll() {
executeCommandOnAllNodes(RedisServerCommands::flushall);
}

@Override
public void flushAll(FlushOption option) {
executeCommandOnAllNodes(it -> it.flushall(LettuceConverters.toFlushMode(option)));
}

@Override
public void flushAll(RedisClusterNode node) {
executeCommandOnSingleNode(RedisServerCommands::flushall, node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
*/
package org.springframework.data.redis.connection.lettuce;

import java.util.Map;

import org.springframework.data.redis.connection.ClusterSlotHashUtil;
import org.springframework.util.Assert;

/**
* @author Christoph Strobl
* @author Mark Paluch
Expand All @@ -31,21 +26,4 @@ class LettuceClusterStringCommands extends LettuceStringCommands {
super(connection);
}

@Override
public Boolean mSetNX(Map<byte[], byte[]> tuples) {

Assert.notNull(tuples, "Tuples must not be null");

if (ClusterSlotHashUtil.isSameSlotForAllKeys(tuples.keySet().toArray(new byte[tuples.keySet().size()][]))) {
return super.mSetNX(tuples);
}

boolean result = true;
for (Map.Entry<byte[], byte[]> entry : tuples.entrySet()) {
if (!setNX(entry.getKey(), entry.getValue()) && result) {
result = false;
}
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -102,8 +104,8 @@
*/
public class LettuceConnection extends AbstractRedisConnection {

private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION =
new FallbackExceptionTranslationStrategy(LettuceExceptionConverter.INSTANCE);
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new FallbackExceptionTranslationStrategy(
LettuceExceptionConverter.INSTANCE);

static final RedisCodec<byte[], byte[]> CODEC = ByteArrayCodec.INSTANCE;

Expand Down Expand Up @@ -189,8 +191,8 @@ public LettuceConnection(@Nullable StatefulRedisConnection<byte[], byte[]> share
/**
* Creates a new {@link LettuceConnection}.
*
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s.
* Should not be used for transactions or blocking operations.
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
* used for transactions or blocking operations.
* @param timeout The connection timeout (in milliseconds)
* @param client The {@link RedisClient} to use when making pub/sub connections.
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
Expand All @@ -209,8 +211,8 @@ public LettuceConnection(@Nullable StatefulRedisConnection<byte[], byte[]> share
/**
* Creates a new {@link LettuceConnection}.
*
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s.
* Should not be used for transactions or blocking operations.
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
* used for transactions or blocking operations.
* @param connectionProvider connection provider to obtain and release native connections.
* @param timeout The connection timeout (in milliseconds)
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
Expand All @@ -225,8 +227,8 @@ public LettuceConnection(@Nullable StatefulRedisConnection<byte[], byte[]> share
/**
* Creates a new {@link LettuceConnection}.
*
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s.
* Should not be used for transactions or blocking operations.
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
* used for transactions or blocking operations.
* @param connectionProvider connection provider to obtain and release native connections.
* @param timeout The connection timeout (in milliseconds)
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
Expand Down Expand Up @@ -453,24 +455,19 @@ private LettuceInvoker doInvoke(RedisClusterAsyncCommands<byte[], byte[]> connec

<T, R> LettuceResult<T, R> newLettuceResult(Future<T> resultHolder, Converter<T, R> converter) {

return LettuceResultBuilder.<T, R>forResponse(resultHolder)
.mappedWith(converter)
.convertPipelineAndTxResults(this.convertPipelineAndTxResults)
.build();
return LettuceResultBuilder.<T, R> forResponse(resultHolder).mappedWith(converter)
.convertPipelineAndTxResults(this.convertPipelineAndTxResults).build();
}

<T, R> LettuceResult<T, R> newLettuceResult(Future<T> resultHolder, Converter<T, R> converter,
Supplier<R> defaultValue) {

return LettuceResultBuilder.<T, R>forResponse(resultHolder)
.mappedWith(converter)
.convertPipelineAndTxResults(this.convertPipelineAndTxResults)
.defaultNullTo(defaultValue)
.build();
return LettuceResultBuilder.<T, R> forResponse(resultHolder).mappedWith(converter)
.convertPipelineAndTxResults(this.convertPipelineAndTxResults).defaultNullTo(defaultValue).build();
}

<T, R> LettuceResult<T, R> newLettuceStatusResult(Future<T> resultHolder) {
return LettuceResultBuilder.<T, R>forResponse(resultHolder).buildStatusResult();
return LettuceResultBuilder.<T, R> forResponse(resultHolder).buildStatusResult();
}

void pipeline(LettuceResult<?, ?> result) {
Expand Down Expand Up @@ -583,7 +580,7 @@ public List<Object> closePipeline() {
pipeliningFlushState = null;
isPipelined = false;

List<io.lettuce.core.protocol.RedisCommand<?, ?, ?>> futures = new ArrayList<>(ppline.size());
List<CompletableFuture<?>> futures = new ArrayList<>(ppline.size());

for (LettuceResult<?, ?> result : ppline) {
futures.add(result.getResultHolder());
Expand All @@ -600,10 +597,24 @@ public List<Object> closePipeline() {
if (done) {
for (LettuceResult<?, ?> result : ppline) {

if (result.getResultHolder().getOutput().hasError()) {
CompletableFuture<?> resultHolder = result.getResultHolder();
if (resultHolder.isCompletedExceptionally()) {

String message;
if (resultHolder instanceof io.lettuce.core.protocol.RedisCommand<?, ?, ?> rc) {
message = rc.getOutput().getError();
} else {
try {
resultHolder.get();
message = "";
} catch (InterruptedException ignore) {
message = "";
} catch (ExecutionException e) {
message = e.getCause().getMessage();
}
}

Exception exception = new InvalidDataAccessApiUsageException(result.getResultHolder()
.getOutput().getError());
Exception exception = new InvalidDataAccessApiUsageException(message);

// remember only the first error
if (problem == null) {
Expand Down Expand Up @@ -684,8 +695,8 @@ public List<Object> exec() {
LettuceTransactionResultConverter resultConverter = new LettuceTransactionResultConverter(
new LinkedList<>(txResults), exceptionConverter);

pipeline(newLettuceResult(exec, source ->
resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
pipeline(newLettuceResult(exec,
source -> resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));

return null;
}
Expand Down Expand Up @@ -837,8 +848,7 @@ <T> T failsafeReadScanValues(List<?> source, @SuppressWarnings("rawtypes") @Null

try {
return (T) (converter != null ? converter.convert(source) : source);
} catch (IndexOutOfBoundsException ignore) {
}
} catch (IndexOutOfBoundsException ignore) {}

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@
*/
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.protocol.RedisCommand;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Supplier;

Expand All @@ -34,7 +33,7 @@
* @since 2.1
*/
@SuppressWarnings("rawtypes")
class LettuceResult<T, R> extends FutureResult<RedisCommand<?, T, ?>> {
class LettuceResult<T, R> extends FutureResult<CompletableFuture<T>> {

private final boolean convertPipelineAndTxResults;

Expand All @@ -51,15 +50,15 @@ class LettuceResult<T, R> extends FutureResult<RedisCommand<?, T, ?>> {
LettuceResult(Future<T> resultHolder, Supplier<R> defaultReturnValue, boolean convertPipelineAndTxResults,
@Nullable Converter<T, R> converter) {

super((RedisCommand) resultHolder, converter, defaultReturnValue);
super((CompletableFuture<T>) resultHolder, converter, defaultReturnValue);
this.convertPipelineAndTxResults = convertPipelineAndTxResults;
}

@Nullable
@Override
@SuppressWarnings("unchecked")
public T get() {
return (T) getResultHolder().getOutput().get();
return (T) getResultHolder().join();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public abstract class ClusterTestVariables {
public static final String KEY_1 = "key1";
public static final String KEY_2 = "key2";
public static final String KEY_3 = "key3";
public static final String KEY_4 = "key4";

public static final String VALUE_1 = "value1";
public static final String VALUE_2 = "value2";
Expand Down
Loading
Loading