Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 89fbff3

Browse files
committed
chore: address dirk review
1 parent 60c398d commit 89fbff3

File tree

8 files changed

+63
-74
lines changed

8 files changed

+63
-74
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@
146146
"multihashes": "~0.4.14",
147147
"multihashing-async": "~0.6.0",
148148
"node-fetch": "^2.3.0",
149+
"p-queue": "^6.0.2",
149150
"peer-book": "~0.9.0",
150151
"peer-id": "~0.12.0",
151152
"peer-info": "~0.15.0",

src/core/config.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ const configSchema = s({
6767
}))
6868
})),
6969
Reprovider: optional(s({
70+
Delay: 'string?',
7071
Interval: 'string?',
7172
Strategy: 'string?'
7273
})),

src/core/provider/index.js

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,14 @@ class Provider {
1212
/**
1313
* Provider goal is to announce blocks to the network.
1414
* It keeps track of which blocks are provided, and allow them to be reprovided
15-
* @param {Libp2p} libp2p
16-
* @param {Blockstore} blockstore
17-
* @param {object} options
18-
* @memberof Provider
15+
* @param {Libp2p} libp2p libp2p instance
16+
* @param {Blockstore} blockstore blockstore instance
17+
* @param {object} options reprovider options
18+
* @param {string} options.delay reprovider initial delay in human friendly time
19+
* @param {string} options.interval reprovider interval in human friendly time
20+
* @param {string} options.strategy reprovider strategy
1921
*/
20-
constructor (libp2p, blockstore, options) {
22+
constructor (libp2p, blockstore, options = {}) {
2123
this._running = false
2224

2325
this._contentRouting = libp2p.contentRouting
@@ -38,11 +40,14 @@ class Provider {
3840

3941
this._running = true
4042

41-
// handle options
42-
const strategy = this._options.strategy || 'all'
43-
const humanInterval = this._options.Interval || '12h'
44-
const interval = await promisify((callback) => human(humanInterval, callback))()
43+
// handle options (config uses uppercase)
44+
const humanDelay = this._options.Delay || this._options.delay || '15s'
45+
const delay = await human(humanDelay)
46+
const humanInterval = this._options.Interval || this._options.interval || '12h'
47+
const interval = await human(humanInterval)
48+
const strategy = this._options.Strategy || this._options.strategy || 'all'
4549
const options = {
50+
delay,
4651
interval,
4752
strategy
4853
}
@@ -65,7 +70,7 @@ class Provider {
6570
}
6671

6772
/**
68-
* Announce block to the network and add and entry to the tracker
73+
* Announce block to the network
6974
* Takes a cid and makes an attempt to announce it to the network
7075
* @param {CID} cid
7176
*/
@@ -79,6 +84,14 @@ class Provider {
7984
})()
8085
}
8186

87+
/**
88+
* Find providers of a block in the network
89+
* @param {CID} cid cid of the block
90+
* @param {object} options
91+
* @param {number} options.timeout - how long the query should maximally run, in ms (default: 60000)
92+
* @param {number} options.maxNumProviders - maximum number of providers to find
93+
* @returns {Promise}
94+
*/
8295
async findProviders (cid, options) { // eslint-disable-line require-await
8396
if (!CID.isCID(cid)) {
8497
throw errCode('invalid CID to find', 'ERR_INVALID_CID')

src/core/provider/queue.js

Lines changed: 10 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
'use strict'
22

3-
const queue = require('async/queue')
3+
const PQueue = require('p-queue')
44

55
const debug = require('debug')
66
const log = debug('ipfs:provider')
@@ -17,71 +17,35 @@ class WorkerQueue {
1717
this._concurrency = concurrency
1818

1919
this.running = false
20-
this.queue = this._setupQueue()
21-
}
22-
23-
/**
24-
* Create the underlying async queue.
25-
* @returns {queue}
26-
*/
27-
_setupQueue () {
28-
const q = queue(async (block) => {
29-
await this._processNext(block)
30-
}, this._concurrency)
31-
32-
// If there is an error, stop the worker
33-
q.error = (err) => {
34-
log.error(err)
35-
this.stop(err)
36-
}
37-
38-
q.buffer = 0
39-
40-
return q
20+
this.queue = new PQueue({ concurrency })
4121
}
4222

4323
/**
4424
* Use the queue from async to keep `concurrency` amount items running
4525
* @param {Block[]} blocks
46-
* @returns {Promise}
4726
*/
4827
async execute (blocks) {
4928
this.running = true
5029

51-
// store the promise resolution functions to be resolved at end of queue
52-
this.execution = {}
53-
const execPromise = new Promise((resolve, reject) => Object.assign(this.execution, { resolve, reject }))
54-
55-
// When all blocks have been processed, stop the worker
56-
this.queue.drain = () => {
57-
log('queue:drain')
58-
this.stop()
59-
}
30+
// Fill queue with the processing blocks function
31+
this.queue.addAll(blocks.map((block) => async () => this._processNext(block))) // eslint-disable-line require-await
6032

61-
// Fill queue with blocks
62-
this.queue.push(blocks)
33+
// Wait for finishing
34+
await this.queue.onIdle()
6335

64-
await execPromise
36+
this.stop()
6537
}
6638

6739
/**
68-
* Stop the worker, optionally an error is thrown if received
69-
*
70-
* @param {object} error
40+
* Stop the worker
7141
*/
72-
stop (error) {
42+
stop () {
7343
if (!this.running) {
7444
return
7545
}
7646

7747
this.running = false
78-
this.queue.kill()
79-
80-
if (error) {
81-
this.execution && this.execution.reject(error)
82-
} else {
83-
this.execution && this.execution.resolve()
84-
}
48+
this.queue.clear()
8549
}
8650

8751
/**

src/core/provider/reprovider.js

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,15 @@ const WorkerQueue = require('./queue')
55

66
const { blockKeyToCid } = require('../utils')
77

8-
// const initialDelay = 15000
9-
const initialDelay = 3000
10-
118
class Reprovider {
129
/**
1310
* Reprovider goal is to reannounce blocks to the network.
1411
* @param {object} contentRouting
1512
* @param {Blockstore} blockstore
1613
* @param {object} options
17-
* @memberof Reprovider
14+
* @param {string} options.delay reprovider initial delay in human friendly time
15+
* @param {string} options.interval reprovider interval in human friendly time
16+
* @param {string} options.strategy reprovider strategy
1817
*/
1918
constructor (contentRouting, blockstore, options) {
2019
this._contentRouting = contentRouting
@@ -33,7 +32,7 @@ class Reprovider {
3332
// Start doing reprovides after the initial delay
3433
this._timeoutId = setTimeout(() => {
3534
this._runPeriodically()
36-
}, initialDelay)
35+
}, this._options.delay)
3736
}
3837

3938
/**
@@ -49,27 +48,34 @@ class Reprovider {
4948
}
5049

5150
/**
52-
* Run reprovide on every `options.interval` ms
51+
* Run reprovide on every `options.interval` ms recursively
5352
* @returns {void}
5453
*/
5554
async _runPeriodically () {
56-
while (this._timeoutId) {
57-
const blocks = await promisify((callback) => this._blockstore.query({}, callback))()
58-
59-
// TODO strategy logic here
60-
if (this._options.strategy === 'pinned') {
55+
// Verify if stopped
56+
if (!this._timeoutId) return
6157

62-
} else if (this._options.strategy === 'pinned') {
58+
// TODO strategy logic here
59+
const blocks = await promisify((callback) => this._blockstore.query({}, callback))()
6360

64-
}
61+
if (this._options.strategy === 'pinned') {
6562

66-
await this._worker.execute(blocks)
63+
} else if (this._options.strategy === 'pinned') {
6764

68-
// Each subsequent walk should run on a `this._options.interval` interval
69-
await new Promise(resolve => {
70-
this._timeoutId = setTimeout(resolve, this._options.interval)
71-
})
7265
}
66+
67+
// Verify if stopped
68+
if (!this._timeoutId) return
69+
70+
await this._worker.execute(blocks)
71+
72+
// Verify if stopped
73+
if (!this._timeoutId) return
74+
75+
// Each subsequent walk should run on a `this._options.interval` interval
76+
this._timeoutId = setTimeout(() => {
77+
this._runPeriodically()
78+
}, this._options.interval)
7379
}
7480

7581
/**

src/core/runtime/config-browser.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ module.exports = () => ({
2828
'/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6'
2929
],
3030
Reprovider: {
31+
Delay: '15s',
3132
Interval: '12h',
3233
Strategy: 'all'
3334
},

src/core/runtime/config-nodejs.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ module.exports = () => ({
4141
'/dns4/node1.preload.ipfs.io/tcp/443/wss/ipfs/Qmbut9Ywz9YEDrz8ySBSgWyJk41Uvm2QJPhwDJzJyGFsD6'
4242
],
4343
Reprovider: {
44+
Delay: '15s',
4445
Interval: '12h',
4546
Strategy: 'all'
4647
},

test/core/provider.spec.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,14 @@ const IPFS = require('../../src')
1313
const DaemonFactory = require('ipfsd-ctl')
1414
const df = DaemonFactory.create({ type: 'proc' })
1515

16+
const DELAY = '3s'
1617
const INTERVAL = '10s'
1718
const STRATEGY = 'all'
1819

1920
const config = {
2021
Bootstrap: [],
2122
Reprovider: {
23+
Delay: DELAY,
2224
Interval: INTERVAL,
2325
Strategy: STRATEGY
2426
}

0 commit comments

Comments
 (0)