Skip to content

Commit 748ee70

Browse files
committed
Add RowStreamStateMachine
1 parent 7acca81 commit 748ee70

File tree

1 file changed

+163
-0
lines changed

1 file changed

+163
-0
lines changed
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import NIOCore
2+
3+
/// A sub state for receiving data rows. Stores whether the consumer has either signaled demand and whether the
4+
/// channel has issued `read` events.
5+
///
6+
/// This should be used as a SubStateMachine in QuerySubStateMachines.
7+
struct RowStreamStateMachine {
8+
9+
enum Action {
10+
case read
11+
case wait
12+
}
13+
14+
private enum State {
15+
/// The state machines expects further writes to `channelRead`. The writes are appended to the buffer.
16+
case waitingForRows(CircularBuffer<PSQLBackendMessage.DataRow>)
17+
/// The state machines expects a call to `demandMoreResponseBodyParts` or `read`. The buffer is
18+
/// empty. It is preserved for performance reasons.
19+
case waitingForReadOrDemand(CircularBuffer<PSQLBackendMessage.DataRow>)
20+
/// The state machines expects a call to `read`. The buffer is empty. It is preserved for performance reasons.
21+
case waitingForRead(CircularBuffer<PSQLBackendMessage.DataRow>)
22+
/// The state machines expects a call to `demandMoreResponseBodyParts`. The buffer is empty. It is
23+
/// preserved for performance reasons.
24+
case waitingForDemand(CircularBuffer<PSQLBackendMessage.DataRow>)
25+
26+
case modifying
27+
}
28+
29+
private var state: State
30+
31+
init() {
32+
self.state = .waitingForRows(CircularBuffer(initialCapacity: 32))
33+
}
34+
35+
mutating func receivedRow(_ newRow: PSQLBackendMessage.DataRow) {
36+
switch self.state {
37+
case .waitingForRows(var buffer):
38+
self.state = .modifying
39+
buffer.append(newRow)
40+
self.state = .waitingForRows(buffer)
41+
42+
// For all the following cases, please note:
43+
// Normally these code paths should never be hit. However there is one way to trigger
44+
// this:
45+
//
46+
// If the server decides to close a connection, NIO will forward all outstanding
47+
// `channelRead`s without waiting for a next `context.read` call. For this reason we might
48+
// receive new rows, when we don't expect them here.
49+
case .waitingForRead(var buffer):
50+
self.state = .modifying
51+
buffer.append(newRow)
52+
self.state = .waitingForRead(buffer)
53+
54+
case .waitingForDemand(var buffer):
55+
self.state = .modifying
56+
buffer.append(newRow)
57+
self.state = .waitingForDemand(buffer)
58+
59+
case .waitingForReadOrDemand(var buffer):
60+
self.state = .modifying
61+
buffer.append(newRow)
62+
self.state = .waitingForReadOrDemand(buffer)
63+
64+
case .modifying:
65+
preconditionFailure("Invalid state: \(self.state)")
66+
}
67+
}
68+
69+
mutating func channelReadComplete() -> CircularBuffer<PSQLBackendMessage.DataRow>? {
70+
switch self.state {
71+
case .waitingForRows(let buffer):
72+
if buffer.isEmpty {
73+
self.state = .waitingForRead(buffer)
74+
return nil
75+
} else {
76+
var newBuffer = buffer
77+
newBuffer.removeAll(keepingCapacity: true)
78+
self.state = .waitingForReadOrDemand(newBuffer)
79+
return buffer
80+
}
81+
82+
case .waitingForRead,
83+
.waitingForDemand,
84+
.waitingForReadOrDemand:
85+
preconditionFailure("How can we receive a body part, after a channelReadComplete, but no read has been forwarded yet. Invalid state: \(self.state)")
86+
87+
case .modifying:
88+
preconditionFailure("Invalid state: \(self.state)")
89+
}
90+
}
91+
92+
mutating func demandMoreResponseBodyParts() -> Action {
93+
switch self.state {
94+
case .waitingForDemand(let buffer):
95+
self.state = .waitingForRows(buffer)
96+
return .read
97+
98+
case .waitingForReadOrDemand(let buffer):
99+
self.state = .waitingForRead(buffer)
100+
return .wait
101+
102+
case .waitingForRead:
103+
// If we are `.waitingForRead`, no action needs to be taken. Demand has already been
104+
// signaled. Once we receive the next `read`, we will forward it, right away
105+
return .wait
106+
107+
case .waitingForRows:
108+
// If we are `.waitingForRows`, no action needs to be taken. As soon as we receive
109+
// the next `channelReadComplete` we will forward all buffered data
110+
return .wait
111+
112+
case .modifying:
113+
preconditionFailure("Invalid state: \(self.state)")
114+
}
115+
}
116+
117+
mutating func read() -> Action {
118+
switch self.state {
119+
case .waitingForRows:
120+
// This should never happen. But we don't want to precondition this behavior. Let's just
121+
// pass the read event on
122+
return .read
123+
124+
case .waitingForReadOrDemand(let buffer):
125+
self.state = .waitingForDemand(buffer)
126+
return .wait
127+
128+
case .waitingForRead(let buffer):
129+
self.state = .waitingForRows(buffer)
130+
return .read
131+
132+
case .waitingForDemand:
133+
// we have already received a read event. We will issue it as soon as we received demand
134+
// from the consumer
135+
return .wait
136+
137+
case .modifying:
138+
preconditionFailure("Invalid state: \(self.state)")
139+
}
140+
}
141+
142+
mutating func end() -> CircularBuffer<PSQLBackendMessage.DataRow> {
143+
switch self.state {
144+
case .waitingForRows(let buffer):
145+
return buffer
146+
147+
case .waitingForReadOrDemand(let buffer),
148+
.waitingForRead(let buffer),
149+
.waitingForDemand(let buffer):
150+
151+
// Normally this code path should never be hit. However there is one way to trigger
152+
// this:
153+
//
154+
// If the server decides to close a connection, NIO will forward all outstanding
155+
// `channelRead`s without waiting for a next `context.read` call. For this reason we might
156+
// receive a call to `end()`, when we don't expect it here.
157+
return buffer
158+
159+
case .modifying:
160+
preconditionFailure("Invalid state: \(self.state)")
161+
}
162+
}
163+
}

0 commit comments

Comments
 (0)