Skip to content

Commit adf692e

Browse files
committed
Merge branch 'master' into key-arg-prefix
2 parents d9770a9 + a02b2eb commit adf692e

File tree

85 files changed

+14388
-98
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+14388
-98
lines changed

pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@
189189
</plugin>
190190
<plugin>
191191
<artifactId>maven-compiler-plugin</artifactId>
192-
<version>3.12.1</version>
192+
<version>3.13.0</version>
193193
<configuration>
194194
<source>1.8</source>
195195
<target>1.8</target>
@@ -300,7 +300,7 @@
300300
<!--Sign the components - this is required by maven central for releases -->
301301
<plugin>
302302
<artifactId>maven-gpg-plugin</artifactId>
303-
<version>3.2.0</version>
303+
<version>3.2.1</version>
304304
<configuration>
305305
<gpgArguments>
306306
<arg>--pinentry-mode</arg>

src/main/java/redis/clients/jedis/BuilderFactory.java

+33-4
Original file line numberDiff line numberDiff line change
@@ -1420,10 +1420,10 @@ public List<Map.Entry<String, List<StreamEntry>>> build(Object data) {
14201420
.collect(Collectors.toList());
14211421
} else {
14221422
List<Map.Entry<String, List<StreamEntry>>> result = new ArrayList<>(list.size());
1423-
for (Object streamObj : list) {
1424-
List<Object> stream = (List<Object>) streamObj;
1425-
String streamKey = STRING.build(stream.get(0));
1426-
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(stream.get(1));
1423+
for (Object anObj : list) {
1424+
List<Object> streamObj = (List<Object>) anObj;
1425+
String streamKey = STRING.build(streamObj.get(0));
1426+
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1));
14271427
result.add(KeyValue.of(streamKey, streamEntries));
14281428
}
14291429
return result;
@@ -1436,6 +1436,35 @@ public String toString() {
14361436
}
14371437
};
14381438

1439+
public static final Builder<Map<String, List<StreamEntry>>> STREAM_READ_MAP_RESPONSE
1440+
= new Builder<Map<String, List<StreamEntry>>>() {
1441+
@Override
1442+
public Map<String, List<StreamEntry>> build(Object data) {
1443+
if (data == null) return null;
1444+
List list = (List) data;
1445+
if (list.isEmpty()) return Collections.emptyMap();
1446+
1447+
if (list.get(0) instanceof KeyValue) {
1448+
return ((List<KeyValue>) list).stream()
1449+
.collect(Collectors.toMap(kv -> STRING.build(kv.getKey()), kv -> STREAM_ENTRY_LIST.build(kv.getValue())));
1450+
} else {
1451+
Map<String, List<StreamEntry>> result = new HashMap<>(list.size());
1452+
for (Object anObj : list) {
1453+
List<Object> streamObj = (List<Object>) anObj;
1454+
String streamKey = STRING.build(streamObj.get(0));
1455+
List<StreamEntry> streamEntries = STREAM_ENTRY_LIST.build(streamObj.get(1));
1456+
result.put(streamKey, streamEntries);
1457+
}
1458+
return result;
1459+
}
1460+
}
1461+
1462+
@Override
1463+
public String toString() {
1464+
return "Map<String, List<StreamEntry>>";
1465+
}
1466+
};
1467+
14391468
public static final Builder<List<StreamPendingEntry>> STREAM_PENDING_ENTRY_LIST = new Builder<List<StreamPendingEntry>>() {
14401469
@Override
14411470
@SuppressWarnings("unchecked")

src/main/java/redis/clients/jedis/CommandObjects.java

+21
Original file line numberDiff line numberDiff line change
@@ -2672,6 +2672,15 @@ public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xread(
26722672
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE);
26732673
}
26742674

2675+
public final CommandObject<Map<String, List<StreamEntry>>> xreadAsMap(
2676+
XReadParams xReadParams, Map<String, StreamEntryID> streams) {
2677+
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
2678+
Set<Map.Entry<String, StreamEntryID>> entrySet = streams.entrySet();
2679+
entrySet.forEach(entry -> args.key(entry.getKey()));
2680+
entrySet.forEach(entry -> args.add(entry.getValue()));
2681+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE);
2682+
}
2683+
26752684
public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xreadGroup(
26762685
String groupName, String consumer, XReadGroupParams xReadGroupParams,
26772686
Map<String, StreamEntryID> streams) {
@@ -2684,6 +2693,18 @@ public final CommandObject<List<Map.Entry<String, List<StreamEntry>>>> xreadGrou
26842693
return new CommandObject<>(args, BuilderFactory.STREAM_READ_RESPONSE);
26852694
}
26862695

