Skip to content

Commit 7f040e3

Browse files
committed
simplifying sync with sender
1 parent ac64d27 commit 7f040e3

File tree

6 files changed

+230
-209
lines changed

6 files changed

+230
-209
lines changed

Network/HTTP2/Client/Run.hs

+56-41
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,10 @@ run cconf@ClientConfig{..} conf client = do
8484
{ auxPossibleClientStreams = possibleClientStream ctx
8585
}
8686
clientCore ctx req processResponse = do
87-
strm <- sendRequest conf ctx scheme authority req
87+
(strm, moutobj) <- makeStream ctx scheme authority req
88+
case moutobj of
89+
Nothing -> return ()
90+
Just outobj -> sendRequest conf ctx strm outobj
8891
rsp <- getResponse strm
8992
x <- processResponse rsp
9093
adjustRxWindow ctx strm
@@ -109,7 +112,10 @@ runIO cconf@ClientConfig{..} conf@Config{..} action = do
109112
ctx@Context{..} <- setup cconf conf
110113
let putB bs = enqueueControl controlQ $ CFrames Nothing [bs]
111114
putR req = do
112-
strm <- sendRequest conf ctx scheme authority req
115+
(strm, moutobj) <- makeStream ctx scheme authority req
116+
case moutobj of
117+
Nothing -> return ()
118+
Just outobj -> sendRequest conf ctx strm outobj
113119
return (streamNumber strm, strm)
114120
get = getResponse
115121
create = openOddStreamWait ctx
@@ -165,23 +171,22 @@ runH2 conf ctx runClient = do
165171
Left () -> do
166172
wait runningClient
167173

