File tree 1 file changed +5
-0
lines changed
store/src/main/java/org/apache/rocketmq/store
1 file changed +5
-0
lines changed Original file line number Diff line number Diff line change 22
22
import org .apache .rocketmq .common .constant .LoggerName ;
23
23
import org .apache .rocketmq .logging .InternalLogger ;
24
24
import org .apache .rocketmq .logging .InternalLoggerFactory ;
25
+ import org .apache .rocketmq .store .config .BrokerRole ;
25
26
import org .apache .rocketmq .store .config .StorePathConfigHelper ;
26
27
27
28
public class ConsumeQueue {
@@ -397,6 +398,10 @@ public void putMessagePositionInfoWrapper(DispatchRequest request) {
397
398
boolean result = this .putMessagePositionInfo (request .getCommitLogOffset (),
398
399
request .getMsgSize (), tagsCode , request .getConsumeQueueOffset ());
399
400
if (result ) {
401
+ if (this .defaultMessageStore .getMessageStoreConfig ().getBrokerRole () == BrokerRole .SLAVE ||
402
+ this .defaultMessageStore .getMessageStoreConfig ().isEnableDLegerCommitLog ()) {
403
+ this .defaultMessageStore .getStoreCheckpoint ().setPhysicMsgTimestamp (request .getStoreTimestamp ());
404
+ }
400
405
this .defaultMessageStore .getStoreCheckpoint ().setLogicsMsgTimestamp (request .getStoreTimestamp ());
401
406
return ;
402
407
} else {
You can’t perform that action at this time.
0 commit comments