Skip to content

Commit 047dfcc

Browse files
committed
[ISSUE apache#9213] Fix get the earliest time error when data is clean up in tiered storage
1 parent 5936695 commit 047dfcc

File tree

2 files changed

+11
-3
lines changed

2 files changed

+11
-3
lines changed

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -318,12 +318,14 @@ public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic, int que
318318
return fetcher.getEarliestMessageTimeAsync(topic, queueId)
319319
.thenApply(time -> {
320320
Attributes latencyAttributes = TieredStoreMetricsManager.newAttributesBuilder()
321-
.put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_EARLIEST_MESSAGE_TIME)
321+
.put(TieredStoreMetricsConstant.LABEL_OPERATION,
322+
TieredStoreMetricsConstant.OPERATION_API_GET_EARLIEST_MESSAGE_TIME)
322323
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
323324
.build();
324325
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
325326
if (time < 0) {
326-
log.debug("GetEarliestMessageTimeAsync failed, try to get earliest message time from next store: topic: {}, queue: {}",
327+
log.debug("GetEarliestMessageTimeAsync failed, " +
328+
"try to get earliest message time from next store: topic: {}, queue: {}",
327329
topic, queueId);
328330
return finalNextEarliestMessageTime != Long.MAX_VALUE ? finalNextEarliestMessageTime : -1;
329331
}

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -375,12 +375,18 @@ public CompletableFuture<GetMessageResult> getMessageAsync(
375375
@Override
376376
public CompletableFuture<Long> getEarliestMessageTimeAsync(String topic, int queueId) {
377377
FlatMessageFile flatFile = flatFileStore.getFlatFile(new MessageQueue(topic, brokerName, queueId));
378+
378379
if (flatFile == null) {
379380
return CompletableFuture.completedFuture(-1L);
380381
}
381382

382-
// read from timestamp to timestamp + length
383383
int length = MessageFormatUtil.STORE_TIMESTAMP_POSITION + 8;
384+
if (flatFile.getConsumeQueueCommitOffset() <= flatFile.getConsumeQueueMinOffset() ||
385+
flatFile.getCommitLogCommitOffset() - flatFile.getCommitLogMinOffset() <= length) {
386+
return CompletableFuture.completedFuture(-1L);
387+
}
388+
389+
// read from timestamp to timestamp + length
384390
return flatFile.getCommitLogAsync(flatFile.getCommitLogMinOffset(), length)
385391
.thenApply(MessageFormatUtil::getStoreTimeStamp);
386392
}

0 commit comments

Comments
 (0)