Skip to content

Commit cee1963

Browse files
authored
[ISSUE #9241] RocksDBConsumeQueueStore do not need to update StoreCheckpoint (#9242)
1 parent b695652 commit cee1963

File tree

2 files changed

+48
-52
lines changed

2 files changed

+48
-52
lines changed

store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java

Lines changed: 48 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,26 @@
1616
*/
1717
package org.apache.rocketmq.store.dledger;
1818

19+
import io.openmessaging.storage.dledger.AppendFuture;
20+
import io.openmessaging.storage.dledger.BatchAppendFuture;
21+
import io.openmessaging.storage.dledger.DLedgerConfig;
22+
import io.openmessaging.storage.dledger.DLedgerServer;
23+
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
24+
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
25+
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
26+
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
27+
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
28+
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
29+
import io.openmessaging.storage.dledger.store.file.MmapFile;
30+
import io.openmessaging.storage.dledger.store.file.MmapFileList;
31+
import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
32+
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
1933
import java.net.Inet6Address;
2034
import java.net.InetSocketAddress;
2135
import java.nio.ByteBuffer;
2236
import java.util.LinkedList;
2337
import java.util.List;
2438
import java.util.concurrent.CompletableFuture;
25-
2639
import org.apache.rocketmq.common.UtilAll;
2740
import org.apache.rocketmq.common.message.MessageDecoder;
2841
import org.apache.rocketmq.common.message.MessageExtBatch;
@@ -43,21 +56,6 @@
4356
import org.apache.rocketmq.store.logfile.MappedFile;
4457
import org.rocksdb.RocksDBException;
4558

46-
import io.openmessaging.storage.dledger.AppendFuture;
47-
import io.openmessaging.storage.dledger.BatchAppendFuture;
48-
import io.openmessaging.storage.dledger.DLedgerConfig;
49-
import io.openmessaging.storage.dledger.DLedgerServer;
50-
import io.openmessaging.storage.dledger.entry.DLedgerEntry;
51-
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
52-
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
53-
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
54-
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
55-
import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
56-
import io.openmessaging.storage.dledger.store.file.MmapFile;
57-
import io.openmessaging.storage.dledger.store.file.MmapFileList;
58-
import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
59-
import io.openmessaging.storage.dledger.utils.DLedgerUtils;
60-
6159
/**
6260
* Store all metadata downtime for recovery, data protection reliability
6361
*/
@@ -428,47 +426,54 @@ private void setRecoverPosition() {
428426
log.info("Will set the initial commitlog offset={} for dledger", dividedCommitlogOffset);
429427
}
430428

431-
private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) {
429+
private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) throws RocksDBException {
432430
ByteBuffer byteBuffer = mmapFile.sliceByteBuffer();
433431

434432
int magicCode = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
435433
if (magicCode != MESSAGE_MAGIC_CODE) {
436434
return false;
437435
}
438436

439-
int storeTimestampPosition;
440-
int sysFlag = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + MessageDecoder.SYSFLAG_POSITION);
441-
if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
442-
storeTimestampPosition = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION;
437+
if (this.defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()) {
438+
final long maxPhyOffsetInConsumeQueue = this.defaultMessageStore.getQueueStore().getMaxPhyOffsetInConsumeQueue();
439+
long phyOffset = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET + MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION);
440+
if (phyOffset <= maxPhyOffsetInConsumeQueue) {
441+
log.info("find check. beginPhyOffset: {}, maxPhyOffsetInConsumeQueue: {}", phyOffset, maxPhyOffsetInConsumeQueue);
442+
return true;
443+
}
443444
} else {
444-
// v6 address is 12 byte larger than v4
445-
storeTimestampPosition = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 12;
446-
}
445+
int storeTimestampPosition;
446+
int sysFlag = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET + MessageDecoder.SYSFLAG_POSITION);
447+
if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
448+
storeTimestampPosition = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION;
449+
} else {
450+
// v6 address is 12 byte larger than v4
451+
storeTimestampPosition = MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 12;
452+
}
447453

448-
long storeTimestamp = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET + storeTimestampPosition);
449-
if (storeTimestamp == 0) {
450-
return false;
451-
}
454+
long storeTimestamp = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET + storeTimestampPosition);
455+
if (storeTimestamp == 0) {
456+
return false;
457+
}
452458

453-
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
459+
if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
454460
&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
455-
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
456-
log.info("dledger find check timestamp, {} {}",
457-
storeTimestamp,
458-
UtilAll.timeMillisToHumanString(storeTimestamp));
459-
return true;
460-
}
461-
} else {
462-
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
463-
log.info("dledger find check timestamp, {} {}",
464-
storeTimestamp,
465-
UtilAll.timeMillisToHumanString(storeTimestamp));
466-
return true;
461+
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
462+
log.info("dledger find check timestamp, {} {}",
463+
storeTimestamp,
464+
UtilAll.timeMillisToHumanString(storeTimestamp));
465+
return true;
466+
}
467+
} else {
468+
if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
469+
log.info("dledger find check timestamp, {} {}",
470+
storeTimestamp,
471+
UtilAll.timeMillisToHumanString(storeTimestamp));
472+
return true;
473+
}
467474
}
468475
}
469-
470476
return false;
471-
472477
}
473478

474479
@Override

store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import java.util.concurrent.Executors;
3030
import java.util.concurrent.ScheduledExecutorService;
3131
import java.util.concurrent.TimeUnit;
32-
3332
import java.util.concurrent.atomic.AtomicReference;
3433
import javax.annotation.Nonnull;
3534
import org.apache.commons.io.FileUtils;
@@ -46,7 +45,6 @@
4645
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
4746
import org.apache.rocketmq.store.DefaultMessageStore;
4847
import org.apache.rocketmq.store.DispatchRequest;
49-
import org.apache.rocketmq.store.config.BrokerRole;
5048
import org.apache.rocketmq.store.config.StorePathConfigHelper;
5149
import org.apache.rocketmq.store.exception.ConsumeQueueException;
5250
import org.apache.rocketmq.store.exception.StoreException;
@@ -265,13 +263,6 @@ private boolean putMessagePosition0(List<DispatchRequest> requests) {
265263
this.rocksDBStorage.batchPut(writeBatch);
266264

267265
this.rocksDBConsumeQueueOffsetTable.putHeapMaxCqOffset(tempTopicQueueMaxOffsetMap);
268-
269-
long storeTimeStamp = requests.get(size - 1).getStoreTimestamp();
270-
if (this.messageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE
271-
|| this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
272-
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimeStamp);
273-
}
274-
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimeStamp);
275266
notifyMessageArriveAndClear(requests);
276267
return true;
277268
} catch (Exception e) {

0 commit comments

Comments
 (0)