Skip to content

Commit 13c641e

Browse files
authored
chore: simplify streaming http response code and use instances of pubsub tracker instead of a singleton (#3719)
Simplifies deps, pass peer id as string instead of cid.
1 parent 1523e33 commit 13c641e

File tree

5 files changed

+84
-83
lines changed

5 files changed

+84
-83
lines changed

src/pubsub/index.js

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
'use strict'
22

3+
const SubscriptionTracker = require('./subscription-tracker')
4+
35
/**
46
* @param {import('../types').Options} config
57
*/
6-
module.exports = config => ({
7-
ls: require('./ls')(config),
8-
peers: require('./peers')(config),
9-
publish: require('./publish')(config),
10-
subscribe: require('./subscribe')(config),
11-
unsubscribe: require('./unsubscribe')(config)
12-
})
8+
module.exports = config => {
9+
const subscriptionTracker = new SubscriptionTracker()
10+
11+
return {
12+
ls: require('./ls')(config),
13+
peers: require('./peers')(config),
14+
publish: require('./publish')(config),
15+
subscribe: require('./subscribe')(config, subscriptionTracker),
16+
unsubscribe: require('./unsubscribe')(config, subscriptionTracker)
17+
}
18+
}

src/pubsub/subscribe.js

Lines changed: 61 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
const uint8ArrayFromString = require('uint8arrays/from-string')
44
const uint8ArrayToString = require('uint8arrays/to-string')
55
const log = require('debug')('ipfs-http-client:pubsub:subscribe')
6-
const SubscriptionTracker = require('./subscription-tracker')
76
const configure = require('../lib/configure')
87
const toUrlSearchParams = require('../lib/to-url-search-params')
98

@@ -12,70 +11,75 @@ const toUrlSearchParams = require('../lib/to-url-search-params')
1211
* @typedef {import('ipfs-core-types/src/pubsub').Message} Message
1312
* @typedef {(err: Error, fatal: boolean, msg?: Message) => void} ErrorHandlerFn
1413
* @typedef {import('ipfs-core-types/src/pubsub').API<HTTPClientExtraOptions & { onError?: ErrorHandlerFn }>} PubsubAPI
14+
* @typedef {import('../types').Options} Options
1515
*/
1616

17-
module.exports = configure((api, options) => {
18-
const subsTracker = SubscriptionTracker.singleton()
19-
20-
/**
21-
* @type {PubsubAPI["subscribe"]}
22-
*/
23-
async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await
24-
options.signal = subsTracker.subscribe(topic, handler, options.signal)
25-
26-
/** @type {(value?: any) => void} */
27-
let done
28-
/** @type {(error: Error) => void} */
29-
let fail
30-
31-
const result = new Promise((resolve, reject) => {
32-
done = resolve
33-
fail = reject
34-
})
35-
36-
// In Firefox, the initial call to fetch does not resolve until some data
37-
// is received. If this doesn't happen within 1 second assume success
38-
const ffWorkaround = setTimeout(() => done(), 1000)
39-
40-
// Do this async to not block Firefox
41-
setTimeout(() => {
42-
api.post('pubsub/sub', {
43-
timeout: options.timeout,
44-
signal: options.signal,
45-
searchParams: toUrlSearchParams({
46-
arg: topic,
47-
...options
48-
}),
49-
headers: options.headers
17+
/**
18+
* @param {Options} options
19+
* @param {import('./subscription-tracker')} subsTracker
20+
*/
21+
module.exports = (options, subsTracker) => {
22+
return configure((api) => {
23+
/**
24+
* @type {PubsubAPI["subscribe"]}
25+
*/
26+
async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await
27+
options.signal = subsTracker.subscribe(topic, handler, options.signal)
28+
29+
/** @type {(value?: any) => void} */
30+
let done
31+
/** @type {(error: Error) => void} */
32+
let fail
33+
34+
const result = new Promise((resolve, reject) => {
35+
done = resolve
36+
fail = reject
5037
})
51-
.catch((err) => {
52-
// Initial subscribe fail, ensure we clean up
53-
subsTracker.unsubscribe(topic, handler)
5438

55-
fail(err)
39+
// In Firefox, the initial call to fetch does not resolve until some data
40+
// is received. If this doesn't happen within 1 second assume success
41+
const ffWorkaround = setTimeout(() => done(), 1000)
42+
43+
// Do this async to not block Firefox
44+
setTimeout(() => {
45+
api.post('pubsub/sub', {
46+
timeout: options.timeout,
47+
signal: options.signal,
48+
searchParams: toUrlSearchParams({
49+
arg: topic,
50+
...options
51+
}),
52+
headers: options.headers
5653
})
57-
.then((response) => {
58-
clearTimeout(ffWorkaround)
59-
60-
if (!response) {
61-
// if there was no response, the subscribe failed
62-
return
63-
}
64-
65-
readMessages(response, {
66-
onMessage: handler,
67-
onEnd: () => subsTracker.unsubscribe(topic, handler),
68-
onError: options.onError
54+
.catch((err) => {
55+
// Initial subscribe fail, ensure we clean up
56+
subsTracker.unsubscribe(topic, handler)
57+
58+
fail(err)
6959
})
60+
.then((response) => {
61+
clearTimeout(ffWorkaround)
7062

71-
done()
72-
})
73-
}, 0)
63+
if (!response) {
64+
// if there was no response, the subscribe failed
65+
return
66+
}
7467

75-
return result
76-
}
77-
return subscribe
78-
})
68+
readMessages(response, {
69+
onMessage: handler,
70+
onEnd: () => subsTracker.unsubscribe(topic, handler),
71+
onError: options.onError
72+
})
73+
74+
done()
75+
})
76+
}, 0)
77+
78+
return result
79+
}
80+
return subscribe
81+
})(options)
82+
}
7983

8084
/**
8185
* @param {import('ipfs-utils/src/types').ExtendedResponse} response

src/pubsub/subscription-tracker.js

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,6 @@ class SubscriptionTracker {
1616
this._subs = new Map()
1717
}
1818

19-
static singleton () {
20-
if (SubscriptionTracker.instance) return SubscriptionTracker.instance
21-
SubscriptionTracker.instance = new SubscriptionTracker()
22-
return SubscriptionTracker.instance
23-
}
24-
2519
/**
2620
* @param {string} topic
2721
* @param {MessageHandlerFn} handler
@@ -63,13 +57,12 @@ class SubscriptionTracker {
6357
unsubs = subs
6458
}
6559

60+
if (!(this._subs.get(topic) || []).length) {
61+
this._subs.delete(topic)
62+
}
63+
6664
unsubs.forEach(s => s.controller.abort())
6765
}
6866
}
6967

70-
/**
71-
* @type {SubscriptionTracker | null}
72-
*/
73-
SubscriptionTracker.instance = null
74-
7568
module.exports = SubscriptionTracker

src/pubsub/unsubscribe.js

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,16 @@
11
'use strict'
22

3-
const SubscriptionTracker = require('./subscription-tracker')
4-
53
/**
64
* @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions
75
* @typedef {import('ipfs-core-types/src/pubsub').API<HTTPClientExtraOptions>} PubsubAPI
6+
* @typedef {import('../types').Options} Options
87
*/
98

109
/**
11-
* @param {import('../types').Options} config
10+
* @param {Options} options
11+
* @param {import('./subscription-tracker')} subsTracker
1212
*/
13-
module.exports = config => {
14-
const subsTracker = SubscriptionTracker.singleton()
15-
13+
module.exports = (options, subsTracker) => {
1614
/**
1715
* @type {PubsubAPI["unsubscribe"]}
1816
*/

src/refs/index.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const toUrlSearchParams = require('../lib/to-url-search-params')
1010
* @typedef {import('ipfs-core-types/src/refs').API<HTTPClientExtraOptions>} RefsAPI
1111
*/
1212

13-
module.exports = configure((api, options) => {
13+
module.exports = configure((api, opts) => {
1414
/**
1515
* @type {RefsAPI["refs"]}
1616
*/
@@ -34,6 +34,6 @@ module.exports = configure((api, options) => {
3434
}
3535

3636
return Object.assign(refs, {
37-
local: require('./local')(options)
37+
local: require('./local')(opts)
3838
})
3939
})

0 commit comments

Comments
 (0)