Skip to content

Commit 11f6edd

Browse files
authored
Merge pull request alibaba#620 from RongtongJin/polish_filter_example
[ISSUE alibaba#608] Polish filter example
2 parents a7b0c27 + e204376 commit 11f6edd

File tree

5 files changed

+76
-112
lines changed

5 files changed

+76
-112
lines changed

example/src/main/java/org/apache/rocketmq/example/filter/SqlConsumer.java renamed to example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterConsumer.java

+10-19
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,24 @@
1717

1818
package org.apache.rocketmq.example.filter;
1919

20+
import java.util.List;
2021
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
2122
import org.apache.rocketmq.client.consumer.MessageSelector;
2223
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
2324
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
2425
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
25-
import org.apache.rocketmq.client.exception.MQClientException;
2626
import org.apache.rocketmq.common.message.MessageExt;
2727

28-
import java.util.List;
28+
public class SqlFilterConsumer {
29+
30+
public static void main(String[] args) throws Exception {
2931

30-
public class SqlConsumer {
32+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
3133

32-
public static void main(String[] args) {
33-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
34-
try {
35-
consumer.subscribe("TopicTest",
36-
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
37-
"and (a is not null and a between 0 3)"));
38-
} catch (MQClientException e) {
39-
e.printStackTrace();
40-
return;
41-
}
34+
// Don't forget to set enablePropertyFilter=true in broker
35+
consumer.subscribe("SqlFilterTest",
36+
MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +
37+
"and (a is not null and a between 0 and 3)"));
4238

4339
consumer.registerMessageListener(new MessageListenerConcurrently() {
4440

@@ -50,12 +46,7 @@ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
5046
}
5147
});
5248

53-
try {
54-
consumer.start();
55-
} catch (MQClientException e) {
56-
e.printStackTrace();
57-
return;
58-
}
49+
consumer.start();
5950
System.out.printf("Consumer Started.%n");
6051
}
6152
}

example/src/main/java/org/apache/rocketmq/example/filter/Producer.java renamed to example/src/main/java/org/apache/rocketmq/example/filter/SqlFilterProducer.java

+19-17
Original file line numberDiff line numberDiff line change
@@ -14,33 +14,35 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
1718
package org.apache.rocketmq.example.filter;
1819

19-
import org.apache.rocketmq.client.exception.MQClientException;
2020
import org.apache.rocketmq.client.producer.DefaultMQProducer;
2121
import org.apache.rocketmq.client.producer.SendResult;
2222
import org.apache.rocketmq.common.message.Message;
2323
import org.apache.rocketmq.remoting.common.RemotingHelper;
2424

25-
public class Producer {
26-
public static void main(String[] args) throws MQClientException, InterruptedException {
27-
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
25+
public class SqlFilterProducer {
26+
27+
public static void main(String[] args) throws Exception {
28+
29+
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
30+
2831
producer.start();
2932

30-
try {
31-
for (int i = 0; i < 6000000; i++) {
32-
Message msg = new Message("TopicFilter7",
33-
"TagA",
34-
"OrderID001",
35-
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
36-
37-
msg.putUserProperty("SequenceId", String.valueOf(i));
38-
SendResult sendResult = producer.send(msg);
39-
System.out.printf("%s%n", sendResult);
40-
}
41-
} catch (Exception e) {
42-
e.printStackTrace();
33+
String[] tags = new String[] {"TagA", "TagB", "TagC"};
34+
35+
for (int i = 0; i < 10; i++) {
36+
Message msg = new Message("SqlFilterTest",
37+
tags[i % tags.length],
38+
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
39+
);
40+
msg.putUserProperty("a", String.valueOf(i));
41+
42+
SendResult sendResult = producer.send(msg);
43+
System.out.printf("%s%n", sendResult);
4344
}
45+
4446
producer.shutdown();
4547
}
4648
}

example/src/main/java/org/apache/rocketmq/example/filter/SqlProducer.java

-67
This file was deleted.

example/src/main/java/org/apache/rocketmq/example/filter/Consumer.java renamed to example/src/main/java/org/apache/rocketmq/example/filter/TagFilterConsumer.java

+3-9
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,22 @@
1616
*/
1717
package org.apache.rocketmq.example.filter;
1818

19-
import java.io.File;
2019
import java.io.IOException;
2120
import java.util.List;
2221
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
2322
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
2423
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
2524
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
2625
import org.apache.rocketmq.client.exception.MQClientException;
27-
import org.apache.rocketmq.common.MixAll;
2826
import org.apache.rocketmq.common.message.MessageExt;
2927

30-
public class Consumer {
28+
public class TagFilterConsumer {
3129

3230
public static void main(String[] args) throws InterruptedException, MQClientException, IOException {
33-
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
3431

35-
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
36-
File classFile = new File(classLoader.getResource("MessageFilterImpl.java").getFile());
32+
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
3733

38-
String filterCode = MixAll.file2String(classFile);
39-
consumer.subscribe("TopicTest", "org.apache.rocketmq.example.filter.MessageFilterImpl",
40-
filterCode);
34+
consumer.subscribe("TagFilterTest", "TagA || TagC");
4135

4236
consumer.registerMessageListener(new MessageListenerConcurrently() {
4337

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.example.filter;
18+
19+
import org.apache.rocketmq.client.producer.DefaultMQProducer;
20+
import org.apache.rocketmq.client.producer.SendResult;
21+
import org.apache.rocketmq.common.message.Message;
22+
import org.apache.rocketmq.remoting.common.RemotingHelper;
23+
24+
public class TagFilterProducer {
25+
26+
public static void main(String[] args) throws Exception {
27+
28+
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
29+
producer.start();
30+
31+
String[] tags = new String[] {"TagA", "TagB", "TagC"};
32+
33+
for (int i = 0; i < 60; i++) {
34+
Message msg = new Message("TagFilterTest",
35+
tags[i % tags.length],
36+
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
37+
38+
SendResult sendResult = producer.send(msg);
39+
System.out.printf("%s%n", sendResult);
40+
}
41+
42+
producer.shutdown();
43+
}
44+
}

0 commit comments

Comments
 (0)