168-
sendRequest
169-
:: Config
170-
-> Context
174+
makeStream
175+
:: Context
171176
-> Scheme
172177
-> Authority
173178
-> Request
174-
-> IO Stream
175-
sendRequest conf ctx@Context{..} scheme auth (Request req) = do
179+
-> IO (Stream, Maybe OutObj)
180+
makeStream ctx@Context{..} scheme auth (Request req) = do
176181
-- Checking push promises
177182
let hdr0 = outObjHeaders req
178-
method = fromMaybe (error "sendRequest:method") $ lookup ":method" hdr0
179-
path = fromMaybe (error "sendRequest:path") $ lookup ":path" hdr0
183+
method = fromMaybe (error "makeStream:method") $ lookup ":method" hdr0
184+
path = fromMaybe (error "makeStream:path") $ lookup ":path" hdr0
180185
mstrm0 <- lookupEvenCache evenStreamTable method path
181186
case mstrm0 of
182187
Just strm0 -> do
183188
deleteEvenCache evenStreamTable method path
184-
return strm0
189+
return (strm0, Nothing)
185190
Nothing -> do
186191
-- Arch/Sender is originally implemented for servers where
187192
-- the ordering of responses can be out-of-order.
@@ -200,38 +205,48 @@ sendRequest conf ctx@Context{..} scheme auth (Request req) = do
200205
| otherwise = hdr1
201206
req' = req{outObjHeaders = hdr2}
202207
-- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
203-
(sid, newstrm) <- openOddStreamWait ctx
204-
sendHeaderBody conf ctx sid newstrm req'
205-
return newstrm
208+
(_sid, newstrm) <- openOddStreamWait ctx
209+
return (newstrm, Just req')
206210

207-
sendHeaderBody :: Config -> Context -> StreamId -> Stream -> OutObj -> IO ()
208-
sendHeaderBody Config{..} ctx@Context{..} sid newstrm OutObj{..} = do
209-
(mnext, mtbq) <- case outObjBody of
210-
OutBodyNone -> return (Nothing, Nothing)
211-
OutBodyFile (FileSpec path fileoff bytecount) -> do
212-
(pread, sentinel) <- confPositionReadMaker path
213-
let next = fillFileBodyGetNext pread fileoff bytecount sentinel
214-
return (Just next, Nothing)
215-
OutBodyBuilder builder -> do
216-
let next = fillBuilderBodyGetNext builder
217-
return (Just next, Nothing)
218-
OutBodyStreaming strmbdy -> do
219-
q <- sendStreaming ctx newstrm $ \iface ->
220-
outBodyUnmask iface $ strmbdy (outBodyPush iface) (outBodyFlush iface)
221-
let next = nextForStreaming q
222-
return (Just next, Just q)
223-
OutBodyStreamingIface strmbdy -> do
224-
q <- sendStreaming ctx newstrm strmbdy
225-
let next = nextForStreaming q
226-
return (Just next, Just q)
227-
((var, sync), out) <-
228-
prepareSync newstrm (OHeader outObjHeaders mnext outObjTrailers) mtbq
229-
atomically $ do
230-
sidOK <- readTVar outputQStreamID
231-
check (sidOK == sid)
232-
enqueueOutputSTM outputQ out
233-
writeTVar outputQStreamID (sid + 2)
234-
forkManaged threadManager "H2 worker" $ syncWithSender ctx newstrm var sync
211+
sendRequest :: Config -> Context -> Stream -> OutObj -> IO ()
212+
sendRequest Config{..} ctx@Context{..} strm OutObj{..} =
213+
forkManaged threadManager label $ do
214+
let sid = streamNumber strm
215+
(mnext, mtbq) <- case outObjBody of
216+
OutBodyNone -> return (Nothing, Nothing)
217+
OutBodyFile (FileSpec path fileoff bytecount) -> do
218+
(pread, sentinel) <- confPositionReadMaker path
219+
let next = fillFileBodyGetNext pread fileoff bytecount sentinel
220+
return (Just next, Nothing)
221+
OutBodyBuilder builder -> do
222+
let next = fillBuilderBodyGetNext builder
223+
return (Just next, Nothing)
224+
OutBodyStreaming strmbdy -> do
225+
q <- sendStreaming ctx strm $ \iface ->
226+
outBodyUnmask iface $ strmbdy (outBodyPush iface) (outBodyFlush iface)
227+
let next = nextForStreaming q
228+
return (Just next, Just q)
229+
OutBodyStreamingIface strmbdy -> do
230+
q <- sendStreaming ctx strm strmbdy
231+
let next = nextForStreaming q
232+
return (Just next, Just q)
233+
let ot = (OHeader outObjHeaders mnext outObjTrailers)
234+
(var, out) <- makeOutput strm ot
235+
atomically $ do
236+
sidOK <- readTVar outputQStreamID
237+
check (sidOK == sid)
238+
writeTVar outputQStreamID (sid + 2)
239+
enqueueOutputSTM outputQ out
240+
tovar <- newTVarIO False
241+
let lc =
242+
LoopCheck
243+
{ lcTBQ = mtbq
244+
, lcTimeout = tovar
245+
, lcWindow = streamTxFlow strm
246+
}
247+
syncWithSender' ctx var lc
248+
where
249+
label = "H2 request sender for stream " ++ show (streamNumber strm)
235250

236251
sendStreaming
237252
:: Context

Network/HTTP2/H2/Sender.hs

+45-52
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ frameSender
7777
x <- atomically $ dequeue off
7878
case x of
7979
C ctl -> flushN off >> control ctl >> loop 0
80-
O out -> outputOrEnqueueAgain out off >>= flushIfNecessary >>= loop
80+
O out -> outputAndSync out off >>= flushIfNecessary >>= loop
8181
Flush -> flushN off >> loop 0
8282

8383
-- Flush the connection buffer to the socket, where the first 'n' bytes of
@@ -139,29 +139,31 @@ frameSender
139139
Just siz -> setLimitForEncoding siz encodeDynamicTable
140140

141141
----------------------------------------------------------------
142-
outputOrEnqueueAgain :: Output -> Offset -> IO Offset
143-
outputOrEnqueueAgain out@(Output strm otyp sync) off = E.handle (\e -> resetStream strm InternalError e >> return off) $ do
142+
-- INVARIANT
143+
--
144+
-- Both the stream window and the connection window are open.
145+
----------------------------------------------------------------
146+
outputAndSync :: Output -> Offset -> IO Offset
147+
outputAndSync out@(Output strm otyp sync) off = E.handle (\e -> resetStream strm InternalError e >> return off) $ do
144148
state <- readStreamState strm
145149
if isHalfClosedLocal state
146150
then return off
147151
else case otyp of
148-
OHeader hdr mnext tlrmkr ->
149-
-- Send headers immediately, without waiting for data
150-
-- No need to check the streaming window (applies to DATA frames only)
151-
outputHeader strm hdr mnext tlrmkr sync off
152+
OHeader hdr mnext tlrmkr -> do
153+
(off', mout') <- outputHeader strm hdr mnext tlrmkr sync off
154+
case mout' of
155+
Nothing -> sync Done
156+
Just out' -> sync $ Cont out'
157+
return off'
152158
_ -> do
153-
-- The 'sync' function usage constraints hold here: We
154-
-- just popped off the only 'Output' for this stream,
155-
-- and we only enqueue a new output (in 'output') if
156-
-- 'sync' returns 'True'
157-
ok <- sync $ Just otyp
158-
if ok
159-
then do
160-
sws <- getStreamWindowSize strm
161-
cws <- getConnectionWindowSize ctx -- not 0
162-
let lim = min cws sws
163-
output out off lim
164-
else return off
159+
sws <- getStreamWindowSize strm
160+
cws <- getConnectionWindowSize ctx -- not 0
161+
let lim = min cws sws
162+
(off', mout') <- output out off lim
163+
case mout' of
164+
Nothing -> sync Done
165+
Just out' -> sync $ Cont out'
166+
return off'
165167

166168
resetStream :: Stream -> ErrorCode -> E.SomeException -> IO ()
167169
resetStream strm err e = do
@@ -175,9 +177,9 @@ frameSender
175177
-> [Header]
176178
-> Maybe DynaNext
177179
-> TrailersMaker
178-
-> (Maybe OutputType -> IO Bool)
180+
-> (Sync -> IO ())
179181
-> Offset
180-
-> IO Offset
182+
-> IO (Offset, Maybe Output)
181183
outputHeader strm hdr mnext tlrmkr sync off0 = do
182184
-- Header frame and Continuation frame
183185
let sid = streamNumber strm
@@ -186,19 +188,19 @@ frameSender
186188
off' <- headerContinue sid ths endOfStream off0
187189
-- halfClosedLocal calls closed which removes
188190
-- the stream from stream table.
189-
when endOfStream $ do
190-
halfClosedLocal ctx strm Finished
191-
void $ sync Nothing
192191
off <- flushIfNecessary off'
193192
case mnext of
194-
Nothing -> return off
193+
Nothing -> do
194+
-- endOfStream
195+
halfClosedLocal ctx strm Finished
196+
return (off, Nothing)
195197
Just next -> do
196198
let out' = Output strm (ONext next tlrmkr) sync
197-
outputOrEnqueueAgain out' off
199+
return (off, Just out')
198200

199201
----------------------------------------------------------------
200-
output :: Output -> Offset -> WindowSize -> IO Offset
201-
output out@(Output strm (ONext curr tlrmkr) sync) off0 lim = do
202+
output :: Output -> Offset -> WindowSize -> IO (Offset, Maybe Output)
203+
output out@(Output strm (ONext curr tlrmkr) _) off0 lim = do
202204
-- Data frame payload
203205
buflim <- readIORef outputBufferLimit
204206
let payloadOff = off0 + frameHeaderLength
@@ -208,13 +210,12 @@ frameSender
208210
case next of
209211
Next datPayloadLen reqflush mnext -> do
210212
NextTrailersMaker tlrmkr' <- runTrailersMaker tlrmkr datBuf datPayloadLen
211-
fillDataHeaderEnqueueNext
213+
fillDataHeader
212214
strm
213215
off0
214216
datPayloadLen
215217
mnext
216218
tlrmkr'
217-
sync
218219
out
219220
reqflush
220221
CancelNext mErr -> do
@@ -233,15 +234,14 @@ frameSender
233234
resetStream strm InternalError err
234235
Nothing ->
235236
resetStream strm Cancel (E.toException CancelledStream)
236-
return off0
237-
output (Output strm (OPush ths pid) sync) off0 _lim = do
237+
return (off0, Nothing)
238+
output (Output strm (OPush ths pid) _) off0 _lim = do
238239
-- Creating a push promise header
239240
-- Frame id should be associated stream id from the client.
240241
let sid = streamNumber strm
241242
len <- pushPromise pid sid ths off0
242243
off <- flushIfNecessary $ off0 + frameHeaderLength + len
243-
_ <- sync Nothing
244-
return off
244+
return (off, Nothing)
245245
output _ _ _ = undefined -- never reached
246246

247247
----------------------------------------------------------------
@@ -285,23 +285,21 @@ frameSender
285285
continue off' ths' FrameContinuation
286286

287287
----------------------------------------------------------------
288-
fillDataHeaderEnqueueNext
288+
fillDataHeader
289289
:: Stream
290290
-> Offset
291291
-> Int
292292
-> Maybe DynaNext
293293
-> (Maybe ByteString -> IO NextTrailersMaker)
294-
-> (Maybe OutputType -> IO Bool)
295294
-> Output
296295
-> Bool
297-
-> IO Offset
298-
fillDataHeaderEnqueueNext
296+
-> IO (Offset, Maybe Output)
297+
fillDataHeader
299298
strm@Stream{streamNumber}
300299
off
301300
datPayloadLen
302301
Nothing
303302
tlrmkr
304-
sync
305303
_
306304
reqflush = do
307305
let buf = confWriteBuffer `plusPtr` off
@@ -321,41 +319,37 @@ frameSender
321319
else
322320
return off
323321
off'' <- handleTrailers mtrailers off'
324-
_ <- sync Nothing
325322
halfClosedLocal ctx strm Finished
326323
if reqflush
327324
then do
328325
flushN off''
329-
return 0
330-
else return off''
326+
return (0, Nothing)
327+
else return (off'', Nothing)
331328
where
332329
handleTrailers Nothing off0 = return off0
333330
handleTrailers (Just trailers) off0 = do
334331
(ths, _) <- toTokenHeaderTable trailers
335332
headerContinue streamNumber ths True {- endOfStream -} off0
336-
fillDataHeaderEnqueueNext
333+
fillDataHeader
337334
_
338335
off
339336
0
340337
(Just next)
341338
tlrmkr
342-
_
343339
out
344340
reqflush = do
345341
let out' = out{outputType = ONext next tlrmkr}
346-
enqueueOutput outputQ out'
347342
if reqflush
348343
then do
349344
flushN off
350-
return 0
351-
else return off
352-
fillDataHeaderEnqueueNext
345+
return (0, Just out')
346+
else return (off, Just out')
347+
fillDataHeader
353348
strm@Stream{streamNumber}
354349
off
355350
datPayloadLen
356351
(Just next)
357352
tlrmkr
358-
_
359353
out
360354
reqflush = do
361355
let buf = confWriteBuffer `plusPtr` off
@@ -364,12 +358,11 @@ frameSender
364358
fillFrameHeader FrameData datPayloadLen streamNumber flag buf
365359
decreaseWindowSize ctx strm datPayloadLen
366360
let out' = out{outputType = ONext next tlrmkr}
367-
enqueueOutput outputQ out'
368361
if reqflush
369362
then do
370363
flushN off'
371-
return 0
372-
else return off'
364+
return (0, Just out')
365+
else return (off', Just out')
373366

374367
----------------------------------------------------------------
375368
pushPromise :: StreamId -> StreamId -> TokenHeaderList -> Offset -> IO Int

0 commit comments

Comments
 (0)