Skip to content

Commit 78db573

Browse files
maschadachingbrain
andauthored
feat!: measure transfer perf over time (#2067)
Measures upload/download speed separately and also over time rather than in total. Closes #2064 BREAKING CHANGE: `measurePerformance` now returns an async generator that yields `PerfOutput`s and no longer accepts the `startTime` parameter --------- Co-authored-by: Alex Potsides <[email protected]>
1 parent 8b82e68 commit 78db573

File tree

7 files changed

+403
-348
lines changed

7 files changed

+403
-348
lines changed

packages/protocol-perf/README.md

+44-33
Original file line numberDiff line numberDiff line change
@@ -7,43 +7,54 @@
77
88
# About
99

10-
The `performanceService` implements the [perf protocol](https://github.com/libp2p/specs/blob/master/perf/perf.md), which is used to measure performance within and across libp2p implementations
11-
addresses.
10+
The PerfService implements the [perf protocol](https://github.com/libp2p/specs/blob/master/perf/perf.md), which can be used to measure transfer performance within and across libp2p implementations.
1211

1312
## Example
1413

1514
```typescript
16-
import { createLibp2p } from 'libp2p'
17-
import { perfService } from '@libp2p/perf'
18-
19-
const node = await createLibp2p({
20-
service: [
21-
perfService()
22-
]
23-
})
24-
```
25-
26-
The `measurePerformance` function can be used to measure the latency and throughput of a connection.
27-
server. This will not work in browsers.
28-
29-
## Example
30-
31-
```typescript
32-
import { createLibp2p } from 'libp2p'
33-
import { perfService } from 'libp2p/perf'
34-
35-
const node = await createLibp2p({
36-
services: [
37-
perf: perfService()
38-
]
39-
})
40-
41-
const connection = await node.dial(multiaddr(multiaddrAddress))
42-
43-
const startTime = Date.now()
44-
45-
await node.services.perf.measurePerformance(startTime, connection, BigInt(uploadBytes), BigInt(downloadBytes))
46-
15+
import { noise } from '@chainsafe/libp2p-noise'
16+
import { yamux } from '@chainsafe/libp2p-yamux'
17+
import { mplex } from '@libp2p/mplex'
18+
import { tcp } from '@libp2p/tcp'
19+
import { createLibp2p, type Libp2p } from 'libp2p'
20+
import { plaintext } from 'libp2p/insecure'
21+
import { perfService, type PerfService } from '@libp2p/perf'
22+
23+
const ONE_MEG = 1024 * 1024
24+
const UPLOAD_BYTES = ONE_MEG * 1024
25+
const DOWNLOAD_BYTES = ONE_MEG * 1024
26+
27+
async function createNode (): Promise<Libp2p<{ perf: PerfService }>> {
28+
return createLibp2p({
29+
addresses: {
30+
listen: [
31+
'/ip4/0.0.0.0/tcp/0'
32+
]
33+
},
34+
transports: [
35+
tcp()
36+
],
37+
connectionEncryption: [
38+
noise(), plaintext()
39+
],
40+
streamMuxers: [
41+
yamux(), mplex()
42+
],
43+
services: {
44+
perf: perfService()
45+
}
46+
})
47+
}
48+
49+
const libp2p1 = await createNode()
50+
const libp2p2 = await createNode()
51+
52+
for await (const output of libp2p1.services.perf.measurePerformance(libp2p2.getMultiaddrs()[0], UPLOAD_BYTES, DOWNLOAD_BYTES)) {
53+
console.info(output)
54+
}
55+
56+
await libp2p1.stop()
57+
await libp2p2.stop()
4758
```
4859

4960
# API Docs

packages/protocol-perf/package.json

+7-12
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,17 @@
4848
"renderResults": "node dist/src/renderResults.js"
4949
},
5050
"dependencies": {
51-
"@chainsafe/libp2p-noise": "^13.0.0",
52-
"@chainsafe/libp2p-yamux": "^5.0.0",
53-
"@libp2p/crypto": "^2.0.7",
5451
"@libp2p/interface": "^0.1.5",
55-
"@libp2p/interface-compliance-tests": "^4.1.3",
5652
"@libp2p/interface-internal": "^0.1.8",
57-
"@libp2p/interfaces": "3.3.2",
5853
"@libp2p/logger": "^3.0.5",
59-
"@libp2p/peer-id-factory": "^3.0.7",
60-
"@libp2p/tcp": "^8.0.11",
61-
"@multiformats/multiaddr": "^12.1.5",
62-
"libp2p": "^0.46.18",
63-
"uint8arrays": "^4.0.6",
64-
"yargs": "^17.7.2"
54+
"@multiformats/multiaddr": "^12.1.10",
55+
"it-pushable": "^3.2.1"
6556
},
6657
"devDependencies": {
67-
"aegir": "^41.0.2"
58+
"@libp2p/interface-compliance-tests": "^4.1.3",
59+
"aegir": "^41.0.2",
60+
"it-last": "^3.0.3",
61+
"it-pair": "^2.0.6",
62+
"sinon-ts": "^2.0.0"
6863
}
6964
}
+4-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
export const PROTOCOL_NAME = '/perf/1.0.0'
2-
export const WRITE_BLOCK_SIZE = BigInt(64 << 10)
2+
export const WRITE_BLOCK_SIZE = 64 << 10
3+
export const MAX_INBOUND_STREAMS = 1
4+
export const MAX_OUTBOUND_STREAMS = 1
5+
export const RUN_ON_TRANSIENT_CONNECTION = false

packages/protocol-perf/src/index.ts

+68-160
Original file line numberDiff line numberDiff line change
@@ -1,194 +1,102 @@
11
/**
22
* @packageDocumentation
33
*
4-
* The `performanceService` implements the [perf protocol](https://github.com/libp2p/specs/blob/master/perf/perf.md), which is used to measure performance within and across libp2p implementations
5-
* addresses.
4+
* The {@link PerfService} implements the [perf protocol](https://github.com/libp2p/specs/blob/master/perf/perf.md), which can be used to measure transfer performance within and across libp2p implementations.
65
*
76
* @example
87
*
98
* ```typescript
10-
* import { createLibp2p } from 'libp2p'
11-
* import { perfService } from '@libp2p/perf'
9+
* import { noise } from '@chainsafe/libp2p-noise'
10+
* import { yamux } from '@chainsafe/libp2p-yamux'
11+
* import { mplex } from '@libp2p/mplex'
12+
* import { tcp } from '@libp2p/tcp'
13+
* import { createLibp2p, type Libp2p } from 'libp2p'
14+
* import { plaintext } from 'libp2p/insecure'
15+
* import { perfService, type PerfService } from '@libp2p/perf'
1216
*
13-
* const node = await createLibp2p({
14-
* service: [
15-
* perfService()
16-
* ]
17-
* })
18-
* ```
19-
*
20-
* The `measurePerformance` function can be used to measure the latency and throughput of a connection.
21-
* server. This will not work in browsers.
17+
* const ONE_MEG = 1024 * 1024
18+
* const UPLOAD_BYTES = ONE_MEG * 1024
19+
* const DOWNLOAD_BYTES = ONE_MEG * 1024
2220
*
23-
* @example
24-
*
25-
* ```typescript
26-
* import { createLibp2p } from 'libp2p'
27-
* import { perfService } from 'libp2p/perf'
21+
* async function createNode (): Promise<Libp2p<{ perf: PerfService }>> {
22+
* return createLibp2p({
23+
* addresses: {
24+
* listen: [
25+
* '/ip4/0.0.0.0/tcp/0'
26+
* ]
27+
* },
28+
* transports: [
29+
* tcp()
30+
* ],
31+
* connectionEncryption: [
32+
* noise(), plaintext()
33+
* ],
34+
* streamMuxers: [
35+
* yamux(), mplex()
36+
* ],
37+
* services: {
38+
* perf: perfService()
39+
* }
40+
* })
41+
* }
2842
*
29-
* const node = await createLibp2p({
30-
* services: [
31-
* perf: perfService()
32-
* ]
33-
* })
43+
* const libp2p1 = await createNode()
44+
* const libp2p2 = await createNode()
3445
*
35-
* const connection = await node.dial(multiaddr(multiaddrAddress))
36-
*
37-
* const startTime = Date.now()
38-
*
39-
* await node.services.perf.measurePerformance(startTime, connection, BigInt(uploadBytes), BigInt(downloadBytes))
46+
* for await (const output of libp2p1.services.perf.measurePerformance(libp2p2.getMultiaddrs()[0], UPLOAD_BYTES, DOWNLOAD_BYTES)) {
47+
* console.info(output)
48+
* }
4049
*
50+
* await libp2p1.stop()
51+
* await libp2p2.stop()
4152
* ```
4253
*/
4354

44-
import { logger } from '@libp2p/logger'
45-
import { PROTOCOL_NAME, WRITE_BLOCK_SIZE } from './constants.js'
46-
import type { Connection } from '@libp2p/interface/connection'
47-
import type { Startable } from '@libp2p/interface/startable'
55+
import { PerfService as PerfServiceClass } from './perf-service.js'
56+
import type { AbortOptions } from '@libp2p/interface'
4857
import type { ConnectionManager } from '@libp2p/interface-internal/connection-manager'
49-
import type { IncomingStreamData, Registrar } from '@libp2p/interface-internal/registrar'
50-
import type { AbortOptions } from '@libp2p/interfaces'
51-
52-
const log = logger('libp2p:perf')
53-
54-
export const defaultInit: PerfServiceInit = {
55-
protocolName: '/perf/1.0.0',
56-
writeBlockSize: BigInt(64 << 10)
58+
import type { Registrar } from '@libp2p/interface-internal/registrar'
59+
import type { Multiaddr } from '@multiformats/multiaddr'
60+
61+
export interface PerfOptions extends AbortOptions {
62+
/**
63+
* By default measuring perf should include the time it takes to establish a
64+
* connection, so a new connection will be opened for every performance run.
65+
*
66+
* To override this and re-use an existing connection if one is present, pass
67+
* `true` here. (default: false)
68+
*/
69+
reuseExistingConnection?: boolean
5770
}
5871

5972
export interface PerfService {
60-
measurePerformance(startTime: number, connection: Connection, sendBytes: bigint, recvBytes: bigint, options?: AbortOptions): Promise<number>
73+
measurePerformance(multiaddr: Multiaddr, sendBytes: number, recvBytes: number, options?: PerfOptions): AsyncGenerator<PerfOutput>
74+
}
75+
76+
export interface PerfOutput {
77+
type: 'intermediary' | 'final'
78+
timeSeconds: number
79+
uploadBytes: number
80+
downloadBytes: number
6181
}
6282

6383
export interface PerfServiceInit {
6484
protocolName?: string
6585
maxInboundStreams?: number
6686
maxOutboundStreams?: number
67-
timeout?: number
68-
writeBlockSize?: bigint
87+
runOnTransientConnection?: boolean
88+
89+
/**
90+
* Data sent/received will be sent in chunks of this size (default: 64KiB)
91+
*/
92+
writeBlockSize?: number
6993
}
7094

7195
export interface PerfServiceComponents {
7296
registrar: Registrar
7397
connectionManager: ConnectionManager
7498
}
7599

76-
class DefaultPerfService implements Startable, PerfService {
77-
public readonly protocol: string
78-
private readonly components: PerfServiceComponents
79-
private started: boolean
80-
private readonly databuf: ArrayBuffer
81-
private readonly writeBlockSize: bigint
82-
83-
constructor (components: PerfServiceComponents, init: PerfServiceInit) {
84-
this.components = components
85-
this.started = false
86-
this.protocol = init.protocolName ?? PROTOCOL_NAME
87-
this.writeBlockSize = init.writeBlockSize ?? WRITE_BLOCK_SIZE
88-
this.databuf = new ArrayBuffer(Number(init.writeBlockSize))
89-
}
90-
91-
async start (): Promise<void> {
92-
await this.components.registrar.handle(this.protocol, (data: IncomingStreamData) => {
93-
void this.handleMessage(data).catch((err) => {
94-
log.error('error handling perf protocol message', err)
95-
})
96-
})
97-
this.started = true
98-
}
99-
100-
async stop (): Promise<void> {
101-
await this.components.registrar.unhandle(this.protocol)
102-
this.started = false
103-
}
104-
105-
isStarted (): boolean {
106-
return this.started
107-
}
108-
109-
async handleMessage (data: IncomingStreamData): Promise<void> {
110-
const { stream } = data
111-
112-
const writeBlockSize = this.writeBlockSize
113-
114-
let bytesToSendBack: bigint | null = null
115-
116-
for await (const buf of stream.source) {
117-
if (bytesToSendBack === null) {
118-
bytesToSendBack = BigInt(buf.getBigUint64(0, false))
119-
}
120-
// Ingest all the bufs and wait for the read side to close
121-
}
122-
123-
const uint8Buf = new Uint8Array(this.databuf)
124-
125-
if (bytesToSendBack === null) {
126-
throw new Error('bytesToSendBack was not set')
127-
}
128-
129-
await stream.sink(async function * () {
130-
while (bytesToSendBack > 0n) {
131-
let toSend: bigint = writeBlockSize
132-
if (toSend > bytesToSendBack) {
133-
toSend = bytesToSendBack
134-
}
135-
bytesToSendBack = bytesToSendBack - toSend
136-
yield uint8Buf.subarray(0, Number(toSend))
137-
}
138-
}())
139-
}
140-
141-
async measurePerformance (startTime: number, connection: Connection, sendBytes: bigint, recvBytes: bigint, options: AbortOptions = {}): Promise<number> {
142-
log('opening stream on protocol %s to %p', this.protocol, connection.remotePeer)
143-
144-
const uint8Buf = new Uint8Array(this.databuf)
145-
146-
const writeBlockSize = this.writeBlockSize
147-
148-
const stream = await connection.newStream([this.protocol], options)
149-
150-
// Convert sendBytes to uint64 big endian buffer
151-
const view = new DataView(this.databuf)
152-
view.setBigInt64(0, recvBytes, false)
153-
154-
log('sending %i bytes to %p', sendBytes, connection.remotePeer)
155-
try {
156-
await stream.sink((async function * () {
157-
// Send the number of bytes to receive
158-
yield uint8Buf.subarray(0, 8)
159-
// Send the number of bytes to send
160-
while (sendBytes > 0n) {
161-
let toSend: bigint = writeBlockSize
162-
if (toSend > sendBytes) {
163-
toSend = sendBytes
164-
}
165-
sendBytes = sendBytes - toSend
166-
yield uint8Buf.subarray(0, Number(toSend))
167-
}
168-
})())
169-
170-
// Read the received bytes
171-
let actualRecvdBytes = BigInt(0)
172-
for await (const buf of stream.source) {
173-
actualRecvdBytes += BigInt(buf.length)
174-
}
175-
176-
if (actualRecvdBytes !== recvBytes) {
177-
throw new Error(`Expected to receive ${recvBytes} bytes, but received ${actualRecvdBytes}`)
178-
}
179-
} catch (err) {
180-
log('error sending %i bytes to %p: %s', sendBytes, connection.remotePeer, err)
181-
throw err
182-
} finally {
183-
log('performed %s to %p', this.protocol, connection.remotePeer)
184-
await stream.close()
185-
}
186-
187-
// Return the latency
188-
return Date.now() - startTime
189-
}
190-
}
191-
192100
export function perfService (init: PerfServiceInit = {}): (components: PerfServiceComponents) => PerfService {
193-
return (components) => new DefaultPerfService(components, init)
101+
return (components) => new PerfServiceClass(components, init)
194102
}

0 commit comments

Comments
 (0)