Skip to content

Commit 7f3bbf0

Browse files
committed
Throw exception if echoed filters do not match requested filters
Fixes #217
1 parent 9f04877 commit 7f3bbf0

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ private ClientReceiver createNativeReceiver(
219219
(ClientReceiver)
220220
ExceptionUtils.wrapGet(
221221
nativeSession.openReceiver(address, receiverOptions).openFuture());
222+
boolean filterOk = true;
222223
if (!filters.isEmpty()) {
223224
Map<String, String> remoteSourceFilters = receiver.source().filters();
224225
for (Map.Entry<String, Object> localEntry : localSourceFilters.entrySet()) {
@@ -227,9 +228,15 @@ private ClientReceiver createNativeReceiver(
227228
"Missing filter value in attach response: {} => {}",
228229
localEntry.getKey(),
229230
localEntry.getValue());
231+
filterOk = false;
230232
}
231233
}
232234
}
235+
if (!filterOk) {
236+
receiver.close();
237+
throw new AmqpException(
238+
"The sending endpoint filters do not match the receiving endpoint filters");
239+
}
233240
return receiver;
234241
} catch (ClientException e) {
235242
throw ExceptionUtils.convert(e, "Error while creating receiver from '%s'", address);

src/test/java/com/rabbitmq/client/amqp/impl/SourceFiltersTest.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.assertj.core.api.Assertions.assertThat;
2929
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3030

31+
import com.rabbitmq.client.amqp.AmqpException;
3132
import com.rabbitmq.client.amqp.Connection;
3233
import com.rabbitmq.client.amqp.Consumer;
3334
import com.rabbitmq.client.amqp.ConsumerBuilder;
@@ -472,7 +473,7 @@ void filterExpressionStringModifier() {
472473
@Test
473474
// TODO should be 4.2
474475
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
475-
void filterExpressionSql() {
476+
void sqlFilterExpressionsShouldFilterMessages() {
476477
publish(1, m -> m.subject("abc 123"));
477478
publish(1, m -> m.subject("foo bar"));
478479
publish(1, m -> m.subject("ab 12"));
@@ -484,6 +485,23 @@ void filterExpressionSql() {
484485
msgs.forEach(m -> assertThat(m).hasSubject("foo bar"));
485486
}
486487

488+
@Test
489+
// TODO should be 4.2
490+
@BrokerVersionAtLeast(RABBITMQ_4_1_0)
491+
void incorrectFilterShouldThrowException() {
492+
assertThatThrownBy(
493+
() ->
494+
connection.consumerBuilder().queue(name).messageHandler((ctx, msg) -> {}).stream()
495+
.offset(FIRST)
496+
.filter()
497+
.sql("TRUE TRUE")
498+
.stream()
499+
.builder()
500+
.build())
501+
.isInstanceOf(AmqpException.class)
502+
.hasMessageContaining("filters do not match");
503+
}
504+
487505
void publish(int messageCount) {
488506
this.publish(messageCount, UnaryOperator.identity());
489507
}

0 commit comments

Comments
 (0)