45
45
import java .math .BigInteger ;
46
46
import java .util .Collections ;
47
47
import java .util .Set ;
48
+ import java .util .concurrent .Semaphore ;
48
49
import java .util .concurrent .TimeUnit ;
49
50
import java .util .concurrent .atomic .AtomicInteger ;
50
51
53
54
import static org .junit .Assert .fail ;
54
55
import static org .mockito .Matchers .any ;
55
56
import static org .mockito .Matchers .anyString ;
57
+ import static org .mockito .Mockito .doNothing ;
56
58
import static org .mockito .Mockito .mock ;
57
59
import static org .mockito .Mockito .when ;
58
60
@@ -135,8 +137,10 @@ public void testHangingRepair() throws ReaperException, InterruptedException {
135
137
context .repairManager = new RepairManager ();
136
138
context .repairManager .initializeThreadPool (1 , 500 , TimeUnit .MILLISECONDS , 1 , TimeUnit .MILLISECONDS );
137
139
140
+ final Semaphore mutex = new Semaphore (0 );
141
+
138
142
context .jmxConnectionFactory = new JmxConnectionFactory () {
139
- final AtomicInteger repairAttempts = new AtomicInteger (0 );
143
+ final AtomicInteger repairAttempts = new AtomicInteger (1 );
140
144
141
145
@ Override
142
146
public JmxProxy connect (final Optional <RepairStatusHandler > handler , String host )
@@ -146,9 +150,10 @@ public JmxProxy connect(final Optional<RepairStatusHandler> handler, String host
146
150
when (jmx .isConnectionAlive ()).thenReturn (true );
147
151
when (jmx .tokenRangeToEndpoint (anyString (), any (RingRange .class )))
148
152
.thenReturn (Lists .newArrayList ("" ));
153
+ //doNothing().when(jmx).cancelAllRepairs();
149
154
when (jmx .triggerRepair (any (BigInteger .class ), any (BigInteger .class ), anyString (),
150
- Matchers .<RepairParallelism >any (),
151
- Sets .newHashSet (anyString ()))).then (
155
+ Matchers .<RepairParallelism >any (),
156
+ Sets .newHashSet (anyString ()))).then (
152
157
new Answer <Integer >() {
153
158
@ Override
154
159
public Integer answer (InvocationOnMock invocation ) throws Throwable {
@@ -157,7 +162,7 @@ public Integer answer(InvocationOnMock invocation) throws Throwable {
157
162
158
163
final int repairNumber = repairAttempts .getAndIncrement ();
159
164
switch (repairNumber ) {
160
- case 0 :
165
+ case 1 :
161
166
new Thread () {
162
167
@ Override
163
168
public void run () {
@@ -168,20 +173,22 @@ public void run() {
168
173
}
169
174
}.start ();
170
175
break ;
171
- case 1 :
176
+ case 2 :
172
177
new Thread () {
173
178
@ Override
174
179
public void run () {
175
180
handler .get ()
176
181
.handle (repairNumber , ActiveRepairService .Status .STARTED , null );
177
182
assertEquals (RepairSegment .State .RUNNING ,
178
- storage .getRepairSegment (SEGMENT_ID ).get ().getState ());
183
+ storage .getRepairSegment (SEGMENT_ID ).get ().getState ());
179
184
handler .get ()
180
185
.handle (repairNumber , ActiveRepairService .Status .SESSION_SUCCESS , null );
181
- assertEquals (RepairSegment .State .RUNNING ,
182
- storage .getRepairSegment (SEGMENT_ID ).get ().getState ());
186
+ assertEquals (RepairSegment .State .DONE ,
187
+ storage .getRepairSegment (SEGMENT_ID ).get ().getState ());
183
188
handler .get ()
184
189
.handle (repairNumber , ActiveRepairService .Status .FINISHED , null );
190
+ mutex .release ();
191
+ System .out .println ("MUTEX RELEASED" );
185
192
}
186
193
}.start ();
187
194
break ;
@@ -198,7 +205,9 @@ public void run() {
198
205
199
206
// TODO: refactor so that we can properly wait for the repair runner to finish rather than
200
207
// TODO: using this sleep().
201
- Thread .sleep (600 );
208
+ mutex .acquire ();
209
+ System .out .println ("MUTEX ACQUIRED" );
210
+ Thread .sleep (100 );
202
211
assertEquals (RepairRun .RunState .DONE , storage .getRepairRun (RUN_ID ).get ().getRunState ());
203
212
}
204
213
0 commit comments