40
40
import org .apache .rocketmq .store .stats .BrokerStatsManager ;
41
41
import org .junit .After ;
42
42
import org .junit .Before ;
43
+ import org .junit .Ignore ;
43
44
import org .junit .Test ;
44
45
import org .junit .runner .RunWith ;
45
46
import org .mockito .junit .MockitoJUnitRunner ;
@@ -123,6 +124,8 @@ public void testWriteAndRead() {
123
124
messageStore .putMessage (buildMessage ());
124
125
}
125
126
127
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
128
+
126
129
for (long i = 0 ; i < totalMsgs ; i ++) {
127
130
GetMessageResult result = messageStore .getMessage ("GROUP_A" , "TOPIC_A" , 0 , i , 1024 * 1024 , null );
128
131
assertThat (result ).isNotNull ();
@@ -180,7 +183,8 @@ public void should_get_consume_queue_offset_successfully_when_incomming_by_times
180
183
int queueId = 0 ;
181
184
String topic = "FooBar" ;
182
185
AppendMessageResult [] appendMessageResults = putMessages (totalCount , topic , queueId , true );
183
- Thread .sleep (10 );
186
+ //Thread.sleep(10);
187
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
184
188
185
189
ConsumeQueue consumeQueue = getDefaultMessageStore ().findConsumeQueue (topic , queueId );
186
190
for (AppendMessageResult appendMessageResult : appendMessageResults ) {
@@ -198,7 +202,8 @@ public void should_get_consume_queue_offset_successfully_when_timestamp_is_skewi
198
202
int queueId = 0 ;
199
203
String topic = "FooBar" ;
200
204
AppendMessageResult [] appendMessageResults = putMessages (totalCount , topic , queueId , true );
201
- Thread .sleep (10 );
205
+ //Thread.sleep(10);
206
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
202
207
int skewing = 2 ;
203
208
204
209
ConsumeQueue consumeQueue = getDefaultMessageStore ().findConsumeQueue (topic , queueId );
@@ -222,7 +227,8 @@ public void should_get_min_of_max_consume_queue_offset_when_timestamp_s_skewing_
222
227
int queueId = 0 ;
223
228
String topic = "FooBar" ;
224
229
AppendMessageResult [] appendMessageResults = putMessages (totalCount , topic , queueId , true );
225
- Thread .sleep (10 );
230
+ //Thread.sleep(10);
231
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
226
232
int skewing = 20000 ;
227
233
228
234
ConsumeQueue consumeQueue = getDefaultMessageStore ().findConsumeQueue (topic , queueId );
@@ -235,6 +241,9 @@ public void should_get_min_of_max_consume_queue_offset_when_timestamp_s_skewing_
235
241
assertThat (indexBuffer .getByteBuffer ().getInt ()).isEqualTo (appendMessageResults [totalCount - 1 ].getWroteBytes ());
236
242
assertThat (indexBuffer2 .getByteBuffer ().getLong ()).isEqualTo (appendMessageResults [0 ].getWroteOffset ());
237
243
assertThat (indexBuffer2 .getByteBuffer ().getInt ()).isEqualTo (appendMessageResults [0 ].getWroteBytes ());
244
+
245
+ indexBuffer .release ();
246
+ indexBuffer2 .release ();
238
247
}
239
248
}
240
249
@@ -245,7 +254,9 @@ public void should_return_zero_when_consume_queue_not_found() throws Interrupted
245
254
int wrongQueueId = 1 ;
246
255
String topic = "FooBar" ;
247
256
AppendMessageResult [] appendMessageResults = putMessages (totalCount , topic , queueId , false );
248
- Thread .sleep (10 );
257
+ //Thread.sleep(10);
258
+
259
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
249
260
250
261
long offset = messageStore .getOffsetInQueueByTime (topic , wrongQueueId , appendMessageResults [0 ].getStoreTimestamp ());
251
262
@@ -259,7 +270,8 @@ public void should_return_negative_one_when_invoke_getMessageStoreTimeStamp_if_c
259
270
int wrongQueueId = 1 ;
260
271
String topic = "FooBar" ;
261
272
putMessages (totalCount , topic , queueId , false );
262
- Thread .sleep (10 );
273
+ //Thread.sleep(10);
274
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
263
275
264
276
long messageStoreTimeStamp = messageStore .getMessageStoreTimeStamp (topic , wrongQueueId , 0 );
265
277
@@ -273,7 +285,9 @@ public void should_return_negative_one_when_invoke_getMessageStoreTimeStamp_if_c
273
285
int wrongQueueId = 1 ;
274
286
String topic = "FooBar" ;
275
287
putMessages (totalCount , topic , queueId , true );
276
- Thread .sleep (10 );
288
+ //Thread.sleep(10);
289
+
290
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
277
291
278
292
long messageStoreTimeStamp = messageStore .getMessageStoreTimeStamp (topic , wrongQueueId , -1 );
279
293
@@ -287,7 +301,8 @@ public void should_get_message_store_timestamp_successfully_when_incomming_by_to
287
301
int queueId = 0 ;
288
302
String topic = "FooBar" ;
289
303
AppendMessageResult [] appendMessageResults = putMessages (totalCount , topic , queueId , false );
290
- Thread .sleep (10 );
304
+ //Thread.sleep(10);
305
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
291
306
292
307
ConsumeQueue consumeQueue = getDefaultMessageStore ().findConsumeQueue (topic , queueId );
293
308
int minOffsetInQueue = (int )consumeQueue .getMinOffsetInQueue ();
@@ -310,7 +325,8 @@ public void should_get_store_time_successfully_when_invoke_getStoreTime_if_every
310
325
int queueId = 0 ;
311
326
String topic = "FooBar" ;
312
327
AppendMessageResult [] appendMessageResults = putMessages (totalCount , topic , queueId , false );
313
- Thread .sleep (10 );
328
+ //Thread.sleep(10);
329
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
314
330
ConsumeQueue consumeQueue = messageStore .getConsumeQueue (topic , queueId );
315
331
316
332
for (int i = 0 ; i < totalCount ; i ++) {
@@ -412,6 +428,8 @@ private void verifyThatMasterIsFunctional(long totalMsgs, MessageStore master) {
412
428
master .putMessage (buildMessage ());
413
429
}
414
430
431
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
432
+
415
433
for (long i = 0 ; i < totalMsgs ; i ++) {
416
434
GetMessageResult result = master .getMessage ("GROUP_A" , "TOPIC_A" , 0 , i , 1024 * 1024 , null );
417
435
assertThat (result ).isNotNull ();
@@ -432,16 +450,21 @@ public void testPullSize() throws Exception {
432
450
}
433
451
// wait for consume queue build
434
452
// the sleep time should be great than consume queue flush interval
435
- Thread .sleep (100 );
453
+ //Thread.sleep(100);
454
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
436
455
String group = "simple" ;
437
456
GetMessageResult getMessageResult32 = messageStore .getMessage (group , topic , 0 , 0 , 32 , null );
438
457
assertThat (getMessageResult32 .getMessageBufferList ().size ()).isEqualTo (32 );
458
+ getMessageResult32 .release ();
439
459
440
460
GetMessageResult getMessageResult20 = messageStore .getMessage (group , topic , 0 , 0 , 20 , null );
441
461
assertThat (getMessageResult20 .getMessageBufferList ().size ()).isEqualTo (20 );
442
462
463
+ getMessageResult20 .release ();
443
464
GetMessageResult getMessageResult45 = messageStore .getMessage (group , topic , 0 , 0 , 10 , null );
444
465
assertThat (getMessageResult45 .getMessageBufferList ().size ()).isEqualTo (10 );
466
+ getMessageResult45 .release ();
467
+
445
468
}
446
469
447
470
@ Test
@@ -455,7 +478,9 @@ public void testRecover() throws Exception {
455
478
messageStore .putMessage (messageExtBrokerInner );
456
479
}
457
480
458
- Thread .sleep (100 );//wait for build consumer queue
481
+ // Thread.sleep(100);//wait for build consumer queue
482
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
483
+
459
484
long maxPhyOffset = messageStore .getMaxPhyOffset ();
460
485
long maxCqOffset = messageStore .getMaxOffsetInQueue (topic , 0 );
461
486
@@ -475,7 +500,8 @@ public void testRecover() throws Exception {
475
500
messageExtBrokerInner .setQueueId (0 );
476
501
messageStore .putMessage (messageExtBrokerInner );
477
502
}
478
- Thread .sleep (100 );
503
+ //Thread.sleep(100);
504
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
479
505
long secondLastPhyOffset = messageStore .getMaxPhyOffset ();
480
506
long secondLastCqOffset = messageStore .getMaxOffsetInQueue (topic , 0 );
481
507
@@ -504,7 +530,8 @@ public void testRecover() throws Exception {
504
530
messageExtBrokerInner .setQueueId (0 );
505
531
messageStore .putMessage (messageExtBrokerInner );
506
532
}
507
- Thread .sleep (100 );
533
+ //Thread.sleep(100);
534
+ StoreTestUtil .waitCommitLogReput ((DefaultMessageStore ) messageStore );
508
535
secondLastPhyOffset = messageStore .getMaxPhyOffset ();
509
536
secondLastCqOffset = messageStore .getMaxOffsetInQueue (topic , 0 );
510
537
0 commit comments