This commit is contained in:
Tobias Koppers 2019-11-09 09:55:06 +01:00
parent d3aaa29f36
commit a9825c9bb1
1 changed files with 52 additions and 12 deletions

View File

@ -64,6 +64,7 @@ class AsyncQueue {
/** @type {AsyncQueueEntry<T, K, R>[]} */
this._queued = [];
this._activeTasks = 0;
this._finishedActiveTasks = 0;
this._willEnsureProcessing = false;
this._stopped = false;
@ -81,6 +82,7 @@ class AsyncQueue {
};
this._ensureProcessing = this._ensureProcessing.bind(this);
this._finishActiveTasks = this._finishActiveTasks.bind(this);
}
/**
@ -206,13 +208,37 @@ class AsyncQueue {
* @returns {void}
*/
_ensureProcessing() {
while (this._activeTasks < this._parallelism && this._queued.length > 0) {
const entry = this._queued.pop();
this._activeTasks++;
entry.state = PROCESSING_STATE;
this._startProcessing(entry);
this._activeTasks -= this._finishedActiveTasks;
this._finishedActiveTasks = 0;
const limit = Date.now() + 1000;
let i = 0;
do {
while (this._activeTasks < this._parallelism && this._queued.length > 0) {
const entry = this._queued.pop();
this._activeTasks++;
entry.state = PROCESSING_STATE;
this._startProcessing(entry);
i++;
}
if (
this._queued.length > 0 &&
this._finishedActiveTasks > 0 &&
Date.now() < limit
) {
console.log("more " + this._name + " " + this._finishedActiveTasks);
this._activeTasks -= this._finishedActiveTasks;
this._finishedActiveTasks = 0;
continue;
}
break;
// eslint-disable-next-line no-constant-condition
} while (true);
if (this._finishedActiveTasks > 0) {
setImmediate(this._ensureProcessing);
} else {
this._willEnsureProcessing = false;
}
this._willEnsureProcessing = false;
}
/**
@ -220,32 +246,35 @@ class AsyncQueue {
* @returns {void}
*/
_startProcessing(entry) {
let isSync = true;
this.hooks.beforeStart.callAsync(entry.item, err => {
if (err) {
this._handleResult(entry, err);
this._handleResult(entry, err, null, isSync);
return;
}
let inCallback = false;
try {
this._processor(entry.item, (e, r) => {
inCallback = true;
this._handleResult(entry, e, r);
this._handleResult(entry, e, r, isSync);
});
} catch (err) {
if (inCallback) throw err;
this._handleResult(entry, err, null);
this._handleResult(entry, err, null, isSync);
}
this.hooks.started.call(entry.item);
});
isSync = false;
}
/**
* @param {AsyncQueueEntry<T, K, R>} entry the entry
* @param {Error=} err error, if any
* @param {R=} result result, if any
* @param {boolean=} isSync true, if this result was created inside the ensure processing loop
* @returns {void}
*/
_handleResult(entry, err, result) {
_handleResult(entry, err, result, isSync) {
this.hooks.result.callAsync(entry.item, err, result, hookError => {
const error = hookError || err;
@ -256,8 +285,11 @@ class AsyncQueue {
entry.callbacks = undefined;
entry.result = result;
entry.error = error;
this._activeTasks--;
if (isSync) {
this._finishedActiveTasks++;
} else {
this._activeTasks--;
}
if (this._willEnsureProcessing === false && this._queued.length > 0) {
this._willEnsureProcessing = true;
setImmediate(this._ensureProcessing);
@ -270,6 +302,14 @@ class AsyncQueue {
}
}
});
isSync = false;
}
_finishActiveTasks() {
if (this._willEnsureProcessing === false && this._queued.length > 0) {
this._willEnsureProcessing = true;
setImmediate(this._ensureProcessing);
}
}
}