Skip to content

Commit d068411

Browse files
authored
Merge pull request #45827 from ozangunalp/messaging_context_propagation
Context propagation for Messaging
2 parents ee21feb + 3153ee8 commit d068411

File tree

20 files changed

+1350
-25
lines changed

20 files changed

+1350
-25
lines changed

docs/src/main/asciidoc/messaging.adoc

+95
Original file line numberDiff line numberDiff line change
@@ -562,6 +562,7 @@ public class StreamProcessor {
562562
}
563563
----
564564

565+
[[execution_model]]
565566
== Execution Model
566567

567568
Quarkus Messaging sits on top of the xref:quarkus-reactive-architecture.adoc#engine[reactive engine] of Quarkus and leverages link:{eclipse-vertx}[Eclipse Vert.x] to dispatch messages for processing.
@@ -634,6 +635,100 @@ Depending on the broker technology, this can be useful to increase the applicati
634635
while still preserving the partial order of messages received in different copies.
635636
This is the case, for example, for Kafka, where multiple consumers can consume different topic partitions.
636637

638+
== Context Propagation
639+
640+
In Quarkus Messaging, the default mechanism for propagating context between different processing stages is the
641+
link:https://smallrye.io/smallrye-reactive-messaging/latest/concepts/message-context[message context].
642+
This provides a consistent way to pass context information along with the message as it flows through different stages.
643+
644+
=== Interaction with Mutiny and MicroProfile Context Propagation
645+
646+
Mutiny, which is the foundation of reactive programming in Quarkus, is integrated with the MicroProfile Context Propagation.
647+
This integration enables automatic capturing and restoring of context across asynchronous boundaries.
648+
To learn more about context propagation in Quarkus and Mutiny, refer to the xref:context-propagation.adoc[Context Propagation] guide.
649+
650+
However, Quarkus Messaging needs to coordinate multiple asynchronous boundaries.
651+
This is why the default context propagation can result in unexpected behavior in some cases, especially using `Emitters`.
652+
653+
To ensure consistent behavior, Quarkus Messaging disables the propagation of any context during message dispatching, through internal channels or connectors.
654+
This means that Emitters won't capture the caller context, and incoming channels won't dispatch messages by activating a context (ex. the request context).
655+
656+
For example, you might want to propagate the caller context from an incoming HTTP request to the message processing stage.
657+
For emitters, instead of using the regular `Emitter` or `MutinyEmitter`, you can inject the `ContextualEmitter` to make sure the message captures the caller context.
658+
This ensures consistent and predictable behaviour by relying on the message context handling provided by the framework.
659+
660+
For example, let `RequestScopedBean` a request-scoped bean, `ContextualEmitter` can be used to dispatch messages locally through the internal channel `app`:
661+
662+
[source, java]
663+
----
664+
import jakarta.inject.Inject;
665+
import jakarta.ws.rs.Consumes;
666+
import jakarta.ws.rs.POST;
667+
import jakarta.ws.rs.Path;
668+
import jakarta.ws.rs.core.MediaType;
669+
670+
import org.eclipse.microprofile.reactive.messaging.Channel;
671+
672+
import io.quarkus.logging.Log;
673+
import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter;
674+
import io.smallrye.mutiny.Uni;
675+
import io.vertx.core.Context;
676+
import io.vertx.core.Vertx;
677+
678+
@Path("/")
679+
public class Resource {
680+
681+
@Channel("app")
682+
ContextualEmitter<String> emitter;
683+
684+
@Inject
685+
RequestScopedBean requestScopedBean;
686+
687+
@POST
688+
@Path("/send")
689+
public void send(String message) {
690+
requestScopedBean.setValue("Hello");
691+
emitter.sendAndAwait(message);
692+
}
693+
694+
}
695+
----
696+
697+
Then the request-scoped bean can be accessed in the message processing stage, regardless of the <<execution_model>>:
698+
699+
[source, java]
700+
----
701+
import jakarta.enterprise.context.ApplicationScoped;
702+
import jakarta.inject.Inject;
703+
704+
import org.eclipse.microprofile.reactive.messaging.Incoming;
705+
706+
import io.quarkus.logging.Log;
707+
import io.smallrye.reactive.messaging.annotations.Blocking;
708+
709+
710+
@ApplicationScoped
711+
public class Processor {
712+
713+
@Inject
714+
RequestScopedBean requestScopedBean;
715+
716+
@Incoming("app")
717+
@Blocking
718+
public void process(String message) {
719+
Log.infof("Message %s from request %s", message, requestScopedBean.getValue());
720+
}
721+
722+
}
723+
----
724+
725+
=== Request Context Activation
726+
727+
In some cases, you might need to activate the request context while processing messages consumed from a broker.
728+
While using `@ActivateRequestContext` on the `@Incoming` method is an option, it's lifecycle does not follow that of a Quarkus Messaging message.
729+
For incoming channels, you can enable the request scope activation with the build time property `quarkus.messaging.request-scoped.enabled=true`.
730+
This will activate the request context for each message processed by the incoming channel, and close the context once the message is processed.
731+
637732
== Health Checks
638733

