Skip to content

Commit 3b829f1

Browse files
committed
simplifying sync with sender
1 parent cb2ce0f commit 3b829f1

File tree

6 files changed

+178
-172
lines changed

6 files changed

+178
-172
lines changed

Network/HTTP2/Client/Run.hs

+5-4
Original file line numberDiff line numberDiff line change
@@ -230,14 +230,15 @@ sendRequest Config{..} ctx@Context{..} strm OutObj{..} =
230230
q <- sendStreaming ctx strm strmbdy
231231
let next = nextForStreaming q
232232
return (Just next, Just q)
233-
((var, sync), out) <-
234-
prepareSync strm (OHeader outObjHeaders mnext outObjTrailers) mtbq
233+
let ot = OHeader outObjHeaders mnext outObjTrailers
234+
(var, out) <- makeOutput strm ot
235235
atomically $ do
236236
sidOK <- readTVar outputQStreamID
237237
check (sidOK == sid)
238-
enqueueOutputSTM outputQ out
239238
writeTVar outputQStreamID (sid + 2)
240-
syncWithSender ctx strm var sync
239+
enqueueOutputSTM outputQ out
240+
lc <- newLoopCheck strm mtbq
241+
syncWithSender' ctx var lc
241242
where
242243
label = "H2 request sender for stream " ++ show (streamNumber strm)
243244

Network/HTTP2/H2/Sender.hs

+45-52
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ frameSender
7777
x <- atomically $ dequeue off
7878
case x of
7979
C ctl -> flushN off >> control ctl >> loop 0
80-
O out -> outputOrEnqueueAgain out off >>= flushIfNecessary >>= loop
80+
O out -> outputAndSync out off >>= flushIfNecessary >>= loop
8181
Flush -> flushN off >> loop 0
8282

8383
-- Flush the connection buffer to the socket, where the first 'n' bytes of
@@ -139,29 +139,31 @@ frameSender
139139
Just siz -> setLimitForEncoding siz encodeDynamicTable
140140

141141
----------------------------------------------------------------
142-
outputOrEnqueueAgain :: Output -> Offset -> IO Offset
143-
outputOrEnqueueAgain out@(Output strm otyp sync) off = E.handle (\e -> resetStream strm InternalError e >> return off) $ do
142+
-- INVARIANT
143+
--
144+
-- Both the stream window and the connection window are open.
145+
----------------------------------------------------------------
146+
outputAndSync :: Output -> Offset -> IO Offset
147+
outputAndSync out@(Output strm otyp sync) off = E.handle (\e -> resetStream strm InternalError e >> return off) $ do
144148
state <- readStreamState strm
145149
if isHalfClosedLocal state
146150
then return off
147151
else case otyp of
148-
OHeader hdr mnext tlrmkr ->
149-
-- Send headers immediately, without waiting for data
150-
-- No need to check the streaming window (applies to DATA frames only)
151-
outputHeader strm hdr mnext tlrmkr sync off
152+
OHeader hdr mnext tlrmkr -> do
153+
(off', mout') <- outputHeader strm hdr mnext tlrmkr sync off
154+
case mout' of
155+
Nothing -> sync Done
156+
Just out' -> sync $ Cont out'
157+
return off'
152158
_ -> do
153-
-- The 'sync' function usage constraints hold here: We
154-
-- just popped off the only 'Output' for this stream,
155-
-- and we only enqueue a new output (in 'output') if
156-
-- 'sync' returns 'True'
157-
ok <- sync $ Just otyp
158-
if ok
159-
then do
160-
sws <- getStreamWindowSize strm
161-
cws <- getConnectionWindowSize ctx -- not 0
162-
let lim = min cws sws
163-
output out off lim
164-
else return off
159+
sws <- getStreamWindowSize strm
160+
cws <- getConnectionWindowSize ctx -- not 0
161+
let lim = min cws sws
162+
(off', mout') <- output out off lim
163+
case mout' of
164+
Nothing -> sync Done
165+
Just out' -> sync $ Cont out'
166+
return off'
165167

