Skip to content

Commit 62be041

Browse files
yangxiaohui-collyangxiaohui
andauthored
[ISSUE #7948] Prevent invoking the queryMessage method lead to OOM (#9265)
Co-authored-by: yangxiaohui <[email protected]>
1 parent c5cd32a commit 62be041

File tree

2 files changed

+50
-2
lines changed

2 files changed

+50
-2
lines changed

store/src/main/java/org/apache/rocketmq/store/index/IndexService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,10 @@ public void destroy() {
164164
}
165165

166166
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
167-
List<Long> phyOffsets = new ArrayList<>(maxNum);
168-
169167
long indexLastUpdateTimestamp = 0;
170168
long indexLastUpdatePhyoffset = 0;
171169
maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
170+
List<Long> phyOffsets = new ArrayList<>(maxNum);
172171
try {
173172
this.readWriteLock.readLock().lock();
174173
if (!this.indexFileList.isEmpty()) {
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.store.index;
19+
20+
import org.apache.rocketmq.common.BrokerConfig;
21+
import org.apache.rocketmq.store.DefaultMessageStore;
22+
import org.apache.rocketmq.store.config.MessageStoreConfig;
23+
import org.apache.rocketmq.store.stats.BrokerStatsManager;
24+
import org.junit.Test;
25+
26+
import java.util.concurrent.ConcurrentHashMap;
27+
28+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
29+
30+
31+
public class IndexServiceTest {
32+
33+
@Test
34+
public void testQueryOffsetThrow() throws Exception {
35+
assertDoesNotThrow(() -> {
36+
DefaultMessageStore store = new DefaultMessageStore(
37+
new MessageStoreConfig(),
38+
new BrokerStatsManager(new BrokerConfig()),
39+
null,
40+
new BrokerConfig(),
41+
new ConcurrentHashMap<>()
42+
);
43+
44+
IndexService indexService = new IndexService(store);
45+
indexService.queryOffset("test", "", Integer.MAX_VALUE, 10, 100);
46+
});
47+
}
48+
49+
}

0 commit comments

Comments
 (0)