put ml computations in a different thread

This commit is contained in:
Alexandre Storelli 2019-03-11 14:08:40 +01:00
parent f7b2ef34ac
commit e5f1340675
4 changed files with 262 additions and 159 deletions

12
build.sh Normal file → Executable file
View File

@ -1,16 +1,16 @@
# Node.JS part
mkdir ../build
cp node_modules/zeromq/build/Release/zmq.node ../build/.
#cp node_modules/zeromq/build/Release/zmq.node ../build/.
cp node_modules/sqlite3/lib/binding/node-v64-linux-x64/node_sqlite3.node ../build/.
pkg demo.js && cp demo-linux ../build/.
# Python part
cd predictor-ml/
pyinstaller mlpredict.spec
cd ../../build
ln -s ../adblockradio/predictor-ml/dist .
#cd predictor-ml/
#pyinstaller mlpredict.spec
#cd ../../build
#ln -s ../adblockradio/predictor-ml/dist .
# now run the demo
cd ../build/
./demo-linux
./demo-linux

7
package-lock.json generated
View File

@ -3258,8 +3258,13 @@
"resolved": "https://registry.npmjs.org/stream-audio-fingerprint/-/stream-audio-fingerprint-1.0.4.tgz",
"integrity": "sha512-FSnM7XIlm6+MNx1JvVvhcZVJYvfZer/2o3hmLzYwAGIhJp5gXINftpYiy0z+p2RZMdQ60ROgH/lGIYATZyunfA==",
"requires": {
"dsp.js": "git+https://git@github.com/corbanbrook/dsp.js.git#051f341d8e1d05515efecb1851bbc862decf02bc",
"node-png": "^0.4.3"
},
"dependencies": {
"dsp.js": {
"version": "git+https://git@github.com/corbanbrook/dsp.js.git#051f341d8e1d05515efecb1851bbc862decf02bc",
"from": "git+https://git@github.com/corbanbrook/dsp.js.git#051f341d8e1d05515efecb1851bbc862decf02bc"
}
}
},
"stream-parser": {

200
predictor-ml/ml-worker.js Normal file
View File

@ -0,0 +1,200 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
// Copyright (c) 2018 Alexandre Storelli
"use strict";
const { log } = require("abr-log")("pred-ml-worker");
//const fs = require("fs");
//global.fetch = require('node-fetch'); // tensorflow-js uses browser API fetch. This is a polyfill for usage in Node
//const tf = require('@tensorflow/tfjs');
const tf = require('@tensorflow/tfjs-node');
const assert = require('assert');
// Input audio sampling rate (Hz). Note audio is assumed to be 16-bit.
const SAMPLING_RATE = 22050;
// Compute each MFCC frame with a window of that length (in seconds)
const MFCC_WINLEN = 0.05;
// How many seconds to step between each MFCC frame
const MFCC_WINSTEP = 0.02;
// Window of audio data sent for each LSTM prediction, in seconds
// Equivalent variable in the Python code: nnXLenT
const LSTM_INTAKE_SECONDS = 4;
// Amount of cepstral coefficients read for each LSTM prediction
// With MFCC_WINSTEP = 0.02 and LSTM_INTAKE_SECONDS = 4, it is equal to 200.
const LSTM_INTAKE_FRAMES = Math.floor(LSTM_INTAKE_SECONDS / MFCC_WINSTEP);
// Compute one LSTM prediction every N seconds.
// It means that you call predict more often than every LSTM_STEP_SECONDS,
// your result will only be made of one LSTM prediction.
// If you call predict on a larger buffer, your result will be the average of several LSTM predictions.
// Equivalent variable in the Python code: nnXStepT
const LSTM_STEP_SECONDS = 0.19*4;
// Amount of cepstral coefficients between each LSTM prediction
// With MFCC_WINSTEP = 0.02 and LSTM_STEP_SECONDS at 0.76, it is equal to 38
// Equivalent variable in the Python code: nnXStep
const LSTM_STEP_FRAMES = Math.round(LSTM_STEP_SECONDS / MFCC_WINSTEP);
const mfcc = require("./mfcc.js")(SAMPLING_RATE, MFCC_WINLEN, MFCC_WINSTEP);
log.info('Child process spawned with the following configuration:');
log.info('modelFile: ' + process.env.modelFile);
assert(process.env.modelFile);
log.info('canonical: ' + process.env.canonical);
assert(process.env.canonical);
let model = null;
let newBuf = null;
let workingBuf = null;
let verbose = false;
function parse(msg) {
try {
return JSON.parse(msg);
} catch (e) {
log.error(process.env.canonical + ' error parsing msg. msg=' + msg);
return null;
}
}
function send(msg) {
process.send(JSON.stringify(msg));
}
(async function() {
const handler = tf.io.fileSystem(process.env.modelFile); // see https://stackoverflow.com/a/53766926/5317732
model = await tf.loadModel(handler);
// load model from remote file
//const path = 'https://www.adblockradio.com/models/' + canonical + '/model.json';
//model = await tf.loadModel(path);
log.info(process.env.canonical + ': ML model loaded');
send({ type: 'loading', err: null, loaded: true });
})();
process.on('message', function(msg) {
msg = parse(msg);
if (msg.type === 'write') {
assert(msg.buf);
assert.equal(msg.buf.type, 'Buffer'); // JSON.stringify represents Buffers as { type: 'Buffer', data: '' }
newBuf = newBuf ? Buffer.concat([newBuf, new Buffer(msg.buf.data)]) : new Buffer(msg.buf.data);
//log.debug("write " + buf.length / 2 + " samples to the buffer. now " + newBuf.length / 2 + " samples in it");
} else if (msg.type === 'predict') {
if (!newBuf) {
log.warn("empty buffer. skip");
return send({ type: msg.type, err: 'empty buffer. skip' });
} else if (!model) {
log.warn("model is not ready. skip");
return send({ type: msg.type, err: 'model is not ready. skip' });
}
const nSamples = newBuf.length / 2;
const duration = nSamples / SAMPLING_RATE;
if (verbose) log.debug("will analyse " + duration + " s (" + nSamples + " samples)");
// compute RMS for volume normalization
let s = 0;
for (let i=0; i<nSamples; i++) {
s += Math.pow(newBuf.readInt16LE(2*i), 2);
}
const rms = isNaN(s) ? 70 : 20 * Math.log10(Math.sqrt(s/nSamples))
if (verbose) log.debug("segment rms=" + Math.round(rms*100)/100 + " dB");
// We take the amount of data necessary to generate a new prediction,
// even if the last prediction was not long ago.
// It means to save the correct amount of data points to fill an analysis window,
// then add the new points since the last prediction.
// Factor 2 comes from the fact that audio is 16 bit.
// The number of LSTM predictions will depend on LSTM_STEP_SECONDS
const cropBufLen = 2*Math.floor(LSTM_INTAKE_SECONDS * SAMPLING_RATE) + newBuf.length;
workingBuf = workingBuf ? Buffer.concat([workingBuf, newBuf]) : newBuf;
newBuf = null;
if (workingBuf.length > cropBufLen) {
if (verbose) log.debug("Working buf will be truncated from " + (workingBuf.length / 2) + " samples to " + cropBufLen);
workingBuf = workingBuf.slice(-cropBufLen);
if (verbose) log.debug("working buf new length=" + (workingBuf.length / 2));
} else if (workingBuf.length <= 2 * MFCC_WINLEN * SAMPLING_RATE) {
log.warn("Working buffer is too short. Keep it but abort prediction now.");
return send({ type: msg.type, err: 'Working buffer is too short. Keep it but abort prediction now.' });
}
const ceps = mfcc(workingBuf); // call here mfcc.js
const nWin = ceps.length;
if (nWin < LSTM_INTAKE_FRAMES) {
// audio input is shorter than LSTM window
// left-pad with identical frames
const nMissingFrames = LSTM_INTAKE_FRAMES - nWin;
if (verbose) log.warn(nMissingFrames + " frames missing to fit lstm intake")
const refFrame = ceps[0].slice();
for (let i=0; i<nMissingFrames; i++) {
ceps.unshift(refFrame);
}
}
if (verbose) log.debug("ceps.l=" + ceps.length + " intake_frames=" + LSTM_INTAKE_FRAMES + " step_frames=" + LSTM_INTAKE_FRAMES);
const nLSTMPredictions = Math.floor((ceps.length - LSTM_INTAKE_FRAMES) / LSTM_STEP_FRAMES) + 1;
if (verbose) log.debug(ceps.length + " frames will be sent to LSTM, in " + nLSTMPredictions + " chunks.");
const MLInputData = new Array(nLSTMPredictions);
for (let i=0; i<nLSTMPredictions; i++) {
MLInputData[i] = ceps.slice(i*LSTM_STEP_FRAMES, i*LSTM_STEP_FRAMES + LSTM_INTAKE_FRAMES);
}
const tfResults = model.predict(tf.tensor3d(MLInputData));
const flatResultsRaw = tfResults.as1D().dataSync();
// TF.js data is a 1D array. Convert it to a nLSTMPredictions * 3 2D array.
const resultsRaw = new Array(nLSTMPredictions).fill(0).map(function(__, index) {
return flatResultsRaw.slice(index*3, (index+1)*3);
});
// Average the results across LSTM predictions, to get a 1D array with 3 elements.
let maxResult = 0;
let indexMaxResult = -1;
const resultsAvg = new Array(3).fill(0).map(function(__, index) {
let sum = 0;
for (let i=index; i<flatResultsRaw.length; i=i+3) {
sum += flatResultsRaw[i];
}
if (sum > maxResult) {
maxResult = sum;
indexMaxResult = index;
}
return sum / nLSTMPredictions;
});
const secondMaxResult = Math.max(...resultsAvg.slice(0, indexMaxResult).concat(resultsAvg.slice(indexMaxResult + 1)));
const confidence = 1 - Math.exp(1 - maxResult / nLSTMPredictions / secondMaxResult);
if (verbose) {
log.debug("ResultsRaw:");
console.log(resultsRaw);
log.debug("ResultsAvg:");
console.log(resultsAvg);
log.debug("pred class = " + indexMaxResult + " with softmax = " + maxResult/nLSTMPredictions);
log.debug("second class is " + secondMaxResult + ". confidence = " + confidence);
}
const outData = {
type: indexMaxResult,
confidence: confidence,
softmaxraw: resultsAvg.concat([0]), // the last class is about jingles. ML does not detect them.
//date: new Date(stream.lastData.getTime() + Math.round(stream.tBuffer*1000)),
gain: rms,
lenPcm: workingBuf.length
};
send({ type: msg.type, err: null, outData });
}
});

View File

@ -7,42 +7,17 @@
"use strict";
const { Writable } = require("stream");
const { log } = require("abr-log")("pred-ml");
//const fs = require("fs");
//global.fetch = require('node-fetch'); // tensorflow-js uses browser API fetch. This is a polyfill for usage in Node
//const tf = require('@tensorflow/tfjs');
const tf = require('@tensorflow/tfjs-node');
// Input audio sampling rate (Hz). Note audio is assumed to be 16-bit.
const SAMPLING_RATE = 22050;
// Compute each MFCC frame with a window of that length (in seconds)
const MFCC_WINLEN = 0.05;
// How many seconds to step between each MFCC frame
const MFCC_WINSTEP = 0.02;
// Window of audio data sent for each LSTM prediction, in seconds
// Equivalent variable in the Python code: nnXLenT
const LSTM_INTAKE_SECONDS = 4;
// Amount of cepstral coefficients read for each LSTM prediction
// With MFCC_WINSTEP = 0.02 and LSTM_INTAKE_SECONDS = 4, it is equal to 200.
const LSTM_INTAKE_FRAMES = Math.floor(LSTM_INTAKE_SECONDS / MFCC_WINSTEP);
// Compute one LSTM prediction every N seconds.
// It means that you call predict more often than every LSTM_STEP_SECONDS,
// your result will only be made of one LSTM prediction.
// If you call predict on a larger buffer, your result will be the average of several LSTM predictions.
// Equivalent variable in the Python code: nnXStepT
const LSTM_STEP_SECONDS = 0.19*4;
// Amount of cepstral coefficients between each LSTM prediction
// With MFCC_WINSTEP = 0.02 and LSTM_STEP_SECONDS at 0.76, it is equal to 38
// Equivalent variable in the Python code: nnXStep
const LSTM_STEP_FRAMES = Math.round(LSTM_STEP_SECONDS / MFCC_WINSTEP);
const mfcc = require("./mfcc.js")(SAMPLING_RATE, MFCC_WINLEN, MFCC_WINSTEP);
const cp = require("child_process");
const assert = require("assert");
function parse(msg) {
try {
return JSON.parse(msg);
} catch (e) {
log.error(self.canonical + ' could not parse response. msg=' + msg);
return null;
}
}
class MlPredictor extends Writable {
constructor(options) {
@ -57,135 +32,58 @@ class MlPredictor extends Writable {
}
async load() {
// load model from local file
const handler = tf.io.fileSystem(this.modelFile); // see https://stackoverflow.com/a/53766926/5317732
this.model = await tf.loadModel(handler);
const self = this;
return new Promise(function(resolve, reject) {
self.child = cp.fork('./predictor-ml/ml-worker.js', {
env: {
canonical: self.canonical,
modelFile: self.modelFile,
}
});
// load model from remote file
//const path = 'https://www.adblockradio.com/models/' + this.canonical + '/model.json';
//this.model = await tf.loadModel(path);
log.info(this.canonical + ': ML model loaded');
this.ready = true;
self.child.once('message', function(msg) {
msg = parse(msg);
assert.equal(msg.type, 'loading');
if (msg.err) {
log.warn(self.canonical + ' could not load model: ' + JSON.stringify(msg));
return reject();
}
self.ready = msg.loaded;
log.info(self.canonical + ' loaded=' + self.ready);
resolve();
});
});
}
_write(buf, enc, next) {
this.newBuf = this.newBuf ? Buffer.concat([this.newBuf, buf]) : buf;
//log.debug("write " + buf.length / 2 + " samples to the buffer. now " + this.newBuf.length / 2 + " samples in it");
if (this.child && this.ready) {
this.child.send(JSON.stringify({
type: 'write',
buf: buf,
}));
}
next();
}
predict(callback) {
if (!this.newBuf) {
log.warn("empty buffer. skip");
return setImmediate(callback);
} else if (!this.model) {
log.warn("model is not ready. skip");
return setImmediate(callback);
if (this.child && this.ready) {
this.child.send(JSON.stringify({
type: 'predict',
}));
const self = this;
this.child.once('message', function(msg) {
msg = parse(msg);
assert.equal(msg.type, 'predict');
if (msg.err) log.warn(self.canonical + ' skipped prediction: ' + JSON.stringify(msg));
callback(null, msg.outData);
});
}
const nSamples = this.newBuf.length / 2;
const duration = nSamples / SAMPLING_RATE;
if (this.verbose) log.debug("will analyse " + duration + " s (" + nSamples + " samples)");
// compute RMS for volume normalization
let s = 0;
for (let i=0; i<nSamples; i++) {
s += Math.pow(this.newBuf.readInt16LE(2*i), 2);
}
const rms = isNaN(s) ? 70 : 20 * Math.log10(Math.sqrt(s/nSamples))
if (this.verbose) log.debug("segment rms=" + Math.round(rms*100)/100 + " dB");
// We take the amount of data necessary to generate a new prediction,
// even if the last prediction was not long ago.
// It means to save the correct amount of data points to fill an analysis window,
// then add the new points since the last prediction.
// Factor 2 comes from the fact that audio is 16 bit.
// The number of LSTM predictions will depend on LSTM_STEP_SECONDS
const cropBufLen = 2*Math.floor(LSTM_INTAKE_SECONDS * SAMPLING_RATE) + this.newBuf.length;
this.workingBuf = this.workingBuf ? Buffer.concat([this.workingBuf, this.newBuf]) : this.newBuf;
this.newBuf = null;
if (this.workingBuf.length > cropBufLen) {
if (this.verbose) log.debug("Working buf will be truncated from " + (this.workingBuf.length / 2) + " samples to " + cropBufLen);
this.workingBuf = this.workingBuf.slice(-cropBufLen);
if (this.verbose) log.debug("working buf new length=" + (this.workingBuf.length / 2));
} else if (this.workingBuf.length <= 2 * MFCC_WINLEN * SAMPLING_RATE) {
log.warn("Working buffer is too short. Keep it but abort prediction now.");
return setImmediate(callback);
}
const ceps = mfcc(this.workingBuf); // call here mfcc.js
const nWin = ceps.length;
if (nWin < LSTM_INTAKE_FRAMES) {
// audio input is shorter than LSTM window
// left-pad with identical frames
const nMissingFrames = LSTM_INTAKE_FRAMES - nWin;
if (this.verbose) log.warn(nMissingFrames + " frames missing to fit lstm intake")
const refFrame = ceps[0].slice();
for (let i=0; i<nMissingFrames; i++) {
ceps.unshift(refFrame);
}
}
if (this.verbose) log.debug("ceps.l=" + ceps.length + " intake_frames=" + LSTM_INTAKE_FRAMES + " step_frames=" + LSTM_INTAKE_FRAMES);
const nLSTMPredictions = Math.floor((ceps.length - LSTM_INTAKE_FRAMES) / LSTM_STEP_FRAMES) + 1;
if (this.verbose) log.debug(ceps.length + " frames will be sent to LSTM, in " + nLSTMPredictions + " chunks.");
const MLInputData = new Array(nLSTMPredictions);
for (let i=0; i<nLSTMPredictions; i++) {
MLInputData[i] = ceps.slice(i*LSTM_STEP_FRAMES, i*LSTM_STEP_FRAMES + LSTM_INTAKE_FRAMES);
}
const tfResults = this.model.predict(tf.tensor3d(MLInputData));
const flatResultsRaw = tfResults.as1D().dataSync();
// TF.js data is a 1D array. Convert it to a nLSTMPredictions * 3 2D array.
const resultsRaw = new Array(nLSTMPredictions).fill(0).map(function(__, index) {
return flatResultsRaw.slice(index*3, (index+1)*3);
});
// Average the results across LSTM predictions, to get a 1D array with 3 elements.
let maxResult = 0;
let indexMaxResult = -1;
const resultsAvg = new Array(3).fill(0).map(function(__, index) {
let sum = 0;
for (let i=index; i<flatResultsRaw.length; i=i+3) {
sum += flatResultsRaw[i];
}
if (sum > maxResult) {
maxResult = sum;
indexMaxResult = index;
}
return sum / nLSTMPredictions;
});
const secondMaxResult = Math.max(...resultsAvg.slice(0, indexMaxResult).concat(resultsAvg.slice(indexMaxResult + 1)));
const confidence = 1 - Math.exp(1 - maxResult / nLSTMPredictions / secondMaxResult);
if (this.verbose) {
log.debug("ResultsRaw:");
console.log(resultsRaw);
log.debug("ResultsAvg:");
console.log(resultsAvg);
log.debug("pred class = " + indexMaxResult + " with softmax = " + maxResult/nLSTMPredictions);
log.debug("second class is " + secondMaxResult + ". confidence = " + confidence);
}
const outData = {
type: indexMaxResult,
confidence: confidence,
softmaxraw: resultsAvg.concat([0]), // the last class is about jingles. ML does not detect them.
//date: new Date(stream.lastData.getTime() + Math.round(stream.tBuffer*1000)),
gain: rms,
lenPcm: this.workingBuf.length
};
//setImmediate(function() {
callback(null, outData);
//});
}
_final() {
// pass
if (this.child) {
this.child.kill();
}
}
}