Skip to content

Commit a4eb369

Browse files
authored
Merge f484582 into 024e066
2 parents 024e066 + f484582 commit a4eb369

File tree

4 files changed

+191
-357
lines changed

4 files changed

+191
-357
lines changed
+4-157
Original file line numberDiff line numberDiff line change
@@ -1,162 +1,9 @@
11
package redis.clients.jedis;
22

3-
import static redis.clients.jedis.Protocol.ResponseKeyword.*;
3+
public abstract class BinaryJedisPubSub extends JedisPubSubBase<byte[]> {
44

5-
import java.util.Arrays;
6-
import java.util.List;
7-
8-
import redis.clients.jedis.Protocol.Command;
9-
import redis.clients.jedis.exceptions.JedisException;
10-
import redis.clients.jedis.util.SafeEncoder;
11-
12-
public abstract class BinaryJedisPubSub {
13-
private int subscribedChannels = 0;
14-
private Connection client;
15-
16-
public void onMessage(byte[] channel, byte[] message) {
17-
}
18-
19-
public void onPMessage(byte[] pattern, byte[] channel, byte[] message) {
20-
}
21-
22-
public void onSubscribe(byte[] channel, int subscribedChannels) {
23-
}
24-
25-
public void onUnsubscribe(byte[] channel, int subscribedChannels) {
26-
}
27-
28-
public void onPUnsubscribe(byte[] pattern, int subscribedChannels) {
29-
}
30-
31-
public void onPSubscribe(byte[] pattern, int subscribedChannels) {
32-
}
33-
34-
public void onPong(byte[] pattern) {
35-
}
36-
37-
public void unsubscribe() {
38-
client.sendCommand(Command.UNSUBSCRIBE);
39-
client.flush();
40-
}
41-
42-
public void unsubscribe(byte[]... channels) {
43-
client.sendCommand(Command.UNSUBSCRIBE, channels);
44-
client.flush();
45-
}
46-
47-
public void subscribe(byte[]... channels) {
48-
client.sendCommand(Command.SUBSCRIBE, channels);
49-
client.flush();
50-
}
51-
52-
public void psubscribe(byte[]... patterns) {
53-
client.sendCommand(Command.PSUBSCRIBE, patterns);
54-
client.flush();
55-
}
56-
57-
public void punsubscribe() {
58-
client.sendCommand(Command.PUNSUBSCRIBE);
59-
client.flush();
60-
}
61-
62-
public void punsubscribe(byte[]... patterns) {
63-
client.sendCommand(Command.PUNSUBSCRIBE, patterns);
64-
client.flush();
65-
}
66-
67-
public void ping() {
68-
client.sendCommand(Command.PING);
69-
client.flush();
70-
}
71-
72-
public void ping(byte[] argument) {
73-
client.sendCommand(Command.PING, argument);
74-
client.flush();
75-
}
76-
77-
public boolean isSubscribed() {
78-
return subscribedChannels > 0;
79-
}
80-
81-
public void proceedWithPatterns(Connection client, byte[]... patterns) {
82-
this.client = client;
83-
this.client.setTimeoutInfinite();
84-
try {
85-
psubscribe(patterns);
86-
process();
87-
} finally {
88-
this.client.rollbackTimeout();
89-
}
90-
}
91-
92-
public void proceed(Connection client, byte[]... channels) {
93-
this.client = client;
94-
this.client.setTimeoutInfinite();
95-
try {
96-
subscribe(channels);
97-
process();
98-
} finally {
99-
this.client.rollbackTimeout();
100-
}
101-
}
102-
103-
private void process() {
104-
do {
105-
Object reply = client.getUnflushedObject();
106-
107-
if (reply instanceof List) {
108-
List<Object> listReply = (List<Object>) reply;
109-
final Object firstObj = listReply.get(0);
110-
if (!(firstObj instanceof byte[])) {
111-
throw new JedisException("Unknown message type: " + firstObj);
112-
}
113-
final byte[] resp = (byte[]) firstObj;
114-
if (Arrays.equals(SUBSCRIBE.getRaw(), resp)) {
115-
subscribedChannels = ((Long) listReply.get(2)).intValue();
116-
final byte[] bchannel = (byte[]) listReply.get(1);
117-
onSubscribe(bchannel, subscribedChannels);
118-
} else if (Arrays.equals(UNSUBSCRIBE.getRaw(), resp)) {
119-
subscribedChannels = ((Long) listReply.get(2)).intValue();
120-
final byte[] bchannel = (byte[]) listReply.get(1);
121-
onUnsubscribe(bchannel, subscribedChannels);
122-
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
123-
final byte[] bchannel = (byte[]) listReply.get(1);
124-
final byte[] bmesg = (byte[]) listReply.get(2);
125-
onMessage(bchannel, bmesg);
126-
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
127-
final byte[] bpattern = (byte[]) listReply.get(1);
128-
final byte[] bchannel = (byte[]) listReply.get(2);
129-
final byte[] bmesg = (byte[]) listReply.get(3);
130-
onPMessage(bpattern, bchannel, bmesg);
131-
} else if (Arrays.equals(PSUBSCRIBE.getRaw(), resp)) {
132-
subscribedChannels = ((Long) listReply.get(2)).intValue();
133-
final byte[] bpattern = (byte[]) listReply.get(1);
134-
onPSubscribe(bpattern, subscribedChannels);
135-
} else if (Arrays.equals(PUNSUBSCRIBE.getRaw(), resp)) {
136-
subscribedChannels = ((Long) listReply.get(2)).intValue();
137-
final byte[] bpattern = (byte[]) listReply.get(1);
138-
onPUnsubscribe(bpattern, subscribedChannels);
139-
} else if (Arrays.equals(PONG.getRaw(), resp)) {
140-
final byte[] bpattern = (byte[]) listReply.get(1);
141-
onPong(bpattern);
142-
} else {
143-
throw new JedisException("Unknown message type: " + firstObj);
144-
}
145-
} else if (reply instanceof byte[]) {
146-
byte[] resp = (byte[]) reply;
147-
String str = SafeEncoder.encode(resp);
148-
if ("PONG".equalsIgnoreCase(str)) {
149-
onPong(null);
150-
} else {
151-
onPong(resp);
152-
}
153-
} else {
154-
throw new JedisException("Unknown message type: " + reply);
155-
}
156-
} while (isSubscribed());
157-
}
158-
159-
public int getSubscribedChannels() {
160-
return subscribedChannels;
5+
@Override
6+
protected final byte[] encode(byte[] raw) {
7+
return raw;
1618
}
1629
}

Diff for: src/main/java/redis/clients/jedis/JedisPubSub.java

+4-197
Original file line numberDiff line numberDiff line change
@@ -1,204 +1,11 @@
11
package redis.clients.jedis;
22

3-
import static redis.clients.jedis.Protocol.ResponseKeyword.*;
4-
5-
import java.util.Arrays;
6-
import java.util.List;
7-
8-
import redis.clients.jedis.Protocol.Command;
9-
import redis.clients.jedis.exceptions.JedisConnectionException;
10-
import redis.clients.jedis.exceptions.JedisException;
113
import redis.clients.jedis.util.SafeEncoder;
124

13-
public abstract class JedisPubSub {
14-
15-
private static final String JEDIS_SUBSCRIPTION_MESSAGE = "JedisPubSub is not subscribed to a Jedis instance.";
16-
private int subscribedChannels = 0;
17-
private volatile Connection client;
18-
19-
public void onMessage(String channel, String message) {
20-
}
21-
22-
public void onPMessage(String pattern, String channel, String message) {
23-
}
24-
25-
public void onSubscribe(String channel, int subscribedChannels) {
26-
}
27-
28-
public void onUnsubscribe(String channel, int subscribedChannels) {
29-
}
30-
31-
public void onPUnsubscribe(String pattern, int subscribedChannels) {
32-
}
33-
34-
public void onPSubscribe(String pattern, int subscribedChannels) {
35-
}
36-
37-
public void onPong(String pattern) {
38-
}
39-
40-
public void unsubscribe() {
41-
if (client == null) {
42-
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
43-
}
44-
client.sendCommand(Command.UNSUBSCRIBE);
45-
client.flush();
46-
}
47-
48-
public void unsubscribe(String... channels) {
49-
if (client == null) {
50-
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
51-
}
52-
client.sendCommand(Command.UNSUBSCRIBE, channels);
53-
client.flush();
54-
}
55-
56-
public void subscribe(String... channels) {
57-
if (client == null) {
58-
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
59-
}
60-
client.sendCommand(Command.SUBSCRIBE, channels);
61-
client.flush();
62-
}
63-
64-
public void psubscribe(String... patterns) {
65-
if (client == null) {
66-
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
67-
}
68-
client.sendCommand(Command.PSUBSCRIBE, patterns);
69-
client.flush();
70-
}
71-
72-
public void punsubscribe() {
73-
if (client == null) {
74-
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
75-
}
76-
client.sendCommand(Command.PUNSUBSCRIBE);
77-
client.flush();
78-
}
79-
80-
public void punsubscribe(String... patterns) {
81-
if (client == null) {
82-
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
83-
}
84-
client.sendCommand(Command.PUNSUBSCRIBE, patterns);
85-
client.flush();
86-
}
87-
88-
public void ping() {
89-
if (client == null) {
90-
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
91-
}
92-
client.sendCommand(Command.PING);
93-
client.flush();
94-
}
95-
96-
public void ping(String argument) {
97-
if (client == null) {
98-
throw new JedisConnectionException(JEDIS_SUBSCRIPTION_MESSAGE);
99-
}
100-
client.sendCommand(Command.PING, argument);
101-
client.flush();
102-
}
103-
104-
public boolean isSubscribed() {
105-
return subscribedChannels > 0;
106-
}
107-
108-
public void proceedWithPatterns(Connection client, String... patterns) {
109-
this.client = client;
110-
this.client.setTimeoutInfinite();
111-
try {
112-
psubscribe(patterns);
113-
process();
114-
} finally {
115-
this.client.rollbackTimeout();
116-
}
117-
}
118-
119-
public void proceed(Connection client, String... channels) {
120-
this.client = client;
121-
this.client.setTimeoutInfinite();
122-
try {
123-
subscribe(channels);
124-
process();
125-
} finally {
126-
this.client.rollbackTimeout();
127-
}
128-
}
129-
130-
// private void process(Client client) {
131-
private void process() {
132-
133-
do {
134-
Object reply = client.getUnflushedObject();
135-
136-
if (reply instanceof List) {
137-
List<Object> listReply = (List<Object>) reply;
138-
final Object firstObj = listReply.get(0);
139-
if (!(firstObj instanceof byte[])) {
140-
throw new JedisException("Unknown message type: " + firstObj);
141-
}
142-
final byte[] resp = (byte[]) firstObj;
143-
if (Arrays.equals(SUBSCRIBE.getRaw(), resp)) {
144-
subscribedChannels = ((Long) listReply.get(2)).intValue();
145-
final byte[] bchannel = (byte[]) listReply.get(1);
146-
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
147-
onSubscribe(strchannel, subscribedChannels);
148-
} else if (Arrays.equals(UNSUBSCRIBE.getRaw(), resp)) {
149-
subscribedChannels = ((Long) listReply.get(2)).intValue();
150-
final byte[] bchannel = (byte[]) listReply.get(1);
151-
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
152-
onUnsubscribe(strchannel, subscribedChannels);
153-
} else if (Arrays.equals(MESSAGE.getRaw(), resp)) {
154-
final byte[] bchannel = (byte[]) listReply.get(1);
155-
final byte[] bmesg = (byte[]) listReply.get(2);
156-
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
157-
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
158-
onMessage(strchannel, strmesg);
159-
} else if (Arrays.equals(PMESSAGE.getRaw(), resp)) {
160-
final byte[] bpattern = (byte[]) listReply.get(1);
161-
final byte[] bchannel = (byte[]) listReply.get(2);
162-
final byte[] bmesg = (byte[]) listReply.get(3);
163-
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
164-
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
165-
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
166-
onPMessage(strpattern, strchannel, strmesg);
167-
} else if (Arrays.equals(PSUBSCRIBE.getRaw(), resp)) {
168-
subscribedChannels = ((Long) listReply.get(2)).intValue();
169-
final byte[] bpattern = (byte[]) listReply.get(1);
170-
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
171-
onPSubscribe(strpattern, subscribedChannels);
172-
} else if (Arrays.equals(PUNSUBSCRIBE.getRaw(), resp)) {
173-
subscribedChannels = ((Long) listReply.get(2)).intValue();
174-
final byte[] bpattern = (byte[]) listReply.get(1);
175-
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
176-
onPUnsubscribe(strpattern, subscribedChannels);
177-
} else if (Arrays.equals(PONG.getRaw(), resp)) {
178-
final byte[] bpattern = (byte[]) listReply.get(1);
179-
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
180-
onPong(strpattern);
181-
} else {
182-
throw new JedisException("Unknown message type: " + firstObj);
183-
}
184-
} else if (reply instanceof byte[]) {
185-
byte[] resp = (byte[]) reply;
186-
String str = SafeEncoder.encode(resp);
187-
if ("PONG".equalsIgnoreCase(str)) {
188-
onPong(null);
189-
} else {
190-
onPong(str);
191-
}
192-
} else {
193-
throw new JedisException("Unknown message type: " + reply);
194-
}
195-
} while (isSubscribed());
196-
197-
// /* Invalidate instance since this thread is no longer listening */
198-
// this.client = null;
199-
}
5+
public abstract class JedisPubSub extends JedisPubSubBase<String> {
2006

201-
public int getSubscribedChannels() {
202-
return subscribedChannels;
7+
@Override
8+
protected final String encode(byte[] raw) {
9+
return SafeEncoder.encode(raw);
20310
}
20411
}

0 commit comments

Comments
 (0)