Skip to content

Commit e642157

Browse files
committed
added a base implementation for unsaturation event caolan#868
updating the README fix readme
1 parent 5da468f commit e642157

File tree

3 files changed

+57
-2
lines changed

3 files changed

+57
-2
lines changed

README.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -1191,8 +1191,9 @@ methods:
11911191
the `worker` has finished processing the task. Instead of a single task, a `tasks` array
11921192
can be submitted. The respective callback is used for every task in the list.
11931193
* `unshift(task, [callback])` - add a new task to the front of the `queue`.
1194-
* `saturated` - a callback that is called when the `queue` length hits the `concurrency` limit,
1195-
and further tasks will be queued.
1194+
* `saturated` - a callback that is called when the `queue` length hits the `concurrency` limit, and further tasks will be queued.
1195+
* `unsaturated` - a callback that is called when the `queue` length is less than the `concurrency` & `buffer` limits, and further tasks will not be queued.
1196+
* `buffer` A minimum threshold buffer in order to say that the `queue` is `unsaturated`.
11961197
* `empty` - a callback that is called when the last item from the `queue` is given to a `worker`.
11971198
* `drain` - a callback that is called when the last item from the `queue` has returned from the `worker`.
11981199
* `paused` - a boolean for determining whether the queue is in a paused state

lib/internal/queue.js

+5
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ export default function queue(worker, concurrency, payload) {
4545
if (q.tasks.length === q.concurrency) {
4646
q.saturated();
4747
}
48+
if (q.tasks.length <= (q.concurrency - q.buffer) ) {
49+
q.unsaturated();
50+
}
4851
});
4952
setImmediate(q.process);
5053
}
@@ -78,6 +81,8 @@ export default function queue(worker, concurrency, payload) {
7881
concurrency: concurrency,
7982
payload: payload,
8083
saturated: noop,
84+
unsaturated:noop,
85+
buffer: concurrency / 4,
8186
empty: noop,
8287
drain: noop,
8388
started: false,

mocha_test/queue.js

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
var async = require('../lib');
2+
var expect = require('chai').expect;
3+
4+
5+
describe('queue', function(){
6+
context('q.unsaturated(): ',function() {
7+
it('should have a default buffer property that equals 25% of the concurrenct rate', function(done){
8+
var q = async.queue(function(task, cb) {
9+
// nop
10+
calls.push('process ' + task);
11+
async.setImmediate(cb);
12+
}, 10);
13+
expect(q.buffer).to.equal(2.5);
14+
done();
15+
});
16+
it('should allow a user to change the buffer property', function(done){
17+
var q = async.queue(function(task, cb) {
18+
// nop
19+
calls.push('process ' + task);
20+
async.setImmediate(cb);
21+
}, 10);
22+
q.buffer = 4;
23+
expect(q.buffer).to.not.equal(2.5);
24+
expect(q.buffer).to.equal(4);
25+
done();
26+
});
27+
it('should call the unsaturated callback if tasks length is less than concurrency minus buffer', function(done){
28+
var calls = [];
29+
var q = async.queue(function(task, cb) {
30+
// nop
31+
calls.push('process ' + task);
32+
async.setImmediate(cb);
33+
}, 10);
34+
q.unsaturated = function() {
35+
calls.push('unsaturated');
36+
};
37+
q.empty = function() {
38+
expect(calls.indexOf('unsaturated')).to.be.above(-1);
39+
done();
40+
};
41+
q.push('foo0', function () {calls.push('foo0 cb');});
42+
q.push('foo1', function () {calls.push('foo1 cb');});
43+
q.push('foo2', function () {calls.push('foo2 cb');});
44+
q.push('foo3', function () {calls.push('foo3 cb');});
45+
q.push('foo4', function () {calls.push('foo4 cb');});
46+
});
47+
});
48+
});
49+

0 commit comments

Comments
 (0)