16
16
17
17
package io .mantisrx .server .worker ;
18
18
19
- import static junit .framework .Assert .assertEquals ;
19
+ import static org .junit .Assert .assertEquals ;
20
+ import static org .junit .Assert .assertNotNull ;
20
21
import static org .junit .Assert .assertTrue ;
21
22
22
23
import io .mantisrx .runtime .MantisJobDurationType ;
@@ -61,7 +62,7 @@ public void convertJobSchedulingInfoToWorkerMapTest() {
61
62
62
63
List <WorkerInfo > workersForStage1 = workerMap .getWorkersForStage (1 );
63
64
64
- assertTrue (workersForStage1 != null );
65
+ assertNotNull (workersForStage1 );
65
66
assertEquals (2 , workersForStage1 .size ());
66
67
67
68
for (int i = 0 ; i < workersForStage1 .size (); i ++) {
@@ -75,7 +76,7 @@ public void convertJobSchedulingInfoToWorkerMapTest() {
75
76
76
77
List <WorkerInfo > workersForStage2 = workerMap .getWorkersForStage (2 );
77
78
78
- assertTrue (workersForStage2 != null );
79
+ assertNotNull (workersForStage2 );
79
80
assertEquals (4 , workersForStage2 .size ());
80
81
81
82
for (int i = 0 ; i < workersForStage2 .size (); i ++) {
@@ -156,16 +157,11 @@ WorkerAssignments createWorkerAssignments(int stageNo, int noWorkers) {
156
157
@ Test
157
158
public void deferTest () throws InterruptedException {
158
159
159
- Subscription subscribe1 = getObs4 ().subscribeOn (Schedulers .io ()).subscribe ((t ) -> {
160
- System .out .println ("In 1 -> " + t );
161
- });
160
+ Subscription subscribe1 = getObs4 ().subscribeOn (Schedulers .io ()).subscribe ((t ) -> System .out .println ("In 1 -> " + t ));
162
161
163
162
Thread .sleep (5000 );
164
163
165
- Subscription subscribe2 = getObs4 ().subscribeOn (Schedulers .io ()).subscribe ((t ) -> {
166
- System .out .println ("In 2 -> " + t );
167
- });
168
-
164
+ Subscription subscribe2 = getObs4 ().subscribeOn (Schedulers .io ()).subscribe ((t ) -> System .out .println ("In 2 -> " + t ));
169
165
170
166
Thread .sleep (5000 );
171
167
subscribe1 .unsubscribe ();
@@ -174,69 +170,49 @@ public void deferTest() throws InterruptedException {
174
170
subscribe2 .unsubscribe ();
175
171
176
172
Thread .sleep (5000 );
177
- Subscription subscribe3 = getObs4 ().subscribeOn (Schedulers .io ()).subscribe ((t ) -> {
178
- System .out .println ("In 3 -> " + t );
179
- });
173
+ Subscription subscribe3 = getObs4 ().subscribeOn (Schedulers .io ()).subscribe ((t ) -> System .out .println ("In 3 -> " + t ));
180
174
Thread .sleep (5000 );
181
175
subscribe3 .unsubscribe ();
182
176
Thread .sleep (10000 );
183
177
}
184
178
185
179
Observable <Long > getObs () {
186
- Observable <Long > oLong = Observable .defer (() -> {
187
- return Observable .interval (1 , TimeUnit .SECONDS ).doOnNext ((e ) -> {
188
- System .out .println ("Minted " + e );
189
- }).share ();
190
- }).doOnSubscribe (() -> {
191
- System .out .println ("Subscribed111" + System .currentTimeMillis ());
192
- }).doOnUnsubscribe (() -> {
193
- System .out .println ("UnSubscribed111" + System .currentTimeMillis ());
194
- });
180
+ Observable <Long > oLong =
181
+ Observable .defer (() -> Observable .interval (1 , TimeUnit .SECONDS )
182
+ .doOnNext ((e ) -> System .out .println ("Minted " + e )).share ())
183
+ .doOnSubscribe (() -> System .out .println ("Subscribed111" + System .currentTimeMillis ()))
184
+ .doOnUnsubscribe (() -> System .out .println ("UnSubscribed111" + System .currentTimeMillis ()));
195
185
return oLong ;
196
186
}
197
187
198
188
Observable <Long > getObs2 () {
199
189
200
190
return Observable .interval (1 , TimeUnit .SECONDS )
201
- .doOnNext ((e ) -> {
202
- System .out .println ("Minted " + e );
203
- })
191
+ .doOnNext ((e ) -> System .out .println ("Minted " + e ))
204
192
.share ()
205
- .doOnSubscribe (() -> {
206
- System .out .println ("Subscribed111" + System .currentTimeMillis ());
207
- }).doOnUnsubscribe (() -> {
208
- System .out .println ("UnSubscribed111" + System .currentTimeMillis ());
209
- })
210
-
211
- ;
193
+ .doOnSubscribe (() -> System .out .println ("Subscribed111" + System .currentTimeMillis ()))
194
+ .doOnUnsubscribe (() -> System .out .println ("UnSubscribed111" + System .currentTimeMillis ()));
212
195
213
196
}
214
197
215
198
Observable <Long > getObs3 () {
216
199
217
- return Observable .range (1 , 100 ).doOnNext ((e ) -> {
218
- System .out .println ("Minted " + e );
219
- }).map ((i ) -> {
220
- return new Long (i );
221
- }).share ()
222
- .doOnSubscribe (() -> {
223
- System .out .println ("Subscribed111" + System .currentTimeMillis ());
224
- }).doOnUnsubscribe (() -> {
225
- System .out .println ("UnSubscribed111" + System .currentTimeMillis ());
226
- });
200
+ return Observable .range (1 , 100 )
201
+ .doOnNext ((e ) -> System .out .println ("Minted " + e ))
202
+ .map (Long ::new )
203
+ .share ()
204
+ .doOnSubscribe (() -> System .out .println ("Subscribed111" + System .currentTimeMillis ()))
205
+ .doOnUnsubscribe (() -> System .out .println ("UnSubscribed111" + System .currentTimeMillis ()));
227
206
228
207
}
229
208
230
209
Observable <Long > getObs4 () {
231
210
BehaviorSubject <Long > o = BehaviorSubject .create ();
232
- Observable .interval (1 , TimeUnit .SECONDS ).doOnNext ((e ) -> {
233
- System .out .println ("Minted " + e );
234
- }).doOnSubscribe (() -> {
235
- System .out .println ("Subscribed111" + System .currentTimeMillis ());
236
- }).doOnUnsubscribe (() -> {
237
- System .out .println ("UnSubscribed111" + System .currentTimeMillis ());
238
- })
239
- .subscribe (o );
211
+ Observable .interval (1 , TimeUnit .SECONDS )
212
+ .doOnNext ((e ) -> System .out .println ("Minted " + e ))
213
+ .doOnSubscribe (() -> System .out .println ("Subscribed111" + System .currentTimeMillis ()))
214
+ .doOnUnsubscribe (() -> System .out .println ("UnSubscribed111" + System .currentTimeMillis ()))
215
+ .subscribe (o );
240
216
241
217
return o ;
242
218
0 commit comments