Skip to content

Commit

Permalink
GH-2994: Add RabbitAmqpListenerContainer infrastructure
Browse files Browse the repository at this point in the history
Related to: #2994

* Add `RabbitAmqpMessageListener` for RabbitMQ AMQP 1.0 native message consumption
* Add `RabbitAmqpMessageListenerAdapter` for `@RabbitLister` API
* Add `RabbitAmqpListenerContainer` and respective `RabbitAmqpListenerContainerFactory`
* Add `AmqpAcknowledgment` as a general abstraction.
In the `RabbitAmqpListenerContainer` delegates to the `Consumer.Context` for manual settlement
* Extract `RabbitAmqpUtils` for conversion to/from AMQP 1.0 native message
* Add convenient `ContainerUtils.isImmediateAcknowledge()` and `ContainerUtils.isAmqpReject()` utilities
* Expose `AmqpAcknowledgment` as a `MessageProperties.amqpAcknowledgment` for generic `MessageListener` use-cases
* Remove `io.micrometer` dependecies from the `spring-rabbitmq-client` module since metrics and observation handled
thoroughly in the `com.rabbitmq.client:amqp-client`

Not tests for the listener yet.
Therefore no fixing for the issue.
  • Loading branch information
artembilan committed Feb 28, 2025
1 parent 6f0a338 commit 5be96cb
Show file tree
Hide file tree
Showing 13 changed files with 888 additions and 97 deletions.
5 changes: 0 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,6 @@ project('spring-rabbitmq-client') {
dependencies {
api project(':spring-rabbit')
api "com.rabbitmq.client:amqp-client:$rabbitmqAmqpClientVersion"
api 'io.micrometer:micrometer-observation'

testApi project(':spring-rabbit-junit')

Expand All @@ -488,10 +487,6 @@ project('spring-rabbitmq-client') {
testImplementation 'org.testcontainers:rabbitmq'
testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.apache.logging.log4j:log4j-slf4j-impl'
testImplementation 'io.micrometer:micrometer-observation-test'
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
testImplementation 'io.micrometer:micrometer-tracing-test'
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright 2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.core;

/**
* An abstraction over acknowledgments.
*
* @author Artem Bilan
*
* @since 4.0
*/
@FunctionalInterface
public interface AmqpAcknowledgment {

/**
* Acknowledge the message.
* @param status the status.
*/
void acknowledge(Status status);

default void acknowledge() {
acknowledge(Status.ACCEPT);
}

enum Status {

/**
* Mark the message as accepted.
*/
ACCEPT,

/**
* Mark the message as rejected.
*/
REJECT,

/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ public class MessageProperties implements Serializable {

private transient @Nullable Object targetBean;

private transient @Nullable AmqpAcknowledgment amqpAcknowledgment;

public void setHeader(String key, Object value) {
this.headers.put(key, value);
}
Expand Down Expand Up @@ -641,6 +643,25 @@ public void setProjectionUsed(boolean projectionUsed) {
}
}

/**
* Return the {@link AmqpAcknowledgment} for consumer if any.
* @return the {@link AmqpAcknowledgment} for consumer if any.
* @since 4.0
*/
public @Nullable AmqpAcknowledgment getAmqpAcknowledgment() {
return this.amqpAcknowledgment;
}

/**
* Set an {@link AmqpAcknowledgment} for manual acks in the target message processor.
* This is only in-application a consumer side logic.
* @param amqpAcknowledgment the {@link AmqpAcknowledgment} to use in the application.
* @since 4.0
*/
public void setAmqpAcknowledgment(AmqpAcknowledgment amqpAcknowledgment) {
this.amqpAcknowledgment = amqpAcknowledgment;
}

@Override // NOSONAR complexity
public int hashCode() {
final int prime = 31;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ protected boolean isNoLocal() {
* to be sent to the dead letter exchange. Setting to false causes all rejections to not
* be requeued. When true, the default can be overridden by the listener throwing an
* {@link AmqpRejectAndDontRequeueException}. Default true.
* @param defaultRequeueRejected true to reject by default.
* @param defaultRequeueRejected true to requeue by default.
*/
public void setDefaultRequeueRejected(boolean defaultRequeueRejected) {
this.defaultRequeueRejected = defaultRequeueRejected;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2022 the original author or authors.
* Copyright 2018-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,10 @@
package org.springframework.amqp.rabbit.listener.support;

import org.apache.commons.logging.Log;
import org.jspecify.annotations.Nullable;

import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.ImmediateRequeueAmqpException;
import org.springframework.amqp.rabbit.listener.exception.MessageRejectedWhileStoppingException;

Expand Down Expand Up @@ -59,7 +61,11 @@ else if (t instanceof ImmediateRequeueAmqpException) {
shouldRequeue = true;
break;
}
t = t.getCause();
Throwable cause = t.getCause();
if (cause == t) {
break;
}
t = cause;
}
if (logger.isDebugEnabled()) {
logger.debug("Rejecting messages (requeue=" + shouldRequeue + ")");
Expand All @@ -75,8 +81,41 @@ else if (t instanceof ImmediateRequeueAmqpException) {
* @since 2.2
*/
public static boolean isRejectManual(Throwable ex) {
return ex instanceof AmqpRejectAndDontRequeueException aradrex
&& aradrex.isRejectManual();
AmqpRejectAndDontRequeueException amqpRejectAndDontRequeueException =
findInCause(ex, AmqpRejectAndDontRequeueException.class);
return amqpRejectAndDontRequeueException != null && amqpRejectAndDontRequeueException.isRejectManual();
}

/**
* Return true for {@link ImmediateAcknowledgeAmqpException}.
* @param ex the exception to traverse.
* @return true if an {@link ImmediateAcknowledgeAmqpException} is present in the cause chain.
* @since 4.0
*/
public static boolean isImmediateAcknowledge(Throwable ex) {
return findInCause(ex, ImmediateAcknowledgeAmqpException.class) != null;
}

/**
* Return true for {@link AmqpRejectAndDontRequeueException}.
* @param ex the exception to traverse.
* @return true if an {@link AmqpRejectAndDontRequeueException} is present in the cause chain.
* @since 4.0
*/
public static boolean isAmqpReject(Throwable ex) {
return findInCause(ex, AmqpRejectAndDontRequeueException.class) != null;
}

@SuppressWarnings("unchecked")
private static <T extends Throwable> @Nullable T findInCause(Throwable throwable, Class<T> exceptionToFind) {
if (exceptionToFind.isAssignableFrom(throwable.getClass())) {
return (T) throwable;
}
Throwable cause = throwable.getCause();
if (cause == null || cause == throwable) {
return null;
}
return findInCause(cause, exceptionToFind);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,7 @@

package org.springframework.amqp.rabbitmq.client;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Date;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -182,38 +178,20 @@ public CompletableFuture<Boolean> send(String exchange, @Nullable String routing
private CompletableFuture<Boolean> doSend(@Nullable String exchange, @Nullable String routingKey,
@Nullable String queue, Message message) {

MessageProperties messageProperties = message.getMessageProperties();

com.rabbitmq.client.amqp.Message amqpMessage =
this.publisher.message(message.getBody())
.contentEncoding(messageProperties.getContentEncoding())
.contentType(messageProperties.getContentType())
.messageId(messageProperties.getMessageId())
.correlationId(messageProperties.getCorrelationId())
.priority(messageProperties.getPriority().byteValue())
.replyTo(messageProperties.getReplyTo());

com.rabbitmq.client.amqp.Message amqpMessage = this.publisher.message();
com.rabbitmq.client.amqp.Message.MessageAddressBuilder address = amqpMessage.toAddress();

Map<String, @Nullable Object> headers = messageProperties.getHeaders();
if (!headers.isEmpty()) {
headers.forEach((key, val) -> mapProp(key, val, amqpMessage));
}

JavaUtils.INSTANCE
.acceptIfNotNull(messageProperties.getUserId(),
(userId) -> amqpMessage.userId(userId.getBytes(StandardCharsets.UTF_8)))
.acceptIfNotNull(messageProperties.getTimestamp(),
(timestamp) -> amqpMessage.creationTime(timestamp.getTime()))
.acceptIfNotNull(messageProperties.getExpiration(),
(expiration) -> amqpMessage.absoluteExpiryTime(Long.parseLong(expiration)))
.acceptIfNotNull(exchange, address::exchange)
.acceptIfNotNull(routingKey, address::key)
.acceptIfNotNull(queue, address::queue);

amqpMessage = address.message();

RabbitAmqpUtils.toAmqpMessage(message, amqpMessage);

CompletableFuture<Boolean> publishResult = new CompletableFuture<>();

this.publisher.publish(address.message(),
this.publisher.publish(amqpMessage,
(context) -> {
switch (context.status()) {
case ACCEPTED -> publishResult.complete(true);
Expand Down Expand Up @@ -299,7 +277,7 @@ public CompletableFuture<Message> receive(String queueName) {
.priority(10)
.messageHandler((context, message) -> {
context.accept();
messageFuture.complete(fromAmqpMessage(message));
messageFuture.complete(RabbitAmqpUtils.fromAmqpMessage(message, null));
})
.build();

Expand Down Expand Up @@ -452,62 +430,4 @@ public <C> CompletableFuture<C> convertSendAndReceiveAsType(String exchange, Str
throw new UnsupportedOperationException();
}

private static void mapProp(String key, @Nullable Object val, com.rabbitmq.client.amqp.Message amqpMessage) {
if (val == null) {
return;
}
if (val instanceof String string) {
amqpMessage.property(key, string);
}
else if (val instanceof Long longValue) {
amqpMessage.property(key, longValue);
}
else if (val instanceof Integer intValue) {
amqpMessage.property(key, intValue);
}
else if (val instanceof Short shortValue) {
amqpMessage.property(key, shortValue);
}
else if (val instanceof Byte byteValue) {
amqpMessage.property(key, byteValue);
}
else if (val instanceof Double doubleValue) {
amqpMessage.property(key, doubleValue);
}
else if (val instanceof Float floatValue) {
amqpMessage.property(key, floatValue);
}
else if (val instanceof Character character) {
amqpMessage.property(key, character);
}
else if (val instanceof UUID uuid) {
amqpMessage.property(key, uuid);
}
else if (val instanceof byte[] bytes) {
amqpMessage.property(key, bytes);
}
else if (val instanceof Boolean booleanValue) {
amqpMessage.property(key, booleanValue);
}
}

private static Message fromAmqpMessage(com.rabbitmq.client.amqp.Message amqpMessage) {
MessageProperties messageProperties = new MessageProperties();

JavaUtils.INSTANCE
.acceptIfNotNull(amqpMessage.messageIdAsString(), messageProperties::setMessageId)
.acceptIfNotNull(amqpMessage.userId(),
(usr) -> messageProperties.setUserId(new String(usr, StandardCharsets.UTF_8)))
.acceptIfNotNull(amqpMessage.correlationIdAsString(), messageProperties::setCorrelationId)
.acceptIfNotNull(amqpMessage.contentType(), messageProperties::setContentType)
.acceptIfNotNull(amqpMessage.contentEncoding(), messageProperties::setContentEncoding)
.acceptIfNotNull(amqpMessage.absoluteExpiryTime(),
(exp) -> messageProperties.setExpiration(Long.toString(exp)))
.acceptIfNotNull(amqpMessage.creationTime(), (time) -> messageProperties.setTimestamp(new Date(time)));

amqpMessage.forEachProperty(messageProperties::setHeader);

return new Message(amqpMessage.body(), messageProperties);
}

}
Loading

0 comments on commit 5be96cb

Please sign in to comment.