Skip to content

Commit 2089abd

Browse files
authored
[ISSUE #9080] Fix tranfer logic when get large messages from cache in tiered storage (#9079)
1 parent 1c35adb commit 2089abd

File tree

2 files changed

+6
-0
lines changed

2 files changed

+6
-0
lines changed

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java

+5
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ public class MessageStoreFetcherImpl implements MessageStoreFetcher {
5656
private final String brokerName;
5757
private final MetadataStore metadataStore;
5858
private final MessageStoreConfig storeConfig;
59+
private final org.apache.rocketmq.store.config.MessageStoreConfig messageStoreConfig;
5960
private final TieredMessageStore messageStore;
6061
private final IndexService indexService;
6162
private final FlatFileStore flatFileStore;
@@ -71,6 +72,7 @@ public MessageStoreFetcherImpl(TieredMessageStore messageStore, MessageStoreConf
7172
FlatFileStore flatFileStore, IndexService indexService) {
7273

7374
this.storeConfig = storeConfig;
75+
this.messageStoreConfig = messageStore.getMessageStoreConfig();
7476
this.brokerName = storeConfig.getBrokerName();
7577
this.flatFileStore = flatFileStore;
7678
this.messageStore = messageStore;
@@ -148,6 +150,9 @@ protected GetMessageResultExt getMessageFromCache(
148150
if (result.getMessageCount() == maxCount) {
149151
break;
150152
}
153+
if (result.getBufferTotalSize() >= messageStoreConfig.getMaxTransferBytesOnMessageInMemory()) {
154+
break;
155+
}
151156
}
152157
result.setStatus(result.getMessageCount() > 0 ?
153158
GetMessageStatus.FOUND : GetMessageStatus.NO_MATCHED_MESSAGE);

tieredstore/src/test/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImplTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public void dispatchFromCommitLogTest() throws Exception {
106106
Mockito.when(messageStore.getStoreExecutor()).thenReturn(executor);
107107
Mockito.when(messageStore.getFlatFileStore()).thenReturn(fileStore);
108108
Mockito.when(messageStore.getIndexService()).thenReturn(indexService);
109+
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(new org.apache.rocketmq.store.config.MessageStoreConfig());
109110

110111
// mock message
111112
ByteBuffer buffer = MessageFormatUtilTest.buildMockedMessageBuffer();

0 commit comments

Comments
 (0)