Skip to content

Commit 411f50e

Browse files
check for thread interrupt in process
1 parent 64b5aac commit 411f50e

File tree

4 files changed

+116
-2
lines changed

4 files changed

+116
-2
lines changed

Diff for: src/main/java/redis/clients/jedis/JedisPubSubBase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ private void process() {
172172
} else {
173173
throw new JedisException("Unknown message type: " + reply);
174174
}
175-
} while (isSubscribed());
175+
} while (!Thread.currentThread().isInterrupted() && isSubscribed());
176176

177177
// /* Invalidate instance since this thread is no longer listening */
178178
// this.client = null;

Diff for: src/main/java/redis/clients/jedis/JedisShardedPubSubBase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ private void process() {
9999
} else {
100100
throw new JedisException("Unknown message type: " + reply);
101101
}
102-
} while (isSubscribed());
102+
} while (!Thread.currentThread().isInterrupted() && isSubscribed());
103103

104104
// /* Invalidate instance since this thread is no longer listening */
105105
// this.client = null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package redis.clients.jedis;
2+
3+
import junit.framework.TestCase;
4+
import redis.clients.jedis.util.SafeEncoder;
5+
6+
import java.util.Arrays;
7+
import java.util.List;
8+
import java.util.concurrent.CountDownLatch;
9+
import java.util.concurrent.TimeUnit;
10+
11+
import static org.mockito.Mockito.mock;
12+
import static org.mockito.Mockito.when;
13+
import static redis.clients.jedis.Protocol.ResponseKeyword.*;
14+
15+
public class JedisPubSubBaseTest extends TestCase {
16+
17+
public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException {
18+
// setup
19+
final JedisPubSubBase<String> pubSub = new JedisPubSubBase<String>() {
20+
21+
@Override
22+
public void onMessage(String channel, String message) {
23+
fail("this should not happen when thread is interrupted");
24+
}
25+
26+
@Override
27+
protected String encode(byte[] raw) {
28+
return SafeEncoder.encode(raw);
29+
}
30+
};
31+
32+
final Connection mockConnection = mock(Connection.class);
33+
final List<Object> mockSubscribe = Arrays.asList(
34+
SUBSCRIBE.getRaw(), "channel".getBytes(), 1L
35+
);
36+
final List<Object> mockResponse = Arrays.asList(
37+
MESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
38+
);
39+
40+
when(mockConnection.getUnflushedObject()).
41+
42+
thenReturn(mockSubscribe, mockResponse);
43+
44+
45+
final CountDownLatch countDownLatch = new CountDownLatch(1);
46+
// action
47+
final Thread thread = new Thread(() -> {
48+
Thread.currentThread().interrupt();
49+
pubSub.proceed(mockConnection, "channel");
50+
51+
countDownLatch.countDown();
52+
});
53+
thread.start();
54+
55+
assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS));
56+
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package redis.clients.jedis;
2+
3+
import junit.framework.TestCase;
4+
5+
import java.util.Arrays;
6+
import java.util.List;
7+
import java.util.concurrent.CountDownLatch;
8+
import java.util.concurrent.TimeUnit;
9+
10+
import static org.mockito.Mockito.mock;
11+
import static org.mockito.Mockito.when;
12+
import static redis.clients.jedis.Protocol.ResponseKeyword.SMESSAGE;
13+
import static redis.clients.jedis.Protocol.ResponseKeyword.SSUBSCRIBE;
14+
15+
public class JedisShardedPubSubBaseTest extends TestCase {
16+
17+
public void testProceed_givenThreadInterrupt_exitLoop() throws InterruptedException {
18+
// setup
19+
final JedisShardedPubSubBase<String> pubSub = new JedisShardedPubSubBase<String>() {
20+
21+
@Override
22+
public void onSMessage(String channel, String message) {
23+
fail("this should not happen when thread is interrupted");
24+
}
25+
26+
@Override
27+
protected String encode(byte[] raw) {
28+
return new String(raw);
29+
}
30+
31+
};
32+
33+
final Connection mockConnection = mock(Connection.class);
34+
final List<Object> mockSubscribe = Arrays.asList(
35+
SSUBSCRIBE.getRaw(), "channel".getBytes(), 1L
36+
);
37+
final List<Object> mockResponse = Arrays.asList(
38+
SMESSAGE.getRaw(), "channel".getBytes(), "message".getBytes()
39+
);
40+
when(mockConnection.getUnflushedObject()).thenReturn(mockSubscribe, mockResponse);
41+
42+
43+
final CountDownLatch countDownLatch = new CountDownLatch(1);
44+
// action
45+
final Thread thread = new Thread(() -> {
46+
Thread.currentThread().interrupt();
47+
pubSub.proceed(mockConnection, "channel");
48+
49+
countDownLatch.countDown();
50+
});
51+
thread.start();
52+
53+
assertTrue(countDownLatch.await(10, TimeUnit.MILLISECONDS));
54+
55+
}
56+
}

0 commit comments

Comments
 (0)