Skip to content

Commit 24c4401

Browse files
committed
Merge branch '6.2.x'
2 parents 8bc99fa + 3afd551 commit 24c4401

File tree

3 files changed

+98
-44
lines changed

3 files changed

+98
-44
lines changed

Diff for: spring-core/src/main/java/org/springframework/core/task/SimpleAsyncTaskExecutor.java

+23-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -87,6 +87,8 @@ public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator
8787

8888
private @Nullable Set<Thread> activeThreads;
8989

90+
private boolean rejectTasksWhenLimitReached = false;
91+
9092
private volatile boolean active = true;
9193

9294

@@ -184,6 +186,17 @@ public void setTaskTerminationTimeout(long timeout) {
184186
this.activeThreads = (timeout > 0 ? ConcurrentHashMap.newKeySet() : null);
185187
}
186188

189+
/**
190+
* Specify whether to reject tasks when the concurrency limit has been reached,
191+
* throwing {@link TaskRejectedException} on any further submission attempts.
192+
* <p>The default is {@code false}, blocking the caller until the submission can
193+
* be accepted. Switch this to {@code true} for immediate rejection instead.
194+
* @since 6.2.6
195+
*/
196+
public void setRejectTasksWhenLimitReached(boolean rejectTasksWhenLimitReached) {
197+
this.rejectTasksWhenLimitReached = rejectTasksWhenLimitReached;
198+
}
199+
187200
/**
188201
* Set the maximum number of parallel task executions allowed.
189202
* The default of -1 indicates no concurrency limit at all.
@@ -350,13 +363,21 @@ public void close() {
350363
* making {@code beforeAccess()} and {@code afterAccess()}
351364
* visible to the surrounding class.
352365
*/
353-
private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
366+
private class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {
354367

355368
@Override
356369
protected void beforeAccess() {
357370
super.beforeAccess();
358371
}
359372

373+
@Override
374+
protected void onLimitReached() {
375+
if (rejectTasksWhenLimitReached) {
376+
throw new TaskRejectedException("Concurrency limit reached: " + getConcurrencyLimit());
377+
}
378+
super.onLimitReached();
379+
}
380+
360381
@Override
361382
protected void afterAccess() {
362383
super.afterAccess();

Diff for: spring-core/src/main/java/org/springframework/util/ConcurrencyThrottleSupport.java

+32-21
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2024 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -105,6 +105,7 @@ public boolean isThrottleActive() {
105105
/**
106106
* To be invoked before the main execution logic of concrete subclasses.
107107
* <p>This implementation applies the concurrency throttle.
108+
* @see #onLimitReached()
108109
* @see #afterAccess()
109110
*/
110111
protected void beforeAccess() {
@@ -113,29 +114,12 @@ protected void beforeAccess() {
113114
"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
114115
}
115116
if (this.concurrencyLimit > 0) {
116-
boolean debug = logger.isDebugEnabled();
117117
this.concurrencyLock.lock();
118118
try {
119-
boolean interrupted = false;
120-
while (this.concurrencyCount >= this.concurrencyLimit) {
121-
if (interrupted) {
122-
throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
123-
"but concurrency limit still does not allow for entering");
124-
}
125-
if (debug) {
126-
logger.debug("Concurrency count " + this.concurrencyCount +
127-
" has reached limit " + this.concurrencyLimit + " - blocking");
128-
}
129-
try {
130-
this.concurrencyCondition.await();
131-
}
132-
catch (InterruptedException ex) {
133-
// Re-interrupt current thread, to allow other threads to react.
134-
Thread.currentThread().interrupt();
135-
interrupted = true;
136-
}
119+
if (this.concurrencyCount >= this.concurrencyLimit) {
120+
onLimitReached();
137121
}
138-
if (debug) {
122+
if (logger.isDebugEnabled()) {
139123
logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
140124
}
141125
this.concurrencyCount++;
@@ -146,6 +130,33 @@ protected void beforeAccess() {
146130
}
147131
}
148132

133+
/**
134+
* Triggered by {@link #beforeAccess()} when the concurrency limit has been reached.
135+
* The default implementation blocks until the concurrency count allows for entering.
136+
* @since 6.2.6
137+
*/
138+
protected void onLimitReached() {
139+
boolean interrupted = false;
140+
while (this.concurrencyCount >= this.concurrencyLimit) {
141+
if (interrupted) {
142+
throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
143+
"but concurrency limit still does not allow for entering");
144+
}
145+
if (logger.isDebugEnabled()) {
146+
logger.debug("Concurrency count " + this.concurrencyCount +
147+
" has reached limit " + this.concurrencyLimit + " - blocking");
148+
}
149+
try {
150+
this.concurrencyCondition.await();
151+
}
152+
catch (InterruptedException ex) {
153+
// Re-interrupt current thread, to allow other threads to react.
154+
Thread.currentThread().interrupt();
155+
interrupted = true;
156+
}
157+
}
158+
}
159+
149160
/**
150161
* To be invoked after the main execution logic of concrete subclasses.
151162
* @see #beforeAccess()

Diff for: spring-core/src/test/java/org/springframework/core/task/SimpleAsyncTaskExecutorTests.java

+43-21
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2025 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import org.springframework.util.ConcurrencyThrottleSupport;
2222

2323
import static org.assertj.core.api.Assertions.assertThat;
24+
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2425
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
2526
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
2627

@@ -31,6 +32,23 @@
3132
*/
3233
class SimpleAsyncTaskExecutorTests {
3334

35+
@Test
36+
void isActiveUntilClose() {
37+
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
38+
assertThat(executor.isActive()).isTrue();
39+
assertThat(executor.isThrottleActive()).isFalse();
40+
executor.close();
41+
assertThat(executor.isActive()).isFalse();
42+
assertThat(executor.isThrottleActive()).isFalse();
43+
}
44+
45+
@Test
46+
void throwsExceptionWhenSuppliedWithNullRunnable() {
47+
try (SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor()) {
48+
assertThatIllegalArgumentException().isThrownBy(() -> executor.execute(null));
49+
}
50+
}
51+
3452
@Test
3553
void cannotExecuteWhenConcurrencyIsSwitchedOff() {
3654
try (SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor()) {
@@ -41,35 +59,34 @@ void cannotExecuteWhenConcurrencyIsSwitchedOff() {
4159
}
4260

4361
@Test
44-
void throttleIsNotActiveByDefault() {
62+
void taskRejectedWhenConcurrencyLimitReached() {
4563
try (SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor()) {
46-
assertThat(executor.isThrottleActive()).as("Concurrency throttle must not default to being active (on)").isFalse();
64+
executor.setConcurrencyLimit(1);
65+
executor.setRejectTasksWhenLimitReached(true);
66+
assertThat(executor.isThrottleActive()).isTrue();
67+
executor.execute(new NoOpRunnable());
68+
assertThatExceptionOfType(TaskRejectedException.class).isThrownBy(() -> executor.execute(new NoOpRunnable()));
4769
}
4870
}
4971

5072
@Test
5173
void threadNameGetsSetCorrectly() {
52-
final String customPrefix = "chankPop#";
53-
final Object monitor = new Object();
54-
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(customPrefix);
55-
ThreadNameHarvester task = new ThreadNameHarvester(monitor);
56-
executeAndWait(executor, task, monitor);
57-
assertThat(task.getThreadName()).startsWith(customPrefix);
74+
String customPrefix = "chankPop#";
75+
Object monitor = new Object();
76+
try (SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(customPrefix)) {
77+
ThreadNameHarvester task = new ThreadNameHarvester(monitor);
78+
executeAndWait(executor, task, monitor);
79+
assertThat(task.getThreadName()).startsWith(customPrefix);
80+
}
5881
}
5982

6083
@Test
6184
void threadFactoryOverridesDefaults() {
62-
final Object monitor = new Object();
63-
SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(runnable -> new Thread(runnable, "test"));
64-
ThreadNameHarvester task = new ThreadNameHarvester(monitor);
65-
executeAndWait(executor, task, monitor);
66-
assertThat(task.getThreadName()).isEqualTo("test");
67-
}
68-
69-
@Test
70-
void throwsExceptionWhenSuppliedWithNullRunnable() {
71-
try (SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor()) {
72-
assertThatIllegalArgumentException().isThrownBy(() -> executor.execute(null));
85+
Object monitor = new Object();
86+
try (SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor(runnable -> new Thread(runnable, "test"))) {
87+
ThreadNameHarvester task = new ThreadNameHarvester(monitor);
88+
executeAndWait(executor, task, monitor);
89+
assertThat(task.getThreadName()).isEqualTo("test");
7390
}
7491
}
7592

@@ -89,7 +106,12 @@ private static final class NoOpRunnable implements Runnable {
89106

90107
@Override
91108
public void run() {
92-
// no-op
109+
try {
110+
Thread.sleep(1000);
111+
}
112+
catch (InterruptedException ex) {
113+
Thread.currentThread().interrupt();
114+
}
93115
}
94116
}
95117

0 commit comments

Comments
 (0)