Skip to content

Commit ecd3007

Browse files
committed
[ISSUE apache#9152] The getConsumeStats supports inputting multiple topics
1 parent a275510 commit ecd3007

File tree

3 files changed

+86
-25
lines changed

3 files changed

+86
-25
lines changed

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

+37-22
Original file line numberDiff line numberDiff line change
@@ -1947,37 +1947,21 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
19471947
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
19481948
try {
19491949
final GetConsumeStatsRequestHeader requestHeader = request.decodeCommandCustomHeader(GetConsumeStatsRequestHeader.class);
1950-
ConsumeStats consumeStats = new ConsumeStats();
1950+
List<String> topicListProvided = requestHeader.fetchTopicList();
1951+
String topicProvided = requestHeader.getTopic();
1952+
String group = requestHeader.getConsumerGroup();
19511953

1952-
Set<String> topics = new HashSet<>();
1953-
if (UtilAll.isBlank(requestHeader.getTopic())) {
1954-
topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getConsumerGroup());
1955-
} else {
1956-
topics.add(requestHeader.getTopic());
1957-
}
1954+
ConsumeStats consumeStats = new ConsumeStats();
1955+
Set<String> topicsForCollecting = getTopicsForCollecting(topicListProvided, topicProvided, group);
19581956

1959-
for (String topic : topics) {
1957+
for (String topic : topicsForCollecting) {
19601958
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
19611959
if (null == topicConfig) {
19621960
LOGGER.warn("AdminBrokerProcessor#getConsumeStats: topic config does not exist, topic={}", topic);
19631961
continue;
19641962
}
19651963

19661964
TopicQueueMappingDetail mappingDetail = this.brokerController.getTopicQueueMappingManager().getTopicQueueMapping(topic);
1967-
1968-
{
1969-
SubscriptionData findSubscriptionData =
1970-
this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), topic);
1971-
1972-
if (null == findSubscriptionData
1973-
&& this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getConsumerGroup()) > 0) {
1974-
LOGGER.warn(
1975-
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, "
1976-
+ "topic={}, consumer group={}", topic, requestHeader.getConsumerGroup());
1977-
continue;
1978-
}
1979-
}
1980-
19811965
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
19821966
MessageQueue mq = new MessageQueue();
19831967
mq.setTopic(topic);
@@ -2038,6 +2022,37 @@ private RemotingCommand getConsumeStats(ChannelHandlerContext ctx,
20382022
return response;
20392023
}
20402024

2025+
private Set<String> getTopicsForCollecting(List<String> topicListProvided, String topicProvided, String group) {
2026+
Set<String> topicsForCollecting = new HashSet<>();
2027+
if (!topicListProvided.isEmpty()) {
2028+
// if topic list is provided, only collect the topics in the list
2029+
// and ignore subscription check
2030+
topicsForCollecting.addAll(topicListProvided);
2031+
} else {
2032+
// In order to be compatible with the old logic,
2033+
// even if the topic has been provided here, the subscription will be checked.
2034+
if (UtilAll.isBlank(topicProvided)) {
2035+
topicsForCollecting.addAll(
2036+
this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group));
2037+
} else {
2038+
topicsForCollecting.add(topicProvided);
2039+
}
2040+
int subscriptionCount = this.brokerController.getConsumerManager().findSubscriptionDataCount(group);
2041+
Iterator<String> iterator = topicsForCollecting.iterator();
2042+
while (iterator.hasNext()) {
2043+
String topic = iterator.next();
2044+
SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);
2045+
if (findSubscriptionData == null && subscriptionCount > 0) {
2046+
LOGGER.warn(
2047+
"AdminBrokerProcessor#getConsumeStats: topic does not exist in consumer group's subscription, topic={}, consumer group={}",
2048+
topic, group);
2049+
iterator.remove();
2050+
}
2051+
}
2052+
}
2053+
return topicsForCollecting;
2054+
}
2055+
20412056
private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) {
20422057
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
20432058

client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -1748,16 +1748,27 @@ public TopicStatsTable getTopicStatsInfo(final String addr, final String topic,
17481748
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final long timeoutMillis)
17491749
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
17501750
MQBrokerException {
1751-
return getConsumeStats(addr, consumerGroup, null, timeoutMillis);
1751+
return getConsumeStats(addr, consumerGroup, null, null, timeoutMillis);
1752+
}
1753+
1754+
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final List<String> topicList,
1755+
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
1756+
return getConsumeStats(addr, consumerGroup, null, topicList, timeoutMillis);
17521757
}
17531758