2696+
public final CommandObject<Map<String, List<StreamEntry>>> xreadGroupAsMap(
2697+
String groupName, String consumer, XReadGroupParams xReadGroupParams,
2698+
Map<String, StreamEntryID> streams) {
2699+
CommandArguments args = commandArguments(XREADGROUP)
2700+
.add(GROUP).add(groupName).add(consumer)
2701+
.addParams(xReadGroupParams).add(STREAMS);
2702+
Set<Map.Entry<String, StreamEntryID>> entrySet = streams.entrySet();
2703+
entrySet.forEach(entry -> args.key(entry.getKey()));
2704+
entrySet.forEach(entry -> args.add(entry.getValue()));
2705+
return new CommandObject<>(args, BuilderFactory.STREAM_READ_MAP_RESPONSE);
2706+
}
2707+
26872708
public final CommandObject<List<Object>> xread(XReadParams xReadParams, Map.Entry<byte[], byte[]>... streams) {
26882709
CommandArguments args = commandArguments(XREAD).addParams(xReadParams).add(STREAMS);
26892710
for (Map.Entry<byte[], byte[]> entry : streams) {

src/main/java/redis/clients/jedis/Jedis.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -9394,6 +9394,12 @@ public List<Map.Entry<String, List<StreamEntry>>> xread(final XReadParams xReadP
93949394
return connection.executeCommand(commandObjects.xread(xReadParams, streams));
93959395
}
93969396

9397+
@Override
9398+
public Map<String, List<StreamEntry>> xreadAsMap(final XReadParams xReadParams, final Map<String, StreamEntryID> streams) {
9399+
checkIsInMultiOrPipeline();
9400+
return connection.executeCommand(commandObjects.xreadAsMap(xReadParams, streams));
9401+
}
9402+
93979403
@Override
93989404
public long xack(final String key, final String group, final StreamEntryID... ids) {
93999405
checkIsInMultiOrPipeline();
@@ -9450,13 +9456,19 @@ public long xtrim(final String key, final XTrimParams params) {
94509456
}
94519457

94529458
@Override
9453-
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(final String groupName,
9454-
final String consumer, final XReadGroupParams xReadGroupParams,
9455-
final Map<String, StreamEntryID> streams) {
9459+
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(final String groupName, final String consumer,
9460+
final XReadGroupParams xReadGroupParams, final Map<String, StreamEntryID> streams) {
94569461
checkIsInMultiOrPipeline();
94579462
return connection.executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
94589463
}
94599464

9465+
@Override
9466+
public Map<String, List<StreamEntry>> xreadGroupAsMap(final String groupName, final String consumer,
9467+
final XReadGroupParams xReadGroupParams, final Map<String, StreamEntryID> streams) {
9468+
checkIsInMultiOrPipeline();
9469+
return connection.executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));
9470+
}
9471+
94609472
@Override
94619473
public StreamPendingSummary xpending(final String key, final String groupName) {
94629474
checkIsInMultiOrPipeline();

src/main/java/redis/clients/jedis/JedisClusterInfoCache.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -11,22 +11,26 @@
1111
import java.util.Map;
1212
import java.util.Map.Entry;
1313
import java.util.Set;
14+
1415
import java.util.concurrent.Executors;
1516
import java.util.concurrent.ScheduledExecutorService;
1617
import java.util.concurrent.TimeUnit;
1718
import java.util.concurrent.locks.Lock;
1819
import java.util.concurrent.locks.ReentrantLock;
1920
import java.util.concurrent.locks.ReentrantReadWriteLock;
20-
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
2121

22+
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
2223
import org.slf4j.Logger;
2324
import org.slf4j.LoggerFactory;
25+
26+
import redis.clients.jedis.annots.Internal;
2427
import redis.clients.jedis.exceptions.JedisClusterOperationException;
2528
import redis.clients.jedis.exceptions.JedisException;
2629
import redis.clients.jedis.util.SafeEncoder;
2730

2831
import static redis.clients.jedis.JedisCluster.INIT_NO_ERROR_PROPERTY;
2932

33+
@Internal
3034
public class JedisClusterInfoCache {
3135

3236
private static final Logger logger = LoggerFactory.getLogger(JedisClusterInfoCache.class);

src/main/java/redis/clients/jedis/MultiClusterClientConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Arrays;
88
import java.util.List;
99

10+
import redis.clients.jedis.annots.Experimental;
1011
import redis.clients.jedis.exceptions.JedisConnectionException;
1112
import redis.clients.jedis.exceptions.JedisValidationException;
1213

@@ -25,6 +26,7 @@
2526
* <p>
2627
*/
2728
// TODO: move
29+
@Experimental
2830
public final class MultiClusterClientConfig {
2931

3032
private static final int RETRY_MAX_ATTEMPTS_DEFAULT = 3;

src/main/java/redis/clients/jedis/PipeliningBase.java

+25
Original file line numberDiff line numberDiff line change
@@ -1536,11 +1536,21 @@ public Response<List<Map.Entry<String, List<StreamEntry>>>> xread(XReadParams xR
15361536
return appendCommand(commandObjects.xread(xReadParams, streams));
15371537
}
15381538

1539+
@Override
1540+
public Response<Map<String, List<StreamEntry>>> xreadAsMap(XReadParams xReadParams, Map<String, StreamEntryID> streams) {
1541+
return appendCommand(commandObjects.xreadAsMap(xReadParams, streams));
1542+
}
1543+
15391544
@Override
15401545
public Response<List<Map.Entry<String, List<StreamEntry>>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
15411546
return appendCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
15421547
}
15431548

1549+
@Override
1550+
public Response<Map<String, List<StreamEntry>>> xreadGroupAsMap(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
1551+
return appendCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));
1552+
}
1553+
15441554
@Override
15451555
public Response<Object> eval(String script) {
15461556
return appendCommand(commandObjects.eval(script));
@@ -3907,6 +3917,16 @@ public Response<String> tsDeleteRule(String sourceKey, String destKey) {
39073917
public Response<List<String>> tsQueryIndex(String... filters) {
39083918
return appendCommand(commandObjects.tsQueryIndex(filters));
39093919
}
3920+
3921+
@Override
3922+
public Response<TSInfo> tsInfo(String key) {
3923+
return appendCommand(commandObjects.tsInfo(key));
3924+
}
3925+
3926+
@Override
3927+
public Response<TSInfo> tsInfoDebug(String key) {
3928+
return appendCommand(commandObjects.tsInfoDebug(key));
3929+
}
39103930
// RedisTimeSeries commands
39113931

39123932
// RedisBloom commands
@@ -4015,6 +4035,11 @@ public Response<Boolean> cfExists(String key, String item) {
40154035
return appendCommand(commandObjects.cfExists(key, item));
40164036
}
40174037

4038+
@Override
4039+
public Response<List<Boolean>> cfMExists(String key, String... items) {
4040+
return appendCommand(commandObjects.cfMExists(key, items));
4041+
}
4042+
40184043
@Override
40194044
public Response<Boolean> cfDel(String key, String item) {
40204045
return appendCommand(commandObjects.cfDel(key, item));

src/main/java/redis/clients/jedis/StreamEntryID.java

+48-9
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,7 @@ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassN
8080
/**
8181
* Should be used only with XADD
8282
*
83-
* <code>
84-
* XADD mystream * field1 value1
85-
* </code>
83+
* {@code XADD mystream * field1 value1}
8684
*/
8785
public static final StreamEntryID NEW_ENTRY = new StreamEntryID() {
8886

@@ -97,11 +95,31 @@ public String toString() {
9795
/**
9896
* Should be used only with XGROUP CREATE
9997
*
100-
* <code>
101-
* XGROUP CREATE mystream consumer-group-name $
102-
* </code>
98+
* {@code XGROUP CREATE mystream consumer-group-name $}
10399
*/
104-
public static final StreamEntryID LAST_ENTRY = new StreamEntryID() {
100+
public static final StreamEntryID XGROUP_LAST_ENTRY = new StreamEntryID() {
101+
102+
private static final long serialVersionUID = 1L;
103+
104+
@Override
105+
public String toString() {
106+
return "$";
107+
}
108+
};
109+
110+
/**
111+
* @deprecated Use {@link StreamEntryID#XGROUP_LAST_ENTRY} for XGROUP CREATE command or
112+
* {@link StreamEntryID#XREAD_NEW_ENTRY} for XREAD command.
113+
*/
114+
@Deprecated
115+
public static final StreamEntryID LAST_ENTRY = XGROUP_LAST_ENTRY;
116+
117+
/**
118+
* Should be used only with XREAD
119+
*
120+
* {@code XREAD BLOCK 5000 COUNT 100 STREAMS mystream $}
121+
*/
122+
public static final StreamEntryID XREAD_NEW_ENTRY = new StreamEntryID() {
105123

106124
private static final long serialVersionUID = 1L;
107125

@@ -114,9 +132,9 @@ public String toString() {
114132
/**
115133
* Should be used only with XREADGROUP
116134
* <p>
117-
* {@code XREADGROUP $GroupName $ConsumerName BLOCK 2000 COUNT 10 STREAMS mystream >}
135+
* {@code XREADGROUP GROUP mygroup myconsumer STREAMS mystream >}
118136
*/
119-
public static final StreamEntryID UNRECEIVED_ENTRY = new StreamEntryID() {
137+
public static final StreamEntryID XREADGROUP_UNDELIVERED_ENTRY = new StreamEntryID() {
120138

121139
private static final long serialVersionUID = 1L;
122140

@@ -126,6 +144,12 @@ public String toString() {
126144
}
127145
};
128146

147+
/**
148+
* @deprecated Use {@link StreamEntryID#XREADGROUP_UNDELIVERED_ENTRY}.
149+
*/
150+
@Deprecated
151+
public static final StreamEntryID UNRECEIVED_ENTRY = XREADGROUP_UNDELIVERED_ENTRY;
152+
129153
/**
130154
* Can be used in XRANGE, XREVRANGE and XPENDING commands.
131155
*/
@@ -151,4 +175,19 @@ public String toString() {
151175
return "+";
152176
}
153177
};
178+
179+
/**
180+
* Should be used only with XREAD
181+
*
182+
* {@code XREAD STREAMS mystream +}
183+
*/
184+
public static final StreamEntryID XREAD_LAST_ENTRY = new StreamEntryID() {
185+
186+
private static final long serialVersionUID = 1L;
187+
188+
@Override
189+
public String toString() {
190+
return "+";
191+
}
192+
};
154193
}

src/main/java/redis/clients/jedis/UnifiedJedis.java

+14-1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
1010
import org.json.JSONArray;
1111

12+
import redis.clients.jedis.annots.Experimental;
1213
import redis.clients.jedis.args.*;
1314
import redis.clients.jedis.bloom.*;
1415
import redis.clients.jedis.commands.JedisCommands;
@@ -192,6 +193,7 @@ public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTo
192193
* by using simple configuration which is passed through from Resilience4j - https://resilience4j.readme.io/docs
193194
* <p>
194195
*/
196+
@Experimental
195197
public UnifiedJedis(MultiClusterPooledConnectionProvider provider) {
196198
this(new CircuitBreakerCommandExecutor(provider), provider);
197199
}
@@ -457,7 +459,7 @@ public long pexpireAt(byte[] key, long millisecondsTimestamp) {
457459

458460
@Override
459461
public long pexpireAt(byte[] key, long millisecondsTimestamp, ExpiryOption expiryOption) {
460-
return executeCommand(commandObjects.expireAt(key, millisecondsTimestamp, expiryOption));
462+
return executeCommand(commandObjects.pexpireAt(key, millisecondsTimestamp, expiryOption));
461463
}
462464

463465
@Override
@@ -3064,12 +3066,23 @@ public List<Map.Entry<String, List<StreamEntry>>> xread(XReadParams xReadParams,
30643066
return executeCommand(commandObjects.xread(xReadParams, streams));
30653067
}
30663068

3069+
@Override
3070+
public Map<String, List<StreamEntry>> xreadAsMap(XReadParams xReadParams, Map<String, StreamEntryID> streams) {
3071+
return executeCommand(commandObjects.xreadAsMap(xReadParams, streams));
3072+
}
3073+
30673074
@Override
30683075
public List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String groupName, String consumer,
30693076
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
30703077
return executeCommand(commandObjects.xreadGroup(groupName, consumer, xReadGroupParams, streams));
30713078
}
30723079

3080+
@Override
3081+
public Map<String, List<StreamEntry>> xreadGroupAsMap(String groupName, String consumer,
3082+
XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
3083+
return executeCommand(commandObjects.xreadGroupAsMap(groupName, consumer, xReadGroupParams, streams));
3084+
}
3085+
30733086
@Override
30743087
public byte[] xadd(byte[] key, XAddParams params, Map<byte[], byte[]> hash) {
30753088
return executeCommand(commandObjects.xadd(key, params, hash));
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package redis.clients.jedis.annots;
2+
3+
import java.lang.annotation.Documented;
4+
import java.lang.annotation.ElementType;
5+
import java.lang.annotation.Target;
6+
7+
/**
8+
* Annotation to mark classes for experimental development.
9+
* <p>
10+
* Classes with this annotation may be renamed, changed or even removed in a future version. This
11+
* annotation doesn't mean that the implementation has an 'experimental' quality.
12+
* <p>
13+
* If a type is marked with this annotation, all its members are considered experimental.
14+
*/
15+
@Documented
16+
@Target({ElementType.TYPE, ElementType.METHOD, ElementType.FIELD, ElementType.CONSTRUCTOR})
17+
public @interface Experimental { }

0 commit comments

Comments
 (0)