Skip to content

Commit 69fa75e

Browse files
authored
Event loop loom carrier (#11704)
* Netty 4.2 * Event loop loom carrier Adds support for running loom threads on the event loop. * fix continuation run * implement client affinity for carrier thread * update to final * adjust reflect config * adjust reflect config * improve client pool * new pool * progress * fix redispatch * progress * progress * docs * fixes * fix merge * avoid unnecessary wakeup * add suppression * fix todo * set max concurrent http1 connection count * checkstyle * remove pointless try * doc * limit loom nesting depth * changes * progress * move event loop to a dedicated virtual thread to avoid deadlocks * add legacy pool implementation * reimplement work spilling * Move back to newThreadPerTaskExecutor conditionally * separate io thread scheduler * config options, sticky loop * Netty 4.2.2.Final * IdleStateHandler now supports ticker * workaround for MemorySegment issue * conditional * better workaround * update version range * disable cleaner only until jdk 25 * cleanup * cleanup * cleanup * docs * thread names changed * thread names changed
1 parent f7b96d6 commit 69fa75e

File tree

21 files changed

+1364
-55
lines changed

21 files changed

+1364
-55
lines changed

context/src/main/java/io/micronaut/runtime/converters/time/TimeConverterRegistrar.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,8 @@ private BiFunction<CharSequence, ConversionContext, Optional<Duration>> duration
355355
final String seq = g2 + matcher.group(3);
356356
if (seq.equals("ns")) {
357357
return Optional.of(Duration.ofNanos(Integer.parseInt(amount)));
358+
} else if (seq.equals("us")) {
359+
return Optional.of(Duration.ofNanos(Integer.parseInt(amount) * 1000L));
358360
}
359361
context.reject(
360362
value,

context/src/main/java/io/micronaut/scheduling/LoomSupport.java

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.micronaut.context.condition.Condition;
1919
import io.micronaut.context.condition.ConditionContext;
20+
import io.micronaut.core.annotation.Experimental;
2021
import io.micronaut.core.annotation.Internal;
2122

2223
import java.lang.invoke.MethodHandle;
@@ -25,6 +26,7 @@
2526
import java.util.concurrent.ExecutorService;
2627
import java.util.concurrent.Executors;
2728
import java.util.concurrent.ThreadFactory;
29+
import java.util.function.Consumer;
2830

2931
/**
3032
* @since 4.0.0
@@ -37,14 +39,20 @@ public final class LoomSupport {
3739
private static final MethodHandle MH_NEW_THREAD_PER_TASK_EXECUTOR;
3840
private static final MethodHandle MH_OF_VIRTUAL;
3941
private static final MethodHandle MH_NAME;
42+
private static final MethodHandle MH_NAME_COUNT;
4043
private static final MethodHandle MH_FACTORY;
44+
private static final MethodHandle MH_UNSTARTED;
45+
private static final MethodHandle MH_IS_VIRTUAL;
4146

4247
static {
4348
boolean sup;
4449
MethodHandle newThreadPerTaskExecutor;
4550
MethodHandle ofVirtual;
4651
MethodHandle name;
52+
MethodHandle nameCount;
4753
MethodHandle factory;
54+
MethodHandle unstarted;
55+
MethodHandle isVirtual;
4856
try {
4957
newThreadPerTaskExecutor = MethodHandles.lookup()
5058
.findStatic(Executors.class, "newThreadPerTaskExecutor", MethodType.methodType(ExecutorService.class, ThreadFactory.class));
@@ -53,9 +61,15 @@ public final class LoomSupport {
5361
ofVirtual = MethodHandles.lookup()
5462
.findStatic(Thread.class, "ofVirtual", MethodType.methodType(ofVirtualCl));
5563
name = MethodHandles.lookup()
64+
.findVirtual(builderCl, "name", MethodType.methodType(builderCl, String.class));
65+
nameCount = MethodHandles.lookup()
5666
.findVirtual(builderCl, "name", MethodType.methodType(builderCl, String.class, long.class));
5767
factory = MethodHandles.lookup()
5868
.findVirtual(builderCl, "factory", MethodType.methodType(ThreadFactory.class));
69+
unstarted = MethodHandles.lookup()
70+
.findVirtual(builderCl, "unstarted", MethodType.methodType(Thread.class, Runnable.class));
71+
isVirtual = MethodHandles.lookup()
72+
.findVirtual(Thread.class, "isVirtual", MethodType.methodType(boolean.class));
5973

6074
// invoke, this will throw an UnsupportedOperationException if we don't have --enable-preview
6175
ofVirtual.invoke();
@@ -65,7 +79,10 @@ public final class LoomSupport {
6579
newThreadPerTaskExecutor = null;
6680
ofVirtual = null;
6781
name = null;
82+
nameCount = null;
6883
factory = null;
84+
unstarted = null;
85+
isVirtual = null;
6986
sup = false;
7087
failure = e;
7188
}
@@ -74,7 +91,10 @@ public final class LoomSupport {
7491
MH_NEW_THREAD_PER_TASK_EXECUTOR = newThreadPerTaskExecutor;
7592
MH_OF_VIRTUAL = ofVirtual;
7693
MH_NAME = name;
94+
MH_NAME_COUNT = nameCount;
7795
MH_FACTORY = factory;
96+
MH_UNSTARTED = unstarted;
97+
MH_IS_VIRTUAL = isVirtual;
7898
}
7999

80100
private LoomSupport() {
@@ -90,6 +110,36 @@ public static void checkSupported() {
90110
}
91111
}
92112

113+
@Experimental
114+
public static ThreadFactory newVirtualThreadFactory(String namePrefix, Consumer<Object> builderModifier) {
115+
checkSupported();
116+
try {
117+
Object builder = MH_OF_VIRTUAL.invoke();
118+
builder = MH_NAME_COUNT.invoke(builder, namePrefix, 1L);
119+
if (builderModifier != null) {
120+
builderModifier.accept(builder);
121+
}
122+
return (ThreadFactory) MH_FACTORY.invoke(builder);
123+
} catch (Throwable e) {
124+
throw new RuntimeException(e);
125+
}
126+
}
127+
128+
@Experimental
129+
public static Thread unstarted(String name, Consumer<Object> builderModifier, Runnable task) {
130+
checkSupported();
131+
try {
132+
Object builder = MH_OF_VIRTUAL.invoke();
133+
builder = MH_NAME.invoke(builder, name);
134+
if (builderModifier != null) {
135+
builderModifier.accept(builder);
136+
}
137+
return (Thread) MH_UNSTARTED.invoke(builder, task);
138+
} catch (Throwable e) {
139+
throw new RuntimeException(e);
140+
}
141+
}
142+
93143
public static ExecutorService newThreadPerTaskExecutor(ThreadFactory threadFactory) {
94144
checkSupported();
95145
try {
@@ -100,11 +150,16 @@ public static ExecutorService newThreadPerTaskExecutor(ThreadFactory threadFacto
100150
}
101151

102152
public static ThreadFactory newVirtualThreadFactory(String namePrefix) {
103-
checkSupported();
153+
return newVirtualThreadFactory(namePrefix, null);
154+
}
155+
156+
public static boolean isVirtual(Thread thread) {
157+
if (!isSupported()) {
158+
// reasonable default.
159+
return false;
160+
}
104161
try {
105-
Object builder = MH_OF_VIRTUAL.invoke();
106-
builder = MH_NAME.invoke(builder, namePrefix, 1L);
107-
return (ThreadFactory) MH_FACTORY.invoke(builder);
162+
return (boolean) MH_IS_VIRTUAL.invokeExact(thread);
108163
} catch (Throwable e) {
109164
throw new RuntimeException(e);
110165
}

context/src/main/java/io/micronaut/scheduling/executor/ExecutorFactory.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.concurrent.ExecutorService;
2828
import java.util.concurrent.Executors;
2929
import java.util.concurrent.ThreadFactory;
30+
import java.util.concurrent.ThreadLocalRandom;
3031

3132
/**
3233
* Constructs {@link ExecutorService} instances based on {@link UserExecutorConfiguration} instances.
@@ -65,7 +66,8 @@ protected ThreadFactory eventLoopGroupThreadFactory(ExecutorConfiguration config
6566
if (name == null) {
6667
name = "virtual";
6768
}
68-
return LoomSupport.newVirtualThreadFactory(name + "-executor");
69+
String prefix = name + "-executor-";
70+
return r -> LoomSupport.unstarted(prefix + ThreadLocalRandom.current().nextInt(), null, r);
6971
}
7072
if (name != null) {
7173
return new NamedThreadFactory(name + "-executor");
@@ -93,8 +95,11 @@ public ExecutorService executorService(ExecutorConfiguration executorConfigurati
9395
case WORK_STEALING:
9496
return Executors.newWorkStealingPool(executorConfiguration.getParallelism());
9597
case THREAD_PER_TASK:
96-
return LoomSupport.newThreadPerTaskExecutor(getThreadFactory(executorConfiguration));
97-
98+
if ("false".equals(System.getProperty("jdk.trackAllThreads"))) {
99+
return new FastThreadPerTaskExecutor(getThreadFactory(executorConfiguration));
100+
} else {
101+
return LoomSupport.newThreadPerTaskExecutor(getThreadFactory(executorConfiguration));
102+
}
98103
default:
99104
throw new IllegalStateException("Could not create Executor service for enum value: " + executorType);
100105
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2017-2025 original authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.micronaut.scheduling.executor;
17+
18+
import io.micronaut.core.annotation.Internal;
19+
20+
import java.util.List;
21+
import java.util.concurrent.AbstractExecutorService;
22+
import java.util.concurrent.ThreadFactory;
23+
import java.util.concurrent.TimeUnit;
24+
25+
/**
26+
* Faster implementation of ThreadPerTaskExecutor that does not keep track of child threads.
27+
*
28+
* @since 4.9.0
29+
* @author Jonas Konrad
30+
*/
31+
@Internal
32+
final class FastThreadPerTaskExecutor extends AbstractExecutorService {
33+
private final ThreadFactory threadFactory;
34+
35+
FastThreadPerTaskExecutor(ThreadFactory threadFactory) {
36+
this.threadFactory = threadFactory;
37+
}
38+
39+
@Override
40+
public void shutdown() {
41+
}
42+
43+
@Override
44+
public List<Runnable> shutdownNow() {
45+
return List.of();
46+
}
47+
48+
@Override
49+
public boolean isShutdown() {
50+
return false;
51+
}
52+
53+
@Override
54+
public boolean isTerminated() {
55+
return false;
56+
}
57+
58+
@Override
59+
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
60+
return true;
61+
}
62+
63+
@Override
64+
public void execute(Runnable command) {
65+
threadFactory.newThread(command).start();
66+
}
67+
}

core/src/main/java/io/micronaut/core/propagation/ThreadContext.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.micronaut.core.annotation.Internal;
1919
import io.netty.util.concurrent.FastThreadLocal;
20+
import io.netty.util.concurrent.FastThreadLocalThread;
2021

2122
/**
2223
* This class holds the {@link ThreadLocal} for the propagated context, or the
@@ -30,45 +31,45 @@
3031
@SuppressWarnings("unchecked")
3132
final class ThreadContext {
3233
private static final Object FAST;
33-
private static final ThreadLocal<PropagatedContextImpl> SLOW;
34+
private static final ThreadLocal<PropagatedContextImpl> SLOW = new ThreadLocal<>() {
35+
@Override
36+
public String toString() {
37+
return "Micronaut Propagation Context";
38+
}
39+
};
3440

3541
static {
3642
Object fast;
37-
ThreadLocal<PropagatedContextImpl> slow;
3843
try {
3944
fast = new FastThreadLocal<PropagatedContextImpl>();
40-
slow = null;
4145
} catch (NoClassDefFoundError e) {
4246
fast = null;
43-
slow = new ThreadLocal<>() {
44-
@Override
45-
public String toString() {
46-
return "Micronaut Propagation Context";
47-
}
48-
};
4947
}
5048
FAST = fast;
51-
SLOW = slow;
49+
}
50+
51+
private static boolean useSlow() {
52+
return FAST == null || !(Thread.currentThread() instanceof FastThreadLocalThread);
5253
}
5354

5455
static void remove() {
55-
if (FAST == null) {
56+
if (useSlow()) {
5657
SLOW.remove();
5758
} else {
5859
((FastThreadLocal<PropagatedContextImpl>) FAST).remove();
5960
}
6061
}
6162

6263
static PropagatedContextImpl get() {
63-
if (FAST == null) {
64+
if (useSlow()) {
6465
return SLOW.get();
6566
} else {
6667
return ((FastThreadLocal<PropagatedContextImpl>) FAST).get();
6768
}
6869
}
6970

7071
static void set(PropagatedContextImpl value) {
71-
if (FAST == null) {
72+
if (useSlow()) {
7273
SLOW.set(value);
7374
} else {
7475
((FastThreadLocal<PropagatedContextImpl>) FAST).set(value);

http-client/src/main/java/io/micronaut/http/client/netty/Pool49.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import io.micronaut.core.execution.ExecutionFlow;
2323
import io.micronaut.http.client.HttpClientConfiguration;
2424
import io.micronaut.http.client.exceptions.HttpClientException;
25+
import io.micronaut.http.netty.channel.loom.EventLoopVirtualThreadScheduler;
26+
import io.micronaut.http.netty.channel.loom.PrivateLoomSupport;
27+
import io.micronaut.scheduling.LoomSupport;
2528
import io.netty.channel.EventLoop;
2629
import io.netty.channel.SingleThreadIoEventLoop;
2730
import io.netty.util.concurrent.EventExecutor;
@@ -183,7 +186,19 @@ LocalPoolPair pickPreferredPool() throws HttpClientException {
183186
LocalPoolPair poolPair = null;
184187
var configLocality = connectionPoolConfiguration.getConnectionLocality();
185188
if (configLocality != HttpClientConfiguration.ConnectionPoolConfiguration.ConnectionLocality.IGNORE) {
186-
EventExecutor currentExecutor = ThreadExecutorMap.currentExecutor();
189+
190+
EventExecutor currentExecutor = null;
191+
192+
if (PrivateLoomSupport.isSupported() &&
193+
LoomSupport.isVirtual(Thread.currentThread()) &&
194+
PrivateLoomSupport.getScheduler(Thread.currentThread()) instanceof EventLoopVirtualThreadScheduler el) {
195+
currentExecutor = el.eventLoop();
196+
}
197+
198+
if (currentExecutor == null) {
199+
currentExecutor = ThreadExecutorMap.currentExecutor();
200+
}
201+
187202
if (currentExecutor == null) {
188203
for (LocalPoolPair pool : localPools) {
189204
if (pool.loop.inEventLoop()) {
@@ -194,7 +209,6 @@ LocalPoolPair pickPreferredPool() throws HttpClientException {
194209
} else {
195210
poolPair = localPoolsByLoop.get(currentExecutor);
196211
}
197-
198212
if (poolPair == null && configLocality == HttpClientConfiguration.ConnectionPoolConfiguration.ConnectionLocality.ENFORCED_ALWAYS) {
199213
throw new HttpClientException("Attempted to open a HTTP connection from thread " +
200214
Thread.currentThread() + " which is not part of the client event loop group, but configured the pool in locality mode ENFORCED_ALWAYS, which disallows " +

0 commit comments

Comments
 (0)