1
+ package redis .clients .jedis ;
2
+
3
+ import junit .framework .TestCase ;
4
+
5
+ import java .util .Arrays ;
6
+ import java .util .List ;
7
+ import java .util .concurrent .CountDownLatch ;
8
+ import java .util .concurrent .TimeUnit ;
9
+
10
+ import static org .mockito .Mockito .mock ;
11
+ import static org .mockito .Mockito .when ;
12
+ import static redis .clients .jedis .Protocol .ResponseKeyword .SMESSAGE ;
13
+ import static redis .clients .jedis .Protocol .ResponseKeyword .SSUBSCRIBE ;
14
+
15
+ public class JedisShardedPubSubBaseTest extends TestCase {
16
+
17
+ public void testProceed () throws InterruptedException {
18
+ // setup
19
+ final JedisShardedPubSubBase <String > pubSub = new JedisShardedPubSubBase <String >() {
20
+
21
+ @ Override
22
+ public void onSMessage (String channel , String message ) {
23
+ fail ("this should not happen when thread is interrupted" );
24
+ }
25
+
26
+ @ Override
27
+ protected String encode (byte [] raw ) {
28
+ return new String (raw );
29
+ }
30
+
31
+ };
32
+
33
+ final Connection mockConnection = mock (Connection .class );
34
+ final List <Object > mockSubscribe = Arrays .asList (
35
+ SSUBSCRIBE .getRaw (), "channel" .getBytes (), 1L
36
+ );
37
+ final List <Object > mockResponse = Arrays .asList (
38
+ SMESSAGE .getRaw (), "channel" .getBytes (), "message" .getBytes ()
39
+ );
40
+ when (mockConnection .getUnflushedObject ()).thenReturn (mockSubscribe , mockResponse );
41
+
42
+
43
+ final CountDownLatch countDownLatch = new CountDownLatch (1 );
44
+ // action
45
+ final Thread thread = new Thread (() -> {
46
+ Thread .currentThread ().interrupt ();
47
+ pubSub .proceed (mockConnection , "channel" );
48
+
49
+ countDownLatch .countDown ();
50
+ });
51
+ thread .start ();
52
+
53
+ assertTrue (countDownLatch .await (10 , TimeUnit .MILLISECONDS ));
54
+
55
+ }
56
+ }
0 commit comments