Skip to content

Commit 024e066

Browse files
authored
Pub/Sub on RESP3 (#3388)
1 parent e6341ad commit 024e066

File tree

6 files changed

+127
-87
lines changed

6 files changed

+127
-87
lines changed

src/main/java/redis/clients/jedis/BinaryJedisPubSub.java

+50-35
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import redis.clients.jedis.Protocol.Command;
99
import redis.clients.jedis.exceptions.JedisException;
10+
import redis.clients.jedis.util.SafeEncoder;
1011

1112
public abstract class BinaryJedisPubSub {
1213
private int subscribedChannels = 0;
@@ -101,42 +102,56 @@ public void proceed(Connection client, byte[]... channels) {
101102

102103
private void process() {
103104
do {
104-
List<Object> reply = client.getUnflushedObjectMultiBulkReply();
105-
final Object firstObj = reply.get(0);
106-
if (!(firstObj instanceof byte[])) {
107-
throw new JedisException("Unknown message type: " + firstObj);
108-
}
109-
final byte[] resp = (byte[]) firstObj;
110-
if (Arrays.equals(SUBSCRIBE.getRaw(), resp)) {
111-
subscribedChannels = ((Long) reply.get(2)).intValue();
112-
final byte[] bchannel = (byte[]) reply.get(1);
113-
onSubscribe(bchannel, subscribedChannels);
114-
} else if (Arrays.equals(UNSUBSCRIBE.getRaw(), resp)) {
115-
subscribedChannels = ((Long) reply.get(2)).intValue();
116-
final byte[] bchannel = (byte[]) reply.get(1);
117-
onUnsubscribe(bchannel, subscribedChannels);
118-
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
119-
final byte[] bchannel = (byte[]) reply.get(1);
120-
final byte[] bmesg = (byte[]) reply.get(2);
121-
onMessage(bchannel, bmesg);
122-
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
123-
final byte[] bpattern = (byte[]) reply.get(1);
124-
final byte[] bchannel = (byte[]) reply.get(2);
125-
final byte[] bmesg = (byte[]) reply.get(3);
126-
onPMessage(bpattern, bchannel, bmesg);
127-
} else if (Arrays.equals(PSUBSCRIBE.getRaw(), resp)) {
128-
subscribedChannels = ((Long) reply.get(2)).intValue();
129-
final byte[] bpattern = (byte[]) reply.get(1);
130-
onPSubscribe(bpattern, subscribedChannels);
131-
} else if (Arrays.equals(PUNSUBSCRIBE.getRaw(), resp)) {
132-
subscribedChannels = ((Long) reply.get(2)).intValue();
133-
final byte[] bpattern = (byte[]) reply.get(1);
134-
onPUnsubscribe(bpattern, subscribedChannels);
135-
} else if (Arrays.equals(PONG.getRaw(), resp)) {
136-
final byte[] bpattern = (byte[]) reply.get(1);
137-
onPong(bpattern);
105+
Object reply = client.getUnflushedObject();
106+
107+
if (reply instanceof List) {
108+
List<Object> listReply = (List<Object>) reply;
109+
final Object firstObj = listReply.get(0);
110+
if (!(firstObj instanceof byte[])) {
111+
throw new JedisException("Unknown message type: " + firstObj);
112+
}
113+
final byte[] resp = (byte[]) firstObj;
114+
if (Arrays.equals(SUBSCRIBE.getRaw(), resp)) {
115+
subscribedChannels = ((Long) listReply.get(2)).intValue();
116+
final byte[] bchannel = (byte[]) listReply.get(1);
117+
onSubscribe(bchannel, subscribedChannels);
118+
} else if (Arrays.equals(UNSUBSCRIBE.getRaw(), resp)) {
119+
subscribedChannels = ((Long) listReply.get(2)).intValue();
120+
final byte[] bchannel = (byte[]) listReply.get(1);
121+
onUnsubscribe(bchannel, subscribedChannels);
122+
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
123+
final byte[] bchannel = (byte[]) listReply.get(1);
124+
final byte[] bmesg = (byte[]) listReply.get(2);
125+
onMessage(bchannel, bmesg);
126+
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
127+
final byte[] bpattern = (byte[]) listReply.get(1);
128+
final byte[] bchannel = (byte[]) listReply.get(2);
129+
final byte[] bmesg = (byte[]) listReply.get(3);
130+
onPMessage(bpattern, bchannel, bmesg);
131+
} else if (Arrays.equals(PSUBSCRIBE.getRaw(), resp)) {
132+
subscribedChannels = ((Long) listReply.get(2)).intValue();
133+
final byte[] bpattern = (byte[]) listReply.get(1);
134+
onPSubscribe(bpattern, subscribedChannels);
135+
} else if (Arrays.equals(PUNSUBSCRIBE.getRaw(), resp)) {
136+
subscribedChannels = ((Long) listReply.get(2)).intValue();
137+
final byte[] bpattern = (byte[]) listReply.get(1);
138+
onPUnsubscribe(bpattern, subscribedChannels);
139+
} else if (Arrays.equals(PONG.getRaw(), resp)) {
140+
final byte[] bpattern = (byte[]) listReply.get(1);
141+
onPong(bpattern);
142+
} else {
143+
throw new JedisException("Unknown message type: " + firstObj);
144+
}
145+
} else if (reply instanceof byte[]) {
146+
byte[] resp = (byte[]) reply;
147+
String str = SafeEncoder.encode(resp);
148+
if ("PONG".equalsIgnoreCase(str)) {
149+
onPong(null);
150+
} else {
151+
onPong(resp);
152+
}
138153
} else {
139-
throw new JedisException("Unknown message type: " + firstObj);
154+
throw new JedisException("Unknown message type: " + reply);
140155
}
141156
} while (isSubscribed());
142157
}

src/main/java/redis/clients/jedis/Connection.java

+11-3
Original file line numberDiff line numberDiff line change
@@ -302,14 +302,19 @@ public List<byte[]> getBinaryMultiBulkReply() {
302302
return (List<byte[]>) readProtocolWithCheckingBroken();
303303
}
304304

305+
// @SuppressWarnings("unchecked")
306+
// public List<Object> getUnflushedObjectMultiBulkReply() {
307+
// return (List<Object>) readProtocolWithCheckingBroken();
308+
// }
309+
//
305310
@SuppressWarnings("unchecked")
306-
public List<Object> getUnflushedObjectMultiBulkReply() {
307-
return (List<Object>) readProtocolWithCheckingBroken();
311+
public Object getUnflushedObject() {
312+
return readProtocolWithCheckingBroken();
308313
}
309314

310315
public List<Object> getObjectMultiBulkReply() {
311316
flush();
312-
return getUnflushedObjectMultiBulkReply();
317+
return (List<Object>) readProtocolWithCheckingBroken();
313318
}
314319

315320
@SuppressWarnings("unchecked")
@@ -339,6 +344,9 @@ protected Object readProtocolWithCheckingBroken() {
339344

340345
try {
341346
return Protocol.read(inputStream);
347+
// Object read = Protocol.read(inputStream);
348+
// System.out.println(SafeEncoder.encodeObject(read));
349+
// return read;
342350
} catch (JedisConnectionException exc) {
343351
broken = true;
344352
throw exc;

src/main/java/redis/clients/jedis/JedisPubSub.java

+59-46
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ public void onPSubscribe(String pattern, int subscribedChannels) {
3535
}
3636

3737
public void onPong(String pattern) {
38-
3938
}
4039

4140
public void unsubscribe() {
@@ -132,52 +131,66 @@ public void proceed(Connection client, String... channels) {
132131
private void process() {
133132

134133
do {
135-
List<Object> reply = client.getUnflushedObjectMultiBulkReply();
136-
final Object firstObj = reply.get(0);
137-
if (!(firstObj instanceof byte[])) {
138-
throw new JedisException("Unknown message type: " + firstObj);
139-
}
140-
final byte[] resp = (byte[]) firstObj;
141-
if (Arrays.equals(SUBSCRIBE.getRaw(), resp)) {
142-
subscribedChannels = ((Long) reply.get(2)).intValue();
143-
final byte[] bchannel = (byte[]) reply.get(1);
144-
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
145-
onSubscribe(strchannel, subscribedChannels);
146-
} else if (Arrays.equals(UNSUBSCRIBE.getRaw(), resp)) {
147-
subscribedChannels = ((Long) reply.get(2)).intValue();
148-
final byte[] bchannel = (byte[]) reply.get(1);
149-
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
150-
onUnsubscribe(strchannel, subscribedChannels);
151-
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
152-
final byte[] bchannel = (byte[]) reply.get(1);
153-
final byte[] bmesg = (byte[]) reply.get(2);
154-
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
155-
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
156-
onMessage(strchannel, strmesg);
157-
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
158-
final byte[] bpattern = (byte[]) reply.get(1);
159-
final byte[] bchannel = (byte[]) reply.get(2);
160-
final byte[] bmesg = (byte[]) reply.get(3);
161-
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
162-
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
163-
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
164-
onPMessage(strpattern, strchannel, strmesg);
165-
} else if (Arrays.equals(PSUBSCRIBE.getRaw(), resp)) {
166-
subscribedChannels = ((Long) reply.get(2)).intValue();
167-
final byte[] bpattern = (byte[]) reply.get(1);
168-
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
169-
onPSubscribe(strpattern, subscribedChannels);
170-
} else if (Arrays.equals(PUNSUBSCRIBE.getRaw(), resp)) {
171-
subscribedChannels = ((Long) reply.get(2)).intValue();
172-
final byte[] bpattern = (byte[]) reply.get(1);
173-
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
174-
onPUnsubscribe(strpattern, subscribedChannels);
175-
} else if (Arrays.equals(PONG.getRaw(), resp)) {
176-
final byte[] bpattern = (byte[]) reply.get(1);
177-
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
178-
onPong(strpattern);
134+
Object reply = client.getUnflushedObject();
135+
136+
if (reply instanceof List) {
137+
List<Object> listReply = (List<Object>) reply;
138+
final Object firstObj = listReply.get(0);
139+
if (!(firstObj instanceof byte[])) {
140+
throw new JedisException("Unknown message type: " + firstObj);
141+
}
142+
final byte[] resp = (byte[]) firstObj;
143+
if (Arrays.equals(SUBSCRIBE.getRaw(), resp)) {
144+
subscribedChannels = ((Long) listReply.get(2)).intValue();
145+
final byte[] bchannel = (byte[]) listReply.get(1);
146+
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
147+
onSubscribe(strchannel, subscribedChannels);
148+
} else if (Arrays.equals(UNSUBSCRIBE.getRaw(), resp)) {
149+
subscribedChannels = ((Long) listReply.get(2)).intValue();
150+
final byte[] bchannel = (byte[]) listReply.get(1);
151+
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
152+
onUnsubscribe(strchannel, subscribedChannels);
153+
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
154+
final byte[] bchannel = (byte[]) listReply.get(1);
155+
final byte[] bmesg = (byte[]) listReply.get(2);
156+
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
157+
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
158+
onMessage(strchannel, strmesg);
159+
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
160+
final byte[] bpattern = (byte[]) listReply.get(1);
161+
final byte[] bchannel = (byte[]) listReply.get(2);
162+
final byte[] bmesg = (byte[]) listReply.get(3);
163+
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
164+
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
165+
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
166+
onPMessage(strpattern, strchannel, strmesg);
167+
} else if (Arrays.equals(PSUBSCRIBE.getRaw(), resp)) {
168+
subscribedChannels = ((Long) listReply.get(2)).intValue();
169+
final byte[] bpattern = (byte[]) listReply.get(1);
170+
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
171+
onPSubscribe(strpattern, subscribedChannels);
172+
} else if (Arrays.equals(PUNSUBSCRIBE.getRaw(), resp)) {
173+
subscribedChannels = ((Long) listReply.get(2)).intValue();
174+
final byte[] bpattern = (byte[]) listReply.get(1);
175+
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
176+
onPUnsubscribe(strpattern, subscribedChannels);
177+
} else if (Arrays.equals(PONG.getRaw(), resp)) {
178+
final byte[] bpattern = (byte[]) listReply.get(1);
179+
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
180+
onPong(strpattern);
181+
} else {
182+
throw new JedisException("Unknown message type: " + firstObj);
183+
}
184+
} else if (reply instanceof byte[]) {
185+
byte[] resp = (byte[]) reply;
186+
String str = SafeEncoder.encode(resp);
187+
if ("PONG".equalsIgnoreCase(str)) {
188+
onPong(null);
189+
} else {
190+
onPong(str);
191+
}
179192
} else {
180-
throw new JedisException("Unknown message type: " + firstObj);
193+
throw new JedisException("Unknown message type: " + reply);
181194
}
182195
} while (isSubscribed());
183196

src/main/java/redis/clients/jedis/Protocol.java

+5
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ public final class Protocol {
3030
public static final byte COMMA_BYTE = ',';
3131
public static final byte DOLLAR_BYTE = '$';
3232
public static final byte EQUAL_BYTE = '=';
33+
public static final byte GRATER_THAN_BYTE = '>';
3334
public static final byte HASH_BYTE = '#';
3435
public static final byte LEFT_BRACE_BYTE = '(';
3536
public static final byte MINUS_BYTE = '-';
@@ -159,6 +160,10 @@ private static Object process(final RedisInputStream is) {
159160
num = is.readIntCrLf();
160161
if (num == -1) return null;
161162
return processMultiBulkReply(num, is);
163+
case GRATER_THAN_BYTE:
164+
num = is.readIntCrLf();
165+
if (num == -1) return null;
166+
return processMultiBulkReply(num, is);
162167
case MINUS_BYTE:
163168
processError(is);
164169
return null;

src/test/java/redis/clients/jedis/commands/jedis/JedisCommandsTestBase.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import redis.clients.jedis.HostAndPort;
88
import redis.clients.jedis.Jedis;
99
import redis.clients.jedis.HostAndPorts;
10-
import redis.clients.jedis.RedisProtocol;
1110

1211
public abstract class JedisCommandsTestBase {
1312

@@ -21,7 +20,8 @@ public JedisCommandsTestBase() {
2120

2221
@Before
2322
public void setUp() throws Exception {
24-
jedis = new Jedis(hnp, DefaultJedisClientConfig.builder().protocol(RedisProtocol.RESP3).timeoutMillis(500).password("foobared").build());
23+
//jedis = new Jedis(hnp, DefaultJedisClientConfig.builder().timeoutMillis(500).password("foobared").build());
24+
jedis = new Jedis(hnp, DefaultJedisClientConfig.builder().resp3().timeoutMillis(500).password("foobared").build());
2525
jedis.flushAll();
2626
}
2727

src/test/java/redis/clients/jedis/commands/jedis/PublishSubscribeCommandsTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import redis.clients.jedis.exceptions.JedisConnectionException;
2525
import redis.clients.jedis.util.SafeEncoder;
2626

27-
@org.junit.Ignore // TODO:
2827
public class PublishSubscribeCommandsTest extends JedisCommandsTestBase {
2928
private void publishOne(final String channel, final String message) {
3029
Thread t = new Thread(new Runnable() {

0 commit comments

Comments
 (0)