Skip to content

Commit dc1e9c0

Browse files
committed
Detect final element
Closes #114.
1 parent 187fb46 commit dc1e9c0

File tree

10 files changed

+200
-92
lines changed

10 files changed

+200
-92
lines changed

.github/workflows/haskell-ci.yml

+6-22
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
#
99
# For more information, see https://github.com/haskell-CI/haskell-ci
1010
#
11-
# version: 0.19.20240501
11+
# version: 0.19.20240514
1212
#
13-
# REGENDATA ("0.19.20240501",["github","cabal.project.ci"])
13+
# REGENDATA ("0.19.20240514",["github","cabal.project.ci"])
1414
#
1515
name: Haskell-CI
1616
on:
@@ -28,9 +28,9 @@ jobs:
2828
strategy:
2929
matrix:
3030
include:
31-
- compiler: ghc-9.10.0.20240426
31+
- compiler: ghc-9.10.1
3232
compilerKind: ghc
33-
compilerVersion: 9.10.0.20240426
33+
compilerVersion: 9.10.1
3434
setup-method: ghcup
3535
allow-failure: false
3636
- compiler: ghc-9.8.2
@@ -67,7 +67,6 @@ jobs:
6767
mkdir -p "$HOME/.ghcup/bin"
6868
curl -sL https://downloads.haskell.org/ghcup/0.1.20.0/x86_64-linux-ghcup-0.1.20.0 > "$HOME/.ghcup/bin/ghcup"
6969
chmod a+x "$HOME/.ghcup/bin/ghcup"
70-
"$HOME/.ghcup/bin/ghcup" config add-release-channel https://raw.githubusercontent.com/haskell/ghcup-metadata/master/ghcup-prereleases-0.0.8.yaml;
7170
"$HOME/.ghcup/bin/ghcup" install ghc "$HCVER" || (cat "$HOME"/.ghcup/logs/*.* && false)
7271
"$HOME/.ghcup/bin/ghcup" install cabal 3.10.2.0 || (cat "$HOME"/.ghcup/logs/*.* && false)
7372
apt-get update
@@ -94,7 +93,7 @@ jobs:
9493
echo "HCNUMVER=$HCNUMVER" >> "$GITHUB_ENV"
9594
echo "ARG_TESTS=--enable-tests" >> "$GITHUB_ENV"
9695
echo "ARG_BENCH=--enable-benchmarks" >> "$GITHUB_ENV"
97-
if [ $((HCNUMVER >= 91000)) -ne 0 ] ; then echo "HEADHACKAGE=true" >> "$GITHUB_ENV" ; else echo "HEADHACKAGE=false" >> "$GITHUB_ENV" ; fi
96+
echo "HEADHACKAGE=false" >> "$GITHUB_ENV"
9897
echo "ARG_COMPILER=--$HCKIND --with-compiler=$HC" >> "$GITHUB_ENV"
9998
echo "GHCJSARITH=0" >> "$GITHUB_ENV"
10099
env:
@@ -123,18 +122,6 @@ jobs:
123122
repository hackage.haskell.org
124123
url: http://hackage.haskell.org/
125124
EOF
126-
if $HEADHACKAGE; then
127-
cat >> $CABAL_CONFIG <<EOF
128-
repository head.hackage.ghc.haskell.org
129-
url: https://ghc.gitlab.haskell.org/head.hackage/
130-
secure: True
131-
root-keys: 7541f32a4ccca4f97aea3b22f5e593ba2c0267546016b992dfadcd2fe944e55d
132-
26021a13b401500c8eb2761ca95c61f2d625bfef951b939a8124ed12ecf07329
133-
f76d08be13e9a61a377a85e2fb63f4c5435d40f8feb3e12eb05905edb8cdea89
134-
key-threshold: 3
135-
active-repositories: hackage.haskell.org, head.hackage.ghc.haskell.org:override
136-
EOF
137-
fi
138125
cat >> $CABAL_CONFIG <<EOF
139126
program-default-options
140127
ghc-options: $GHCJOBS +RTS -M3G -RTS
@@ -191,10 +178,7 @@ jobs:
191178
flags: +build-demo +build-stress-test +snappy
192179
ghc-options: -Werror
193180
EOF
194-
if $HEADHACKAGE; then
195-
echo "allow-newer: $($HCPKG list --simple-output | sed -E 's/([a-zA-Z-]+)-[0-9.]+/*:\1,/g')" >> cabal.project
196-
fi
197-
$HCPKG list --simple-output --names-only | perl -ne 'for (split /\s+/) { print "constraints: $_ installed\n" unless /^(grapesy)$/; }' >> cabal.project.local
181+
$HCPKG list --simple-output --names-only | perl -ne 'for (split /\s+/) { print "constraints: any.$_ installed\n" unless /^(grapesy)$/; }' >> cabal.project.local
198182
cat cabal.project
199183
cat cabal.project.local
200184
- name: dump install plan

grapesy.cabal

+2-2
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ library
194194
, exceptions >= 0.10 && < 0.11
195195
, hashable >= 1.3 && < 1.5
196196
, http-types >= 0.12 && < 0.13
197-
, http2 >= 5.2.1 && < 5.3
197+
, http2 >= 5.2.2 && < 5.3
198198
, http2-tls >= 0.2.11 && < 0.3
199199
, lens >= 5.0 && < 5.4
200200
, mtl >= 2.2 && < 2.4
@@ -305,7 +305,7 @@ test-suite test-grapesy
305305
, containers >= 0.6 && < 0.8
306306
, exceptions >= 0.10 && < 0.11
307307
, http-types >= 0.12 && < 0.13
308-
, http2 >= 5.2.1 && < 5.3
308+
, http2 >= 5.2.2 && < 5.3
309309
, mtl >= 2.2 && < 2.4
310310
, network >= 3.1 && < 3.3
311311
, proto-lens-runtime >= 0.7 && < 0.8

src/Network/GRPC/Client.hs

-4
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,3 @@ import Network.GRPC.Util.TLS qualified as Util.TLS
187187
-- The specialized functions from "Network.GRPC.Client.StreamType" take care of
188188
-- this; if these functions are not applicable, users may wish to use
189189
-- 'recvFinalOutput'.
190-
--
191-
-- **KNOWN LIMITATION**: for _incoming_ messages @grapesy@ cannot currently
192-
-- make this distinction. For detailed discussion, see
193-
-- <https://github.com/well-typed/grapesy/issues/114>.

test-grapesy/Test/Driver/Dialogue/Execution.hs

+6-5
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,12 @@ clientLocal clock call = \(LocalSteps steps) ->
206206
expect (tick, action) (== ResponseInitialMetadata expectedMetadata) $
207207
receivedMetadata
208208
Send (FinalElem a b) -> do
209-
-- <https://github.com/well-typed/grapesy/issues/114>
209+
-- On the client side, when the server sends the final message, we
210+
-- will receive that final message in one HTTP data frame, and then
211+
-- the trailers in another. This means that when we get the message,
212+
-- we do not yet know if this is in fact the last.
213+
-- (This is different on the server side, because gRPC does not
214+
-- support trailers on the client side.)
210215
reactToServer tick $ Send (StreamElem a)
211216
reactToServer tick $ Send (NoMoreElems b)
212217
Send expectedElem -> do
@@ -401,10 +406,6 @@ serverLocal clock call = \(LocalSteps steps) -> do
401406
case action of
402407
Initiate _ ->
403408
error "serverLocal: unexpected ClientInitiateRequest"
404-
Send (FinalElem a b) -> do
405-
-- <https://github.com/well-typed/grapesy/issues/114>
406-
reactToClient tick $ Send (StreamElem a)
407-
reactToClient tick $ Send (NoMoreElems b)
408409
Send expectedElem -> do
409410
mInp <- liftIO $ try $ within timeoutReceive action $
410411
Server.Binary.recvInput call

test-grapesy/Test/Prop/IncrementalParsing.hs

+45-13
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@ tests = testGroup "Test.Prop.IncrementalParsing" [
2424
testProperty "parser" test_parser
2525
]
2626

27-
test_parser :: Input -> [ChunkSize] -> PhraseSize -> Property
28-
test_parser input splits phraseSize =
27+
test_parser :: MarkLast -> Input -> [ChunkSize] -> PhraseSize -> Property
28+
test_parser markLast input splits phraseSize =
2929
counterexample ("chunks: " ++ show chunks)
30-
$ case processAll chunks processPhrase (parsePhrase phraseSize) of
30+
$ case processAll markLast chunks phraseSize of
3131
Left err ->
3232
counterexample ("Unexpected failure " ++ show err) $ False
3333
Right unconsumed ->
@@ -64,29 +64,33 @@ runPure chunks =
6464
. flip runStateT chunks
6565
. unwrapPure
6666

67-
getChunk :: Pure Strict.ByteString
68-
getChunk =
67+
getChunk :: MarkLast -> Pure (Strict.ByteString, Bool)
68+
getChunk (MarkLast markLast) =
6969
state $ \case
70-
[] -> (BS.Strict.empty, [])
71-
c:cs -> (c, cs)
70+
[] -> ((BS.Strict.empty, True), [])
71+
[c] -> ((c, markLast), [])
72+
c:cs -> ((c, False), cs)
7273

7374
processAll ::
74-
[Strict.ByteString]
75-
-> (a -> Pure ())
76-
-> Parser String a
75+
MarkLast
76+
-> [Strict.ByteString]
77+
-> PhraseSize
7778
-> Either String Lazy.ByteString
78-
processAll chunks processOne p =
79+
processAll markLast chunks phraseSize =
7980
runPure chunks aux >>= verifyAllChunksConsumed
8081
where
82+
p :: Parser String [Word8]
83+
p = parsePhrase phraseSize
84+
8185
-- 'processAll' does not assume that the monad @m@ in which it is executed
8286
-- has any way of reporting errors: if there is a parse failure during
8387
-- execution, this failure is returned as a value. For the specific case of
8488
-- 'Pure', however, we /can/ throw errors in @m@ (to allow 'processOne' to
8589
-- throw errors), so we can reuse that also for any parse failures.
8690
aux :: Pure Lazy.ByteString
8791
aux =
88-
Parser.processAll getChunk processOne p
89-
>>= either throwError return
92+
Parser.processAll (getChunk markLast) processPhrase processPhrase p
93+
>>= throwParseErrors
9094

9195
-- 'processAll' should run until all chunks are used
9296
verifyAllChunksConsumed ::
@@ -99,6 +103,28 @@ processAll chunks processOne p =
99103
| otherwise
100104
= Left "not all chunks consumed"
101105

106+
throwParseErrors :: Parser.ProcessResult String () -> Pure Lazy.ByteString
107+
throwParseErrors (Parser.ProcessError err) =
108+
throwError err
109+
throwParseErrors (Parser.ProcessedWithFinal () bs) = do
110+
unless canMarkFinal $ throwError "Unexpected ProcessedWithFinal"
111+
return bs
112+
throwParseErrors (Parser.ProcessedWithoutFinal bs) = do
113+
when canMarkFinal $ throwError "Unexpected ProcessedWithoutFinal"
114+
return bs
115+
116+
-- We can mark the final phrase as final if the final chunk is marked as
117+
-- final, and when we get that chunk, it contains at least one phrase.
118+
canMarkFinal :: Bool
119+
canMarkFinal = and [
120+
getMarkLast markLast
121+
, case reverse chunks of
122+
[] -> False
123+
c:cs -> let left = sum (map BS.Strict.length cs)
124+
`mod` getPhraseSize phraseSize
125+
in (left + BS.Strict.length c) >= getPhraseSize phraseSize
126+
]
127+
102128
{-------------------------------------------------------------------------------
103129
Test input
104130
@@ -111,6 +137,8 @@ processAll chunks processOne p =
111137
```
112138
113139
* We split this input into non-empty chunks of varying sizes @[ChunkSize]@.
140+
We sometimes mark the last chunk as being the last, and sometimes don't
141+
(see <https://github.com/well-typed/grapesy/issues/114>).
114142
115143
* We then choose a non-zero 'PhraseSize' @n@. The idea is that the parser
116144
splits the input into phrases of @n@ bytes
@@ -134,6 +162,7 @@ processAll chunks processOne p =
134162
(in 'processAll') that all input chunks are fed to the parser.
135163
-------------------------------------------------------------------------------}
136164

165+
newtype MarkLast = MarkLast { getMarkLast :: Bool } deriving (Show)
137166
newtype Input = Input { getInputBytes :: [Word8] } deriving (Show)
138167
newtype ChunkSize = ChunkSize { getChunkSize :: Int } deriving (Show)
139168
newtype PhraseSize = PhraseSize { getPhraseSize :: Int } deriving (Show)
@@ -179,6 +208,8 @@ processPhrase phrase =
179208
Arbitrary instances
180209
-------------------------------------------------------------------------------}
181210

211+
deriving newtype instance Arbitrary MarkLast
212+
182213
instance Arbitrary Input where
183214
arbitrary = sized $ \n -> do
184215
len <- choose (0, n * 100)
@@ -187,3 +218,4 @@ instance Arbitrary Input where
187218

188219
deriving via Positive Int instance Arbitrary ChunkSize
189220
deriving via Positive Int instance Arbitrary PhraseSize
221+

test-grapesy/Test/Sanity/EndOfStream.hs

+33-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ tests = testGroup "Test.Sanity.EndOfStream" [
2929
, testCase "recvTrailers" test_recvTrailers
3030
]
3131
, testGroup "server" [
32-
testCase "recvEndOfInput" test_recvEndOfInput
32+
testCase "recvInput" test_recvInput
33+
, testCase "recvEndOfInput" test_recvEndOfInput
3334
]
3435
]
3536

@@ -155,6 +156,37 @@ serverStreamingHandler = Server.streamingRpcHandler $
155156
Server tests
156157
-------------------------------------------------------------------------------}
157158

159+
-- | Test that the final element is marked as 'FinalElem'
160+
--
161+
-- Verifies that <https://github.com/well-typed/grapesy/issues/114> is solved.
162+
--
163+
-- NOTE: There is no client equivalent for this test. On the client side, the
164+
-- server will send trailers, and so /cannot/ make the final data frame as
165+
-- end-of-stream.
166+
test_recvInput :: Assertion
167+
test_recvInput = testClientServer $ ClientServerTest {
168+
config = def
169+
, server = [Server.someRpcHandler handler]
170+
, client = simpleTestClient $ \conn ->
171+
Client.withRPC conn def (Proxy @Trivial) $ \call -> do
172+
Client.sendFinalInput call BS.Lazy.empty
173+
_resp <- Client.recvFinalOutput call
174+
return ()
175+
}
176+
where
177+
handler :: Server.RpcHandler IO Trivial
178+
handler = Server.mkRpcHandler $ \call -> do
179+
x <- Server.recvInput call
180+
181+
-- The purpose of this test:
182+
case x of
183+
FinalElem{} ->
184+
return ()
185+
_otherwise ->
186+
assertFailure "Expected FinalElem"
187+
188+
Server.sendFinalOutput call (mempty, NoMetadata)
189+
158190
-- | Test that 'recvEndOfInput' does /not/ throw an exception, even if the
159191
-- previous 'recvNextInput' /happened/ to give us the final input.
160192
--

test-grapesy/Test/Sanity/StreamingType/CustomFormat.hs

+2-1
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,11 @@ test_calculator_cbor = do
232232
biDiStreamingSumHandler = Server.streamingRpcHandler $
233233
Server.mkBiDiStreaming $ \recv send ->
234234
let
235+
go :: Int -> IO ()
235236
go acc =
236237
recv >>= \case
237238
NoMoreElems _ -> return ()
238-
FinalElem n _ -> send (acc + n) >> go (acc + n)
239+
FinalElem n _ -> send (acc + n)
239240
StreamElem n -> send (acc + n) >> go (acc + n)
240241
in
241242
go 0

util/Network/GRPC/Util/HTTP2/Stream.hs

+4-4
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ data OutputStream = OutputStream {
4040
}
4141

4242
data InputStream = InputStream {
43-
_getChunk :: HasCallStack => IO Strict.ByteString
43+
_getChunk :: HasCallStack => IO (Strict.ByteString, Bool)
4444
, _getTrailers :: HasCallStack => IO [HTTP.Header]
4545
}
4646

@@ -54,7 +54,7 @@ writeChunk = _writeChunk
5454
flush :: HasCallStack => OutputStream -> IO ()
5555
flush = _flush
5656

57-
getChunk :: HasCallStack => InputStream -> IO Strict.ByteString
57+
getChunk :: HasCallStack => InputStream -> IO (Strict.ByteString, Bool)
5858
getChunk = _getChunk
5959

6060
getTrailers :: HasCallStack => InputStream -> IO [HTTP.Header]
@@ -68,7 +68,7 @@ serverInputStream :: Server.Request -> IO InputStream
6868
serverInputStream req = do
6969
return InputStream {
7070
_getChunk = wrapStreamExceptionsWith ClientDisconnected $
71-
Server.getRequestBodyChunk req
71+
Server.getRequestBodyChunk' req
7272
, _getTrailers = wrapStreamExceptionsWith ClientDisconnected $
7373
maybe [] fromHeaderTable <$>
7474
Server.getRequestTrailers req
@@ -132,7 +132,7 @@ clientInputStream :: Client.Response -> IO InputStream
132132
clientInputStream resp = do
133133
return InputStream {
134134
_getChunk = wrapStreamExceptionsWith ServerDisconnected $
135-
Client.getResponseBodyChunk resp
135+
Client.getResponseBodyChunk' resp
136136
, _getTrailers = wrapStreamExceptionsWith ServerDisconnected $
137137
maybe [] fromHeaderTable <$>
138138
Client.getResponseTrailers resp

0 commit comments

Comments
 (0)