Skip to content

Commit 10ea197

Browse files
authored
fix: do not wait for stream reads and writes at the same time (#2290)
achingbrain/it#109 will change the behaviour of byte streams to wait for the first read before resolving the promise returned from the first write in order to have guarentees that once the promise has resolved, the data has been sent so update the tests etc to remove deadlocks.
1 parent 09dd029 commit 10ea197

File tree

5 files changed

+32
-24
lines changed

5 files changed

+32
-24
lines changed

packages/connection-encrypter-plaintext/src/index.ts

+18-15
Original file line numberDiff line numberDiff line change
@@ -75,23 +75,26 @@ class Plaintext implements ConnectionEncrypter {
7575
type = KeyType.Secp256k1
7676
}
7777

78-
// Encode the public key and write it to the remote peer
79-
await pb.write({
80-
id: localId.toBytes(),
81-
pubkey: {
82-
Type: type,
83-
Data: localId.publicKey ?? new Uint8Array(0)
84-
}
85-
}, {
86-
signal
87-
})
88-
8978
this.log('write pubkey exchange to peer %p', remoteId)
9079

91-
// Get the Exchange message
92-
const response = await pb.read({
93-
signal
94-
})
80+
const [
81+
, response
82+
] = await Promise.all([
83+
// Encode the public key and write it to the remote peer
84+
pb.write({
85+
id: localId.toBytes(),
86+
pubkey: {
87+
Type: type,
88+
Data: localId.publicKey ?? new Uint8Array(0)
89+
}
90+
}, {
91+
signal
92+
}),
93+
// Get the Exchange message
94+
pb.read({
95+
signal
96+
})
97+
])
9598

9699
let peerId
97100
try {

packages/multistream-select/test/dialer.spec.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { logger } from '@libp2p/logger'
55
import { expect } from 'aegir/chai'
66
import randomBytes from 'iso-random-stream/src/random.js'
77
import all from 'it-all'
8+
import drain from 'it-drain'
89
import { duplexPair } from 'it-pair/duplex'
910
import { pipe } from 'it-pipe'
1011
import pTimeout from 'p-timeout'
@@ -29,7 +30,7 @@ describe('Dialer', () => {
2930

3031
// Ensure stream is usable after selection - send data outgoing -> incoming
3132
const input = [randomBytes(10), randomBytes(64), randomBytes(3)]
32-
void pipe(input, selection.stream)
33+
void pipe(input, selection.stream, drain)
3334

3435
// wait for incoming end to have completed negotiation
3536
await handled

packages/pnet/src/index.ts

+10-6
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,17 @@ class PreSharedKeyConnectionProtector implements ConnectionProtector {
127127
const signal = AbortSignal.timeout(this.timeout)
128128

129129
const bytes = byteStream(connection)
130-
await bytes.write(localNonce, {
131-
signal
132-
})
133130

134-
const result = await bytes.read(NONCE_LENGTH, {
135-
signal
136-
})
131+
const [
132+
, result
133+
] = await Promise.all([
134+
bytes.write(localNonce, {
135+
signal
136+
}),
137+
bytes.read(NONCE_LENGTH, {
138+
signal
139+
})
140+
])
137141

138142
const remoteNonce = result.subarray()
139143

packages/protocol-identify/test/push.spec.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ describe('identify (push)', () => {
107107
const updatedAddress = multiaddr('/ip4/127.0.0.1/tcp/48322')
108108

109109
const pb = pbStream(stream)
110-
await pb.write({
110+
void pb.write({
111111
publicKey: remotePeer.publicKey,
112112
protocols: [
113113
updatedProtocol

packages/protocol-ping/test/index.spec.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ describe('ping', () => {
122122
const input = Uint8Array.from([0, 1, 2, 3, 4])
123123

124124
const b = byteStream(outgoingStream)
125-
await b.write(input)
125+
void b.write(input)
126126

127127
const output = await b.read()
128128

0 commit comments

Comments
 (0)