Skip to content

Recover rxflow on closing #126

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions Network/HTTP2/Client/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ run cconf@ClientConfig{..} conf client = do
clientCore ctx mgr req processResponse = do
strm <- sendRequest ctx mgr scheme authority req
rsp <- getResponse strm
processResponse rsp
x <- processResponse rsp
adjustRxWindow ctx strm
return x
runClient ctx mgr = do
x <- client (clientCore ctx mgr) $ aux ctx
waitCounter0 mgr
Expand Down Expand Up @@ -205,14 +207,15 @@ sendStreaming Context{..} mgr req sid newstrm strmbdy = do
forkManagedUnmask mgr $ \unmask -> do
decrementedCounter <- newIORef False
let decCounterOnce = do
alreadyDecremented <- atomicModifyIORef decrementedCounter $ \b -> (True, b)
unless alreadyDecremented $ decCounter mgr
let iface = OutBodyIface {
outBodyUnmask = unmask
, outBodyPush = \b -> atomically $ writeTBQueue tbq (StreamingBuilder b Nothing)
, outBodyPushFinal = \b -> atomically $ writeTBQueue tbq (StreamingBuilder b (Just decCounterOnce))
, outBodyFlush = atomically $ writeTBQueue tbq StreamingFlush
}
alreadyDecremented <- atomicModifyIORef decrementedCounter $ \b -> (True, b)
unless alreadyDecremented $ decCounter mgr
let iface =
OutBodyIface
{ outBodyUnmask = unmask
, outBodyPush = \b -> atomically $ writeTBQueue tbq (StreamingBuilder b Nothing)
, outBodyPushFinal = \b -> atomically $ writeTBQueue tbq (StreamingBuilder b (Just decCounterOnce))
, outBodyFlush = atomically $ writeTBQueue tbq StreamingFlush
}
finished = atomically $ writeTBQueue tbq $ StreamingFinished decCounterOnce
incCounter mgr
strmbdy iface `finally` finished
Expand Down
17 changes: 5 additions & 12 deletions Network/HTTP2/H2/Receiver.hs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,12 @@ processState (Open _ (NoBody tbl@(_, reqvt))) ctx@Context{..} strm@Stream{stream
return False

-- Transition (process2)
processState (Open hcl (HasBody tbl@(_, reqvt))) ctx@Context{..} strm@Stream{streamInput} _streamId = do
processState (Open hcl (HasBody tbl@(_, reqvt))) ctx@Context{..} strm@Stream{streamInput, streamRxQ} _streamId = do
let mcl = fst <$> (getFieldValue tokenContentLength reqvt >>= C8.readInt)
bodyLength <- newIORef 0
tlr <- newIORef Nothing
q <- newTQueueIO
writeIORef streamRxQ $ Just q
setStreamState ctx strm $ Open hcl (Body q mcl bodyLength tlr)
-- FLOW CONTROL: WINDOW_UPDATE 0: recv: announcing my limit properly
-- FLOW CONTROL: WINDOW_UPDATE: recv: announcing my limit properly
Expand Down Expand Up @@ -612,17 +613,9 @@ stream x FrameHeader{streamId} _ _ _ _ =
----------------------------------------------------------------

-- | Type for input streaming.
data Source
= Source
(Int -> IO ())
(TQueue (Either E.SomeException (ByteString, Bool)))
(IORef ByteString)
(IORef Bool)

mkSource
:: TQueue (Either E.SomeException (ByteString, Bool))
-> (Int -> IO ())
-> IO Source
data Source = Source (Int -> IO ()) RxQ (IORef ByteString) (IORef Bool)

mkSource :: RxQ -> (Int -> IO ()) -> IO Source
mkSource q inform = Source inform q <$> newIORef "" <*> newIORef False

readSource :: Source -> IO (ByteString, Bool)
Expand Down
2 changes: 2 additions & 0 deletions Network/HTTP2/H2/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ newOddStream sid txwin rxwin =
<*> newEmptyMVar
<*> newTVarIO (newTxFlow txwin)
<*> newIORef (newRxFlow rxwin)
<*> newIORef Nothing

newEvenStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newEvenStream sid txwin rxwin =
Expand All @@ -60,6 +61,7 @@ newEvenStream sid txwin rxwin =
<*> newEmptyMVar
<*> newTVarIO (newTxFlow txwin)
<*> newIORef (newRxFlow rxwin)
<*> newIORef Nothing

----------------------------------------------------------------

Expand Down
3 changes: 3 additions & 0 deletions Network/HTTP2/H2/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,15 @@ instance Show StreamState where

----------------------------------------------------------------

type RxQ = TQueue (Either E.SomeException (ByteString, Bool))

data Stream = Stream
{ streamNumber :: StreamId
, streamState :: IORef StreamState
, streamInput :: MVar (Either SomeException InpObj) -- Client only
, streamTxFlow :: TVar TxFlow
, streamRxFlow :: IORef RxFlow
, streamRxQ :: IORef (Maybe RxQ)
}

instance Show Stream where
Expand Down
24 changes: 23 additions & 1 deletion Network/HTTP2/H2/Window.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE BangPatterns #-}

module Network.HTTP2.H2.Window where

import qualified Data.ByteString as BS
import Data.IORef
import Network.Control
import qualified UnliftIO.Exception as E
Expand Down Expand Up @@ -79,3 +80,24 @@ informWindowUpdate Context{controlQ, rxFlow} Stream{streamNumber, streamRxFlow}
let frame = windowUpdateFrame streamNumber ws
cframe = CFrames Nothing [frame]
enqueueControl controlQ cframe

adjustRxWindow :: Context -> Stream -> IO ()
adjustRxWindow ctx stream@Stream{streamRxQ} = do
mq <- readIORef streamRxQ
case mq of
Nothing -> return ()
Just q -> do
len <- readQ q
informWindowUpdate ctx stream len
where
readQ q = atomically $ loop 0
where
loop !total = do
meb <- tryReadTQueue q
case meb of
Just (Right (bs, _)) -> loop (total + BS.length bs)
Just le@(Left _) -> do
-- reserving HTTP2Error
writeTQueue q le
return total
_ -> return total
2 changes: 1 addition & 1 deletion Network/HTTP2/Server/Run.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ run sconf@ServerConfig{numberOfWorkers} conf server = do
when ok $ do
(ctx, mgr) <- setup sconf conf
let wc = fromContext ctx
setAction mgr $ worker wc mgr server
setAction mgr $ worker ctx wc mgr server
replicateM_ numberOfWorkers $ spawnAction mgr
runH2 conf ctx mgr

Expand Down
14 changes: 5 additions & 9 deletions Network/HTTP2/Server/Worker.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}
Expand All @@ -15,7 +14,6 @@ import Network.HTTP.Semantics.IO
import Network.HTTP.Semantics.Server
import Network.HTTP.Semantics.Server.Internal
import Network.HTTP.Types
import Network.Socket (SockAddr)
import qualified System.TimeManager as T
import UnliftIO.Exception (SomeException (..))
import qualified UnliftIO.Exception as E
Expand All @@ -33,8 +31,6 @@ data WorkerConf a = WorkerConf
, workerCleanup :: a -> IO ()
, isPushable :: IO Bool
, makePushStream :: a -> PushPromise -> IO (StreamId, a)
, mySockAddr :: SockAddr
, peerSockAddr :: SockAddr
}

