Skip to content

Commit cbd2ab9

Browse files
authored
Merge a6861ad into f86e1ba
2 parents f86e1ba + a6861ad commit cbd2ab9

File tree

4 files changed

+117
-2
lines changed

4 files changed

+117
-2
lines changed

Diff for: src/main/java/redis/clients/jedis/JedisPubSubBase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ private void process() {
172172
} else {
173173
throw new JedisException("Unknown message type: " + reply);
174174
}
175-
} while (isSubscribed());
175+
} while (!Thread.currentThread().isInterrupted() && isSubscribed());
176176

177177
// /* Invalidate instance since this thread is no longer listening */
178178
// this.client = null;

Diff for: src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private void process() {
9999
} else {
100100
throw new JedisException("Unknown message type: " + reply);
101101
}
102-
} while (isSubscribed());
102+
} while (!Thread.currentThread().isInterrupted() && isSubscribed());
103103

104104
// /* Invalidate instance since this thread is no longer listening */
105105
// this.client = null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package redis.clients.jedis;
2+
3+
import junit.framework.TestCase;
4+
import redis.clients.jedis.util.SafeEncoder;
5+
6+
import java.util.Arrays;
7+
import java.util.List;
8+
import java.util.concurrent.CountDownLatch;
9+
import java.util.concurrent.TimeUnit;
10+
11+
import static org.mockito.Mockito.mock;
12+
import static org.mockito.Mockito.when;
13+
import static redis.clients.jedis.Protocol.ResponseKeyword.MESSAGE;
14+
import static redis.clients.jedis.Protocol.ResponseKeyword.SUBSCRIBE;
15+
16+
public class JedisPubSubBaseTest extends TestCase {
17+
18+
public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException {
19+
// setup
20+
final JedisPubSubBase<String> pubSub = new JedisPubSubBase<String>() {
21+
22+
@Override
23+
public void onMessage(String channel, String message) {
24+
fail("this should not happen when thread is interrupted");
25+
}
26+
27+
@Override
28+
protected String encode(byte[] raw) {
29+
return SafeEncoder.encode(raw);
30+
}
31+
};
32+
33+
final Connection mockConnection = mock(Connection.class);
34+
final List<Object> mockSubscribe = Arrays.asList(
35+
SUBSCRIBE.getRaw(), "channel".getBytes(), 1L
36+
);
37+
final List<Object> mockResponse = Arrays.asList(
38+
MESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
39+
);
40+
41+
when(mockConnection.getUnflushedObject()).
42+
43+
thenReturn(mockSubscribe, mockResponse);
44+
45+
46+
final CountDownLatch countDownLatch = new CountDownLatch(1);
47+
// action
48+
final Thread thread = new Thread(() -> {
49+
Thread.currentThread().interrupt();
50+
pubSub.proceed(mockConnection, "channel");
51+
52+
countDownLatch.countDown();
53+
});
54+
thread.start();
55+
56+
assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS));
57+
58+
}
59+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package redis.clients.jedis;
2+
3+
import junit.framework.TestCase;
4+
5+
import java.util.Arrays;
6+
import java.util.List;
7+
import java.util.concurrent.CountDownLatch;
8+
import java.util.concurrent.TimeUnit;
9+
10+
import static org.mockito.Mockito.mock;
11+
import static org.mockito.Mockito.when;
12+
import static redis.clients.jedis.Protocol.ResponseKeyword.SMESSAGE;
13+
import static redis.clients.jedis.Protocol.ResponseKeyword.SSUBSCRIBE;
14+
15+
public class JedisShardedPubSubBaseTest extends TestCase {
16+
17+
public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException {
18+
// setup
19+
final JedisShardedPubSubBase<String> pubSub = new JedisShardedPubSubBase<String>() {
20+
21+
@Override
22+
public void onSMessage(String channel, String message) {
23+
fail("this should not happen when thread is interrupted");
24+
}
25+
26+
@Override
27+
protected String encode(byte[] raw) {
28+
return new String(raw);
29+
}
30+
31+
};
32+
33+
final Connection mockConnection = mock(Connection.class);
34+
final List<Object> mockSubscribe = Arrays.asList(
35+
SSUBSCRIBE.getRaw(), "channel".getBytes(), 1L
36+
);
37+
final List<Object> mockResponse = Arrays.asList(
38+
SMESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
39+
);
40+
when(mockConnection.getUnflushedObject()).thenReturn(mockSubscribe, mockResponse);
41+
42+
43+
final CountDownLatch countDownLatch = new CountDownLatch(1);
44+
// action
45+
final Thread thread = new Thread(() -> {
46+
Thread.currentThread().interrupt();
47+
pubSub.proceed(mockConnection, "channel");
48+
49+
countDownLatch.countDown();
50+
});
51+
thread.start();
52+
53+
assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS));
54+
55+
}
56+
}

0 commit comments

Comments
 (0)