Skip to content

Commit 42b0c5f

Browse files
committed
docs: add documentation on how reliable messaging is implemented
1 parent 7ea0884 commit 42b0c5f

File tree

2 files changed

+179
-9
lines changed

2 files changed

+179
-9
lines changed

docs/reliable.md

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
# Reliable Sync
2+
3+
The state of any CRDT instance is eventually consistent with all the other
4+
instances. If we miss one or more updates, we can't be sure that all the
5+
instances eventually converge to the same state. This is not a feature of the
6+
default *ActionCable* implementation in Rails. Therefore, we built a mechanism
7+
that guarantees `at-least-once` delivery and builds on top of Rails ActionCable.
8+
9+
`yrb-actioncable` extends ActionCable with a sync and a reliable sync mechanism.
10+
The following document describes the parts that make a sync reliable. In order
11+
to be considered an (effective) reliable transport, it must provide the
12+
following features:
13+
14+
- `at-least-once` delivery of new updates from any client
15+
- `at-least-once` distribution of updates from the server
16+
- No delays in the delivery of messages (happy path)
17+
18+
This extension does not provide any order guarantees. This is ok, due to
19+
integrating updates in a Y.js document is idempotent, and applying the same
20+
update multiple times will not change the state.
21+
22+
In order to achieve the guarantees described above, we must maintain a message
23+
queue that is not only capable of distributing messages to clients considered
24+
_live_. ActionCable relies on Redis PubSub, which only guarantees `at-most-once`
25+
delivery, and it is possible that messages get dropped (temporary disconnect,
26+
node crashing, …).
27+
28+
To work around these limitations, we use the concept of _Streams_. Every unique
29+
document must maintain its own _stream_ of updates. A stream is an append-only
30+
data-structure, and once an update is appended, it becomes immutable.
31+
32+
We guarantee that an update is appended to a _stream_, by `acknowledging` the
33+
retrieval and persistence of the update in the stream. In case a message does
34+
not get acknowledged, we will make sure to send the message again. We retry as
35+
long as necessary for a message to get acknowledged.
36+
37+
It is important to understand, that the client and server implementation
38+
are different. As there are many clients and (conceptually) one server, it is
39+
far more important that all updates produced by any client eventually end up on
40+
the server to be re-distributed to all other clients immediately, than one
41+
client missing an update from the server (as long as the client catches up
42+
eventually).
43+
Therefore, there is a relatively small timespan for the server to acknowledge
44+
message retrieval, before a client tries again. The client must implement an
45+
exponential backoff mechanism with a maximum number of retries to not overwhelm
46+
the server, and it must eventually stop trying when maximum number of retries is
47+
reached. At this point a client can be considered offline and need to
48+
essentially resync it's complete state to the server to be considered online
49+
again.
50+
51+
Instead, when a client does not immediately acknowledge an update distributed to
52+
it, the server does not retry immediately, but instead tracks the current
53+
offset of the client in the stream. This is conceptually similar to how
54+
consumers and consumer groups work in Kafka and Redis Streams.
55+
56+
## Tracker
57+
58+
A tracker is implemented as a sorted set. The sort order (score) is the
59+
normalized stream offset of a client. In case of Redis, this is a padded
60+
Integer, created from the `entry_id` returned by invoking the `#xadd` method.
61+
62+
The `entry_id` returned by the `stream` append operation is guaranteed to be a
63+
monotonic counter. As we cannot sort on the provided format, we pad the
64+
right-side counter with enough space to be sure that there is never going to be
65+
a conflict. In order for this format to break, a client would need to produce
66+
more than 999 messages within `1 ms`.
67+
68+
```
69+
entry_id = client.xadd(…) # 123456-0 -> 123456000
70+
entry_id = client.xadd(…) # 123456-1 -> 123456001
71+
```
72+
73+
The item that is tracked is not the user object, but the connection. This is
74+
necessary to support scenarios where a user has multiple browsers or tabs open
75+
with the same document.
76+
77+
A connection is added to the tracker as soon as a connection `subscribes` to a
78+
`channel`, and a `channel` must always have a `1:1` relation with a `Document`.
79+
The connection will be removed from the tracker as soon as the connection gets
80+
dropped (`unsubsribed`).
81+
82+
## Garbage Collection
83+
84+
The _reliable_ sync mechanism adds state to the server, just for the purpose of
85+
guaranteeing delivery. When not being careful, memory usage can balloon easily
86+
for both, the stream and the tracker. The assumption is that the volume of
87+
updates grows linear with the number of users. To reduce the state kept in
88+
memory to a minimum, we use the tracker to truncate the stream, and a heuristic
89+
to collect garbage from the tracker.
90+
91+
### Truncate the stream
92+
93+
The stream is periodically (or manually) truncated using the _minimum score_
94+
currently stored in the tracker. This means, when all clients have acknowledged
95+
all updates, the size of the `stream = 0`, and in return, when at least one
96+
client hasn't acknowledged any update, the size of the
97+
`stream = number of updates`. The actual size of the stream will vary based on
98+
various factors:
99+
1. Length of the _interval_ (higher values = more memory)
100+
2. Health of clients (unhealthy clients > 0 = more memory)
101+
102+
### Garbage collect clients in the tracker
103+
104+
Due to the second scenario (at least one client does not acknowledge any
105+
update), we need to make sure that the tracker is cleaned from clients that are
106+
not in a healthy state. To determine if a client is not health, we use two
107+
heuristics:
108+
109+
1. We assume that clients become _unhealthy_ when they fall too far behind from other clients (relative delta)
110+
2. When clients haven't consumed any updates for a given timespan (absolute delta)
111+
112+
Due to the tracker being implemented as a sorted set, and given that the order
113+
value is essentially a padded UNIX epoch, we can check for both heuristics with
114+
low runtime complexity.
115+
116+
For 1): we measure the delta between the client with the highest score and all
117+
other clients. Every client that exceeds a threshold (e.g., 30 seconds) is
118+
evicted from the tracker. This can easily be implemented with a
119+
`ZREMRANGEBYSCORE` in Redis. This is the only heuristic applied as long as there
120+
is one health client.
121+
122+
For 2): For cases where no client is healthy, we make sure that the delta
123+
between any client and the current server UNIX epoch is not above a certain
124+
threshold (e.g., 30 minutes). The second threshold must be higher than the first
125+
one and should be selected carefully based on use cases (e.g. temporary offline
126+
for x minutes due to Wi-Fi disconnect in train). We can, again, use
127+
`ZREMRANGEBYSCORE` to evict clients from the tracker that exceed the threshold.
128+
129+
## Operations
130+
131+
### Acknowledge
132+
133+
Every message exchanged between client and server is identified by an `op` type.
134+
The `ack` operation usually consists of one field: `id`. This allows the sender
135+
to accept a message as delivered.
136+
137+
For the client, this results in clearing the _send buffer_ from the acknowledged
138+
message, and the server will update (move) the tracker offset for the client
139+
that acknowledged given `id`.
140+
141+
The client retry mechanism implemented as follows:
142+
143+
A function is periodically called to retry any message that is still in the
144+
_send buffer_. To let the client establish a relation between the `acknowledge`
145+
operation payload and the actual message acknowledged by the server, we use a
146+
logical clock instead of the stream offset when sending messages from the client
147+
side.
148+
149+
The implementation uses UNIX epochs to determine if we have reached the number
150+
of max retries. The length of the interval for the periodically called `retry`
151+
function must exceed the maximum time that it can take for the retry mechanism
152+
to terminate.
153+
154+
The server does not implement a retry mechanism, but will re-send all messages
155+
that aren't acknowledged by a connection when a new incoming message is
156+
distributed to a client. This results in relatively little overhead for reliable
157+
messaging on the server-side, which should be helpful scaling the messaging
158+
system to many clients.
159+
160+
### Update
161+
162+
Incoming updates are relayed to all clients immediately (re-broadcasted). The
163+
update operation implements a small optimization, where it uses the identifier
164+
of a connection to determine the `origin` of a message. The actual `transmit`
165+
implementation then does not send a message back to its origin. This is strictly
166+
speaking not a functional requirement, but makes the messaging more efficient
167+
and easier to debug.

