Stop training thread from processing training requests once cancelled. (#14423)
This commit is contained in:
parent
828af8182f
commit
3c73da09c3
17
ml/Host.cc
17
ml/Host.cc
|
@ -223,6 +223,11 @@ void Host::train() {
|
|||
TrainingRequest TrainingReq = P.first;
|
||||
size_t Size = P.second;
|
||||
|
||||
if (ThreadsCancelled) {
|
||||
info("Stopping training thread because it was cancelled.");
|
||||
break;
|
||||
}
|
||||
|
||||
usec_t AllottedUT = (Cfg.TrainEvery * RH->rrd_update_every * USEC_PER_SEC) / Size;
|
||||
if (AllottedUT > USEC_PER_SEC)
|
||||
AllottedUT = USEC_PER_SEC;
|
||||
|
@ -340,11 +345,13 @@ void Host::startAnomalyDetectionThreads() {
|
|||
|
||||
char Tag[NETDATA_THREAD_TAG_MAX + 1];
|
||||
|
||||
// #define ML_DISABLE_JOINING
|
||||
|
||||
snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLTR[%s]", rrdhost_hostname(RH));
|
||||
netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, train_main, static_cast<void *>(this));
|
||||
netdata_thread_create(&TrainingThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, train_main, static_cast<void *>(this));
|
||||
|
||||
snprintfz(Tag, NETDATA_THREAD_TAG_MAX, "MLDT[%s]", rrdhost_hostname(RH));
|
||||
netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_DEFAULT, detect_main, static_cast<void *>(this));
|
||||
netdata_thread_create(&DetectionThread, Tag, NETDATA_THREAD_OPTION_JOINABLE, detect_main, static_cast<void *>(this));
|
||||
}
|
||||
|
||||
void Host::stopAnomalyDetectionThreads(bool join) {
|
||||
|
@ -362,7 +369,7 @@ void Host::stopAnomalyDetectionThreads(bool join) {
|
|||
netdata_thread_cancel(DetectionThread);
|
||||
}
|
||||
|
||||
if(join && !ThreadsJoined) {
|
||||
if (join && !ThreadsJoined) {
|
||||
ThreadsJoined = true;
|
||||
ThreadsRunning = false;
|
||||
|
||||
|
@ -374,7 +381,7 @@ void Host::stopAnomalyDetectionThreads(bool join) {
|
|||
// to enable again:
|
||||
// NETDATA_THREAD_OPTION_DEFAULT needs to become NETDATA_THREAD_OPTION_JOINABLE
|
||||
|
||||
//netdata_thread_join(TrainingThread, nullptr);
|
||||
//netdata_thread_join(DetectionThread, nullptr);
|
||||
netdata_thread_join(TrainingThread, nullptr);
|
||||
netdata_thread_join(DetectionThread, nullptr);
|
||||
}
|
||||
}
|
||||
|
|
11
ml/Queue.h
11
ml/Queue.h
|
@ -32,8 +32,15 @@ public:
|
|||
while (Q.empty()) {
|
||||
pthread_cond_wait(&CV, M.inner());
|
||||
|
||||
if (Exit)
|
||||
pthread_exit(nullptr);
|
||||
if (Exit) {
|
||||
// This should happen only when we are destroying a host.
|
||||
// Callers should use a flag dedicated to checking if we
|
||||
// are about to delete the host or exit the agent. The original
|
||||
// implementation would call pthread_exit which would cause
|
||||
// the queue's mutex to be destroyed twice (and fail on the
|
||||
// 2nd time)
|
||||
return { T(), 0 };
|
||||
}
|
||||
}
|
||||
|
||||
T V = Q.front();
|
||||
|
|
Loading…
Reference in New Issue