Skip to content

Commit 5c06681

Browse files
ozangunalpgsmet
authored andcommitted
Messaging Context propagation: Replace emitOn that requests upfront with operator that calls executor synchronously.
Fixes #47670 (cherry picked from commit daa3e28)
1 parent 42dc09d commit 5c06681

File tree

1 file changed

+44
-1
lines changed

1 file changed

+44
-1
lines changed

extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ConnectorContextPropagationDecorator.java

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.List;
44
import java.util.Optional;
5+
import java.util.concurrent.Executor;
56

67
import jakarta.enterprise.context.ApplicationScoped;
78
import jakarta.inject.Inject;
@@ -11,6 +12,10 @@
1112
import org.eclipse.microprofile.reactive.messaging.Message;
1213

1314
import io.smallrye.mutiny.Multi;
15+
import io.smallrye.mutiny.helpers.ParameterValidation;
16+
import io.smallrye.mutiny.operators.multi.AbstractMultiOperator;
17+
import io.smallrye.mutiny.operators.multi.MultiOperatorProcessor;
18+
import io.smallrye.mutiny.subscription.MultiSubscriber;
1419
import io.smallrye.reactive.messaging.PublisherDecorator;
1520
import io.smallrye.reactive.messaging.SubscriberDecorator;
1621

@@ -32,7 +37,7 @@ public ConnectorContextPropagationDecorator(
3237
public Multi<? extends Message<?>> decorate(Multi<? extends Message<?>> publisher, List<String> channelName,
3338
boolean isConnector) {
3439
if (isConnector) {
35-
return publisher.emitOn(tc.currentContextExecutor());
40+
return new ContextPropagationOperator<>(publisher, tc);
3641
}
3742
return publisher;
3843
}
@@ -42,4 +47,42 @@ public int getPriority() {
4247
// Before the io.smallrye.reactive.messaging.providers.locals.ContextDecorator which has the priority 0
4348
return -100;
4449
}
50+
51+
public static class ContextPropagationOperator<T> extends AbstractMultiOperator<T, T> {
52+
53+
private final ThreadContext tc;
54+
55+
/**
56+
* Creates a new {@link AbstractMultiOperator} with the passed {@link Multi} as upstream.
57+
*
58+
* @param upstream the upstream, must not be {@code null}
59+
*/
60+
public ContextPropagationOperator(Multi<? extends T> upstream, ThreadContext tc) {
61+
super(upstream);
62+
this.tc = tc;
63+
}
64+
65+
@Override
66+
public void subscribe(MultiSubscriber<? super T> downstream) {
67+
ParameterValidation.nonNullNpe(downstream, "subscriber");
68+
upstream.subscribe().withSubscriber(new ContextPropagationProcessor<>(downstream, tc));
69+
}
70+
71+
static final class ContextPropagationProcessor<T> extends MultiOperatorProcessor<T, T> {
72+
73+
private final Executor tcExecutor;
74+
75+
public ContextPropagationProcessor(MultiSubscriber<? super T> downstream, ThreadContext tc) {
76+
super(downstream);
77+
this.tcExecutor = tc.currentContextExecutor();
78+
}
79+
80+
@Override
81+
public void onItem(T item) {
82+
// Even though the executor is called, this is a synchronous call
83+
tcExecutor.execute(() -> super.onItem(item));
84+
}
85+
86+
}
87+
}
4588
}

0 commit comments

Comments
 (0)