packages/yrb-actioncable/src/reliable-websocket-provider.ts

+12-9
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ enum Op {
5454

5555
type OpPayload =
5656
{ op: Op.Ack, clock: number } |
57-
{ op: Op.Update, update: string, id: string }
57+
{ op: Op.Update, updates: string[], id: string }
5858

5959
const permissionDeniedHandler = (
6060
provider: ReliableWebsocketProvider,
@@ -133,7 +133,7 @@ export class ReliableWebsocketProvider {
133133
bcconnected: boolean;
134134
private readonly disableBc: boolean;
135135
private _synced: boolean;
136-
private retryInterval: Timer | undefined;
136+
private retryInterval: NodeJS.Timer | number | undefined;
137137

138138
constructor(
139139
doc: Doc,
@@ -292,13 +292,16 @@ export class ReliableWebsocketProvider {
292292
console.log(`buffer size=${provider.buffer.size}`, provider.buffer);
293293
break;
294294
case Op.Update:
295-
const encodedUpdate = message.update;
296-
const update = decodeBase64ToBinary(encodedUpdate);
297-
const encoder = provider.process(update, true);
298-
if (encodingLength(encoder) > 1) {
299-
provider.send(toUint8Array(encoder));
300-
}
301-
this.perform(Op.Ack, {id: message.id});
295+
// TODO: make sure to only create one encoder, integrate all updates
296+
// send one ack operation, including all IDs
297+
message.updates.forEach((encodedUpdate) => {
298+
const update = decodeBase64ToBinary(encodedUpdate);
299+
const encoder = provider.process(update, true);
300+
if (encodingLength(encoder) > 1) {
301+
provider.send(toUint8Array(encoder));
302+
}
303+
this.perform(Op.Ack, {id: message.id});
304+
});
302305
break;
303306
}
304307
},

0 commit comments

Comments
 (0)