You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
283 lines
5.4 KiB
283 lines
5.4 KiB
'use strict' |
|
|
|
/* eslint-disable no-var */ |
|
|
|
var reusify = require('reusify') |
|
|
|
function fastqueue (context, worker, concurrency) { |
|
if (typeof context === 'function') { |
|
concurrency = worker |
|
worker = context |
|
context = null |
|
} |
|
|
|
if (concurrency < 1) { |
|
throw new Error('fastqueue concurrency must be greater than 1') |
|
} |
|
|
|
var cache = reusify(Task) |
|
var queueHead = null |
|
var queueTail = null |
|
var _running = 0 |
|
var errorHandler = null |
|
|
|
var self = { |
|
push: push, |
|
drain: noop, |
|
saturated: noop, |
|
pause: pause, |
|
paused: false, |
|
concurrency: concurrency, |
|
running: running, |
|
resume: resume, |
|
idle: idle, |
|
length: length, |
|
getQueue: getQueue, |
|
unshift: unshift, |
|
empty: noop, |
|
kill: kill, |
|
killAndDrain: killAndDrain, |
|
error: error |
|
} |
|
|
|
return self |
|
|
|
function running () { |
|
return _running |
|
} |
|
|
|
function pause () { |
|
self.paused = true |
|
} |
|
|
|
function length () { |
|
var current = queueHead |
|
var counter = 0 |
|
|
|
while (current) { |
|
current = current.next |
|
counter++ |
|
} |
|
|
|
return counter |
|
} |
|
|
|
function getQueue () { |
|
var current = queueHead |
|
var tasks = [] |
|
|
|
while (current) { |
|
tasks.push(current.value) |
|
current = current.next |
|
} |
|
|
|
return tasks |
|
} |
|
|
|
function resume () { |
|
if (!self.paused) return |
|
self.paused = false |
|
for (var i = 0; i < self.concurrency; i++) { |
|
_running++ |
|
release() |
|
} |
|
} |
|
|
|
function idle () { |
|
return _running === 0 && self.length() === 0 |
|
} |
|
|
|
function push (value, done) { |
|
var current = cache.get() |
|
|
|
current.context = context |
|
current.release = release |
|
current.value = value |
|
current.callback = done || noop |
|
current.errorHandler = errorHandler |
|
|
|
if (_running === self.concurrency || self.paused) { |
|
if (queueTail) { |
|
queueTail.next = current |
|
queueTail = current |
|
} else { |
|
queueHead = current |
|
queueTail = current |
|
self.saturated() |
|
} |
|
} else { |
|
_running++ |
|
worker.call(context, current.value, current.worked) |
|
} |
|
} |
|
|
|
function unshift (value, done) { |
|
var current = cache.get() |
|
|
|
current.context = context |
|
current.release = release |
|
current.value = value |
|
current.callback = done || noop |
|
|
|
if (_running === self.concurrency || self.paused) { |
|
if (queueHead) { |
|
current.next = queueHead |
|
queueHead = current |
|
} else { |
|
queueHead = current |
|
queueTail = current |
|
self.saturated() |
|
} |
|
} else { |
|
_running++ |
|
worker.call(context, current.value, current.worked) |
|
} |
|
} |
|
|
|
function release (holder) { |
|
if (holder) { |
|
cache.release(holder) |
|
} |
|
var next = queueHead |
|
if (next) { |
|
if (!self.paused) { |
|
if (queueTail === queueHead) { |
|
queueTail = null |
|
} |
|
queueHead = next.next |
|
next.next = null |
|
worker.call(context, next.value, next.worked) |
|
if (queueTail === null) { |
|
self.empty() |
|
} |
|
} else { |
|
_running-- |
|
} |
|
} else if (--_running === 0) { |
|
self.drain() |
|
} |
|
} |
|
|
|
function kill () { |
|
queueHead = null |
|
queueTail = null |
|
self.drain = noop |
|
} |
|
|
|
function killAndDrain () { |
|
queueHead = null |
|
queueTail = null |
|
self.drain() |
|
self.drain = noop |
|
} |
|
|
|
function error (handler) { |
|
errorHandler = handler |
|
} |
|
} |
|
|
|
function noop () {} |
|
|
|
function Task () { |
|
this.value = null |
|
this.callback = noop |
|
this.next = null |
|
this.release = noop |
|
this.context = null |
|
this.errorHandler = null |
|
|
|
var self = this |
|
|
|
this.worked = function worked (err, result) { |
|
var callback = self.callback |
|
var errorHandler = self.errorHandler |
|
var val = self.value |
|
self.value = null |
|
self.callback = noop |
|
if (self.errorHandler) { |
|
errorHandler(err, val) |
|
} |
|
callback.call(self.context, err, result) |
|
self.release(self) |
|
} |
|
} |
|
|
|
function queueAsPromised (context, worker, concurrency) { |
|
if (typeof context === 'function') { |
|
concurrency = worker |
|
worker = context |
|
context = null |
|
} |
|
|
|
function asyncWrapper (arg, cb) { |
|
worker.call(this, arg) |
|
.then(function (res) { |
|
cb(null, res) |
|
}, cb) |
|
} |
|
|
|
var queue = fastqueue(context, asyncWrapper, concurrency) |
|
|
|
var pushCb = queue.push |
|
var unshiftCb = queue.unshift |
|
|
|
queue.push = push |
|
queue.unshift = unshift |
|
queue.drained = drained |
|
|
|
return queue |
|
|
|
function push (value) { |
|
var p = new Promise(function (resolve, reject) { |
|
pushCb(value, function (err, result) { |
|
if (err) { |
|
reject(err) |
|
return |
|
} |
|
resolve(result) |
|
}) |
|
}) |
|
|
|
// Let's fork the promise chain to |
|
// make the error bubble up to the user but |
|
// not lead to a unhandledRejection |
|
p.catch(noop) |
|
|
|
return p |
|
} |
|
|
|
function unshift (value) { |
|
var p = new Promise(function (resolve, reject) { |
|
unshiftCb(value, function (err, result) { |
|
if (err) { |
|
reject(err) |
|
return |
|
} |
|
resolve(result) |
|
}) |
|
}) |
|
|
|
// Let's fork the promise chain to |
|
// make the error bubble up to the user but |
|
// not lead to a unhandledRejection |
|
p.catch(noop) |
|
|
|
return p |
|
} |
|
|
|
function drained () { |
|
var previousDrain = queue.drain |
|
|
|
var p = new Promise(function (resolve) { |
|
queue.drain = function () { |
|
previousDrain() |
|
resolve() |
|
} |
|
}) |
|
|
|
return p |
|
} |
|
} |
|
|
|
module.exports = fastqueue |
|
module.exports.promise = queueAsPromised
|
|
|