fromContext :: Context -> WorkerConf Stream
Expand All @@ -54,8 +50,6 @@ fromContext ctx@Context{..} =
(_, newstrm) <- openEvenStreamWait ctx
let pid = streamNumber pstrm
return (pid, newstrm)
, mySockAddr = mySockAddr
, peerSockAddr = peerSockAddr
}

----------------------------------------------------------------
Expand Down Expand Up @@ -162,8 +156,8 @@ response wc@WorkerConf{..} mgr th tconf strm (Request req) (Response rsp) pps =
(_, reqvt) = inpObjHeaders req

-- | Worker for server applications.
worker :: WorkerConf a -> Manager -> Server -> Action
worker wc@WorkerConf{..} mgr server = do
worker :: Context -> WorkerConf Stream -> Manager -> Server -> Action
worker ctx@Context{..} wc@WorkerConf{..} mgr server = do
sinfo <- newStreamInfo
tcont <- newThreadContinue
timeoutKillThread mgr $ go sinfo tcont
Expand All @@ -178,7 +172,9 @@ worker wc@WorkerConf{..} mgr server = do
T.resume th
T.tickle th
let aux = Aux th mySockAddr peerSockAddr
server (Request req') aux $ response wc mgr th tcont strm (Request req')
r <- server (Request req') aux $ response wc mgr th tcont strm (Request req')
adjustRxWindow ctx strm
return r
cont1 <- case ex of
Right () -> return True
Left e@(SomeException _)
Expand Down
Loading