21
21
import java .util .LinkedHashSet ;
22
22
import java .util .List ;
23
23
import java .util .Set ;
24
- import java .util .concurrent .atomic .AtomicBoolean ;
25
24
import java .util .function .Consumer ;
26
25
27
26
import org .springframework .http .MediaType ;
@@ -77,7 +76,7 @@ public class ResponseBodyEmitter {
77
76
private final Set <DataWithMediaType > earlySendAttempts = new LinkedHashSet <>(8 );
78
77
79
78
/** Store successful completion before the handler is initialized. */
80
- private final AtomicBoolean complete = new AtomicBoolean () ;
79
+ private boolean complete ;
81
80
82
81
/** Store an error before the handler is initialized. */
83
82
@ Nullable
@@ -128,7 +127,7 @@ synchronized void initialize(Handler handler) throws IOException {
128
127
this .earlySendAttempts .clear ();
129
128
}
130
129
131
- if (this .complete . get () ) {
130
+ if (this .complete ) {
132
131
if (this .failure != null ) {
133
132
this .handler .completeWithError (this .failure );
134
133
}
@@ -143,12 +142,11 @@ synchronized void initialize(Handler handler) throws IOException {
143
142
}
144
143
}
145
144
146
- void initializeWithError (Throwable ex ) {
147
- if (this .complete .compareAndSet (false , true )) {
148
- this .failure = ex ;
149
- this .earlySendAttempts .clear ();
150
- this .errorCallback .accept (ex );
151
- }
145
+ synchronized void initializeWithError (Throwable ex ) {
146
+ this .complete = true ;
147
+ this .failure = ex ;
148
+ this .earlySendAttempts .clear ();
149
+ this .errorCallback .accept (ex );
152
150
}
153
151
154
152
/**
@@ -186,7 +184,7 @@ public void send(Object object) throws IOException {
186
184
* @throws java.lang.IllegalStateException wraps any other errors
187
185
*/
188
186
public synchronized void send (Object object , @ Nullable MediaType mediaType ) throws IOException {
189
- Assert .state (!this .complete . get () , () -> "ResponseBodyEmitter has already completed" +
187
+ Assert .state (!this .complete , () -> "ResponseBodyEmitter has already completed" +
190
188
(this .failure != null ? " with error: " + this .failure : "" ));
191
189
if (this .handler != null ) {
192
190
try {
@@ -214,7 +212,7 @@ public synchronized void send(Object object, @Nullable MediaType mediaType) thro
214
212
* @since 6.0.12
215
213
*/
216
214
public synchronized void send (Set <DataWithMediaType > items ) throws IOException {
217
- Assert .state (!this .complete . get () , () -> "ResponseBodyEmitter has already completed" +
215
+ Assert .state (!this .complete , () -> "ResponseBodyEmitter has already completed" +
218
216
(this .failure != null ? " with error: " + this .failure : "" ));
219
217
sendInternal (items );
220
218
}
@@ -247,8 +245,9 @@ private void sendInternal(Set<DataWithMediaType> items) throws IOException {
247
245
* to complete request processing. It should not be used after container
248
246
* related events such as an error while {@link #send(Object) sending}.
249
247
*/
250
- public void complete () {
251
- if (this .complete .compareAndSet (false , true ) && this .handler != null ) {
248
+ public synchronized void complete () {
249
+ this .complete = true ;
250
+ if (this .handler != null ) {
252
251
this .handler .complete ();
253
252
}
254
253
}
@@ -264,12 +263,11 @@ public void complete() {
264
263
* container related events such as an error while
265
264
* {@link #send(Object) sending}.
266
265
*/
267
- public void completeWithError (Throwable ex ) {
268
- if (this .complete .compareAndSet (false , true )) {
269
- this .failure = ex ;
270
- if (this .handler != null ) {
271
- this .handler .completeWithError (ex );
272
- }
266
+ public synchronized void completeWithError (Throwable ex ) {
267
+ this .complete = true ;
268
+ this .failure = ex ;
269
+ if (this .handler != null ) {
270
+ this .handler .completeWithError (ex );
273
271
}
274
272
}
275
273
@@ -278,7 +276,7 @@ public void completeWithError(Throwable ex) {
278
276
* called from a container thread when an async request times out.
279
277
* <p>As of 6.2, one can register multiple callbacks for this event.
280
278
*/
281
- public void onTimeout (Runnable callback ) {
279
+ public synchronized void onTimeout (Runnable callback ) {
282
280
this .timeoutCallback .addDelegate (callback );
283
281
}
284
282
@@ -289,7 +287,7 @@ public void onTimeout(Runnable callback) {
289
287
* <p>As of 6.2, one can register multiple callbacks for this event.
290
288
* @since 5.0
291
289
*/
292
- public void onError (Consumer <Throwable > callback ) {
290
+ public synchronized void onError (Consumer <Throwable > callback ) {
293
291
this .errorCallback .addDelegate (callback );
294
292
}
295
293
@@ -300,7 +298,7 @@ public void onError(Consumer<Throwable> callback) {
300
298
* detecting that a {@code ResponseBodyEmitter} instance is no longer usable.
301
299
* <p>As of 6.2, one can register multiple callbacks for this event.
302
300
*/
303
- public void onCompletion (Runnable callback ) {
301
+ public synchronized void onCompletion (Runnable callback ) {
304
302
this .completionCallback .addDelegate (callback );
305
303
}
306
304
@@ -371,15 +369,15 @@ public MediaType getMediaType() {
371
369
372
370
private class DefaultCallback implements Runnable {
373
371
374
- private final List <Runnable > delegates = new ArrayList <>(1 );
372
+ private List <Runnable > delegates = new ArrayList <>(1 );
375
373
376
- public synchronized void addDelegate (Runnable delegate ) {
374
+ public void addDelegate (Runnable delegate ) {
377
375
this .delegates .add (delegate );
378
376
}
379
377
380
378
@ Override
381
379
public void run () {
382
- ResponseBodyEmitter .this .complete . compareAndSet ( false , true ) ;
380
+ ResponseBodyEmitter .this .complete = true ;
383
381
for (Runnable delegate : this .delegates ) {
384
382
delegate .run ();
385
383
}
@@ -389,15 +387,15 @@ public void run() {
389
387
390
388
private class ErrorCallback implements Consumer <Throwable > {
391
389
392
- private final List <Consumer <Throwable >> delegates = new ArrayList <>(1 );
390
+ private List <Consumer <Throwable >> delegates = new ArrayList <>(1 );
393
391
394
- public synchronized void addDelegate (Consumer <Throwable > callback ) {
392
+ public void addDelegate (Consumer <Throwable > callback ) {
395
393
this .delegates .add (callback );
396
394
}
397
395
398
396
@ Override
399
397
public void accept (Throwable t ) {
400
- ResponseBodyEmitter .this .complete . compareAndSet ( false , true ) ;
398
+ ResponseBodyEmitter .this .complete = true ;
401
399
for (Consumer <Throwable > delegate : this .delegates ) {
402
400
delegate .accept (t );
403
401
}
0 commit comments