Skip to content

feat: add stream middleware #3173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ additionals
SECG
Certicom
RSAES
unuse
15 changes: 14 additions & 1 deletion packages/interface-compliance-tests/src/mocks/registrar.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { mergeOptions } from '@libp2p/utils/merge-options'
import type { Connection, PeerId, Topology, IncomingStreamData, StreamHandler, StreamHandlerOptions, StreamHandlerRecord } from '@libp2p/interface'
import type { Connection, PeerId, Topology, IncomingStreamData, StreamHandler, StreamHandlerOptions, StreamHandlerRecord, StreamMiddleware } from '@libp2p/interface'
import type { Registrar } from '@libp2p/interface-internal'

export class MockRegistrar implements Registrar {
private readonly topologies = new Map<string, Array<{ id: string, topology: Topology }>>()
private readonly handlers = new Map<string, StreamHandlerRecord>()
private readonly middleware = new Map<string, StreamMiddleware[]>()

getProtocols (): string[] {
return Array.from(this.handlers.keys()).sort()
Expand Down Expand Up @@ -69,6 +70,18 @@ export class MockRegistrar implements Registrar {
getTopologies (protocol: string): Topology[] {
return (this.topologies.get(protocol) ?? []).map(t => t.topology)
}

use (protocol: string, middleware: StreamMiddleware[]): void {
this.middleware.set(protocol, middleware)
}

unuse (protocol: string): void {
this.middleware.delete(protocol)
}

getMiddleware (protocol: string): StreamMiddleware[] {
return this.middleware.get(protocol) ?? []
}
}

export function mockRegistrar (): Registrar {
Expand Down
26 changes: 25 additions & 1 deletion packages/interface-internal/src/registrar.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { StreamHandler, StreamHandlerOptions, StreamHandlerRecord, Topology, IncomingStreamData } from '@libp2p/interface'
import type { StreamHandler, StreamHandlerOptions, StreamHandlerRecord, Topology, IncomingStreamData, StreamMiddleware } from '@libp2p/interface'
import type { AbortOptions } from '@multiformats/multiaddr'

export type {
Expand Down Expand Up @@ -69,6 +69,30 @@ export interface Registrar {
*/
getHandler(protocol: string): StreamHandlerRecord

/**
* Retrieve any registered middleware for a given protocol.
*
* @param protocol - The protocol to fetch middleware for
* @returns A list of `StreamMiddleware` implementations
*/
use(protocol: string, middleware: StreamMiddleware[]): void

/**
* Retrieve any registered middleware for a given protocol.
*
* @param protocol - The protocol to fetch middleware for
* @returns A list of `StreamMiddleware` implementations
*/
unuse(protocol: string): void

/**
* Retrieve any registered middleware for a given protocol.
*
* @param protocol - The protocol to fetch middleware for
* @returns A list of `StreamMiddleware` implementations
*/
getMiddleware(protocol: string): StreamMiddleware[]

/**
* Register a topology handler for a protocol - the topology will be
* invoked when peers are discovered on the network that support the
Expand Down
29 changes: 28 additions & 1 deletion packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import type { PeerInfo } from './peer-info.js'
import type { PeerRouting } from './peer-routing.js'
import type { Address, Peer, PeerStore } from './peer-store.js'
import type { Startable } from './startable.js'
import type { StreamHandler, StreamHandlerOptions } from './stream-handler.js'
import type { StreamHandler, StreamHandlerOptions, StreamMiddleware } from './stream-handler.js'
import type { Topology } from './topology.js'
import type { Listener, OutboundConnectionUpgradeEvents } from './transport.js'
import type { Multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -720,6 +720,33 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
*/
unregister(id: string): void

/**
* Registers one or more middleware implementations that will be invoked for
* incoming and outgoing protocol streams that match the passed protocol.
*
* @example
*
* ```TypeScript
* libp2p.use('/my/protocol/1.0.0', (stream, connection, next) => {
* // do something with stream and/or connection
* next(stream, connection)
* })
* ```
*/
use (protocol: string, middleware: StreamMiddleware | StreamMiddleware[]): void

/**
* Deregisters all middleware for the passed protocol.
*
* @example
*
* ```TypeScript
* libp2p.unuse('/my/protocol/1.0.0')
* // any previously registered middleware will no longer be invoked
* ```
*/
unuse (protocol: string): void

/**
* Returns the public key for the passed PeerId. If the PeerId is of the 'RSA'
* type this may mean searching the routing if the peer's key is not present
Expand Down
12 changes: 12 additions & 0 deletions packages/interface/src/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ export interface StreamHandler {
(data: IncomingStreamData): void
}

/**
* Stream middleware allows accessing stream data outside of the stream handler
*/
export interface StreamMiddleware {
(stream: Stream, connection: Connection, next: (stream: Stream, connection: Connection) => void): void
}

export interface StreamHandlerOptions extends AbortOptions {
/**
* How many incoming streams can be open for this protocol at the same time on each connection
Expand All @@ -46,6 +53,11 @@ export interface StreamHandlerOptions extends AbortOptions {
* protocol(s), the existing handler will be discarded.
*/
force?: true

/**
* Middleware allows accessing stream data outside of the stream handler
*/
middleware?: StreamMiddleware[]
}

export interface StreamHandlerRecord {
Expand Down
10 changes: 9 additions & 1 deletion packages/libp2p/src/libp2p.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { userAgent } from './user-agent.js'
import * as pkg from './version.js'
import type { Components } from './components.js'
import type { Libp2p as Libp2pInterface, Libp2pInit } from './index.js'
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey, StreamHandler, StreamHandlerOptions } from '@libp2p/interface'
import type { PeerRouting, ContentRouting, Libp2pEvents, PendingDial, ServiceMap, AbortOptions, ComponentLogger, Logger, Connection, NewStreamOptions, Stream, Metrics, PeerId, PeerInfo, PeerStore, Topology, Libp2pStatus, IsDialableOptions, DialOptions, PublicKey, Ed25519PeerId, Secp256k1PeerId, RSAPublicKey, RSAPeerId, URLPeerId, Ed25519PublicKey, Secp256k1PublicKey, StreamHandler, StreamHandlerOptions, StreamMiddleware } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'

export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter<Libp2pEvents> implements Libp2pInterface<T> {
Expand Down Expand Up @@ -402,6 +402,14 @@ export class Libp2p<T extends ServiceMap = ServiceMap> extends TypedEventEmitter
this.components.registrar.unregister(id)
}

use (protocol: string, middleware: StreamMiddleware | StreamMiddleware[]): void {
this.components.registrar.use(protocol, Array.isArray(middleware) ? middleware : [middleware])
}

unuse (protocol: string): void {
this.components.registrar.unuse(protocol)
}

async isDialable (multiaddr: Multiaddr, options: IsDialableOptions = {}): Promise<boolean> {
return this.components.connectionManager.isDialable(multiaddr, options)
}
Expand Down
16 changes: 15 additions & 1 deletion packages/libp2p/src/registrar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { InvalidParametersError } from '@libp2p/interface'
import { mergeOptions } from '@libp2p/utils/merge-options'
import { trackedMap } from '@libp2p/utils/tracked-map'
import * as errorsJs from './errors.js'
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, PeerId, PeerStore, Topology, StreamHandler, StreamHandlerRecord, StreamHandlerOptions, AbortOptions, Metrics } from '@libp2p/interface'
import type { IdentifyResult, Libp2pEvents, Logger, PeerUpdate, PeerId, PeerStore, Topology, StreamHandler, StreamHandlerRecord, StreamHandlerOptions, AbortOptions, Metrics, StreamMiddleware } from '@libp2p/interface'
import type { Registrar as RegistrarInterface } from '@libp2p/interface-internal'
import type { ComponentLogger } from '@libp2p/logger'
import type { TypedEventTarget } from 'main-event'
Expand All @@ -26,10 +26,12 @@ export class Registrar implements RegistrarInterface {
private readonly topologies: Map<string, Map<string, Topology>>
private readonly handlers: Map<string, StreamHandlerRecord>
private readonly components: RegistrarComponents
private readonly middleware: Map<string, StreamMiddleware[]>

constructor (components: RegistrarComponents) {
this.components = components
this.log = components.logger.forComponent('libp2p:registrar')
this.middleware = new Map()
this.topologies = new Map()
components.metrics?.registerMetricGroup('libp2p_registrar_topologies', {
calculate: () => {
Expand Down Expand Up @@ -165,6 +167,18 @@ export class Registrar implements RegistrarInterface {
}
}

use (protocol: string, middleware: StreamMiddleware[]): void {
this.middleware.set(protocol, middleware)
}

unuse (protocol: string): void {
this.middleware.delete(protocol)
}

getMiddleware (protocol: string): StreamMiddleware[] {
return this.middleware.get(protocol) ?? []
}

/**
* Remove a disconnected peer from the record
*/
Expand Down
41 changes: 37 additions & 4 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ export class Upgrader implements UpgraderInterface {

let muxer: StreamMuxer | undefined
let newStream: ((multicodecs: string[], options?: AbortOptions) => Promise<Stream>) | undefined
let connection: Connection // eslint-disable-line prefer-const
let connection: Connection

if (muxerFactory != null) {
// Create the muxer
Expand Down Expand Up @@ -488,7 +488,7 @@ export class Upgrader implements UpgraderInterface {
}

connection.log.trace('starting new stream for protocols %s', protocols)
const muxedStream = await muxer.newStream()
let muxedStream = await muxer.newStream()
connection.log.trace('started new stream %s for protocols %s', muxedStream.id, protocols)

try {
Expand Down Expand Up @@ -556,6 +556,23 @@ export class Upgrader implements UpgraderInterface {

this.components.metrics?.trackProtocolStream(muxedStream, connection)

const middleware = this.components.registrar.getMiddleware(protocol)

middleware.push((stream, connection, next) => {
next(stream, connection)
})

let i = 0

while (i < middleware.length) {
// eslint-disable-next-line no-loop-func
middleware[i](muxedStream, connection, (s, c) => {
muxedStream = s
connection = c
i++
})
}

return muxedStream
} catch (err: any) {
connection.log.error('could not create new outbound stream on connection %s %a for protocols %s - %e', direction === 'inbound' ? 'from' : 'to', opts.maConn.remoteAddr, protocols, err)
Expand Down Expand Up @@ -652,14 +669,30 @@ export class Upgrader implements UpgraderInterface {
* Routes incoming streams to the correct handler
*/
_onStream (opts: OnStreamOptions): void {
const { connection, stream, protocol } = opts
let { connection, stream, protocol } = opts
const { handler, options } = this.components.registrar.getHandler(protocol)

if (connection.limits != null && options.runOnLimitedConnection !== true) {
throw new LimitedConnectionError('Cannot open protocol stream on limited connection')
}

handler({ connection, stream })
const middleware = this.components.registrar.getMiddleware(protocol)

middleware.push((stream, connection, next) => {
handler({ connection, stream })
next(stream, connection)
})

let i = 0

while (i < middleware.length) {
// eslint-disable-next-line no-loop-func
middleware[i](stream, connection, (s, c) => {
stream = s
connection = c
i++
})
}
}

/**
Expand Down
Loading