Skip to content

Commit 0d544f1

Browse files
committed
GH-1218: Backport Schema Fixes
Resolves #1218 - batch properties
1 parent 15be46f commit 0d544f1

File tree

5 files changed

+54
-12
lines changed

5 files changed

+54
-12
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ConnectionFactoryParser.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -54,6 +54,8 @@ class ConnectionFactoryParser extends AbstractSingleBeanDefinitionParser {
5454

5555
private static final String PUBLISHER_CONFIRMS = "publisher-confirms";
5656

57+
private static final String CONFIRM_TYPE = "confirm-type";
58+
5759
private static final String PUBLISHER_RETURNS = "publisher-returns";
5860

5961
private static final String REQUESTED_HEARTBEAT = "requested-heartbeat";
@@ -114,7 +116,7 @@ protected void doParse(Element element, ParserContext parserContext, BeanDefinit
114116
NamespaceUtils.setValueIfAttributeDefined(builder, element, FACTORY_TIMEOUT, "channelCheckoutTimeout");
115117
NamespaceUtils.setValueIfAttributeDefined(builder, element, CONNECTION_LIMIT);
116118
NamespaceUtils.setReferenceIfAttributeDefined(builder, element, "connection-name-strategy");
117-
NamespaceUtils.setValueIfAttributeDefined(builder, element, "confirm-type", "publisherConfirmType");
119+
NamespaceUtils.setValueIfAttributeDefined(builder, element, CONFIRM_TYPE, "publisherConfirmType");
118120
}
119121

120122
}

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/RabbitNamespaceUtils.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -74,6 +74,10 @@ public final class RabbitNamespaceUtils {
7474

7575
private static final String TRANSACTION_SIZE_ATTRIBUTE = "transaction-size";
7676

77+
private static final String CONSUMER_BATCH_ENABLED_ATTRIBUTE = "consumer-batch-enabled";
78+
79+
private static final String BATCH_SIZE_ATTRIBUTE = "batch-size";
80+
7781
private static final String PHASE_ATTRIBUTE = "phase";
7882

7983
private static final String AUTO_STARTUP_ATTRIBUTE = "auto-startup";
@@ -216,9 +220,23 @@ public static BeanDefinition parseContainer(Element containerEle, ParserContext
216220
containerDef.getPropertyValues().add("channelTransacted", new TypedStringValue(channelTransacted));
217221
}
218222

223+
String consumerBatch = containerEle.getAttribute(CONSUMER_BATCH_ENABLED_ATTRIBUTE);
224+
if (StringUtils.hasText(consumerBatch)) {
225+
containerDef.getPropertyValues().add("consumerBatchEnabled", new TypedStringValue(consumerBatch));
226+
}
227+
228+
String batchSize = containerEle.getAttribute(BATCH_SIZE_ATTRIBUTE);
229+
if (StringUtils.hasText(batchSize)) {
230+
containerDef.getPropertyValues().add("batchSize", new TypedStringValue(batchSize));
231+
}
232+
219233
String transactionSize = containerEle.getAttribute(TRANSACTION_SIZE_ATTRIBUTE);
220234
if (StringUtils.hasText(transactionSize)) {
221-
containerDef.getPropertyValues().add("txSize", new TypedStringValue(transactionSize));
235+
if (StringUtils.hasText(batchSize)) {
236+
parserContext.getReaderContext().error(
237+
"Listener Container - cannot have both 'batch-size' and 'transaction-size'", containerEle);
238+
}
239+
containerDef.getPropertyValues().add("batchSize", new TypedStringValue(transactionSize));
222240
}
223241

224242
String requeueRejected = containerEle.getAttribute(REQUEUE_REJECTED_ATTRIBUTE);

spring-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-2.2.xsd

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -736,20 +736,40 @@
736736
<xsd:attribute name="transaction-size" type="xsd:string">
737737
<xsd:annotation>
738738
<xsd:documentation><![CDATA[
739+
Deprecated. Synonym to 'batch-size'.
740+
]]></xsd:documentation>
741+
</xsd:annotation>
742+
</xsd:attribute>
743+
<xsd:attribute name="consumer-batch-enabled" default="false">
744+
<xsd:annotation>
745+
<xsd:documentation><![CDATA[
746+
Enable consumer-side batching according to 'batch-size' and 'receive-timeout'.
747+
Only applies when the container type is 'simple'.
748+
]]></xsd:documentation>
749+
</xsd:annotation>
750+
<xsd:simpleType>
751+
<xsd:union memberTypes="xsd:boolean xsd:string" />
752+
</xsd:simpleType>
753+
</xsd:attribute>
754+
<xsd:attribute name="batch-size" type="xsd:string">
755+
<xsd:annotation>
756+
<xsd:documentation><![CDATA[
739757
Tells the container how many messages to process in a single transaction (if the channel is transactional). For
740-
best results it should be less than or equal to the prefetch count. Also used to determine how often acks
741-
are sent when using AUTO acknowledge mode.
758+
best results it should be less than or equal to the prefetch count.
759+
Also used to determine how often acks are sent when using AUTO acknowledge mode.
760+
Also used to determine the batch size when 'consumer-batch-enabled' is 'true'.
742761
Only applies when the container type is 'simple'.
743762
]]></xsd:documentation>
744763
</xsd:annotation>
745764
</xsd:attribute>
746765
<xsd:attribute name="receive-timeout" type="xsd:string">
747766
<xsd:annotation>
748767
<xsd:documentation><![CDATA[
749-
The time for which a consumer waits for a message; used together with 'transaction-size' to determine whether
750-
or not a consumer is idle, why dynamic concurrency is enabled using 'max-concurrency'. When 'transaction-size'
751-
is greater than 1 (default), acks for processed message(s) can be delayed up to ('transaction-size' - 1) *
752-
this value because the ack is sent after `transaction-size` attempts to receive a message have occurred.
768+
The time for which a consumer waits for a message; used together with 'batch-size' to determine whether
769+
or not a consumer is idle, why dynamic concurrency is enabled using 'max-concurrency'. When 'batch-size'
770+
is greater than 1 (default), acks for processed message(s) can be delayed up to ('batch-size' - 1) *
771+
this value because the ack is sent after 'batch-size' attempts to receive a message have occurred.
772+
Also used to deliver a short batch when 'consumer-batch-enabled' is 'true'.
753773
Only applies when the container type is 'simple'.
754774
]]></xsd:documentation>
755775
</xsd:annotation>

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2019 the original author or authors.
2+
* Copyright 2010-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -180,6 +180,7 @@ public void testParseWithTx() {
180180
SimpleMessageListenerContainer container = beanFactory.getBean("container6", SimpleMessageListenerContainer.class);
181181
assertThat(container.isChannelTransacted()).isTrue();
182182
assertThat(ReflectionTestUtils.getField(container, "batchSize")).isEqualTo(5);
183+
assertThat(ReflectionTestUtils.getField(container, "consumerBatchEnabled")).isEqualTo(Boolean.TRUE);
183184
}
184185

185186
@Test

spring-rabbit/src/test/resources/org/springframework/amqp/rabbit/config/ListenerContainerParserTests-context.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@
4040
<rabbit:listener id="container5" queues="foo" ref="testBean" method="handle"/>
4141
</rabbit:listener-container>
4242

43-
<rabbit:listener-container connection-factory="connectionFactory" channel-transacted="true" transaction-size="5" >
43+
<rabbit:listener-container connection-factory="connectionFactory" channel-transacted="true" batch-size="5"
44+
consumer-batch-enabled="true" >
4445
<rabbit:listener id="container6" queues="foo" ref="testBean" method="handle"/>
4546
</rabbit:listener-container>
4647

0 commit comments

Comments
 (0)