166168
resetStream :: Stream -> ErrorCode -> E.SomeException -> IO ()
167169
resetStream strm err e = do
@@ -175,9 +177,9 @@ frameSender
175177
-> [Header]
176178
-> Maybe DynaNext
177179
-> TrailersMaker
178-
-> (Maybe OutputType -> IO Bool)
180+
-> (Sync -> IO ())
179181
-> Offset
180-
-> IO Offset
182+
-> IO (Offset, Maybe Output)
181183
outputHeader strm hdr mnext tlrmkr sync off0 = do
182184
-- Header frame and Continuation frame
183185
let sid = streamNumber strm
@@ -186,19 +188,19 @@ frameSender
186188
off' <- headerContinue sid ths endOfStream off0
187189
-- halfClosedLocal calls closed which removes
188190
-- the stream from stream table.
189-
when endOfStream $ do
190-
halfClosedLocal ctx strm Finished
191-
void $ sync Nothing
192191
off <- flushIfNecessary off'
193192
case mnext of
194-
Nothing -> return off
193+
Nothing -> do
194+
-- endOfStream
195+
halfClosedLocal ctx strm Finished
196+
return (off, Nothing)
195197
Just next -> do
196198
let out' = Output strm (ONext next tlrmkr) sync
197-
outputOrEnqueueAgain out' off
199+
return (off, Just out')
198200

199201
----------------------------------------------------------------
200-
output :: Output -> Offset -> WindowSize -> IO Offset
201-
output out@(Output strm (ONext curr tlrmkr) sync) off0 lim = do
202+
output :: Output -> Offset -> WindowSize -> IO (Offset, Maybe Output)
203+
output out@(Output strm (ONext curr tlrmkr) _) off0 lim = do
202204
-- Data frame payload
203205
buflim <- readIORef outputBufferLimit
204206
let payloadOff = off0 + frameHeaderLength
@@ -208,13 +210,12 @@ frameSender
208210
case next of
209211
Next datPayloadLen reqflush mnext -> do
210212
NextTrailersMaker tlrmkr' <- runTrailersMaker tlrmkr datBuf datPayloadLen
211-
fillDataHeaderEnqueueNext
213+
fillDataHeader
212214
strm
213215
off0
214216
datPayloadLen
215217
mnext
216218
tlrmkr'
217-
sync
218219
out
219220
reqflush
220221
CancelNext mErr -> do
@@ -233,15 +234,14 @@ frameSender
233234
resetStream strm InternalError err
234235
Nothing ->
235236
resetStream strm Cancel (E.toException CancelledStream)
236-
return off0
237-
output (Output strm (OPush ths pid) sync) off0 _lim = do
237+
return (off0, Nothing)
238+
output (Output strm (OPush ths pid) _) off0 _lim = do
238239
-- Creating a push promise header
239240
-- Frame id should be associated stream id from the client.
240241
let sid = streamNumber strm
241242
len <- pushPromise pid sid ths off0
242243
off <- flushIfNecessary $ off0 + frameHeaderLength + len
243-
_ <- sync Nothing
244-
return off
244+
return (off, Nothing)
245245
output _ _ _ = undefined -- never reached
246246

247247
----------------------------------------------------------------
@@ -285,23 +285,21 @@ frameSender
285285
continue off' ths' FrameContinuation
286286

287287
----------------------------------------------------------------
288-
fillDataHeaderEnqueueNext
288+
fillDataHeader
289289
:: Stream
290290
-> Offset
291291
-> Int
292292
-> Maybe DynaNext
293293
-> (Maybe ByteString -> IO NextTrailersMaker)
294-
-> (Maybe OutputType -> IO Bool)
295294
-> Output
296295
-> Bool
297-
-> IO Offset
298-
fillDataHeaderEnqueueNext
296+
-> IO (Offset, Maybe Output)
297+
fillDataHeader
299298
strm@Stream{streamNumber}
300299
off
301300
datPayloadLen
302301
Nothing
303302
tlrmkr
304-
sync
305303
_
306304
reqflush = do
307305
let buf = confWriteBuffer `plusPtr` off
@@ -321,41 +319,37 @@ frameSender
321319
else
322320
return off
323321
off'' <- handleTrailers mtrailers off'
324-
_ <- sync Nothing
325322
halfClosedLocal ctx strm Finished
326323
if reqflush
327324
then do
328325
flushN off''
329-
return 0
330-
else return off''
326+
return (0, Nothing)
327+
else return (off'', Nothing)
331328
where
332329
handleTrailers Nothing off0 = return off0
333330
handleTrailers (Just trailers) off0 = do
334331
(ths, _) <- toTokenHeaderTable trailers
335332
headerContinue streamNumber ths True {- endOfStream -} off0
336-
fillDataHeaderEnqueueNext
333+
fillDataHeader
337334
_
338335
off
339336
0
340337
(Just next)
341338
tlrmkr
342-
_
343339
out
344340
reqflush = do
345341
let out' = out{outputType = ONext next tlrmkr}
346-
enqueueOutput outputQ out'
347342
if reqflush
348343
then do
349344
flushN off
350-
return 0
351-
else return off
352-
fillDataHeaderEnqueueNext
345+
return (0, Just out')
346+
else return (off, Just out')
347+
fillDataHeader
353348
strm@Stream{streamNumber}
354349
off
355350
datPayloadLen
356351
(Just next)
357352
tlrmkr
358-
_
359353
out
360354
reqflush = do
361355
let buf = confWriteBuffer `plusPtr` off
@@ -364,12 +358,11 @@ frameSender
364358
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
365359
decreaseWindowSize ctx strm datPayloadLen
366360
let out' = out{outputType = ONext next tlrmkr}
367-
enqueueOutput outputQ out'
368361
if reqflush
369362
then do
370363
flushN off'
371-
return 0
372-
else return off'
364+
return (0, Just out')
365+
else return (off', Just out')
373366

374367
----------------------------------------------------------------
375368
pushPromise :: StreamId -> StreamId -> TokenHeaderList -> Offset -> IO Int

0 commit comments

Comments
 (0)