Skip to content
This repository was archived by the owner on Jun 26, 2023. It is now read-only.

fix: simpler pubsub #172

Merged
merged 1 commit into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 22 additions & 8 deletions packages/libp2p-interface-compliance-tests/src/mocks/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import type { PeerId } from '@libp2p/interfaces/src/peer-id'
import { mockMultiaddrConnection } from './multiaddr-connection.js'
import type { Registrar } from '@libp2p/interfaces/registrar'
import { mockRegistrar } from './registrar.js'
import { Listener } from '@libp2p/multistream-select'
import { Dialer, Listener } from '@libp2p/multistream-select'
import { logger } from '@libp2p/logger'
import { CustomEvent } from '@libp2p/interfaces'

Expand All @@ -33,7 +33,6 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
const remotePeer = peerIdFromString(remotePeerIdStr)
const registry = new Map()
const streams: Stream[] = []
let streamId = 0
const direction = opts.direction ?? 'inbound'
const registrar = opts.registrar ?? mockRegistrar()

Expand Down Expand Up @@ -91,11 +90,17 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
throw new Error('protocols must have a length')
}

const id = `${streamId++}`
const id = `${Math.random()}`
const stream: Stream = muxer.newStream(id)
const mss = new Dialer(stream)
const result = await mss.select(protocols)

const streamData: ProtocolStream = {
protocol: protocols[0],
stream
protocol: result.protocol,
stream: {
...stream,
...result.stream
}
}

registry.set(id, streamData)
Expand Down Expand Up @@ -129,15 +134,24 @@ export function mockStream (stream: Duplex<Uint8Array>): Stream {
}
}

export function connectionPair (peerA: PeerId, peerB: PeerId): [ Connection, Connection ] {
export interface Peer {
peerId: PeerId
registrar: Registrar
}

export function connectionPair (a: Peer, b: Peer): [ Connection, Connection ] {
const [peerBtoPeerA, peerAtoPeerB] = duplexPair<Uint8Array>()

return [
mockConnection(
mockMultiaddrConnection(peerBtoPeerA, peerA)
mockMultiaddrConnection(peerAtoPeerB, b.peerId), {
registrar: a.registrar
}
),
mockConnection(
mockMultiaddrConnection(peerAtoPeerB, peerB)
mockMultiaddrConnection(peerBtoPeerA, a.peerId), {
registrar: b.registrar
}
)
]
}
22 changes: 12 additions & 10 deletions packages/libp2p-interface-compliance-tests/src/mocks/registrar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,20 @@ export async function mockIncomingStreamEvent (protocol: string, conn: Connectio
})
}

export async function connectPeers (protocol: string, registrarA: Registrar, registrarB: Registrar, peerIdA: PeerId, peerIdB: PeerId) {
const topologyA = registrarA.getTopologies(protocol)[0]
const topologyB = registrarB.getTopologies(protocol)[0]
// const handlerA = registrarA.getHandler(protocol)
// const handlerB = registrarB.getHandler(protocol)
export interface Peer {
peerId: PeerId
registrar: Registrar
}

export async function connectPeers (protocol: string, a: Peer, b: Peer) {
// Notify peers of connection
const [bToA, aToB] = connectionPair(peerIdA, peerIdB)
const [aToB, bToA] = connectionPair(a, b)

await topologyA.onConnect(peerIdB, aToB)
// await handlerA(await mockIncomingStreamEvent(protocol, aToB, peerIdB))
for (const topology of a.registrar.getTopologies(protocol)) {
await topology.onConnect(b.peerId, aToB)
}

await topologyB.onConnect(peerIdA, bToA)
// await handlerB(await mockIncomingStreamEvent(protocol, bToA, peerIdA))
for (const topology of b.registrar.getTopologies(protocol)) {
await topology.onConnect(a.peerId, bToA)
}
}
22 changes: 13 additions & 9 deletions packages/libp2p-interface-compliance-tests/src/pubsub/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,20 @@ import sinon from 'sinon'
import pDefer from 'p-defer'
import pWaitFor from 'p-wait-for'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { mockRegistrar } from '../mocks/registrar.js'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import delay from 'delay'
import { CustomEvent } from '@libp2p/interfaces'
import type { TestSetup } from '../index.js'
import type { PubSub, PubSubOptions } from '@libp2p/interfaces/pubsub'
import type { EventMap } from './index.js'
import type { Registrar } from '@libp2p/interfaces/src/registrar'
import { mockRegistrar } from '../mocks/registrar.js'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
import type { PubsubBaseProtocol } from '@libp2p/pubsub'

const topic = 'foo'
const data = uint8ArrayFromString('bar')

export default (common: TestSetup<PubSub<EventMap>, PubSubOptions>) => {
export default (common: TestSetup<PubsubBaseProtocol<EventMap>, PubSubOptions>) => {
describe('pubsub api', () => {
let pubsub: PubSub<EventMap>
let registrar: Registrar
Expand Down Expand Up @@ -60,20 +63,22 @@ export default (common: TestSetup<PubSub<EventMap>, PubSubOptions>) => {
}

await pubsub.start()
pubsub.subscribe(topic)
pubsub.addEventListener('topic', handler)
pubsub.addEventListener(topic, handler)

await pWaitFor(() => {
const topics = pubsub.getTopics()
return topics.length === 1 && topics[0] === topic
})

pubsub.unsubscribe(topic)
pubsub.removeEventListener(topic, handler)

await pWaitFor(() => pubsub.getTopics().length === 0)

// Publish to guarantee the handler is not called
await pubsub.publish(topic, data)
pubsub.dispatchEvent(new CustomEvent(topic, { detail: data }))

// handlers are called async
await delay(100)

await pubsub.stop()
})
Expand All @@ -83,13 +88,12 @@ export default (common: TestSetup<PubSub<EventMap>, PubSubOptions>) => {

await pubsub.start()

pubsub.subscribe(topic)
pubsub.addEventListener(topic, (evt) => {
const msg = evt.detail
expect(msg).to.not.eql(undefined)
defer.resolve()
})
await pubsub.publish(topic, data)
pubsub.dispatchEvent(new CustomEvent(topic, { detail: data }))
await defer.promise

await pubsub.stop()
Expand Down
Loading