21
21
import java .util .LinkedHashSet ;
22
22
import java .util .List ;
23
23
import java .util .Set ;
24
+ import java .util .concurrent .atomic .AtomicBoolean ;
24
25
import java .util .function .Consumer ;
25
26
26
27
import org .springframework .http .MediaType ;
@@ -76,7 +77,7 @@ public class ResponseBodyEmitter {
76
77
private final Set <DataWithMediaType > earlySendAttempts = new LinkedHashSet <>(8 );
77
78
78
79
/** Store successful completion before the handler is initialized. */
79
- private boolean complete ;
80
+ private final AtomicBoolean complete = new AtomicBoolean () ;
80
81
81
82
/** Store an error before the handler is initialized. */
82
83
@ Nullable
@@ -127,7 +128,7 @@ synchronized void initialize(Handler handler) throws IOException {
127
128
this .earlySendAttempts .clear ();
128
129
}
129
130
130
- if (this .complete ) {
131
+ if (this .complete . get () ) {
131
132
if (this .failure != null ) {
132
133
this .handler .completeWithError (this .failure );
133
134
}
@@ -142,11 +143,12 @@ synchronized void initialize(Handler handler) throws IOException {
142
143
}
143
144
}
144
145
145
- synchronized void initializeWithError (Throwable ex ) {
146
- this .complete = true ;
147
- this .failure = ex ;
148
- this .earlySendAttempts .clear ();
149
- this .errorCallback .accept (ex );
146
+ void initializeWithError (Throwable ex ) {
147
+ if (this .complete .compareAndSet (false , true )) {
148
+ this .failure = ex ;
149
+ this .earlySendAttempts .clear ();
150
+ this .errorCallback .accept (ex );
151
+ }
150
152
}
151
153
152
154
/**
@@ -184,7 +186,7 @@ public void send(Object object) throws IOException {
184
186
* @throws java.lang.IllegalStateException wraps any other errors
185
187
*/
186
188
public synchronized void send (Object object , @ Nullable MediaType mediaType ) throws IOException {
187
- Assert .state (!this .complete , () -> "ResponseBodyEmitter has already completed" +
189
+ Assert .state (!this .complete . get () , () -> "ResponseBodyEmitter has already completed" +
188
190
(this .failure != null ? " with error: " + this .failure : "" ));
189
191
if (this .handler != null ) {
190
192
try {
@@ -212,7 +214,7 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
212
214
* @since 6.0.12
213
215
*/
214
216
public synchronized void send (Set <DataWithMediaType > items ) throws IOException {
215
- Assert .state (!this .complete , () -> "ResponseBodyEmitter has already completed" +
217
+ Assert .state (!this .complete . get () , () -> "ResponseBodyEmitter has already completed" +
216
218
(this .failure != null ? " with error: " + this .failure : "" ));
217
219
sendInternal (items );
218
220
}
@@ -245,9 +247,8 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
245
247
* to complete request processing. It should not be used after container
246
248
* related events such as an error while {@link #send(Object) sending}.
247
249
*/
248
- public synchronized void complete () {
249
- this .complete = true ;
250
- if (this .handler != null ) {
250
+ public void complete () {
251
+ if (this .complete .compareAndSet (false , true ) && this .handler != null ) {
251
252
this .handler .complete ();
252
253
}
253
254
}
@@ -263,11 +264,12 @@ public synchronized void complete() {
263
264
* container related events such as an error while
264
265
* {@link #send(Object) sending}.
265
266
*/
266
- public synchronized void completeWithError (Throwable ex ) {
267
- this .complete = true ;
268
- this .failure = ex ;
269
- if (this .handler != null ) {
270
- this .handler .completeWithError (ex );
267
+ public void completeWithError (Throwable ex ) {
268
+ if (this .complete .compareAndSet (false , true )) {
269
+ this .failure = ex ;
270
+ if (this .handler != null ) {
271
+ this .handler .completeWithError (ex );
272
+ }
271
273
}
272
274
}
273
275
@@ -276,7 +278,7 @@ public synchronized void completeWithError(Throwable ex) {
276
278
* called from a container thread when an async request times out.
277
279
* <p>As of 6.2, one can register multiple callbacks for this event.
278
280
*/
279
- public synchronized void onTimeout (Runnable callback ) {
281
+ public void onTimeout (Runnable callback ) {
280
282
this .timeoutCallback .addDelegate (callback );
281
283
}
282
284
@@ -287,7 +289,7 @@ public synchronized void onTimeout(Runnable callback) {
287
289
* <p>As of 6.2, one can register multiple callbacks for this event.
288
290
* @since 5.0
289
291
*/
290
- public synchronized void onError (Consumer <Throwable > callback ) {
292
+ public void onError (Consumer <Throwable > callback ) {
291
293
this .errorCallback .addDelegate (callback );
292
294
}
293
295
@@ -298,7 +300,7 @@ public synchronized void onError(Consumer<Throwable> callback) {
298
300
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
299
301
* <p>As of 6.2, one can register multiple callbacks for this event.
300
302
*/
301
- public synchronized void onCompletion (Runnable callback ) {
303
+ public void onCompletion (Runnable callback ) {
302
304
this .completionCallback .addDelegate (callback );
303
305
}
304
306
@@ -369,15 +371,15 @@ public MediaType getMediaType() {
369
371
370
372
private class DefaultCallback implements Runnable {
371
373
372
- private List <Runnable > delegates = new ArrayList <>(1 );
374
+ private final List <Runnable > delegates = new ArrayList <>(1 );
373
375
374
- public void addDelegate (Runnable delegate ) {
376
+ public synchronized void addDelegate (Runnable delegate ) {
375
377
this .delegates .add (delegate );
376
378
}
377
379
378
380
@ Override
379
381
public void run () {
380
- ResponseBodyEmitter .this .complete = true ;
382
+ ResponseBodyEmitter .this .complete . compareAndSet ( false , true ) ;
381
383
for (Runnable delegate : this .delegates ) {
382
384
delegate .run ();
383
385
}
@@ -387,15 +389,15 @@ public void run() {
387
389
388
390
private class ErrorCallback implements Consumer <Throwable > {
389
391
390
- private List <Consumer <Throwable >> delegates = new ArrayList <>(1 );
392
+ private final List <Consumer <Throwable >> delegates = new ArrayList <>(1 );
391
393
392
- public void addDelegate (Consumer <Throwable > callback ) {
394
+ public synchronized void addDelegate (Consumer <Throwable > callback ) {
393
395
this .delegates .add (callback );
394
396
}
395
397
396
398
@ Override
397
399
public void accept (Throwable t ) {
398
- ResponseBodyEmitter .this .complete = true ;
400
+ ResponseBodyEmitter .this .complete . compareAndSet ( false , true ) ;
399
401
for (Consumer <Throwable > delegate : this .delegates ) {
400
402
delegate .accept (t );
401
403
}
0 commit comments