add test for live stream analysis. bugfixes

This commit is contained in:
Alexandre Storelli 2019-03-12 13:20:21 +01:00
parent 770fafad46
commit 8e603b32b5
7 changed files with 244 additions and 20 deletions

View File

@ -32,8 +32,8 @@ abr.on("data", function(obj) {
//log.info("status=" + JSON.stringify(Object.assign(obj, { audio: undefined }), null, "\t"));
});
abr.on("close", function() {
log.info("analyser closed");
abr.on("end", function() {
log.info("analyser ended");
});
//setTimeout(abr.stopDl, 15000);
//setTimeout(abr.stopDl, 15000);

6
package-lock.json generated
View File

@ -5206,6 +5206,12 @@
"signal-exit": "^3.0.2"
}
},
"wtfnode": {
"version": "0.8.0",
"resolved": "https://registry.npmjs.org/wtfnode/-/wtfnode-0.8.0.tgz",
"integrity": "sha512-A5jm/0REykxUac1q4Q5kv+hDIiacvqVpwIoXzCQcRL7syeEKucVVOxyLLrt+jIiZoXfla3lnsxUw/cmWXIaGWA==",
"dev": true
},
"xdg-basedir": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/xdg-basedir/-/xdg-basedir-3.0.0.tgz",

View File

@ -4,7 +4,7 @@
"description": "Distinguish ads, talk and music in radios",
"main": "post-processing.js",
"scripts": {
"test": "mocha --delay"
"test": "mocha --delay test/file.js && mocha --delay test/online.js"
},
"keywords": [
"adblock",
@ -32,6 +32,7 @@
"electron-builder": "^20.38.5",
"electron-prebuilt": "^1.4.13",
"electron-rebuild": "^1.8.4",
"mocha": "^6.0.2"
"mocha": "^6.0.2",
"wtfnode": "^0.8.0"
}
}

View File

@ -371,7 +371,7 @@ class Analyser extends Readable {
metadataPath: undefined
});
self.push({ liveResult: obj });
self.push({ liveResult: obj, metadataPath: metadataPath });
if (!self.config.saveMetadata) return;
if (!metadataPath) {
@ -474,14 +474,11 @@ class Analyser extends Readable {
{ file: self.config.modelFile, tar: false, callback: self.predictor.refreshPredictorMl },
{ file: self.config.modelFile.replace('model.json', 'group1-shard1of1'), tar: false, callback: self.predictor.refreshPredictorMl },
{ file: self.config.hotlistFile, tar: true, callback: self.predictor.refreshPredictorHotlist },
]
});
}
checkMetadataUpdates(self.predictor.refreshMetadata);
}, self.config.modelUpdateInterval * 60000);
}
})();

View File

@ -27,7 +27,6 @@ class MlPredictor extends Writable {
this.ready = false; // becomes true when ML model is loaded
//this.ready2 = false; // becomes true when audio data is piped to this module. managed externally
//this.finalCallback = null;
this.readyToCallFinal = false;
this.modelFile = options.modelFile;
}

View File

