1
1
/*
2
- * Copyright 2002-2025 the original author or authors.
2
+ * Copyright 2002-2024 the original author or authors.
3
3
*
4
4
* Licensed under the Apache License, Version 2.0 (the "License");
5
5
* you may not use this file except in compliance with the License.
21
21
import java .util .LinkedHashSet ;
22
22
import java .util .List ;
23
23
import java .util .Set ;
24
- import java .util .concurrent .atomic .AtomicReference ;
25
24
import java .util .function .Consumer ;
26
25
27
26
import org .jspecify .annotations .Nullable ;
@@ -72,19 +71,20 @@ public class ResponseBodyEmitter {
72
71
73
72
private @ Nullable Handler handler ;
74
73
75
- private final AtomicReference <State > state = new AtomicReference <>(State .START );
76
-
77
74
/** Store send data before handler is initialized. */
78
75
private final Set <DataWithMediaType > earlySendAttempts = new LinkedHashSet <>(8 );
79
76
77
+ /** Store successful completion before the handler is initialized. */
78
+ private boolean complete ;
79
+
80
80
/** Store an error before the handler is initialized. */
81
81
private @ Nullable Throwable failure ;
82
82
83
- private final TimeoutCallback timeoutCallback = new TimeoutCallback ();
83
+ private final DefaultCallback timeoutCallback = new DefaultCallback ();
84
84
85
85
private final ErrorCallback errorCallback = new ErrorCallback ();
86
86
87
- private final CompletionCallback completionCallback = new CompletionCallback ();
87
+ private final DefaultCallback completionCallback = new DefaultCallback ();
88
88
89
89
90
90
/**
@@ -124,7 +124,7 @@ synchronized void initialize(Handler handler) throws IOException {
124
124
this .earlySendAttempts .clear ();
125
125
}
126
126
127
- if (this .state . get () == State . COMPLETE ) {
127
+ if (this .complete ) {
128
128
if (this .failure != null ) {
129
129
this .handler .completeWithError (this .failure );
130
130
}
@@ -139,12 +139,11 @@ synchronized void initialize(Handler handler) throws IOException {
139
139
}
140
140
}
141
141
142
- void initializeWithError (Throwable ex ) {
143
- if (this .state .compareAndSet (State .START , State .COMPLETE )) {
144
- this .failure = ex ;
145
- this .earlySendAttempts .clear ();
146
- this .errorCallback .accept (ex );
147
- }
142
+ synchronized void initializeWithError (Throwable ex ) {
143
+ this .complete = true ;
144
+ this .failure = ex ;
145
+ this .earlySendAttempts .clear ();
146
+ this .errorCallback .accept (ex );
148
147
}
149
148
150
149
/**
@@ -182,7 +181,8 @@ public void send(Object object) throws IOException {
182
181
* @throws java.lang.IllegalStateException wraps any other errors
183
182
*/
184
183
public synchronized void send (Object object , @ Nullable MediaType mediaType ) throws IOException {
185
- assertNotComplete ();
184
+ Assert .state (!this .complete , () -> "ResponseBodyEmitter has already completed" +
185
+ (this .failure != null ? " with error: " + this .failure : "" ));
186
186
if (this .handler != null ) {
187
187
try {
188
188
this .handler .send (object , mediaType );
@@ -209,13 +209,9 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
209
209
* @since 6.0.12
210
210
*/
211
211
public synchronized void send (Set <DataWithMediaType > items ) throws IOException {
212
- assertNotComplete ();
213
- sendInternal (items );
214
- }
215
-
216
- private void assertNotComplete () {
217
- Assert .state (this .state .get () == State .START , () -> "ResponseBodyEmitter has already completed" +
212
+ Assert .state (!this .complete , () -> "ResponseBodyEmitter has already completed" +
218
213
(this .failure != null ? " with error: " + this .failure : "" ));
214
+ sendInternal (items );
219
215
}
220
216
221
217
private void sendInternal (Set <DataWithMediaType > items ) throws IOException {
@@ -246,8 +242,9 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
246
242
* to complete request processing. It should not be used after container
247
243
* related events such as an error while {@link #send(Object) sending}.
248
244
*/
249
- public void complete () {
250
- if (trySetComplete () && this .handler != null ) {
245
+ public synchronized void complete () {
246
+ this .complete = true ;
247
+ if (this .handler != null ) {
251
248
this .handler .complete ();
252
249
}
253
250
}
@@ -263,26 +260,20 @@ public void complete() {
263
260
* container related events such as an error while
264
261
* {@link #send(Object) sending}.
265
262
*/
266
- public void completeWithError (Throwable ex ) {
267
- if (trySetComplete ()) {
268
- this .failure = ex ;
269
- if (this .handler != null ) {
270
- this .handler .completeWithError (ex );
271
- }
263
+ public synchronized void completeWithError (Throwable ex ) {
264
+ this .complete = true ;
265
+ this .failure = ex ;
266
+ if (this .handler != null ) {
267
+ this .handler .completeWithError (ex );
272
268
}
273
269
}
274
270
275
- private boolean trySetComplete () {
276
- return (this .state .compareAndSet (State .START , State .COMPLETE ) ||
277
- (this .state .compareAndSet (State .TIMEOUT , State .COMPLETE )));
278
- }
279
-
280
271
/**
281
272
* Register code to invoke when the async request times out. This method is
282
273
* called from a container thread when an async request times out.
283
274
* <p>As of 6.2, one can register multiple callbacks for this event.
284
275
*/
285
- public void onTimeout (Runnable callback ) {
276
+ public synchronized void onTimeout (Runnable callback ) {
286
277
this .timeoutCallback .addDelegate (callback );
287
278
}
288
279
@@ -293,7 +284,7 @@ public void onTimeout(Runnable callback) {
293
284
* <p>As of 6.2, one can register multiple callbacks for this event.
294
285
* @since 5.0
295
286
*/
296
- public void onError (Consumer <Throwable > callback ) {
287
+ public synchronized void onError (Consumer <Throwable > callback ) {
297
288
this .errorCallback .addDelegate (callback );
298
289
}
299
290
@@ -304,7 +295,7 @@ public void onError(Consumer<Throwable> callback) {
304
295
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
305
296
* <p>As of 6.2, one can register multiple callbacks for this event.
306
297
*/
307
- public void onCompletion (Runnable callback ) {
298
+ public synchronized void onCompletion (Runnable callback ) {
308
299
this .completionCallback .addDelegate (callback );
309
300
}
310
301
@@ -371,80 +362,39 @@ public Object getData() {
371
362
}
372
363
373
364
374
- private class TimeoutCallback implements Runnable {
365
+ private class DefaultCallback implements Runnable {
375
366
376
- private final List <Runnable > delegates = new ArrayList <>(1 );
367
+ private List <Runnable > delegates = new ArrayList <>(1 );
377
368
378
- public synchronized void addDelegate (Runnable delegate ) {
369
+ public void addDelegate (Runnable delegate ) {
379
370
this .delegates .add (delegate );
380
371
}
381
372
382
373
@ Override
383
374
public void run () {
384
- if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .TIMEOUT )) {
385
- for (Runnable delegate : this .delegates ) {
386
- delegate .run ();
387
- }
375
+ ResponseBodyEmitter .this .complete = true ;
376
+ for (Runnable delegate : this .delegates ) {
377
+ delegate .run ();
388
378
}
389
379
}
390
380
}
391
381
392
382
393
383
private class ErrorCallback implements Consumer <Throwable > {
394
384
395
- private final List <Consumer <Throwable >> delegates = new ArrayList <>(1 );
385
+ private List <Consumer <Throwable >> delegates = new ArrayList <>(1 );
396
386
397
- public synchronized void addDelegate (Consumer <Throwable > callback ) {
387
+ public void addDelegate (Consumer <Throwable > callback ) {
398
388
this .delegates .add (callback );
399
389
}
400
390
401
391
@ Override
402
392
public void accept (Throwable t ) {
403
- if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .COMPLETE )) {
404
- for (Consumer <Throwable > delegate : this .delegates ) {
405
- delegate .accept (t );
406
- }
393
+ ResponseBodyEmitter .this .complete = true ;
394
+ for (Consumer <Throwable > delegate : this .delegates ) {
395
+ delegate .accept (t );
407
396
}
408
397
}
409
398
}
410
399
411
-
412
- private class CompletionCallback implements Runnable {
413
-
414
- private final List <Runnable > delegates = new ArrayList <>(1 );
415
-
416
- public synchronized void addDelegate (Runnable delegate ) {
417
- this .delegates .add (delegate );
418
- }
419
-
420
- @ Override
421
- public void run () {
422
- if (ResponseBodyEmitter .this .state .compareAndSet (State .START , State .COMPLETE )) {
423
- for (Runnable delegate : this .delegates ) {
424
- delegate .run ();
425
- }
426
- }
427
- }
428
- }
429
-
430
-
431
- /**
432
- * Represents a state for {@link ResponseBodyEmitter}.
433
- * <p><pre>
434
- * START ----+
435
- * | |
436
- * v |
437
- * TIMEOUT |
438
- * | |
439
- * v |
440
- * COMPLETE <--+
441
- * </pre>
442
- * @since 6.2.4
443
- */
444
- private enum State {
445
- START ,
446
- TIMEOUT , // handling a timeout
447
- COMPLETE
448
- }
449
-
450
400
}
0 commit comments