bugfixes about event ordering

This commit is contained in:
Alexandre Storelli 2019-02-22 12:19:22 +01:00
parent d72c1aa253
commit c4300806ac
5 changed files with 56 additions and 26 deletions

View File

@ -26,8 +26,13 @@ const consts = {
ML_CONFIDENCE_THRESHOLD: 0.65,
HOTLIST_CONFIDENCE_THRESHOLD: 0.5,
FINAL_CONFIDENCE_THRESHOLD: 0.40,
MINIMUM_BUFFER: 2, // in seconds. some radio streams have very small buffers. just like players
// that wait for a minimal buffer before playing, wait for N seconds before streaming data.
MINIMUM_BUFFER: 2, // in seconds.
// Some radios have a very small buffer, down to zero.
// But browsers such as Firefox and Chrome only start playing after a 2 second buffer.
// So we artificially delay the predictions.
// VLC player, however, plays without such delay.
DOWNSTREAM_LATENCY: 500 // in milliseconds. broadcast the prediction result N ms before it should be applied by the players of the end users.
}
@ -118,8 +123,14 @@ class PostProcessor extends Transform {
if (this.config.verbose) log.debug("---------------------");
const now = +new Date();
this.slotCounter++;
this.cache.unshift({ ts: now, audio: null, ml: null, hotlist: null, tBuf: tBuffer, n: this.slotCounter });
this.cache.unshift({ ts: null, audio: null, ml: null, hotlist: null, tBuf: tBuffer, n: this.slotCounter });
if (this.cache[1]) {
this.cache[1].ts = now;
} else { // happens only at first startup.
this.cache[0].ts = now;
}
if (this.config.fileMode) {
if (this.cache.length >= 5) {
@ -411,7 +422,7 @@ class Analyser extends Readable {
if (self.config.modelUpdates) {
await checkModelUpdates(self.country, self.name, self.config.modelPath);
} else {
log.info(self.country + '_' + self.name + ' module updates are disabled');
log.info(self.country + '_' + self.name + ' model updates are disabled');
}
await checkMetadataUpdates();

View File

@ -6,7 +6,7 @@
"use strict";
const sqlite3 = require("sqlite3").verbose();
const { Transform } = require("stream");
const { Writable } = require("stream");
const { log } = require("abr-log")("pred-hotlist");
const Codegen = require("stream-audio-fingerprint");
const async = require("async");
@ -25,7 +25,7 @@ const consts = {
}
}
class Hotlist extends Transform {
class Hotlist extends Writable {
constructor(options) {
super({ objectMode: true });
this.country = options.country;
@ -131,8 +131,7 @@ class Hotlist extends Transform {
let hcodes = this.fingerbuffer.hcodes;
this.fingerbuffer = { tcodes: [], hcodes: [] };
if (!tcodes.length) {
this.push({ type: "hotlist", data: consts.EMPTY_OUTPUT });
if (callback) callback();
if (callback) callback(null, consts.EMPTY_OUTPUT);
return log.warn("onFingers: " + this.country + "_" + this.name + " no fingerprints to search");
}
@ -155,8 +154,7 @@ class Hotlist extends Transform {
if (err) return log.error("onFingers: " + self.country + "_" + self.name + " query error=" + err);
if (!res || !res.length) {
//log.warn("onFingers: no results for a query of " + tcodes.length);
self.push({ type: "hotlist", data: consts.EMPTY_OUTPUT });
if (callback) callback();
if (callback) callback(null, consts.EMPTY_OUTPUT);
return
}
@ -270,8 +268,7 @@ class Hotlist extends Transform {
softmaxraw: softmax,
}
self.push({ type: "hotlist", data: output });
if (callback) callback();
if (callback) callback(null, output);
});
}

View File

@ -155,12 +155,26 @@ class PredictorFile {
function(cb) {
if (!self.config.enablePredictorMl) return setImmediate(cb);
self.mlPredictor.write(dataObj.data);
self.mlPredictor.predict(cb);
self.mlPredictor.predict(function(err, data) {
if (!err && data) {
self.listener.write({ type: "ml", data });
} else {
log.warn("skip ml result because err=" + err + " data=" + JSON.stringify(data));
}
cb(err);
});
},
function(cb) {
if (!self.config.enablePredictorHotlist) return setImmediate(cb);
self.hotlist.write(dataObj.data);
self.hotlist.onFingers(cb);
self.hotlist.onFingers(function(err, data) {
if (!err && data) {
self.listener.write({ type: "hotlist", data });
} else {
log.warn("skip hotlist result because err=" + err + " data=" + JSON.stringify(data));
}
cb(err);
});
}
], function(err) {
@ -188,7 +202,6 @@ class PredictorFile {
name: this.name,
fileDB: this.modelPath + '/' + this.country + '_' + this.name + '.sqlite'
});
this.hotlist.pipe(this.listener);
} else {
this.hotlist = null;
}
@ -206,7 +219,6 @@ class PredictorFile {
}
callback();
});
this.mlPredictor.pipe(this.listener);
} else {
this.mlPredictor = null;
}

View File

@ -5,13 +5,13 @@
// Copyright (c) 2018 Alexandre Storelli
"use strict";
const { Transform } = require("stream");
const { Writable } = require("stream");
const cp = require("child_process");
const { log } = require("abr-log")("pred-ml");
const zerorpc = require("zerorpc");
const fs = require("fs");
class MlPredictor extends Transform {
class MlPredictor extends Writable {
constructor(options) {
super({ readableObjectMode: true });
this.canonical = options.country + "_" + options.name;
@ -165,7 +165,6 @@ class MlPredictor extends Transform {
lenPcm: results.lenpcm
}
self.push({ type:"ml", data: outData, array: true });
callback(null, outData);
});
}

View File

@ -177,10 +177,26 @@ class Predictor {
async.parallel([
function(cb) {
return self.config.enablePredictorMl && self.mlPredictor.ready ? self.mlPredictor.predict(cb) : setImmediate(cb);
if (!self.config.enablePredictorMl || !self.mlPredictor.ready) return setImmediate(cb);
self.mlPredictor.predict(function(err, data) {
if (!err && data) {
self.listener.write({ type: "ml", data });
} else {
log.warn("skip ml result because err=" + err + " data=" + JSON.stringify(data));
}
cb(err);
});
},
function(cb) {
return self.config.enablePredictorHotlist ? self.hotlist.onFingers(cb) : setImmediate(cb);
if (!self.config.enablePredictorHotlist) return setImmediate(cb);
self.hotlist.onFingers(function(err, data) {
if (!err && data) {
self.listener.write({ type: "hotlist", data });
} else {
log.warn("skip hotlist result because err=" + err + " data=" + JSON.stringify(data));
}
cb(err);
});
}
], function(err) {
@ -264,7 +280,6 @@ class Predictor {
refreshPredictorHotlist() {
log.info(this.canonical + " refresh hotlist predictor");
if (this.hotlist) {
this.hotlist.unpipe(this.listener);
this.decoder.stdout.unpipe(this.hotlist);
this.hotlist.destroy();
delete this.hotlist;
@ -275,7 +290,6 @@ class Predictor {
name: this.name,
fileDB: this.modelPath + '/' + this.country + '_' + this.name + '.sqlite'
});
this.hotlist.pipe(this.listener);
this.decoder.stdout.pipe(this.hotlist);
} else {
this.hotlist = null;
@ -290,7 +304,6 @@ class Predictor {
country: this.country,
name: this.name,
});
this.mlPredictor.pipe(this.listener);
} else if (this.mlPredictor.ready2) {
this.decoder.stdout.unpipe(this.mlPredictor);
}
@ -301,7 +314,6 @@ class Predictor {
this.mlPredictor.load(this.modelPath + '/' + this.country + '_' + this.name + '.keras', function(err) {
if (err && ("" + err).indexOf("Lost remote after 30000ms") >= 0) {
log.warn(self.canonical + " lost remote Python worker. will restart it");
self.mlPredictor.unpipe(self.listener);
self.mlPredictor.destroy();
self.refreshPredictorMl();
@ -320,7 +332,6 @@ class Predictor {
});
} else {
if (this.mlPredictor) {
this.mlPredictor.unpipe(this.listener);
if (this.mlPredictor.ready2) this.decoder.stdout.unpipe(this.mlPredictor);
this.mlPredictor.destroy();
this.mlPredictor = null;