62
62
import java .util .Optional ;
63
63
import java .util .concurrent .CompletableFuture ;
64
64
import java .util .concurrent .ConcurrentHashMap ;
65
+ import java .util .concurrent .Executor ;
65
66
import java .util .concurrent .ExecutorService ;
66
67
import java .util .concurrent .RejectedExecutionException ;
67
68
import java .util .function .Function ;
@@ -80,16 +81,18 @@ static MediaType parseMediaType(String contentType) {
80
81
return result ;
81
82
}
82
83
83
- private abstract class OkHttpAsyncBody <T > implements AsyncBody {
84
+ abstract static class OkHttpAsyncBody <T > implements AsyncBody {
84
85
private final AsyncBody .Consumer <T > consumer ;
85
86
private final BufferedSource source ;
86
87
private final CompletableFuture <Void > done = new CompletableFuture <>();
87
88
private boolean consuming ;
88
89
private boolean requested ;
90
+ private Executor executor ;
89
91
90
- private OkHttpAsyncBody (AsyncBody .Consumer <T > consumer , BufferedSource source ) {
92
+ OkHttpAsyncBody (AsyncBody .Consumer <T > consumer , BufferedSource source , Executor executor ) {
91
93
this .consumer = consumer ;
92
94
this .source = source ;
95
+ this .executor = executor ;
93
96
}
94
97
95
98
@ Override
@@ -103,7 +106,7 @@ public void consume() {
103
106
}
104
107
try {
105
108
// consume should not block a caller, delegate to the dispatcher thread pool
106
- httpClient . dispatcher (). executorService () .execute (this ::doConsume );
109
+ executor .execute (this ::doConsume );
107
110
} catch (Exception e ) {
108
111
// executor is likely shutdown
109
112
Utils .closeQuietly (source );
@@ -125,6 +128,8 @@ private void doConsume() {
125
128
T value = process (source );
126
129
consumer .consume (value , this );
127
130
} else {
131
+ // even if we've read everything an explicit close is still needed
132
+ source .close ();
128
133
done .complete (null );
129
134
}
130
135
}
@@ -311,7 +316,8 @@ private okhttp3.Request.Builder newRequestBuilder() {
311
316
@ Override
312
317
public CompletableFuture <HttpResponse <AsyncBody >> consumeBytesDirect (StandardHttpRequest request ,
313
318
Consumer <List <ByteBuffer >> consumer ) {
314
- Function <BufferedSource , AsyncBody > handler = s -> new OkHttpAsyncBody <List <ByteBuffer >>(consumer , s ) {
319
+ Function <BufferedSource , AsyncBody > handler = s -> new OkHttpAsyncBody <List <ByteBuffer >>(consumer , s ,
320
+ this .httpClient .dispatcher ().executorService ()) {
315
321
@ Override
316
322
protected List <ByteBuffer > process (BufferedSource source ) throws IOException {
317
323
// read only what is available otherwise okhttp will block trying to read
0 commit comments