639734
Together with the SmallRye Health extension, Quarkus Messaging extensions provide health check support per channel.

extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingBuildTimeConfig.java

+7
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,11 @@ public interface ReactiveMessagingBuildTimeConfig {
2727
@WithName("auto-connector-attachment")
2828
@WithDefault("true")
2929
boolean autoConnectorAttachment();
30+
31+
/**
32+
* Whether to enable the RequestScope context on a message context
33+
*/
34+
@WithName("request-scoped.enabled")
35+
@WithDefault("false")
36+
boolean activateRequestScopeEnabled();
3037
}

extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,16 @@
7070
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedChannelBuildItem;
7171
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedEmitterBuildItem;
7272
import io.quarkus.smallrye.reactivemessaging.deployment.items.MediatorBuildItem;
73+
import io.quarkus.smallrye.reactivemessaging.runtime.ContextClearedDecorator;
74+
import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitterFactory;
7375
import io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactory;
7476
import io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor;
7577
import io.quarkus.smallrye.reactivemessaging.runtime.HealthCenterFilter;
7678
import io.quarkus.smallrye.reactivemessaging.runtime.HealthCenterInterceptor;
7779
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration;
7880
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry;
7981
import io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration;
82+
import io.quarkus.smallrye.reactivemessaging.runtime.RequestScopedDecorator;
8083
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle;
8184
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder;
8285
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder.SmallRyeReactiveMessagingContext;
@@ -112,11 +115,14 @@ FeatureBuildItem feature() {
112115
}
113116

114117
@BuildStep
115-
AdditionalBeanBuildItem beans() {
118+
void beans(BuildProducer<AdditionalBeanBuildItem> additionalBean, ReactiveMessagingBuildTimeConfig buildTimeConfig) {
116119
// We add the connector and channel qualifiers to make them part of the index.
117-
return new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class,
120+
additionalBean.produce(new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class,
118121
Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class,
119-
QuarkusWorkerPoolRegistry.class);
122+
QuarkusWorkerPoolRegistry.class, ContextualEmitterFactory.class, ContextClearedDecorator.class));
123+
if (buildTimeConfig.activateRequestScopeEnabled()) {
124+
additionalBean.produce(new AdditionalBeanBuildItem(RequestScopedDecorator.class));
125+
}
120126
}
121127

