Skip to content

Commit f0cae5b

Browse files
committed
[ISSUE apache#8974] Support recalling of delay message, unit test and integration test
1 parent 041b414 commit f0cae5b

File tree

17 files changed

+1204
-1
lines changed

17 files changed

+1204
-1
lines changed

acl/src/test/java/org/apache/rocketmq/acl/plain/PlainAccessResourceTest.java

+37
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,15 @@
1919

2020
import java.util.HashMap;
2121
import java.util.Map;
22+
import apache.rocketmq.v2.RecallMessageRequest;
23+
import apache.rocketmq.v2.Resource;
24+
import com.google.protobuf.GeneratedMessageV3;
25+
import org.apache.rocketmq.acl.common.AuthenticationHeader;
2226
import org.apache.rocketmq.acl.common.Permission;
2327
import org.apache.rocketmq.common.MixAll;
2428
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
2529
import org.apache.rocketmq.remoting.protocol.RequestCode;
30+
import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader;
2631
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
2732
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
2833
import org.junit.Assert;
@@ -33,6 +38,8 @@ public class PlainAccessResourceTest {
3338
public static final String DEFAULT_PRODUCER_GROUP = "PID_acl";
3439
public static final String DEFAULT_CONSUMER_GROUP = "GID_acl";
3540
public static final String DEFAULT_REMOTE_ADDR = "192.128.1.1";
41+
public static final String AUTH_HEADER =
42+
"Signature Credential=1234567890/test, SignedHeaders=host, Signature=1234567890";
3643

3744
@Test
3845
public void testParseSendNormal() {
@@ -93,4 +100,34 @@ public void testParseSendRetryV2() {
93100

94101
Assert.assertEquals(permMap, accessResource.getResourcePermMap());
95102
}
103+
104+
@Test
105+
public void testParseRecallMessage() {
106+
// remoting
107+
RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader();
108+
requestHeader.setTopic(DEFAULT_TOPIC);
109+
requestHeader.setProducerGroup(DEFAULT_PRODUCER_GROUP);
110+
requestHeader.setRecallHandle("handle");
111+
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, requestHeader);
112+
request.makeCustomHeaderToNet();
113+
114+
PlainAccessResource accessResource = PlainAccessResource.parse(request, DEFAULT_REMOTE_ADDR);
115+
Assert.assertTrue(Permission.PUB == accessResource.getResourcePermMap().get(DEFAULT_TOPIC));
116+
117+
// grpc
118+
GeneratedMessageV3 grpcRequest = RecallMessageRequest.newBuilder()
119+
.setTopic(Resource.newBuilder().setName(DEFAULT_TOPIC).build())
120+
.setRecallHandle("handle")
121+
.build();
122+
accessResource = PlainAccessResource.parse(grpcRequest, mockAuthenticationHeader());
123+
Assert.assertTrue(Permission.PUB == accessResource.getResourcePermMap().get(DEFAULT_TOPIC));
124+
}
125+
126+
private AuthenticationHeader mockAuthenticationHeader() {
127+
return AuthenticationHeader.builder()
128+
.remoteAddress(DEFAULT_REMOTE_ADDR)
129+
.authorization(AUTH_HEADER)
130+
.datetime("datetime")
131+
.build();
132+
}
96133
}

auth/src/test/java/org/apache/rocketmq/auth/authorization/builder/DefaultAuthorizationContextBuilderTest.java

+31
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import apache.rocketmq.v2.Publishing;
2929
import apache.rocketmq.v2.QueryAssignmentRequest;
3030
import apache.rocketmq.v2.QueryRouteRequest;
31+
import apache.rocketmq.v2.RecallMessageRequest;
3132
import apache.rocketmq.v2.ReceiveMessageRequest;
3233
import apache.rocketmq.v2.Resource;
3334
import apache.rocketmq.v2.SendMessageRequest;
@@ -65,6 +66,7 @@
6566
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
6667
import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
6768
import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader;
69+
import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader;
6870
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
6971
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
7072
import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
@@ -122,6 +124,19 @@ public void buildGrpc() {
122124
Assert.assertEquals(result.get(0).getChannelId(), "channel-id");
123125
Assert.assertEquals(result.get(0).getRpcCode(), SendMessageRequest.getDescriptor().getFullName());
124126

127+
request = RecallMessageRequest.newBuilder()
128+
.setTopic(Resource.newBuilder().setName("topic").build())
129+
.setRecallHandle("handle")
130+
.build();
131+
result = builder.build(metadata, request);
132+
Assert.assertEquals(1, result.size());
133+
Assert.assertEquals(result.get(0).getSubject().getSubjectKey(), "User:rocketmq");
134+
Assert.assertEquals(result.get(0).getResource().getResourceKey(), "Topic:topic");
135+
Assert.assertTrue(result.get(0).getActions().containsAll(Arrays.asList(Action.PUB)));
136+
Assert.assertEquals(result.get(0).getSourceIp(), "192.168.0.1");
137+
Assert.assertEquals(result.get(0).getChannelId(), "channel-id");
138+
Assert.assertEquals(result.get(0).getRpcCode(), RecallMessageRequest.getDescriptor().getFullName());
139+
125140
request = EndTransactionRequest.newBuilder()
126141
.setTopic(Resource.newBuilder().setName("topic").build())
127142
.build();
@@ -315,6 +330,22 @@ public void buildRemoting() {
315330
Assert.assertEquals("Group:group", result.get(0).getResource().getResourceKey());
316331
Assert.assertTrue(result.get(0).getActions().containsAll(Arrays.asList(Action.SUB)));
317332

333+
RecallMessageRequestHeader recallMessageRequestHeader = new RecallMessageRequestHeader();
334+
recallMessageRequestHeader.setTopic("topic");
335+
recallMessageRequestHeader.setRecallHandle("handle");
336+
request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, recallMessageRequestHeader);
337+
request.setVersion(441);
338+
request.addExtField("AccessKey", "rocketmq");
339+
request.makeCustomHeaderToNet();
340+
result = builder.build(channelHandlerContext, request);
341+
Assert.assertEquals(1, result.size());
342+
Assert.assertEquals("User:rocketmq", result.get(0).getSubject().getSubjectKey());
343+
Assert.assertEquals("Topic:topic", result.get(0).getResource().getResourceKey());
344+
Assert.assertTrue(result.get(0).getActions().containsAll(Arrays.asList(Action.PUB)));
345+
Assert.assertEquals("192.168.0.1", result.get(0).getSourceIp());
346+
Assert.assertEquals("channel-id", result.get(0).getChannelId());
347+
Assert.assertEquals(RequestCode.RECALL_MESSAGE + "", result.get(0).getRpcCode());
348+
318349
EndTransactionRequestHeader endTransactionRequestHeader = new EndTransactionRequestHeader();
319350
endTransactionRequestHeader.setTopic("topic");
320351
request = RemotingCommand.createRequestCommand(RequestCode.END_TRANSACTION, endTransactionRequestHeader);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.broker.processor;
19+
20+
import io.netty.channel.Channel;
21+
import io.netty.channel.ChannelHandlerContext;
22+
import org.apache.rocketmq.broker.BrokerController;
23+
import org.apache.rocketmq.broker.topic.TopicConfigManager;
24+
import org.apache.rocketmq.common.BrokerConfig;
25+
import org.apache.rocketmq.common.TopicConfig;
26+
import org.apache.rocketmq.common.message.MessageAccessor;
27+
import org.apache.rocketmq.common.message.MessageConst;
28+
import org.apache.rocketmq.common.message.MessageDecoder;
29+
import org.apache.rocketmq.common.message.MessageExt;
30+
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
31+
import org.apache.rocketmq.common.producer.RecallMessageHandle;
32+
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
33+
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
34+
import org.apache.rocketmq.remoting.protocol.RequestCode;
35+
import org.apache.rocketmq.remoting.protocol.ResponseCode;
36+
import org.apache.rocketmq.remoting.protocol.header.RecallMessageRequestHeader;
37+
import org.apache.rocketmq.remoting.protocol.header.RecallMessageResponseHeader;
38+
import org.apache.rocketmq.store.AppendMessageResult;
39+
import org.apache.rocketmq.store.AppendMessageStatus;
40+
import org.apache.rocketmq.store.MessageStore;
41+
import org.apache.rocketmq.store.PutMessageResult;
42+
import org.apache.rocketmq.store.PutMessageStatus;
43+
import org.apache.rocketmq.store.config.BrokerRole;
44+
import org.apache.rocketmq.store.config.MessageStoreConfig;
45+
import org.apache.rocketmq.store.stats.BrokerStatsManager;
46+
import org.junit.Assert;
47+
import org.junit.Before;
48+
import org.junit.Test;
49+
import org.junit.runner.RunWith;
50+
import org.mockito.Mock;
51+
import org.mockito.junit.MockitoJUnitRunner;
52+
53+
import java.util.Arrays;
54+
import java.util.List;
55+
import java.util.Map;
56+
57+
import static org.mockito.ArgumentMatchers.any;
58+
import static org.mockito.Mockito.times;
59+
import static org.mockito.Mockito.when;
60+
import static org.mockito.Mockito.verify;
61+
62+
@RunWith(MockitoJUnitRunner.class)
63+
public class RecallMessageProcessorTest {
64+
private static final String TOPIC = "topic";
65+
private static final String BROKER_NAME = "brokerName";
66+
67+
private RecallMessageProcessor recallMessageProcessor;
68+
@Mock
69+
private BrokerConfig brokerConfig;
70+
@Mock
71+
private BrokerController brokerController;
72+
@Mock
73+
private ChannelHandlerContext handlerContext;
74+
@Mock
75+
private MessageStoreConfig messageStoreConfig;
76+
@Mock
77+
private TopicConfigManager topicConfigManager;
78+
@Mock
79+
private MessageStore messageStore;
80+
@Mock
81+
private BrokerStatsManager brokerStatsManager;
82+
@Mock
83+
private Channel channel;
84+
85+
@Before
86+
public void init() throws IllegalAccessException, NoSuchFieldException {
87+
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
88+
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
89+
when(brokerController.getMessageStore()).thenReturn(messageStore);
90+
when(brokerController.getBrokerConfig()).thenReturn(brokerConfig);
91+
when(brokerConfig.getBrokerName()).thenReturn(BROKER_NAME);
92+
when(brokerController.getBrokerStatsManager()).thenReturn(brokerStatsManager);
93+
when(handlerContext.channel()).thenReturn(channel);
94+
recallMessageProcessor = new RecallMessageProcessor(brokerController);
95+
}
96+
97+
@Test
98+
public void testBuildMessage() {
99+
String timestampStr = String.valueOf(System.currentTimeMillis());
100+
String id = "id";
101+
RecallMessageHandle.HandleV1 handle = new RecallMessageHandle.HandleV1(TOPIC, "brokerName", timestampStr, id);
102+
MessageExtBrokerInner msg =
103+
recallMessageProcessor.buildMessage(handlerContext, new RecallMessageRequestHeader(), handle);
104+
105+
Assert.assertEquals(TOPIC, msg.getTopic());
106+
Map<String, String> properties = MessageDecoder.string2messageProperties(msg.getPropertiesString());
107+
Assert.assertEquals(timestampStr, properties.get(MessageConst.PROPERTY_TIMER_DELIVER_MS));
108+
Assert.assertEquals(id, properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
109+
Assert.assertEquals(id, properties.get(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
110+
}
111+
112+
@Test
113+
public void testHandlePutMessageResult() {
114+
MessageExt message = new MessageExt();
115+
MessageAccessor.putProperty(message, MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "id");
116+
RemotingCommand response = RemotingCommand.createResponseCommand(RecallMessageResponseHeader.class);
117+
recallMessageProcessor.handlePutMessageResult(null, null, response, message, handlerContext, 0L);
118+
Assert.assertEquals(ResponseCode.SYSTEM_ERROR, response.getCode());
119+
120+
List<PutMessageStatus> okStatus = Arrays.asList(PutMessageStatus.PUT_OK, PutMessageStatus.FLUSH_DISK_TIMEOUT,
121+
PutMessageStatus.FLUSH_SLAVE_TIMEOUT, PutMessageStatus.SLAVE_NOT_AVAILABLE);
122+
123+
for (PutMessageStatus status : PutMessageStatus.values()) {
124+
PutMessageResult putMessageResult =
125+
new PutMessageResult(status, new AppendMessageResult(AppendMessageStatus.PUT_OK));
126+
recallMessageProcessor.handlePutMessageResult(putMessageResult, null, response, message, handlerContext, 0L);
127+
if (okStatus.contains(status)) {
128+
Assert.assertEquals(ResponseCode.SUCCESS, response.getCode());
129+
RecallMessageResponseHeader responseHeader = (RecallMessageResponseHeader) response.readCustomHeader();
130+
Assert.assertEquals("id", responseHeader.getMsgId());
131+
} else {
132+
Assert.assertEquals(ResponseCode.SYSTEM_ERROR, response.getCode());
133+
}
134+
}
135+
}
136+
137+
@Test
138+
public void testProcessRequest_invalidStatus() throws RemotingCommandException {
139+
RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME);
140+
RemotingCommand response;
141+
142+
// role slave
143+
when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
144+
response = recallMessageProcessor.processRequest(handlerContext, request);
145+
Assert.assertEquals(ResponseCode.SLAVE_NOT_AVAILABLE, response.getCode());
146+
147+
// not reach startTimestamp
148+
when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SYNC_MASTER);
149+
when(messageStore.now()).thenReturn(0L);
150+
when(brokerConfig.getStartAcceptSendRequestTimeStamp()).thenReturn(System.currentTimeMillis());
151+
response = recallMessageProcessor.processRequest(handlerContext, request);
152+
Assert.assertEquals(ResponseCode.SERVICE_NOT_AVAILABLE, response.getCode());
153+
}
154+
155+
@Test
156+
public void testProcessRequest_notWriteable() throws RemotingCommandException {
157+
when(brokerConfig.getBrokerPermission()).thenReturn(4);
158+
when(brokerConfig.isAllowRecallWhenBrokerNotWriteable()).thenReturn(false);
159+
RemotingCommand request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME);
160+
RemotingCommand response = recallMessageProcessor.processRequest(handlerContext, request);
161+
Assert.assertEquals(ResponseCode.SERVICE_NOT_AVAILABLE, response.getCode());
162+
}
163+
164+
@Test
165+
public void testProcessRequest_topicNotFound_or_notMatch() throws RemotingCommandException {
166+
when(brokerConfig.getBrokerPermission()).thenReturn(6);
167+
RemotingCommand request;
168+
RemotingCommand response;
169+
170+
// not found
171+
request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME);
172+
response = recallMessageProcessor.processRequest(handlerContext, request);
173+
Assert.assertEquals(ResponseCode.TOPIC_NOT_EXIST, response.getCode());
174+
175+
// not match
176+
when(topicConfigManager.selectTopicConfig(TOPIC)).thenReturn(new TopicConfig(TOPIC));
177+
request = mockRequest(0, TOPIC, "anotherTopic", "id", BROKER_NAME);
178+
response = recallMessageProcessor.processRequest(handlerContext, request);
179+
Assert.assertEquals(ResponseCode.ILLEGAL_OPERATION, response.getCode());
180+
}
181+
182+
@Test
183+
public void testProcessRequest_brokerNameNotMatch() throws RemotingCommandException {
184+
when(brokerConfig.getBrokerPermission()).thenReturn(6);
185+
when(topicConfigManager.selectTopicConfig(TOPIC)).thenReturn(new TopicConfig(TOPIC));
186+
187+
RemotingCommand request = mockRequest(0, TOPIC, "anotherTopic", "id", BROKER_NAME + "_other");
188+
RemotingCommand response = recallMessageProcessor.processRequest(handlerContext, request);
189+
Assert.assertEquals(ResponseCode.ILLEGAL_OPERATION, response.getCode());
190+
}
191+
192+
@Test
193+
public void testProcessRequest_timestampInvalid() throws RemotingCommandException {
194+
when(brokerConfig.getBrokerPermission()).thenReturn(6);
195+
when(topicConfigManager.selectTopicConfig(TOPIC)).thenReturn(new TopicConfig(TOPIC));
196+
RemotingCommand request;
197+
RemotingCommand response;
198+
199+
// past timestamp
200+
request = mockRequest(0, TOPIC, TOPIC, "id", BROKER_NAME);
201+
response = recallMessageProcessor.processRequest(handlerContext, request);
202+
Assert.assertEquals(ResponseCode.ILLEGAL_OPERATION, response.getCode());
203+
204+
// timestamp overflow
205+
when(messageStoreConfig.getTimerMaxDelaySec()).thenReturn(86400);
206+
request = mockRequest(System.currentTimeMillis() + 86400 * 2 * 1000, TOPIC, TOPIC, "id", BROKER_NAME);
207+
response = recallMessageProcessor.processRequest(handlerContext, request);
208+
Assert.assertEquals(ResponseCode.ILLEGAL_OPERATION, response.getCode());
209+
}
210+
211+
@Test
212+
public void testProcessRequest_success() throws RemotingCommandException {
213+
when(brokerConfig.getBrokerPermission()).thenReturn(6);
214+
when(topicConfigManager.selectTopicConfig(TOPIC)).thenReturn(new TopicConfig(TOPIC));
215+
when(messageStoreConfig.getTimerMaxDelaySec()).thenReturn(86400);
216+
when(messageStore.putMessage(any())).thenReturn(
217+
new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
218+
219+
String msgId = "msgId";
220+
RemotingCommand request = mockRequest(System.currentTimeMillis() + 90 * 1000, TOPIC, TOPIC, msgId, BROKER_NAME);
221+
RemotingCommand response = recallMessageProcessor.processRequest(handlerContext, request);
222+
RecallMessageResponseHeader responseHeader = (RecallMessageResponseHeader) response.readCustomHeader();
223+
Assert.assertEquals(ResponseCode.SUCCESS, response.getCode());
224+
Assert.assertEquals(msgId, responseHeader.getMsgId());
225+
verify(messageStore, times(1)).putMessage(any());
226+
}
227+
228+
private RemotingCommand mockRequest(long timestamp, String requestTopic, String handleTopic,
229+
String msgId, String brokerName) {
230+
String handle =
231+
RecallMessageHandle.HandleV1.buildHandle(handleTopic, brokerName, String.valueOf(timestamp), msgId);
232+
RecallMessageRequestHeader requestHeader = new RecallMessageRequestHeader();
233+
requestHeader.setProducerGroup("group");
234+
requestHeader.setTopic(requestTopic);
235+
requestHeader.setRecallHandle(handle);
236+
requestHeader.setBrokerName(brokerName);
237+
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RECALL_MESSAGE, requestHeader);
238+
request.makeCustomHeaderToNet();
239+
return request;
240+
}
241+
}

0 commit comments

Comments
 (0)