Skip to content

Commit 52619a0

Browse files
committed
CABI: unshare SharedStreamImpl.copy to simplify the unshared logic (no change)
1 parent 85ca43d commit 52619a0

File tree

2 files changed

+89
-51
lines changed

2 files changed

+89
-51
lines changed

design/mvp/CanonicalABI.md

Lines changed: 64 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1371,36 +1371,34 @@ Note that `cancel` and `close` notify in opposite directions:
13711371
* `close` *must not* be called on a readable or writable end with an operation
13721372
pending, and thus `close` notifies the opposite end.
13731373

1374-
Finally, the meat of the class is the `read` method that is called through the
1375-
abstract `ReadableStream` interface (by the host or another component). There
1376-
is also a symmetric `write` method that follows the same rules as `read`,
1377-
but in the opposite direction. Both are implemented by a single underlying
1378-
`copy` method parameterized by the direction of the copy:
1379-
```python
1380-
def read(self, inst, dst, on_copy, on_copy_done):
1381-
self.copy(inst, dst, on_copy, on_copy_done, self.pending_buffer, dst)
1382-
1383-
def write(self, inst, src, on_copy, on_copy_done):
1384-
self.copy(inst, src, on_copy, on_copy_done, src, self.pending_buffer)
1385-
1386-
def copy(self, inst, buffer, on_copy, on_copy_done, src, dst):
1374+
The `read` method implements the `ReadableStream.read` interface described
1375+
above and is called by either `stream.read` or the host, depending on who is
1376+
passed the readable end of the stream. If the reader is first to rendezvous,
1377+
then all the parameters are stored in the `pending_*` fields, requiring the
1378+
reader to wait for the writer to rendezvous. If the writer was first to
1379+
rendezvous, then there is already a pending `ReadableBuffer` to read from, and
1380+
so the reader copies as much as it can (which may be less than a full buffer's
1381+
worth) and eagerly completes the copy without blocking. In the final special
1382+
case where both the reader and pending writer have zero-length buffers, the
1383+
writer is notified, but the reader remains blocked:
1384+
```python
1385+
def read(self, inst, dst_buffer, on_copy, on_copy_done):
13871386
if self.closed_:
13881387
on_copy_done(CopyResult.CLOSED)
13891388
elif not self.pending_buffer:
1390-
self.set_pending(inst, buffer, on_copy, on_copy_done)
1389+
self.set_pending(inst, dst_buffer, on_copy, on_copy_done)
13911390
else:
1392-
assert(self.t == src.t == dst.t)
1391+
assert(self.t == dst_buffer.t == self.pending_buffer.t)
13931392
trap_if(inst is self.pending_inst and self.t is not None) # temporary
13941393
if self.pending_buffer.remain() > 0:
1395-
if buffer.remain() > 0:
1396-
dst.write(src.read(min(src.remain(), dst.remain())))
1394+
if dst_buffer.remain() > 0:
1395+
n = min(dst_buffer.remain(), self.pending_buffer.remain())
1396+
dst_buffer.write(self.pending_buffer.read(n))
13971397
self.pending_on_copy(self.reset_pending)
13981398
on_copy_done(CopyResult.COMPLETED)
1399-
elif buffer is src and buffer.remain() == 0 and self.pending_buffer.is_zero_length():
1400-
on_copy_done(CopyResult.COMPLETED)
14011399
else:
14021400
self.reset_and_notify_pending(CopyResult.COMPLETED)
1403-
self.set_pending(inst, buffer, on_copy, on_copy_done)
1401+
self.set_pending(inst, dst_buffer, on_copy, on_copy_done)
14041402
```
14051403
Currently, there is a trap when both the `read` and `write` come from the same
14061404
component instance and there is a non-empty element type. This trap will be
@@ -1409,25 +1407,52 @@ and lowering can alias the same memory, interleavings can be complex and must
14091407
be handled carefully. Future improvements to the Canonical ABI ([lazy lowering])
14101408
can greatly simplify this interleaving and be more practical to implement.
14111409

1412-
The meaning of a `read` or `write` when the length is `0` is that the caller is
1413-
querying the "readiness" of the other side. When a `0`-length read/write
1414-
rendezvous with a non-`0`-length read/write, only the `0`-length read/write
1415-
completes; the non-`0`-length read/write is kept pending (and ready for a
1416-
subsequent rendezvous).
1417-
1418-
In the corner case where a `0`-length read *and* write rendezvous, only the
1419-
*writer* is notified of readiness. To avoid livelock, the Canonical ABI
1420-
requires that a writer *must* (eventually) follow a completed `0`-length write
1421-
with a non-`0`-length write that is allowed to block (allowing the reader end
1422-
to run and rendezvous with its own non-`0`-length read). To implement a
1423-
traditional `O_NONBLOCK` `write()` or `sendmsg()` API, a writer can use a
1424-
buffering scheme in which, after `select()` (or a similar API) signals a file
1425-
descriptor is ready to write, the next `O_NONBLOCK` `write()`/`sendmsg()` on
1426-
that file descriptor copies to an internal buffer and suceeds, issuing an
1427-
`async` `stream.write` in the background and waiting for completion before
1428-
signalling readiness again. Note that buffering only occurs when streaming
1429-
between two components using non-blocking I/O; if either side is the host or a
1430-
component using blocking or completion-based I/O, no buffering is necessary.
1410+
The `write` method is symmetric to `read` (being given a `ReadableBuffer`
1411+
instead of a `WritableBuffer`) and is called by the `stream.write` built-in.
1412+
(noting that the host cannot be passed the writable end of a stream but may
1413+
instead *implement* the `ReadableStream` interface and pass the readable end
1414+
into a component). The steps for `write` are the same as `read` except for
1415+
when a zero-length `write` rendezvous with a zero-length `read`, in which case
1416+
the `write` eagerly completes, leaving the `read` pending:
1417+
```python
1418+
def write(self, inst, src_buffer, on_copy, on_copy_done):
1419+
if self.closed_:
1420+
on_copy_done(CopyResult.CLOSED)
1421+
elif not self.pending_buffer:
1422+
self.set_pending(inst, src_buffer, on_copy, on_copy_done)
1423+
else:
1424+
assert(self.t == src_buffer.t == self.pending_buffer.t)
1425+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
1426+
if self.pending_buffer.remain() > 0:
1427+
if src_buffer.remain() > 0:
1428+
n = min(src_buffer.remain(), self.pending_buffer.remain())
1429+
self.pending_buffer.write(src_buffer.read(n))
1430+
self.pending_on_copy(self.reset_pending)
1431+
on_copy_done(CopyResult.COMPLETED)
1432+
elif src_buffer.is_zero_length() and self.pending_buffer.is_zero_length():
1433+
on_copy_done(CopyResult.COMPLETED)
1434+
else:
1435+
self.reset_and_notify_pending(CopyResult.COMPLETED)
1436+
self.set_pending(inst, src_buffer, on_copy, on_copy_done)
1437+
```
1438+
Putting together the behavior of zero-length `read` and `write` above, we can
1439+
see that, when *both* the reader and writer are zero-length, regardless of who
1440+
was first, the zero-length `write` always completes, leaving the zero-length
1441+
`read` pending. To avoid livelock, the Canonical ABI requires that a writer
1442+
*must* (eventually) follow a completed zero-length `write` with a
1443+
non-zero-length `write` that is allowed to block. This will break the loop,
1444+
notifying the reader end and allowing it to rendezvous with a non-zero-length
1445+
`read` and make progress. Based on this rule, to implement a traditional
1446+
`O_NONBLOCK` `write()` or `sendmsg()` API, a writer can use a buffering scheme
1447+
in which, after `select()` (or a similar API) signals a file descriptor is
1448+
ready to write, the next `O_NONBLOCK` `write()`/`sendmsg()` on that file
1449+
descriptor copies to an internal buffer and suceeds, issuing an `async`
1450+
`stream.write` in the background and waiting for completion before signalling
1451+
readiness again. Note that buffering only occurs when streaming between two
1452+
components using non-blocking I/O; if either side is the host or a component
1453+
using blocking or completion-based I/O, no buffering is necessary. This
1454+
buffering is analogous to the buffering performed in kernel memory by a
1455+
`pipe()`.
14311456

14321457
Given the above, we can define the `{Readable,Writable}StreamEnd` classes that
14331458
are actually stored in the component instance table. The classes are almost

design/mvp/canonical-abi/definitions.py

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -807,30 +807,43 @@ def close(self):
807807
def closed(self):
808808
return self.closed_
809809

810-
def read(self, inst, dst, on_copy, on_copy_done):
811-
self.copy(inst, dst, on_copy, on_copy_done, self.pending_buffer, dst)
812-
813-
def write(self, inst, src, on_copy, on_copy_done):
814-
self.copy(inst, src, on_copy, on_copy_done, src, self.pending_buffer)
810+
def read(self, inst, dst_buffer, on_copy, on_copy_done):
811+
if self.closed_:
812+
on_copy_done(CopyResult.CLOSED)
813+
elif not self.pending_buffer:
814+
self.set_pending(inst, dst_buffer, on_copy, on_copy_done)
815+
else:
816+
assert(self.t == dst_buffer.t == self.pending_buffer.t)
817+
trap_if(inst is self.pending_inst and self.t is not None) # temporary
818+
if self.pending_buffer.remain() > 0:
819+
if dst_buffer.remain() > 0:
820+
n = min(dst_buffer.remain(), self.pending_buffer.remain())
821+
dst_buffer.write(self.pending_buffer.read(n))
822+
self.pending_on_copy(self.reset_pending)
823+
on_copy_done(CopyResult.COMPLETED)
824+
else:
825+
self.reset_and_notify_pending(CopyResult.COMPLETED)
826+
self.set_pending(inst, dst_buffer, on_copy, on_copy_done)
815827

816-
def copy(self, inst, buffer, on_copy, on_copy_done, src, dst):
828+
def write(self, inst, src_buffer, on_copy, on_copy_done):
817829
if self.closed_:
818830
on_copy_done(CopyResult.CLOSED)
819831
elif not self.pending_buffer:
820-
self.set_pending(inst, buffer, on_copy, on_copy_done)
832+
self.set_pending(inst, src_buffer, on_copy, on_copy_done)
821833
else:
822-
assert(self.t == src.t == dst.t)
834+
assert(self.t == src_buffer.t == self.pending_buffer.t)
823835
trap_if(inst is self.pending_inst and self.t is not None) # temporary
824836
if self.pending_buffer.remain() > 0:
825-
if buffer.remain() > 0:
826-
dst.write(src.read(min(src.remain(), dst.remain())))
837+
if src_buffer.remain() > 0:
838+
n = min(src_buffer.remain(), self.pending_buffer.remain())
839+
self.pending_buffer.write(src_buffer.read(n))
827840
self.pending_on_copy(self.reset_pending)
828841
on_copy_done(CopyResult.COMPLETED)
829-
elif buffer is src and buffer.remain() == 0 and self.pending_buffer.is_zero_length():
842+
elif src_buffer.is_zero_length() and self.pending_buffer.is_zero_length():
830843
on_copy_done(CopyResult.COMPLETED)
831844
else:
832845
self.reset_and_notify_pending(CopyResult.COMPLETED)
833-
self.set_pending(inst, buffer, on_copy, on_copy_done)
846+
self.set_pending(inst, src_buffer, on_copy, on_copy_done)
834847

835848
class StreamEnd(Waitable):
836849
shared: ReadableStream

0 commit comments

Comments
 (0)