122128
@BuildStep
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package io.quarkus.smallrye.reactivemessaging.runtime;
2+
3+
import java.util.List;
4+
5+
import jakarta.enterprise.context.ApplicationScoped;
6+
7+
import org.eclipse.microprofile.context.ThreadContext;
8+
import org.eclipse.microprofile.reactive.messaging.Message;
9+
10+
import io.smallrye.mutiny.Multi;
11+
import io.smallrye.reactive.messaging.PublisherDecorator;
12+
13+
@ApplicationScoped
14+
public class ContextClearedDecorator implements PublisherDecorator {
15+
16+
private final ThreadContext tc;
17+
18+
public ContextClearedDecorator() {
19+
tc = ThreadContext.builder()
20+
.propagated(ThreadContext.NONE)
21+
.cleared(ThreadContext.ALL_REMAINING)
22+
.build();
23+
}
24+
25+
@Override
26+
public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, List<String> channelName,
27+
boolean isConnector) {
28+
if (isConnector) {
29+
return publisher.emitOn(tc.currentContextExecutor());
30+
}
31+
return publisher;
32+
}
33+
34+
@Override
35+
public int getPriority() {
36+
// Before the io.smallrye.reactive.messaging.providers.locals.ContextDecorator which has the priority 0
37+
return -100;
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.quarkus.smallrye.reactivemessaging.runtime;
2+
3+
import org.eclipse.microprofile.reactive.messaging.Message;
4+
5+
import io.smallrye.mutiny.Uni;
6+
import io.smallrye.reactive.messaging.EmitterType;
7+
8+
public interface ContextualEmitter<T> extends EmitterType {
9+
10+
Uni<Void> send(T payload);
11+
12+
void sendAndAwait(T payload);
13+
14+
<M extends Message<? extends T>> Uni<Void> sendMessage(M msg);
15+
16+
<M extends Message<? extends T>> void sendMessageAndAwait(M msg);
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package io.quarkus.smallrye.reactivemessaging.runtime;
2+
3+
import jakarta.enterprise.context.ApplicationScoped;
4+
import jakarta.enterprise.inject.Produces;
5+
import jakarta.enterprise.inject.Typed;
6+
import jakarta.enterprise.inject.spi.InjectionPoint;
7+
import jakarta.inject.Inject;
8+
9+
import org.eclipse.microprofile.reactive.messaging.Channel;
10+
11+
import io.smallrye.reactive.messaging.ChannelRegistry;
12+
import io.smallrye.reactive.messaging.EmitterConfiguration;
13+
import io.smallrye.reactive.messaging.EmitterFactory;
14+
import io.smallrye.reactive.messaging.annotations.EmitterFactoryFor;
15+
import io.smallrye.reactive.messaging.providers.extension.ChannelProducer;
16+
17+
@EmitterFactoryFor(ContextualEmitter.class)
18+
@ApplicationScoped
19+
public class ContextualEmitterFactory implements EmitterFactory<ContextualEmitterImpl<Object>> {
20+
21+
@Inject
22+
ChannelRegistry channelRegistry;
23+
24+
@Override
25+
public ContextualEmitterImpl<Object> createEmitter(EmitterConfiguration emitterConfiguration, long l) {
26+
return new ContextualEmitterImpl<>(emitterConfiguration, l);
27+
}
28+
29+
@Produces
30+
@Typed(ContextualEmitter.class)
31+
@Channel("") // Stream name is ignored during type-safe resolution
32+
<T> ContextualEmitter<T> produceEmitter(InjectionPoint injectionPoint) {
33+
String channelName = ChannelProducer.getChannelName(injectionPoint);
34+
return channelRegistry.getEmitter(channelName, ContextualEmitter.class);
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.quarkus.smallrye.reactivemessaging.runtime;
2+
3+
import static io.smallrye.reactive.messaging.providers.i18n.ProviderExceptions.ex;
4+
5+
import org.eclipse.microprofile.reactive.messaging.Message;
6+
7+
import io.smallrye.common.annotation.CheckReturnValue;
8+
import io.smallrye.common.vertx.VertxContext;
9+
import io.smallrye.mutiny.Uni;
10+
import io.smallrye.reactive.messaging.EmitterConfiguration;
11+
import io.smallrye.reactive.messaging.providers.extension.AbstractEmitter;
12+
import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata;
13+
import io.vertx.core.Context;
14+
import io.vertx.core.Vertx;
15+
16+
public class ContextualEmitterImpl<T> extends AbstractEmitter<T> implements ContextualEmitter<T> {
17+
18+
public ContextualEmitterImpl(EmitterConfiguration configuration, long defaultBufferSize) {
19+
super(configuration, defaultBufferSize);
20+
}
21+
22+
@Override
23+
public void sendAndAwait(T payload) {
24+
sendMessage(Message.of(payload)).await().indefinitely();
25+
}
26+
27+
@Override
28+
public Uni<Void> send(T payload) {
29+
return sendMessage(Message.of(payload));
30+
}
31+
32+
@Override
33+
public <M extends Message<? extends T>> void sendMessageAndAwait(M msg) {
34+
sendMessage(msg).await().indefinitely();
35+
}
36+
37+
@Override
38+
@CheckReturnValue
39+
public <M extends Message<? extends T>> Uni<Void> sendMessage(M msg) {
40+
if (msg == null) {
41+
throw ex.illegalArgumentForNullValue();
42+
}
43+
44+
// If we are running on a Vert.x context, we need to capture the context to switch back during the emission.
45+
Context context = Vertx.currentContext();
46+
Uni<Void> uni = Uni.createFrom().emitter(e -> {
47+
try {
48+
Message<? extends T> message = msg;
49+
if (VertxContext.isDuplicatedContext(context)) {
50+
message = message.addMetadata(new LocalContextMetadata(context));
51+
}
52+
emit(message.withAck(() -> {
53+
e.complete(null);
54+
return msg.ack();
55+
})
56+
.withNack(t -> {
57+
e.fail(t);
58+
return msg.nack(t);
59+
}));
60+
} catch (Exception t) {
61+
// Capture synchronous exception and nack the message.
62+
msg.nack(t);
63+
throw t;
64+
}
65+
});
66+
if (context != null) {
67+
uni = uni.emitOn(runnable -> context.runOnContext(x -> runnable.run()));
68+
}
69+
return uni;
70+
}
71+
72+
}

0 commit comments

Comments
 (0)