@ -93,7 +93,7 @@ class Predictor {
});
this.dl.pause();
this.decoder = require('child_process').spawn('ffmpeg', [
this.decoder = cp.spawn('ffmpeg', [
'-i', 'pipe:0',
'-acodec', 'pcm_s16le',
'-ar', 22050,
@ -112,7 +112,11 @@ class Predictor {
this.dbs = null;
this.dl.on("metadata", function(metadata) {
log.info(self.canonical + " metadata=" + JSON.stringify(metadata));
self.listener.write({ type: "dlinfo", data: metadata });
if (self.listener.writable) {
self.listener.write({ type: "dlinfo", data: metadata });
} else {
log.warn("Could not pass metadata to listener because it is not writable");
}
self.audioExt = metadata.ext;
if (!self.dbs) {
@ -180,10 +184,10 @@ class Predictor {
function(cb) {
if (!self.config.enablePredictorMl || !self.mlPredictor.ready) return setImmediate(cb);
self.mlPredictor.predict(function(err, data) {
if (!err && data) {
if (!err && data && self.listener.writable) {
self.listener.write({ type: "ml", data });
} else {
log.warn("skip ml result because err=" + err + " data=" + JSON.stringify(data));
log.warn("skip ml result because err=" + err + " data=" + JSON.stringify(data) + " writable=" + self.listener.writable);
}
cb(err);
});
@ -191,10 +195,10 @@ class Predictor {
function(cb) {
if (!self.config.enablePredictorHotlist) return setImmediate(cb);
self.hotlist.onFingers(function(err, data) {
if (!err && data) {
if (!err && data && self.listener.writable) {
self.listener.write({ type: "hotlist", data });
} else {
log.warn("skip hotlist result because err=" + err + " data=" + JSON.stringify(data));
log.warn("skip hotlist result because err=" + err + " data=" + JSON.stringify(data) + " writable=" + self.listener.writable);
}
cb(err);
});
@ -266,8 +270,8 @@ class Predictor {
const path = dir + now.toISOString();
//log.debug("saveAudioSegment: path=" + path);
cp.exec("mkdir -p \"" + dir + "\"", function(error, stdout, stderr) {
if (error) log.error("warning, could not create path " + path);
fs.mkdir(dir, { recursive: true }, function(err) {
if (err && !("" + err).includes('EEXIST')) log.error("warning, could not create path " + path + " err=" + err);
self.dbs = {
audio: self.config.saveAudio ? new fs.createWriteStream(path + "." + self.audioExt) : null,
metadataPath: path + ".json"
@ -358,7 +362,7 @@ class Predictor {
this.dl.stopDl();
log.debug("will stop decoder");
this.decoder.kill();
this.decoder.stdin.end();
if (this.hotlist) {
log.debug("will close hotlist");
@ -373,7 +377,6 @@ class Predictor {
log.debug("no ML predictor to close");
}
}
}

218
test/online.js Normal file
View File

@ -0,0 +1,218 @@
"use strict";
const { log } = require("abr-log")("test-online");
const { Analyser } = require("../post-processing.js");
const fs = require("fs-extra");
const assert = require("assert");
const cluster = require("cluster");
const TEST_ML = true;
const TEST_HOTLIST = true;
const PRED_INTERVAL = 1; // in seconds
if (cluster.isMaster) {
const CLOSE_DELAY = 15000;
const TIMEOUT = 10000; // ms counted in addition to CLOSE_DELAY
let cp = null;
let gotData = false;
let finished = false;
let hasErrors = false;
let stopped = false;
let exited = false;
let exitCode = null;
let timeout = null;
let timedOut = false;
let metaFiles = [];
let metaFilesContent = null;
let metaFilesSane = null;
setTimeout(function() {
log.info('stop the stream analysis');
stopped = true;
cp.send({ action: 'stop' });
timeout = setTimeout(function() {
log.warn('child process has not exited properly. kill it.');
cp.kill();
timedOut = true;
run();
}, TIMEOUT)
}, CLOSE_DELAY);
cp = cluster.fork();
cp.on('message', function(msg) {
if (msg.type === 'data') {
if (msg.data) {
gotData = true;
if (msg.data.metadataPath) {
log.debug("metafile=" + msg.data.metadataPath);
if (!metaFiles.includes(msg.data.metadataPath)) metaFiles.push(msg.data.metadataPath);
}
}
} else if (msg.type === 'stop') {
stopped = true;
} else if (msg.type === 'end') {
finished = true;
}
});
cp.on('error', function(err) {
log.error('child process had an error: ' + err);
hasErrors = true;
});
cp.on('exit', function(code) {
exited = true;
exitCode = code;
if (timeout) clearTimeout(timeout);
(async function() {
metaFilesContent = new Array(metaFiles.length);
metaFilesSane = new Array(metaFiles.length);
for (let i=0; i<metaFiles.length; i++) {
try {
log.debug("Read " + metaFiles[i]);
metaFilesContent[i] = await fs.readFile(metaFiles[i]);
metaFilesContent[i] = JSON.parse(metaFilesContent[i]);
metaFilesSane[i] = true;
} catch (e) {
log.warn("could not read " + metaFiles[i] + " err=" + e);
metaFilesSane[i] = false;
}
}
run();
})();
});
describe('Live stream analysis', function() {
it("should have emitted data", function() {
assert(gotData);
});
it("should have emitted an end event", function() {
assert(finished);
});
it("should not have thrown errors", function() {
assert(!hasErrors);
});
it("should have exited properly", function() {
assert(stopped);
assert(exited);
assert.equal(exitCode, 0);
assert(!timedOut);
});
it("should write results in JSON format", function() {
assert(metaFiles.length);
for (let i=0; i<metaFiles.length; i++) {
assert(metaFilesSane[i]);
const c = metaFilesContent[i];
assert(c);
assert(!isNaN(c.predictorStartTime));
assert(c.country);
assert(c.name);
assert(c.streamInfo);
assert(c.streamInfo.url);
assert(!isNaN(c.streamInfo.bitrate));
assert(c.streamInfo.favicon);
assert(c.streamInfo.homepage);
assert(c.streamInfo.audioExt);
assert(c.predictions);
for (let j=0; j<c.predictions.length; j++) {
const p = c.predictions[j];
assert(['0-ads', '1-speech', '2-music', '3-jingles', 'unsure'].includes(p.class));
assert(p.playTime);
assert(!isNaN(p.tBuffer));
// ML module is usually not ready at startup of live stream analysis
if (TEST_ML && p.ml) {
assert(p.gain > 60 && p.gain < 85);
assert(p.ml);
assert(['0-ads', '1-speech', '2-music'].includes(p.ml.class));
assert(p.ml.softmaxraw);
assert.equal(p.ml.softmaxraw.length, 4);
assert(p.ml.softmax);
assert.equal(p.ml.softmax.length, 4);
assert(!isNaN(p.ml.slotsFuture));
assert(!isNaN(p.ml.slotsPast));
} else {
assert.equal(p.ml, null);
}
if (TEST_HOTLIST) {
assert(p.hotlist);
assert(['0-ads', '1-speech', '2-music', '3-jingles', 'unsure'].includes(p.hotlist.class));
assert(p.hotlist.softmaxraw);
assert.equal(p.hotlist.softmaxraw.length, 4);
assert(p.hotlist.softmax);
assert.equal(p.hotlist.softmax.length, 4);
assert(!isNaN(p.hotlist.matchesSync));
assert(!isNaN(p.hotlist.matchesTotal));
assert(!isNaN(p.hotlist.confidence1));
assert(0 <= p.hotlist.confidence1);
assert(p.hotlist.confidence1 <= 1);
assert(!isNaN(p.hotlist.confidence2));
assert(0 <= p.hotlist.confidence2);
assert(p.hotlist.confidence2 <= 1);
} else {
assert.equal(p.hotlist, null);
}
}
}
});
});
} else {
const abr = new Analyser({
country: 'France',
name: 'RTL',
config: {
predInterval: PRED_INTERVAL,
saveDuration: 10,
enablePredictorHotlist: TEST_HOTLIST,
enablePredictorMl: TEST_ML,
saveAudio: true,
saveMetadata: true,
fetchMetadata: true,
verbose: false,
}
});
abr.on("data", function(obj) {
obj.liveResult.audio = "[redacted]";
//log.info(obj.metadataPath);
log.info(JSON.stringify(obj.liveResult, null, "\t"));
process.send({ type: 'data', data: obj });
});
abr.on("end", function() {
process.send({ type: 'end' });
log.info("analyser ended");
process.disconnect(); // otherwise the IPC prevents the subprocess from gracefully exiting
});
process.on('message', function(msg) {
if (msg && msg.action === 'stop') {
abr.stopDl();
}
});
}