Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 0266abf

Browse files
Gozalaachingbrain
andauthored
fix: stalling subscription on (node) http-client when daemon is stopped (#3468)
This change fixes #3465 by upgrading to a temporary fork of node-fetch with node-fetch/node-fetch#1172 applied. Co-authored-by: achingbrain <[email protected]>
1 parent e294067 commit 0266abf

File tree

11 files changed

+120
-20
lines changed

11 files changed

+120
-20
lines changed

examples/browser-ipns-publish/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"human-crypto-keys": "^0.1.4",
1717
"ipfs": "^0.55.2",
1818
"ipfs-http-client": "^50.1.0",
19-
"ipfs-utils": "^7.0.0",
19+
"ipfs-utils": "^8.1.2",
2020
"ipns": "^0.11.0",
2121
"it-last": "^1.0.4",
2222
"p-retry": "^4.2.0",

examples/custom-libp2p/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
"license": "MIT",
1212
"dependencies": {
1313
"ipfs": "^0.55.2",
14-
"libp2p": "^0.31.5",
14+
"libp2p": "^0.31.6",
1515
"libp2p-bootstrap": "^0.12.3",
1616
"libp2p-kad-dht": "^0.22.0",
1717
"libp2p-mdns": "^0.16.0",

packages/interface-ipfs-core/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
"err-code": "^3.0.1",
4848
"ipfs-unixfs": "^4.0.3",
4949
"ipfs-unixfs-importer": "^7.0.3",
50-
"ipfs-utils": "^7.0.0",
50+
"ipfs-utils": "^8.1.2",
5151
"ipld-block": "^0.11.0",
5252
"ipld-dag-cbor": "^1.0.0",
5353
"ipld-dag-pb": "^0.22.1",

packages/ipfs-cli/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
"ipfs-daemon": "^0.7.0",
4444
"ipfs-http-client": "^50.1.0",
4545
"ipfs-repo": "^9.1.6",
46-
"ipfs-utils": "^7.0.0",
46+
"ipfs-utils": "^8.1.2",
4747
"ipld-dag-cbor": "^1.0.0",
4848
"ipld-dag-pb": "^0.22.1",
4949
"it-all": "^1.0.4",

packages/ipfs-core-utils/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
"err-code": "^3.0.1",
4949
"ipfs-core-types": "^0.5.0",
5050
"ipfs-unixfs": "^4.0.3",
51-
"ipfs-utils": "^7.0.0",
51+
"ipfs-utils": "^8.1.2",
5252
"it-all": "^1.0.4",
5353
"it-map": "^1.0.4",
5454
"it-peekable": "^1.0.1",

packages/ipfs-core/package.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
"ipfs-unixfs": "^4.0.3",
7979
"ipfs-unixfs-exporter": "^5.0.3",
8080
"ipfs-unixfs-importer": "^7.0.3",
81-
"ipfs-utils": "^7.0.0",
81+
"ipfs-utils": "^8.1.2",
8282
"ipld": "^0.30.0",
8383
"ipld-block": "^0.11.0",
8484
"ipld-dag-cbor": "^1.0.0",
@@ -94,11 +94,11 @@
9494
"it-map": "^1.0.4",
9595
"it-pipe": "^1.1.0",
9696
"just-safe-set": "^2.2.1",
97-
"libp2p": "^0.31.5",
97+
"libp2p": "^0.31.6",
9898
"libp2p-bootstrap": "^0.12.3",
9999
"libp2p-crypto": "^0.19.3",
100100
"libp2p-floodsub": "^0.25.1",
101-
"libp2p-gossipsub": "^0.9.0",
101+
"libp2p-gossipsub": "^0.9.2",
102102
"libp2p-kad-dht": "^0.22.0",
103103
"libp2p-mdns": "^0.16.0",
104104
"libp2p-mplex": "^0.10.2",

packages/ipfs-daemon/package.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,17 @@
3838
"ipfs-http-client": "^50.1.0",
3939
"ipfs-http-gateway": "^0.4.1",
4040
"ipfs-http-server": "^0.5.0",
41-
"ipfs-utils": "^7.0.0",
41+
"ipfs-utils": "^8.1.2",
4242
"just-safe-set": "^2.2.1",
43-
"libp2p": "^0.31.5",
43+
"libp2p": "^0.31.6",
4444
"libp2p-delegated-content-routing": "^0.10.0",
4545
"libp2p-delegated-peer-routing": "^0.9.0",
4646
"libp2p-webrtc-star": "^0.22.2",
4747
"multiaddr": "^9.0.1"
4848
},
4949
"devDependencies": {
5050
"aegir": "^33.0.0",
51-
"node-fetch": "^2.6.1",
51+
"node-fetch": "npm:@achingbrain/node-fetch@^2.6.4",
5252
"ws": "^7.3.1"
5353
},
5454
"optionalDependencies": {

packages/ipfs-http-client/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
"ipfs-core-types": "^0.5.0",
5353
"ipfs-core-utils": "^0.8.1",
5454
"ipfs-unixfs": "^4.0.3",
55-
"ipfs-utils": "^7.0.0",
55+
"ipfs-utils": "^8.1.2",
5656
"ipld-block": "^0.11.0",
5757
"ipld-dag-cbor": "^1.0.0",
5858
"ipld-dag-pb": "^0.22.1",

packages/ipfs-http-client/src/pubsub/subscribe.js

+23-7
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ module.exports = configure((api, options) => {
6262
return
6363
}
6464

65-
readMessages(response.ndjson(), {
65+
readMessages(response, {
6666
onMessage: handler,
6767
onEnd: () => subsTracker.unsubscribe(topic, handler),
6868
onError: options.onError
@@ -78,17 +78,17 @@ module.exports = configure((api, options) => {
7878
})
7979

8080
/**
81-
* @param {*} msgStream
81+
* @param {import('ipfs-utils/src/types').ExtendedResponse} response
8282
* @param {object} options
8383
* @param {(message: Message) => void} options.onMessage
8484
* @param {() => void} options.onEnd
8585
* @param {ErrorHandlerFn} [options.onError]
8686
*/
87-
async function readMessages (msgStream, { onMessage, onEnd, onError }) {
87+
async function readMessages (response, { onMessage, onEnd, onError }) {
8888
onError = onError || log
8989

9090
try {
91-
for await (const msg of msgStream) {
91+
for await (const msg of response.ndjson()) {
9292
try {
9393
if (!msg.from) {
9494
continue
@@ -106,12 +106,28 @@ async function readMessages (msgStream, { onMessage, onEnd, onError }) {
106106
}
107107
}
108108
} catch (err) {
109-
// FIXME: In testing with Chrome, err.type is undefined (should not be!)
110-
// Temporarily use the name property instead.
111-
if (err.type !== 'aborted' && err.name !== 'AbortError') {
109+
if (!isAbortError(err)) {
112110
onError(err, true) // Fatal
113111
}
114112
} finally {
115113
onEnd()
116114
}
117115
}
116+
117+
/**
118+
* @param {Error & {type?:string}} error
119+
* @returns {boolean}
120+
*/
121+
const isAbortError = error => {
122+
switch (error.type) {
123+
case 'aborted':
124+
return true
125+
// It is `abort` in Electron instead of `aborted`
126+
case 'abort':
127+
return true
128+
default:
129+
// FIXME: In testing with Chrome, err.type is undefined (should not be!)
130+
// Temporarily use the name property instead.
131+
return error.name === 'AbortError'
132+
}
133+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/* eslint-env mocha */
2+
'use strict'
3+
4+
const { expect } = require('aegir/utils/chai')
5+
const { AbortController } = require('native-abort-controller')
6+
7+
const f = require('./utils/factory')()
8+
9+
describe('.pubsub', function () {
10+
this.timeout(20 * 1000)
11+
describe('.subscribe', () => {
12+
let ipfs
13+
let ctl
14+
15+
beforeEach(async function () {
16+
this.timeout(30 * 1000) // slow CI
17+
18+
ctl = await await f.spawn({
19+
args: '--enable-pubsub-experiment'
20+
})
21+
22+
ipfs = ctl.api
23+
})
24+
25+
afterEach(() => f.clean())
26+
27+
it('.onError when connection is closed', async () => {
28+
const topic = 'gossipboom'
29+
let messageCount = 0
30+
let onError
31+
const error = new Promise(resolve => { onError = resolve })
32+
33+
await ipfs.pubsub.subscribe(topic, message => {
34+
messageCount++
35+
36+
if (messageCount === 2) {
37+
// Stop the daemon
38+
ctl.stop().catch()
39+
}
40+
}, {
41+
onError
42+
})
43+
44+
await ipfs.pubsub.publish(topic, 'hello')
45+
await ipfs.pubsub.publish(topic, 'bye')
46+
47+
await expect(error).to.eventually.be.fulfilled().and.to.be.instanceOf(Error)
48+
})
49+
50+
it('does not call onError when aborted', async () => {
51+
const controller = new AbortController()
52+
const topic = 'gossipabort'
53+
const messages = []
54+
let onError
55+
let onReceived
56+
57+
const received = new Promise(resolve => { onReceived = resolve })
58+
const error = new Promise(resolve => { onError = resolve })
59+
60+
await ipfs.pubsub.subscribe(topic, message => {
61+
messages.push(message)
62+
if (messages.length === 2) {
63+
onReceived()
64+
}
65+
}, {
66+
onError,
67+
signal: controller.signal
68+
})
69+
70+
await ipfs.pubsub.publish(topic, 'hello')
71+
await ipfs.pubsub.publish(topic, 'bye')
72+
73+
await received
74+
controller.abort()
75+
76+
// Stop the daemon
77+
await ctl.stop()
78+
// Just to make sure no error is caused by above line
79+
setTimeout(onError, 200, 'aborted')
80+
81+
await expect(error).to.eventually.be.fulfilled().and.to.equal('aborted')
82+
})
83+
})
84+
})

packages/ipfs/package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
"ipfs-core-types": "^0.5.0",
5858
"ipfs-http-client": "^50.1.0",
5959
"ipfs-interop": "^5.0.2",
60-
"ipfs-utils": "^7.0.0",
60+
"ipfs-utils": "^8.1.2",
6161
"ipfsd-ctl": "^8.0.1",
6262
"iso-url": "^1.0.0",
6363
"libp2p-webrtc-star": "^0.22.2",

0 commit comments

Comments
 (0)