17541759
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
1755-
final long timeoutMillis)
1760+
final long timeoutMillis) throws RemotingSendRequestException, RemotingConnectException, RemotingTimeoutException, MQBrokerException, InterruptedException {
1761+
return getConsumeStats(addr, consumerGroup, topic, null, timeoutMillis);
1762+
}
1763+
1764+
public ConsumeStats getConsumeStats(final String addr, final String consumerGroup, final String topic,
1765+
final List<String> topicList, final long timeoutMillis)
17561766
throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException,
17571767
MQBrokerException {
17581768
GetConsumeStatsRequestHeader requestHeader = new GetConsumeStatsRequestHeader();
17591769
requestHeader.setConsumerGroup(consumerGroup);
17601770
requestHeader.setTopic(topic);
1771+
requestHeader.updateTopicList(topicList);
17611772

17621773
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, requestHeader);
17631774

remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/GetConsumeStatsRequestHeader.java

+36-1
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,62 @@
1717
package org.apache.rocketmq.remoting.protocol.header;
1818

1919
import com.google.common.base.MoreObjects;
20+
import java.util.Arrays;
21+
import java.util.Collections;
22+
import java.util.List;
23+
import org.apache.commons.lang3.StringUtils;
2024
import org.apache.rocketmq.common.action.Action;
2125
import org.apache.rocketmq.common.action.RocketMQAction;
2226
import org.apache.rocketmq.common.resource.ResourceType;
2327
import org.apache.rocketmq.common.resource.RocketMQResource;
2428
import org.apache.rocketmq.remoting.annotation.CFNotNull;
2529
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
26-
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
2730
import org.apache.rocketmq.remoting.protocol.RequestCode;
31+
import org.apache.rocketmq.remoting.rpc.TopicRequestHeader;
2832

2933
@RocketMQAction(value = RequestCode.GET_CONSUME_STATS, action = Action.GET)
3034
public class GetConsumeStatsRequestHeader extends TopicRequestHeader {
35+
private static final String TOPIC_NAME_SEPARATOR = ";";
36+
3137
@CFNotNull
3238
@RocketMQResource(ResourceType.GROUP)
3339
private String consumerGroup;
40+
3441
@RocketMQResource(ResourceType.TOPIC)
3542
private String topic;
3643

44+
// if topicList is provided, topic will be ignored
45+
@RocketMQResource(ResourceType.TOPIC)
46+
private String topicList;
47+
3748
@Override
3849
public void checkFields() throws RemotingCommandException {
3950
}
4051

52+
public List<String> fetchTopicList() {
53+
if (StringUtils.isBlank(topicList)) {
54+
return Collections.emptyList();
55+
}
56+
return Arrays.asList(StringUtils.split(topicList, TOPIC_NAME_SEPARATOR));
57+
}
58+
59+
public void updateTopicList(List<String> topicList) {
60+
if (topicList == null) {
61+
return;
62+
}
63+
StringBuilder sb = new StringBuilder();
64+
topicList.forEach(topic -> sb.append(topic).append(TOPIC_NAME_SEPARATOR));
65+
this.setTopicList(sb.toString());
66+
}
67+
68+
public String getTopicList() {
69+
return topicList;
70+
}
71+
72+
public void setTopicList(String topicList) {
73+
this.topicList = topicList;
74+
}
75+
4176
public String getConsumerGroup() {
4277
return consumerGroup;
4378
}

0 commit comments

Comments
 (0)