Anomaly Detection MVP (#11548)

* Add support for feature extraction and K-Means clustering.

This patch adds support for performing feature extraction and running the
K-Means clustering algorithm on the extracted features.

We use the open-source dlib library to compute the K-Means clustering
centers, which has been added as a new git submodule.

The build system has been updated to recognize two new options:

    1) --enable-ml: build an agent with ml functionality, and
    2) --enable-ml-tests: support running tests with the `-W mltest`
       option in netdata.

The second flag is meant only for internal use. To build tests successfully,
you need to install the GoogleTest framework on your machine.

* Boilerplate code to track hosts/dims and init ML config options.

A new opaque pointer field is added to the database's host and dimension
data structures. The fields point to C++ wrapper classes that will be used
to store ML-related information in follow-up patches.

The ML functionality needs to iterate all tracked dimensions twice per
second. To avoid locking the entire DB multiple times, we use a
separate dictionary to add/remove dimensions as they are created/deleted
by the database.

A global configuration object is initialized during the startup of the
agent. It will allow our users to specify ML-related configuration
options, eg. hosts/charts to skip from training, etc.

* Add support for training and prediction of dimensions.

Every new host spawns a training thread which is used to train the model
of each dimension.

Training of dimensions is done in a non-batching mode in order to avoid
impacting the generated ML model by the CPU, RAM and disk utilization of
the training code itself.

For performance reasons, prediction is done at the time a new value
is pushed in the database. The alternative option, ie. maintaining a
separate thread for prediction, would be ~3-4x times slower and would
increase locking contention considerably.

For similar reasons, we use a custom function to unpack storage_numbers
into doubles, instead of long doubles.

* Add data structures required by the anomaly detector.

This patch adds two data structures that will be used by the anomaly
detector in follow-up patches.

The first data structure is a circular bit buffer which is being used to
count the number of set bits over time.

The second data structure represents an expandable, rolling window that
tracks set/unset bits. It is explicitly modeled as a finite-state
machine in order to make the anomaly detector's behaviour easier to test
and reason about.

* Add anomaly detection thread.

This patch creates a new anomaly detection thread per host. Each thread
maintains a BitRateWindow which is updated every second based on the
anomaly status of the correspondent host.

Based on the updated status of the anomaly window, we can identify the
existence/absence of an anomaly event, it's start/end time and the
dimensions that participate in it.

* Create/insert/query anomaly events from Sqlite DB.

* Create anomaly event endpoints.

This patch adds two endpoints to expose information about anomaly
events. The first endpoint returns the list of anomalous events within a
specified time range. The second endpoint provides detailed information
about a single anomaly event, ie. the list of anomalous dimensions in
that event along with their anomaly rate.

The `anomaly-bit` option has been added to the `/data` endpoint in order
to allow users to get the anomaly status of individual dimensions per
second.

* Fix build failures on Ubuntu 16.04 & CentOS 7.

These distros do not have toolchains with C++11 enabled by default.
Replacing nullptr with NULL should be fix the build problems on these
platforms when the ML feature is not enabled.

* Fix `make dist` to include ML makefiles and dlib sources.

Currently, we add ml/kmeans/dlib to EXTRA_DIST. We might want to
generate an explicit list of source files in the future, in order to
bring down the generated archive's file size.

* Small changes to make the LGTM & Codacy bots happy.

- Cast unused result of function calls to void.
- Pass a const-ref string to Database's constructor.
- Reduce the scope of a local variable in the anomaly detector.

* Add user configuration option to enable/disable anomaly detection.

* Do not log dimension-specific operations.

Training and prediction operations happen every second for each
dimension. In prep for making this PR easier to run anomaly detection
for many charts & dimensions, I've removed logs that would cause log
flooding.

* Reset dimensions' bit counter when not above anomaly rate threshold.

* Update the default config options with real values.

With this patch the default configuration options will match the ones
we want our users to use by default.

* Update conditions for creating new ML dimensions.

1. Skip dimensions with update_every != 1,
2. Skip dimensions that come from the ML charts.

With this filtering in place, any configuration value for the
relevant simple_pattern expressions will work correctly.

* Teach buildinfo{,json} about the ML feature.

* Set --enable-ml by default in the configuration options.

This patch is only meant for testing the building of the ML functionality
on Github. It will be reverted once tests pass successfully.

* Minor build system fixes.

- Add path to json header
- Enable C++ linker when ML functionality is enabled
- Rename ml/ml-dummy.cc to ml/ml-dummy.c

* Revert "Set --enable-ml by default in the configuration options."

This reverts commit 28206952a59a577675c86194f2590ec63b60506c.

We pass all Github checks when building the ML functionality, except for
those that run on CentOS 7 due to not having a C++11 toolchain.

* Check for missing dlib and nlohmann files.

We simply check the single-source files upon which our build system
depends. If they are missing, an error message notifies the user
about missing git submodules which are required for the ML
functionality.

* Allow users to specify the maximum number of KMeans iterations.

* Use dlib v19.10

v19.22 broke compatibility with CentOS 7's g++. Development of the
anomaly detection used v19.10, which is the version used by most Debian and
Ubuntu distribution versions that are not past EOL.

No observable performance improvements/regressions specific to the K-Means
algorithm occur between the two versions.

* Detect and use the -std=c++11 flag when building anomaly detection.

This patch automatically adds the -std=c++11 when building netdata
with the ML functionality, if it's supported by the user's toolchain.

With this change we are able to build the agent correctly on CentOS 7.

* Restructure configuration options.

- update default values,
- clamp values to min/max defaults,
- validate and identify conflicting values.

* Add update_every configuration option.

Considerring that the MVP does not support per host configuration
options, the update_every option will be used to filter hosts to train.

With this change anomaly detection will be supported on:

    - Single nodes with update_every != 1, and
    - Children nodes with a common update_every value that might differ from
      the value of the parent node.

* Reorganize anomaly detection charts.

This follows Andrew's suggestion to have four charts to show the number
of anomalous/normal dimensions, the anomaly rate, the detector's window
length, and the events that occur in the prediction step.

Context and family values, along with the necessary information in the
dashboard_info.js file, will be updated in a follow-up commit.

* Do not dump anomaly event info in logs.

* Automatically handle low "train every secs" configuration values.

If a user specifies a very low value for the "train every secs", then
it is possible that the time it takes to train a dimension is higher
than the its allotted time.

In that case, we want the training thread to:

    - Reduce it's CPU usage per second, and
    - Allow the prediction thread to proceed.

We achieve this by limiting the training time of a single dimension to
be equal to half the time allotted to it. This means, that the training
thread will never consume more than 50% of a single core.

* Automatically detect if ML functionality should be enabled.

With these changes, we enable ML if:

    - The user has not explicitly specified --disable-ml, and
    - Git submodules have been checked out properly, and
    - The toolchain supports C++11.

If the user has explicitly specified --enable-ml, the build fails if
git submodules are missing, or the toolchain does not support C++11.

* Disable anomaly detection by default.

* Do not update charts in locked region.

* Cleanup code reading configuration options.

* Enable C++ linker when building ML.

* Disable ML functionality for CMake builds.

* Skip LGTM for dlib and nlohmann libraries.

* Do not build ML if libuuid is missing.

* Fix dlib path in LGTM's yaml config file.

* Add chart to track duration of prediction step.

* Add chart to track duration of training step.

* Limit the number dimensions in an anomaly event.

This will ensure our JSON results won't grow without any limit. The
default ML configuration options, train approximately ~1700 dimensions
in a newly-installed Netdata agent. The hard-limit is set to 2000
dimensions which:

    - Is well above the default number of dimensions we train,
    - If it is ever reached it means that the user had accidentaly a
      very low anomaly rate threshold, and
    - Considering that we sort the result by anomaly score, the cutoff
      dimensions will be the less anomalous, ie. the least important to
      investigate.

* Add information about the ML charts.

* Update family value in ML charts.

This fix will allow us to show the individual charts in the RHS Anomaly
Detection submenu.

* Rename chart type

s/anomalydetection/anomaly_detection/g

* Expose ML feat in /info endpoint.

* Export ML config through /info endpoint.

* Fix CentOS 7 build.

* Reduce the critical region of a host's lock.

Before this change, each host had a single, dedicated lock to protect
its map of dimensions from adding/deleting new dimensions while training
and detecting anomalies. This was problematic because training of a
single dimension can take several seconds in nodes that are under heavy
load.

After this change, the host's lock protects only the insertion/deletion
of new dimensions, and the prediction step. For the training of dimensions
we use a dedicated lock per dimension, which is responsible for protecting
the dimension from deletion while training.

Prediction is fast enough, even on slow machines or under heavy load,
which allows us to use the host's main lock and avoid increasing the
complexity of our implementation in the anomaly detector.

* Improve the way we are tracking anomaly detector's performance.

This change allows us to:

    - track the total training time per update_every period,
    - track the maximum training time of a single dimension per
      update_every period, and
    - export the current number of total, anomalous, normal dimensions
      to the /info endpoint.

Also, now that we use dedicated locks per dimensions, we can train under
heavy load continuously without having to sleep in order to yield the
training thread and allow the prediction thread to progress.

* Use samples instead of seconds in ML configuration.

This commit changes the way we are handling input ML configuration
options from the user. Instead of treating values as seconds, we
interpret all inputs as number of update_every periods. This allows
us to enable anomaly detection on hosts that have update_every != 1
second, and still produce a model for training/prediction & detection
that behaves in an expected way.

Tested by running anomaly detection on an agent with update_every = [1,
2, 4] seconds.

* Remove unecessary log message in detection thread

* Move ML configuration to global section.

* Update web/gui/dashboard_info.js

Co-authored-by: Andrew Maguire <andrewm4894@gmail.com>

* Fix typo

Co-authored-by: Andrew Maguire <andrewm4894@gmail.com>

* Rebase.

* Use negative logic for anomaly bit.

* Add info for prediction_stats and training_stats charts.

* Disable ML on PPC64EL.

The CI test fails with -std=c++11 and requires -std=gnu++11 instead.
However, it's not easy to quickly append the required flag to CXXFLAGS.
For the time being, simply disable ML on PPC64EL and if any users
require this functionality we can fix it in the future.

* Add comment on why we disable ML on PPC64EL.

Co-authored-by: Andrew Maguire <andrewm4894@gmail.com>
This commit is contained in:
vkalintiris 2021-10-27 09:26:21 +03:00 committed by GitHub
parent a350fb92f9
commit 9ed4cea590
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 3143 additions and 21 deletions

View File

@ -57,7 +57,7 @@ jobs:
- name: Configure
run: |
autoreconf -ivf
./configure
./configure --disable-ml
# XXX: Work-around for bug with libbson-1.0 in Ubuntu 18.04
# See: https://bugs.launchpad.net/ubuntu/+source/libmongoc/+bug/1790771
# https://jira.mongodb.org/browse/CDRIVER-2818

9
.gitmodules vendored
View File

@ -4,3 +4,12 @@
[submodule "aclk/aclk-schemas"]
path = aclk/aclk-schemas
url = https://github.com/netdata/aclk-schemas.git
[submodule "ml/kmeans/dlib"]
path = ml/kmeans/dlib
url = https://github.com/davisking/dlib.git
shallow = true
ignore = dirty
[submodule "ml/json"]
path = ml/json
url = https://github.com/nlohmann/json.git
shallow = true

View File

@ -17,6 +17,8 @@ path_classifiers:
- collectors/node.d.plugin/node_modules/extend.js
- collectors/node.d.plugin/node_modules/net-snmp.js
- collectors/node.d.plugin/node_modules/pixl-xml.js
- ml/kmeans/dlib/
- ml/json/
- web/gui/lib/
- web/gui/src/
- web/gui/css/

View File

@ -945,6 +945,11 @@ set(DAEMON_FILES
daemon/unit_test.h
)
set(ML_FILES
ml/ml.h
ml/ml-dummy.c
)
set(NETDATA_FILES
collectors/all.h
${DAEMON_FILES}
@ -954,6 +959,7 @@ set(NETDATA_FILES
${CHECKS_PLUGIN_FILES}
${HEALTH_PLUGIN_FILES}
${IDLEJITTER_PLUGIN_FILES}
${ML_FILES}
${PLUGINSD_PLUGIN_FILES}
${RRD_PLUGIN_FILES}
${REGISTRY_PLUGIN_FILES}

View File

@ -39,6 +39,7 @@ EXTRA_DIST = \
build/m4/ax_c_mallopt.m4 \
build/m4/tcmalloc.m4 \
build/m4/ax_c__generic.m4 \
ml/kmeans/dlib \
README.md \
LICENSE \
REDISTRIBUTED.md \
@ -117,6 +118,7 @@ SUBDIRS += \
claim \
parser \
spawn \
ml \
$(NULL)
if ENABLE_ACLK
@ -233,6 +235,44 @@ HEALTH_PLUGIN_FILES = \
health/health_log.c \
$(NULL)
ML_FILES = \
ml/ml.h \
ml/ml-dummy.c \
$(NULL)
if ENABLE_ML
ML_FILES += \
ml/BitBufferCounter.h \
ml/BitBufferCounter.cc \
ml/BitRateWindow.h \
ml/BitRateWindow.cc \
ml/Config.h \
ml/Config.cc \
ml/Database.h \
ml/Database.cc \
ml/Dimension.cc \
ml/Dimension.h \
ml/Host.h \
ml/Host.cc \
ml/Query.h \
ml/kmeans/KMeans.h \
ml/kmeans/KMeans.cc \
ml/kmeans/SamplesBuffer.h \
ml/kmeans/SamplesBuffer.cc \
ml/kmeans/dlib/dlib/all/source.cpp \
ml/json/single_include/nlohmann/json.hpp \
ml/ml.cc \
ml/ml-private.h \
$(NULL)
endif
if ENABLE_ML_TESTS
ML_TESTS_FILES = \
ml/kmeans/Tests.cc \
ml/Tests.cc \
$(NULL)
endif
IDLEJITTER_PLUGIN_FILES = \
collectors/idlejitter.plugin/plugin_idlejitter.c \
collectors/idlejitter.plugin/plugin_idlejitter.h \
@ -863,6 +903,8 @@ NETDATA_FILES = \
$(EXPORTING_ENGINE_FILES) \
$(CHECKS_PLUGIN_FILES) \
$(HEALTH_PLUGIN_FILES) \
$(ML_FILES) \
$(ML_TESTS_FILES) \
$(IDLEJITTER_PLUGIN_FILES) \
$(PLUGINSD_PLUGIN_FILES) \
$(REGISTRY_PLUGIN_FILES) \
@ -944,6 +986,11 @@ if ACLK_NG
$(NULL)
endif
if ENABLE_ML_TESTS
netdata_LDADD += $(OPTIONAL_ML_TESTS_LIBS) \
$(NULL)
endif
if ACLK_LEGACY
netdata_LDADD += \
$(abs_top_srcdir)/externaldeps/mosquitto/libmosquitto.a \

View File

@ -5,12 +5,6 @@
#include "libnetdata/libnetdata.h"
#include "mqtt_wss_client.h"
// CentOS 7 has older version that doesn't define this
// same goes for MacOS
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
#endif
// Helper stuff which should not have any further inside ACLK dependency
// and are supposed not to be needed outside of ACLK

View File

@ -184,6 +184,18 @@ AC_ARG_WITH(
[with_bundled_protobuf="$withval"],
[with_bundled_protobuf="detect"]
)
AC_ARG_ENABLE(
[ml],
[AS_HELP_STRING([--enable-ml], [Enable anomaly detection @<:@default autodetect@:>@])],
,
[enable_ml="detect"]
)
AC_ARG_ENABLE(
[ml_tests],
[AS_HELP_STRING([--enable-ml-tests], [Enable anomaly detection tests @<:@no@:>@])],
[enable_ml_tests="yes"],
[enable_ml_tests="no"]
)
# -----------------------------------------------------------------------------
# Enforce building with C99, bail early if we can't.
@ -1189,6 +1201,90 @@ fi
AC_MSG_RESULT([${enable_plugin_perf}])
AM_CONDITIONAL([ENABLE_PLUGIN_PERF], [test "${enable_plugin_perf}" = "yes"])
# -----------------------------------------------------------------------------
# gtest/gmock
AC_MSG_CHECKING([if gtest and gmock can be found])
PKG_CHECK_MODULES([GTEST], [gtest], [have_gtest=yes], [have_gtest=no])
PKG_CHECK_MODULES([GMOCK], [gmock], [have_gmock=yes], [have_gmock=no])
if test "${have_gtest}" = "yes" -a "${have_gmock}" = "yes"; then
OPTIONAL_GTEST_CFLAGS="${GTEST_CFLAGS} ${GMOCK_CFLAGS}"
OPTIONAL_GTEST_LIBS="${GTEST_LIBS} ${GMOCK_LIBS}"
have_gtest="yes"
else
have_gtest="no"
fi
# -----------------------------------------------------------------------------
# ml - anomaly detection
# Check if uuid is availabe. Fail if ML was explicitly requested.
if test "${enable_ml}" = "yes" -a "${have_uuid}" != "yes"; then
AC_MSG_ERROR([You have explicitly requested --enable-ml functionality but libuuid can not be found."])
fi
# Check if submodules have not been fetched. Fail if ML was explicitly requested.
AC_MSG_CHECKING([if git submodules are present for machine learning functionality])
if test -f "ml/kmeans/dlib/dlib/all/source.cpp" -a -f "ml/json/single_include/nlohmann/json.hpp"; then
AC_MSG_RESULT([yes])
have_ml_submodules="yes"
else
AC_MSG_RESULT([no])
have_ml_submodules="no"
fi
if test "${enable_ml}" = "yes" -a "${have_ml_submodules}" = "no"; then
AC_MSG_ERROR([You have explicitly requested --enable-ml functionality but it cannot be built because the required git submodules are missing.])
fi
# Check if C++ toolchain does not support C++11. Fail if ML was explicitly requested.
AC_LANG_PUSH([C++])
AX_CHECK_COMPILE_FLAG([-std=c++11], [have_cxx11=yes], [have_cxx11=no])
AC_LANG_POP([C++])
# PPC64LE needs -std=gnu++11 in order to build dlib. However, the rest of
# the agent's components use and have been tested only with -std=c++11.
# Skip ML compilation on that CPU until we reorganize and test the C++ flags.
if test "${host_cpu}" = "powerpc64le"; then
have_cxx11="no"
fi
if test "${enable_ml}" = "yes" -a "${have_cxx11}" = "no"; then
AC_MSG_ERROR([You have explicitly requested --enable-ml functionality but it cannot be built without a C++11 toolchain.])
else
CXX11FLAG="$CXX11FLAG -std=c++11"
fi
# Decide if we should build ML
if test "${enable_ml}" != "no" -a "${have_ml_submodules}" = "yes" -a "${have_cxx11}" = "yes" -a "${have_uuid}" = "yes"; then
build_ml="yes"
else
build_ml="no"
fi
AM_CONDITIONAL([ENABLE_ML], [test "${build_ml}" = "yes"])
if test "${build_ml}" = "yes"; then
AC_DEFINE([ENABLE_ML], [1], [anomaly detection usability])
OPTIONAL_ML_CFLAGS="-DDLIB_NO_GUI_SUPPORT -I \$(abs_top_srcdir)/ml/kmeans/dlib"
OPTIONAL_ML_LIBS=""
fi
# Decide if we should build ML tests.
if test "${build_ml}" = "yes" -a "${enable_ml_tests}" = "yes" -a "${have_gtest}" = "yes"; then
build_ml_tests="yes"
else
build_ml_tests="no"
fi
AM_CONDITIONAL([ENABLE_ML_TESTS], [test "${build_ml_tests}" = "yes"])
if test "${build_ml_tests}" = "yes"; then
AC_DEFINE([ENABLE_ML_TESTS], [1], [anomaly detection tests])
OPTIONAL_ML_TESTS_CFLAGS="${OPTIONAL_GTEST_CFLAGS}"
OPTIONAL_ML_TESTS_LIBS="${OPTIONAL_GTEST_LIBS}"
fi
# -----------------------------------------------------------------------------
# ebpf.plugin
@ -1557,7 +1653,8 @@ AC_MSG_RESULT([${enable_lto}])
AM_CONDITIONAL([ENABLE_CXX_LINKER], [test "${enable_backend_kinesis}" = "yes" \
-o "${enable_exporting_pubsub}" = "yes" \
-o "${enable_backend_prometheus_remote_write}" = "yes" \
-o "${new_cloud_protocol}" = "yes"])
-o "${new_cloud_protocol}" = "yes" \
-o "${build_ml}" = "yes"])
AC_DEFINE_UNQUOTED([NETDATA_USER], ["${with_user}"], [use this user to drop privileged])
@ -1589,7 +1686,7 @@ CFLAGS="${CFLAGS} ${OPTIONAL_PROTOBUF_CFLAGS} ${OPTIONAL_MATH_CFLAGS} ${OPTIONAL
${OPTIONAL_LIBCAP_CFLAGS} ${OPTIONAL_IPMIMONITORING_CFLAGS} ${OPTIONAL_CUPS_CFLAGS} ${OPTIONAL_XENSTAT_FLAGS} \
${OPTIONAL_KINESIS_CFLAGS} ${OPTIONAL_PUBSUB_CFLAGS} ${OPTIONAL_PROMETHEUS_REMOTE_WRITE_CFLAGS} \
${OPTIONAL_MONGOC_CFLAGS} ${LWS_CFLAGS} ${OPTIONAL_JSONC_STATIC_CFLAGS} ${OPTIONAL_BPF_CFLAGS} ${OPTIONAL_JUDY_CFLAGS} \
${OPTIONAL_ACLK_NG_CFLAGS}"
${OPTIONAL_ACLK_NG_CFLAGS} ${OPTIONAL_ML_CFLAGS} ${OPTIONAL_ML_TESTS_CFLAGS}"
CXXFLAGS="${CFLAGS} ${CXX11FLAG}"
@ -1642,6 +1739,12 @@ AC_SUBST([OPTIONAL_LWS_LIBS])
AC_SUBST([OPTIONAL_ACLK_NG_CFLAGS])
AC_SUBST([OPTIONAL_PROTOBUF_CFLAGS])
AC_SUBST([OPTIONAL_PROTOBUF_LIBS])
AC_SUBST([OPTIONAL_GTEST_CFLAGS])
AC_SUBST([OPTIONAL_GTEST_LIBS])
AC_SUBST([OPTIONAL_ML_CFLAGS])
AC_SUBST([OPTIONAL_ML_LIBS])
AC_SUBST([OPTIONAL_ML_TESTS_CFLAGS])
AC_SUBST([OPTIONAL_ML_TESTS_LIBS])
# -----------------------------------------------------------------------------
# Check if cmocka is available - needed for unit testing
@ -1731,6 +1834,8 @@ AC_CONFIG_FILES([
exporting/tests/Makefile
health/Makefile
health/notifications/Makefile
ml/Makefile
ml/kmeans/Makefile
libnetdata/Makefile
libnetdata/tests/Makefile
libnetdata/adaptive_resortable_list/Makefile

View File

@ -37,6 +37,12 @@
#define FEAT_NATIVE_HTTPS 0
#endif
#ifdef ENABLE_ML
#define FEAT_ML 1
#else
#define FEAT_ML 0
#endif
// Optional libraries
#ifdef ENABLE_JSONC
@ -224,6 +230,7 @@ void print_build_info(void) {
printf(" ACLK-NG New Cloud Protocol: %s\n", FEAT_YES_NO(NEW_CLOUD_PROTO));
printf(" ACLK Legacy: %s\n", FEAT_YES_NO(FEAT_ACLK_LEGACY));
printf(" TLS Host Verification: %s\n", FEAT_YES_NO(FEAT_TLS_HOST_VERIFY));
printf(" Machine Learning: %s\n", FEAT_YES_NO(FEAT_ML));
printf("Libraries:\n");
printf(" jemalloc: %s\n", FEAT_YES_NO(FEAT_JEMALLOC));
@ -282,7 +289,8 @@ void print_build_info_json(void) {
printf(" \"aclk-ng-new-cloud-proto\": %s,\n", FEAT_JSON_BOOL(NEW_CLOUD_PROTO));
printf(" \"aclk-legacy\": %s,\n", FEAT_JSON_BOOL(FEAT_ACLK_LEGACY));
printf(" \"tls-host-verify\": %s\n", FEAT_JSON_BOOL(FEAT_TLS_HOST_VERIFY));
printf(" \"tls-host-verify\": %s,\n", FEAT_JSON_BOOL(FEAT_TLS_HOST_VERIFY));
printf(" \"machine-learning\": %s\n", FEAT_JSON_BOOL(FEAT_ML));
printf(" },\n");
printf(" \"libs\": {\n");
@ -337,6 +345,7 @@ void analytics_build_info(BUFFER *b) {
if(NEW_CLOUD_PROTO) buffer_strcat (b, "|New Cloud Protocol Support");
if(FEAT_ACLK_LEGACY) buffer_strcat (b, "|ACLK Legacy");
if(FEAT_TLS_HOST_VERIFY) buffer_strcat (b, "|TLS Host Verification");
if(FEAT_ML) buffer_strcat (b, "|Machine Learning");
if(FEAT_JEMALLOC) buffer_strcat (b, "|jemalloc");
if(FEAT_JSONC) buffer_strcat (b, "|JSON-C");

View File

@ -44,6 +44,9 @@
// health monitoring and alarm notifications
#include "health/health.h"
// anomaly detection
#include "ml/ml.h"
// the netdata registry
// the registry is actually an API feature
#include "registry/registry.h"

View File

@ -826,6 +826,11 @@ int main(int argc, char **argv) {
fprintf(stderr, "\n\nALL TESTS PASSED\n\n");
return 0;
}
#ifdef ENABLE_ML_TESTS
else if(strcmp(optarg, "mltest") == 0) {
return test_ml(argc, argv);
}
#endif
#ifdef ENABLE_DBENGINE
else if(strncmp(optarg, createdataset_string, strlen(createdataset_string)) == 0) {
optarg += strlen(createdataset_string);
@ -1144,6 +1149,10 @@ int main(int argc, char **argv) {
set_silencers_filename();
health_initialize_global_silencers();
// --------------------------------------------------------------------
// Initialize ML configuration
ml_init();
// --------------------------------------------------------------------
// setup process signals

View File

@ -3,6 +3,8 @@
#ifndef NETDATA_RRDENGINELIB_H
#define NETDATA_RRDENGINELIB_H
#include "libnetdata/libnetdata.h"
/* Forward declarations */
struct rrdeng_page_descr;
struct rrdengine_instance;
@ -12,10 +14,6 @@ struct rrdengine_instance;
#define BITS_PER_ULONG (sizeof(unsigned long) * 8)
#ifndef UUID_STR_LEN
#define UUID_STR_LEN (37)
#endif
/* Taken from linux kernel */
#define BUILD_BUG_ON(condition) ((void)sizeof(char[1 - 2*!!(condition)]))
@ -141,4 +139,4 @@ extern char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, si
extern int compute_multidb_diskspace();
extern int is_legacy_child(const char *machine_guid);
#endif /* NETDATA_RRDENGINELIB_H */
#endif /* NETDATA_RRDENGINELIB_H */

View File

@ -15,6 +15,9 @@ typedef struct rrdcalctemplate RRDCALCTEMPLATE;
typedef struct alarm_entry ALARM_ENTRY;
typedef struct context_param CONTEXT_PARAM;
typedef void *ml_host_t;
typedef void *ml_dimension_t;
// forward declarations
struct rrddim_volatile;
struct rrdset_volatile;
@ -421,6 +424,8 @@ struct rrddim_volatile {
// get the timestamp of the first entry of this metric
time_t (*oldest_time)(RRDDIM *rd);
} query_ops;
ml_dimension_t ml_dimension;
};
// ----------------------------------------------------------------------------
@ -876,6 +881,10 @@ struct rrdhost {
netdata_rwlock_t rrdhost_rwlock; // lock for this RRDHOST (protects rrdset_root linked list)
// ------------------------------------------------------------------------
// ML handle
ml_host_t ml_host;
// ------------------------------------------------------------------------
// Support for host-level labels
struct label_index labels;

View File

@ -386,7 +386,7 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
rd->last_collected_time.tv_sec = 0;
rd->last_collected_time.tv_usec = 0;
rd->rrdset = st;
rd->state = mallocz(sizeof(*rd->state));
rd->state = callocz(1, sizeof(*rd->state));
#ifdef ENABLE_ACLK
rd->state->aclk_live_status = -1;
#endif
@ -454,6 +454,8 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
calc_link_to_rrddim(rd);
ml_new_dimension(rd);
rrdset_unlock(st);
#ifdef ENABLE_ACLK
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
@ -466,6 +468,8 @@ RRDDIM *rrddim_add_custom(RRDSET *st, const char *id, const char *name, collecte
void rrddim_free_custom(RRDSET *st, RRDDIM *rd, int db_rotated)
{
ml_delete_dimension(rd);
#ifndef ENABLE_ACLK
UNUSED(db_rotated);
#endif

View File

@ -382,6 +382,8 @@ RRDHOST *rrdhost_create(const char *hostname,
else localhost = host;
}
ml_new_host(host);
info("Host '%s' (at registry as '%s') with guid '%s' initialized"
", os '%s'"
", timezone '%s'"
@ -906,6 +908,8 @@ void rrdhost_free(RRDHOST *host) {
rrdeng_exit(host->rrdeng_ctx);
#endif
ml_delete_host(host);
// ------------------------------------------------------------------------
// remove it from the indexes

View File

@ -1237,13 +1237,22 @@ static inline size_t rrdset_done_interpolate(
}
if(unlikely(!store_this_entry)) {
(void) ml_is_anomalous(rd, 0, false);
rd->state->collect_ops.store_metric(rd, next_store_ut, SN_EMPTY_SLOT);
// rd->values[current_entry] = SN_EMPTY_SLOT;
continue;
}
if(likely(rd->updated && rd->collections_counter > 1 && iterations < st->gap_when_lost_iterations_above)) {
rd->state->collect_ops.store_metric(rd, next_store_ut, pack_storage_number(new_value, storage_flags));
uint32_t dim_storage_flags = storage_flags;
if (ml_is_anomalous(rd, new_value, true)) {
// clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
dim_storage_flags &= ~ ((uint32_t) SN_ANOMALY_BIT);
}
rd->state->collect_ops.store_metric(rd, next_store_ut, pack_storage_number(new_value, dim_storage_flags));
// rd->values[current_entry] = pack_storage_number(new_value, storage_flags );
rd->last_stored_value = new_value;
@ -1255,9 +1264,9 @@ static inline size_t rrdset_done_interpolate(
, unpack_storage_number(rd->values[current_entry]), new_value
);
#endif
}
else {
(void) ml_is_anomalous(rd, 0, false);
#ifdef NETDATA_INTERNAL_CHECKS
rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING "

View File

@ -796,6 +796,7 @@ void appconfig_generate(struct config *root, BUFFER *wb, int only_changed)
|| !strcmp(co->name, CONFIG_SECTION_BACKEND)
|| !strcmp(co->name, CONFIG_SECTION_STREAM)
|| !strcmp(co->name, CONFIG_SECTION_HOST_LABEL)
|| !strcmp(co->name, CONFIG_SECTION_ML)
)
pri = 0;
else if(!strncmp(co->name, "plugin:", 7)) pri = 1;

View File

@ -91,6 +91,7 @@
#define CONFIG_SECTION_HEALTH "health"
#define CONFIG_SECTION_BACKEND "backend"
#define CONFIG_SECTION_STREAM "stream"
#define CONFIG_SECTION_ML "ml"
#define CONFIG_SECTION_EXPORTING "exporting:global"
#define CONFIG_SECTION_PROMETHEUS "prometheus:exporter"
#define CONFIG_SECTION_HOST_LABEL "host labels"

View File

@ -53,6 +53,7 @@ extern "C" {
#include <pthread.h>
#include <errno.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
@ -90,6 +91,12 @@ extern "C" {
#include <uv.h>
#include <assert.h>
// CentOS 7 has older version that doesn't define this
// same goes for MacOS
#ifndef UUID_STR_LEN
#define UUID_STR_LEN (37)
#endif
#ifdef HAVE_NETINET_IN_H
#include <netinet/in.h>
#endif

29
ml/BitBufferCounter.cc Normal file
View File

@ -0,0 +1,29 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "BitBufferCounter.h"
using namespace ml;
std::vector<bool> BitBufferCounter::getBuffer() const {
std::vector<bool> Buffer;
for (size_t Idx = start(); Idx != (start() + size()); Idx++)
Buffer.push_back(V[Idx % V.size()]);
return Buffer;
}
void BitBufferCounter::insert(bool Bit) {
if (N >= V.size())
NumSetBits -= (V[start()] == true);
NumSetBits += (Bit == true);
V[N++ % V.size()] = Bit;
}
void BitBufferCounter::print(std::ostream &OS) const {
std::vector<bool> Buffer = getBuffer();
for (bool B : Buffer)
OS << B;
}

54
ml/BitBufferCounter.h Normal file
View File

@ -0,0 +1,54 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef BIT_BUFFER_COUNTER_H
#define BIT_BUFFER_COUNTER_H
#include "ml-private.h"
namespace ml {
class BitBufferCounter {
public:
BitBufferCounter(size_t Capacity) : V(Capacity, 0), NumSetBits(0), N(0) {}
std::vector<bool> getBuffer() const;
void insert(bool Bit);
void print(std::ostream &OS) const;
bool isFilled() const {
return N >= V.size();
}
size_t numSetBits() const {
return NumSetBits;
}
private:
inline size_t size() const {
return N < V.size() ? N : V.size();
}
inline size_t start() const {
if (N <= V.size())
return 0;
return N % V.size();
}
private:
std::vector<bool> V;
size_t NumSetBits;
size_t N;
};
} // namespace ml
inline std::ostream& operator<<(std::ostream &OS, const ml::BitBufferCounter &BBC) {
BBC.print(OS);
return OS;
}
#endif /* BIT_BUFFER_COUNTER_H */

75
ml/BitRateWindow.cc Normal file
View File

@ -0,0 +1,75 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "BitRateWindow.h"
using namespace ml;
std::pair<BitRateWindow::Edge, size_t> BitRateWindow::insert(bool Bit) {
Edge E;
BBC.insert(Bit);
switch (CurrState) {
case State::NotFilled: {
if (BBC.isFilled()) {
if (BBC.numSetBits() < SetBitsThreshold) {
CurrState = State::BelowThreshold;
} else {
CurrState = State::AboveThreshold;
}
} else {
CurrState = State::NotFilled;
}
E = {State::NotFilled, CurrState};
break;
} case State::BelowThreshold: {
if (BBC.numSetBits() >= SetBitsThreshold) {
CurrState = State::AboveThreshold;
}
E = {State::BelowThreshold, CurrState};
break;
} case State::AboveThreshold: {
if ((BBC.numSetBits() < SetBitsThreshold) ||
(CurrLength == MaxLength)) {
CurrState = State::Idle;
}
E = {State::AboveThreshold, CurrState};
break;
} case State::Idle: {
if (CurrLength == IdleLength) {
CurrState = State::NotFilled;
}
E = {State::Idle, CurrState};
break;
}
}
Action A = EdgeActions[E];
size_t L = (this->*A)(E.first, Bit);
return {E, L};
}
void BitRateWindow::print(std::ostream &OS) const {
switch (CurrState) {
case State::NotFilled:
OS << "NotFilled";
break;
case State::BelowThreshold:
OS << "BelowThreshold";
break;
case State::AboveThreshold:
OS << "AboveThreshold";
break;
case State::Idle:
OS << "Idle";
break;
default:
OS << "UnknownState";
break;
}
OS << ": " << BBC << " (Current Length: " << CurrLength << ")";
}

170
ml/BitRateWindow.h Normal file
View File

@ -0,0 +1,170 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef BIT_RATE_WINDOW_H
#define BIT_RATE_WINDOW_H
#include "BitBufferCounter.h"
#include "ml-private.h"
namespace ml {
class BitRateWindow {
public:
enum class State {
NotFilled,
BelowThreshold,
AboveThreshold,
Idle
};
using Edge = std::pair<State, State>;
using Action = size_t (BitRateWindow::*)(State PrevState, bool NewBit);
private:
std::map<Edge, Action> EdgeActions = {
// From == To
{
Edge(State::NotFilled, State::NotFilled),
&BitRateWindow::onRoundtripNotFilled,
},
{
Edge(State::BelowThreshold, State::BelowThreshold),
&BitRateWindow::onRoundtripBelowThreshold,
},
{
Edge(State::AboveThreshold, State::AboveThreshold),
&BitRateWindow::onRoundtripAboveThreshold,
},
{
Edge(State::Idle, State::Idle),
&BitRateWindow::onRoundtripIdle,
},
// NotFilled => {BelowThreshold, AboveThreshold}
{
Edge(State::NotFilled, State::BelowThreshold),
&BitRateWindow::onNotFilledToBelowThreshold
},
{
Edge(State::NotFilled, State::AboveThreshold),
&BitRateWindow::onNotFilledToAboveThreshold
},
// BelowThreshold => AboveThreshold
{
Edge(State::BelowThreshold, State::AboveThreshold),
&BitRateWindow::onBelowToAboveThreshold
},
// AboveThreshold => Idle
{
Edge(State::AboveThreshold, State::Idle),
&BitRateWindow::onAboveThresholdToIdle
},
// Idle => NotFilled
{
Edge(State::Idle, State::NotFilled),
&BitRateWindow::onIdleToNotFilled
},
};
public:
BitRateWindow(size_t MinLength, size_t MaxLength, size_t IdleLength,
size_t SetBitsThreshold) :
MinLength(MinLength), MaxLength(MaxLength), IdleLength(IdleLength),
SetBitsThreshold(SetBitsThreshold),
CurrState(State::NotFilled), CurrLength(0), BBC(MinLength) {}
std::pair<Edge, size_t> insert(bool Bit);
void print(std::ostream &OS) const;
private:
size_t onRoundtripNotFilled(State PrevState, bool NewBit) {
(void) PrevState, (void) NewBit;
CurrLength += 1;
return CurrLength;
}
size_t onRoundtripBelowThreshold(State PrevState, bool NewBit) {
(void) PrevState, (void) NewBit;
CurrLength = MinLength;
return CurrLength;
}
size_t onRoundtripAboveThreshold(State PrevState, bool NewBit) {
(void) PrevState, (void) NewBit;
CurrLength += 1;
return CurrLength;
}
size_t onRoundtripIdle(State PrevState, bool NewBit) {
(void) PrevState, (void) NewBit;
CurrLength += 1;
return CurrLength;
}
size_t onNotFilledToBelowThreshold(State PrevState, bool NewBit) {
(void) PrevState, (void) NewBit;
CurrLength = MinLength;
return CurrLength;
}
size_t onNotFilledToAboveThreshold(State PrevState, bool NewBit) {
(void) PrevState, (void) NewBit;
CurrLength += 1;
return CurrLength;
}
size_t onBelowToAboveThreshold(State PrevState, bool NewBit) {
(void) PrevState, (void) NewBit;
CurrLength = MinLength;
return CurrLength;
}
size_t onAboveThresholdToIdle(State PrevState, bool NewBit) {
(void) PrevState, (void) NewBit;
size_t PrevLength = CurrLength;
CurrLength = 1;
return PrevLength;
}
size_t onIdleToNotFilled(State PrevState, bool NewBit) {
(void) PrevState, (void) NewBit;
BBC = BitBufferCounter(MinLength);
BBC.insert(NewBit);
CurrLength = 1;
return CurrLength;
}
private:
size_t MinLength;
size_t MaxLength;
size_t IdleLength;
size_t SetBitsThreshold;
State CurrState;
size_t CurrLength;
BitBufferCounter BBC;
};
} // namespace ml
inline std::ostream& operator<<(std::ostream &OS, const ml::BitRateWindow BRW) {
BRW.print(OS);
return OS;
}
#endif /* BIT_RATE_WINDOW_H */

128
ml/Config.cc Normal file
View File

@ -0,0 +1,128 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "Config.h"
#include "ml-private.h"
using namespace ml;
/*
* Global configuration instance to be shared between training and
* prediction threads.
*/
Config ml::Cfg;
template <typename T>
static T clamp(const T& Value, const T& Min, const T& Max) {
return std::max(Min, std::min(Value, Max));
}
/*
* Initialize global configuration variable.
*/
void Config::readMLConfig(void) {
const char *ConfigSectionML = CONFIG_SECTION_ML;
bool EnableAnomalyDetection = config_get_boolean(ConfigSectionML, "enabled", false);
/*
* Read values
*/
unsigned MaxTrainSamples = config_get_number(ConfigSectionML, "maximum num samples to train", 4 * 3600);
unsigned MinTrainSamples = config_get_number(ConfigSectionML, "minimum num samples to train", 1 * 3600);
unsigned TrainEvery = config_get_number(ConfigSectionML, "train every", 1 * 3600);
unsigned DiffN = config_get_number(ConfigSectionML, "num samples to diff", 1);
unsigned SmoothN = config_get_number(ConfigSectionML, "num samples to smooth", 3);
unsigned LagN = config_get_number(ConfigSectionML, "num samples to lag", 5);
unsigned MaxKMeansIters = config_get_number(ConfigSectionML, "maximum number of k-means iterations", 1000);
double DimensionAnomalyScoreThreshold = config_get_float(ConfigSectionML, "dimension anomaly score threshold", 0.99);
double HostAnomalyRateThreshold = config_get_float(ConfigSectionML, "host anomaly rate threshold", 0.01);
double ADMinWindowSize = config_get_float(ConfigSectionML, "minimum window size", 30);
double ADMaxWindowSize = config_get_float(ConfigSectionML, "maximum window size", 600);
double ADIdleWindowSize = config_get_float(ConfigSectionML, "idle window size", 30);
double ADWindowRateThreshold = config_get_float(ConfigSectionML, "window minimum anomaly rate", 0.25);
double ADDimensionRateThreshold = config_get_float(ConfigSectionML, "anomaly event min dimension rate threshold", 0.05);
std::string HostsToSkip = config_get(ConfigSectionML, "hosts to skip from training", "!*");
std::string ChartsToSkip = config_get(ConfigSectionML, "charts to skip from training",
"!system.* !cpu.* !mem.* !disk.* !disk_* "
"!ip.* !ipv4.* !ipv6.* !net.* !net_* !netfilter.* "
"!services.* !apps.* !groups.* !user.* !ebpf.* !netdata.* *");
std::stringstream SS;
SS << netdata_configured_cache_dir << "/anomaly-detection.db";
Cfg.AnomalyDBPath = SS.str();
/*
* Clamp
*/
MaxTrainSamples = clamp(MaxTrainSamples, 1 * 3600u, 6 * 3600u);
MinTrainSamples = clamp(MinTrainSamples, 1 * 3600u, 6 * 3600u);
TrainEvery = clamp(TrainEvery, 1 * 3600u, 6 * 3600u);
DiffN = clamp(DiffN, 0u, 1u);
SmoothN = clamp(SmoothN, 0u, 5u);
LagN = clamp(LagN, 0u, 5u);
MaxKMeansIters = clamp(MaxKMeansIters, 500u, 1000u);
DimensionAnomalyScoreThreshold = clamp(DimensionAnomalyScoreThreshold, 0.01, 5.00);
HostAnomalyRateThreshold = clamp(HostAnomalyRateThreshold, 0.01, 1.0);
ADMinWindowSize = clamp(ADMinWindowSize, 30.0, 300.0);
ADMaxWindowSize = clamp(ADMaxWindowSize, 60.0, 900.0);
ADIdleWindowSize = clamp(ADIdleWindowSize, 30.0, 900.0);
ADWindowRateThreshold = clamp(ADWindowRateThreshold, 0.01, 0.99);
ADDimensionRateThreshold = clamp(ADDimensionRateThreshold, 0.01, 0.99);
/*
* Validate
*/
if (MinTrainSamples >= MaxTrainSamples) {
error("invalid min/max train samples found (%d >= %d)", MinTrainSamples, MaxTrainSamples);
MinTrainSamples = 1 * 3600;
MaxTrainSamples = 4 * 3600;
}
if (ADMinWindowSize >= ADMaxWindowSize) {
error("invalid min/max anomaly window size found (%lf >= %lf)", ADMinWindowSize, ADMaxWindowSize);
ADMinWindowSize = 30.0;
ADMaxWindowSize = 600.0;
}
/*
* Assign to config instance
*/
Cfg.EnableAnomalyDetection = EnableAnomalyDetection;
Cfg.MaxTrainSamples = MaxTrainSamples;
Cfg.MinTrainSamples = MinTrainSamples;
Cfg.TrainEvery = TrainEvery;
Cfg.DiffN = DiffN;
Cfg.SmoothN = SmoothN;
Cfg.LagN = LagN;
Cfg.MaxKMeansIters = MaxKMeansIters;
Cfg.DimensionAnomalyScoreThreshold = DimensionAnomalyScoreThreshold;
Cfg.HostAnomalyRateThreshold = HostAnomalyRateThreshold;
Cfg.ADMinWindowSize = ADMinWindowSize;
Cfg.ADMaxWindowSize = ADMaxWindowSize;
Cfg.ADIdleWindowSize = ADIdleWindowSize;
Cfg.ADWindowRateThreshold = ADWindowRateThreshold;
Cfg.ADDimensionRateThreshold = ADDimensionRateThreshold;
Cfg.SP_HostsToSkip = simple_pattern_create(HostsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT);
Cfg.SP_ChartsToSkip = simple_pattern_create(ChartsToSkip.c_str(), NULL, SIMPLE_PATTERN_EXACT);
}

45
ml/Config.h Normal file
View File

@ -0,0 +1,45 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef ML_CONFIG_H
#define ML_CONFIG_H
#include "ml-private.h"
namespace ml {
class Config {
public:
bool EnableAnomalyDetection;
unsigned MaxTrainSamples;
unsigned MinTrainSamples;
unsigned TrainEvery;
unsigned DiffN;
unsigned SmoothN;
unsigned LagN;
unsigned MaxKMeansIters;
double DimensionAnomalyScoreThreshold;
double HostAnomalyRateThreshold;
double ADMinWindowSize;
double ADMaxWindowSize;
double ADIdleWindowSize;
double ADWindowRateThreshold;
double ADDimensionRateThreshold;
SIMPLE_PATTERN *SP_HostsToSkip;
SIMPLE_PATTERN *SP_ChartsToSkip;
std::string AnomalyDBPath;
void readMLConfig();
};
extern Config Cfg;
} // namespace ml
#endif /* ML_CONFIG_H */

127
ml/Database.cc Normal file
View File

@ -0,0 +1,127 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "Database.h"
const char *ml::Database::SQL_CREATE_ANOMALIES_TABLE =
"CREATE TABLE IF NOT EXISTS anomaly_events( "
" anomaly_detector_name text NOT NULL, "
" anomaly_detector_version int NOT NULL, "
" host_id text NOT NULL, "
" after int NOT NULL, "
" before int NOT NULL, "
" anomaly_event_info text, "
" PRIMARY KEY( "
" anomaly_detector_name, anomaly_detector_version, "
" host_id, after, before "
" ) "
");";
const char *ml::Database::SQL_INSERT_ANOMALY =
"INSERT INTO anomaly_events( "
" anomaly_detector_name, anomaly_detector_version, "
" host_id, after, before, anomaly_event_info) "
"VALUES (?1, ?2, ?3, ?4, ?5, ?6);";
const char *ml::Database::SQL_SELECT_ANOMALY =
"SELECT anomaly_event_info FROM anomaly_events WHERE"
" anomaly_detector_name == ?1 AND"
" anomaly_detector_version == ?2 AND"
" host_id == ?3 AND"
" after == ?4 AND"
" before == ?5;";
const char *ml::Database::SQL_SELECT_ANOMALY_EVENTS =
"SELECT after, before FROM anomaly_events WHERE"
" anomaly_detector_name == ?1 AND"
" anomaly_detector_version == ?2 AND"
" host_id == ?3 AND"
" after >= ?4 AND"
" before <= ?5;";
using namespace ml;
bool Statement::prepare(sqlite3 *Conn) {
if (!Conn)
return false;
if (ParsedStmt)
return true;
int RC = sqlite3_prepare_v2(Conn, RawStmt, -1, &ParsedStmt, nullptr);
if (RC == SQLITE_OK)
return true;
std::string Msg = "Statement \"%s\" preparation failed due to \"%s\"";
error(Msg.c_str(), RawStmt, sqlite3_errstr(RC));
return false;
}
bool Statement::bindValue(size_t Pos, const std::string &Value) {
int RC = sqlite3_bind_text(ParsedStmt, Pos, Value.c_str(), -1, SQLITE_TRANSIENT);
if (RC == SQLITE_OK)
return true;
error("Failed to bind text '%s' (pos = %zu) in statement '%s'.", Value.c_str(), Pos, RawStmt);
return false;
}
bool Statement::bindValue(size_t Pos, const int Value) {
int RC = sqlite3_bind_int(ParsedStmt, Pos, Value);
if (RC == SQLITE_OK)
return true;
error("Failed to bind integer %d (pos = %zu) in statement '%s'.", Value, Pos, RawStmt);
return false;
}
bool Statement::resetAndClear(bool Ret) {
int RC = sqlite3_reset(ParsedStmt);
if (RC != SQLITE_OK) {
error("Could not reset statement: '%s'", RawStmt);
return false;
}
RC = sqlite3_clear_bindings(ParsedStmt);
if (RC != SQLITE_OK) {
error("Could not clear bindings in statement: '%s'", RawStmt);
return false;
}
return Ret;
}
Database::Database(const std::string &Path) {
// Get sqlite3 connection handle.
int RC = sqlite3_open(Path.c_str(), &Conn);
if (RC != SQLITE_OK) {
std::string Msg = "Failed to initialize ML DB at %s, due to \"%s\"";
error(Msg.c_str(), Path.c_str(), sqlite3_errstr(RC));
sqlite3_close(Conn);
Conn = nullptr;
return;
}
// Create anomaly events table if it does not exist.
char *ErrMsg;
RC = sqlite3_exec(Conn, SQL_CREATE_ANOMALIES_TABLE, nullptr, nullptr, &ErrMsg);
if (RC == SQLITE_OK)
return;
error("SQLite error during database initialization, rc = %d (%s)", RC, ErrMsg);
error("SQLite failed statement: %s", SQL_CREATE_ANOMALIES_TABLE);
sqlite3_free(ErrMsg);
sqlite3_close(Conn);
Conn = nullptr;
}
Database::~Database() {
if (!Conn)
return;
int RC = sqlite3_close(Conn);
if (RC != SQLITE_OK)
error("Could not close connection properly (rc=%d)", RC);
}

131
ml/Database.h Normal file
View File

@ -0,0 +1,131 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef ML_DATABASE_H
#define ML_DATABASE_H
#include "Dimension.h"
#include "ml-private.h"
#include "json/single_include/nlohmann/json.hpp"
namespace ml {
class Statement {
public:
using RowCallback = std::function<void(sqlite3_stmt *Stmt)>;
public:
Statement(const char *RawStmt) : RawStmt(RawStmt), ParsedStmt(nullptr) {}
template<typename ...ArgTypes>
bool exec(sqlite3 *Conn, RowCallback RowCb, ArgTypes ...Args) {
if (!prepare(Conn))
return false;
switch (bind(1, Args...)) {
case 0:
return false;
case sizeof...(Args):
break;
default:
return resetAndClear(false);
}
while (true) {
switch (int RC = sqlite3_step(ParsedStmt)) {
case SQLITE_BUSY: case SQLITE_LOCKED:
usleep(SQLITE_INSERT_DELAY * USEC_PER_MS);
continue;
case SQLITE_ROW:
RowCb(ParsedStmt);
continue;
case SQLITE_DONE:
return resetAndClear(true);
default:
error("Stepping through '%s' returned rc=%d", RawStmt, RC);
return resetAndClear(false);
}
}
}
~Statement() {
if (!ParsedStmt)
return;
int RC = sqlite3_finalize(ParsedStmt);
if (RC != SQLITE_OK)
error("Could not properly finalize statement (rc=%d)", RC);
}
private:
bool prepare(sqlite3 *Conn);
bool bindValue(size_t Pos, const int Value);
bool bindValue(size_t Pos, const std::string &Value);
template<typename ArgType, typename ...ArgTypes>
size_t bind(size_t Pos, ArgType T) {
return bindValue(Pos, T);
}
template<typename ArgType, typename ...ArgTypes>
size_t bind(size_t Pos, ArgType T, ArgTypes ...Args) {
return bindValue(Pos, T) + bind(Pos + 1, Args...);
}
bool resetAndClear(bool Ret);
private:
const char *RawStmt;
sqlite3_stmt *ParsedStmt;
};
class Database {
private:
static const char *SQL_CREATE_ANOMALIES_TABLE;
static const char *SQL_INSERT_ANOMALY;
static const char *SQL_SELECT_ANOMALY;
static const char *SQL_SELECT_ANOMALY_EVENTS;
public:
Database(const std::string &Path);
~Database();
template<typename ...ArgTypes>
bool insertAnomaly(ArgTypes... Args) {
Statement::RowCallback RowCb = [](sqlite3_stmt *Stmt) { (void) Stmt; };
return InsertAnomalyStmt.exec(Conn, RowCb, Args...);
}
template<typename ...ArgTypes>
bool getAnomalyInfo(nlohmann::json &Json, ArgTypes&&... Args) {
Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) {
const char *Text = static_cast<const char *>(sqlite3_column_blob(Stmt, 0));
Json = nlohmann::json::parse(Text);
};
return GetAnomalyInfoStmt.exec(Conn, RowCb, Args...);
}
template<typename ...ArgTypes>
bool getAnomaliesInRange(std::vector<std::pair<time_t, time_t>> &V, ArgTypes&&... Args) {
Statement::RowCallback RowCb = [&](sqlite3_stmt *Stmt) {
V.push_back({
sqlite3_column_int64(Stmt, 0),
sqlite3_column_int64(Stmt, 1)
});
};
return GetAnomaliesInRangeStmt.exec(Conn, RowCb, Args...);
}
private:
sqlite3 *Conn;
Statement InsertAnomalyStmt{SQL_INSERT_ANOMALY};
Statement GetAnomalyInfoStmt{SQL_SELECT_ANOMALY};
Statement GetAnomaliesInRangeStmt{SQL_SELECT_ANOMALY_EVENTS};
};
}
#endif /* ML_DATABASE_H */

169
ml/Dimension.cc Normal file
View File

@ -0,0 +1,169 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "Config.h"
#include "Dimension.h"
#include "Query.h"
using namespace ml;
/*
* Copy of the unpack_storage_number which allows us to convert
* a storage_number to double.
*/
static CalculatedNumber unpack_storage_number_dbl(storage_number value) {
if(!value)
return 0;
int sign = 0, exp = 0;
int factor = 10;
// bit 32 = 0:positive, 1:negative
if(unlikely(value & (1 << 31)))
sign = 1;
// bit 31 = 0:divide, 1:multiply
if(unlikely(value & (1 << 30)))
exp = 1;
// bit 27 SN_EXISTS_100
if(unlikely(value & (1 << 26)))
factor = 100;
// bit 26 SN_EXISTS_RESET
// bit 25 SN_ANOMALY_BIT
// bit 30, 29, 28 = (multiplier or divider) 0-7 (8 total)
int mul = (value & ((1<<29)|(1<<28)|(1<<27))) >> 27;
// bit 24 to bit 1 = the value, so remove all other bits
value ^= value & ((1<<31)|(1<<30)|(1<<29)|(1<<28)|(1<<27)|(1<<26)|(1<<25)|(1<<24));
CalculatedNumber CN = value;
if(exp) {
for(; mul; mul--)
CN *= factor;
}
else {
for( ; mul ; mul--)
CN /= 10;
}
if(sign)
CN = -CN;
return CN;
}
std::pair<CalculatedNumber *, size_t>
TrainableDimension::getCalculatedNumbers() {
size_t MinN = Cfg.MinTrainSamples;
size_t MaxN = Cfg.MaxTrainSamples;
// Figure out what our time window should be.
time_t BeforeT = now_realtime_sec() - 1;
time_t AfterT = BeforeT - (MaxN * updateEvery());
BeforeT -= (BeforeT % updateEvery());
AfterT -= (AfterT % updateEvery());
BeforeT = std::min(BeforeT, latestTime());
AfterT = std::max(AfterT, oldestTime());
if (AfterT >= BeforeT)
return { nullptr, 0 };
CalculatedNumber *CNs = new CalculatedNumber[MaxN * (Cfg.LagN + 1)]();
// Start the query.
unsigned Idx = 0;
unsigned CollectedValues = 0;
unsigned TotalValues = 0;
CalculatedNumber LastValue = std::numeric_limits<CalculatedNumber>::quiet_NaN();
Query Q = Query(getRD());
Q.init(AfterT, BeforeT);
while (!Q.isFinished()) {
if (Idx == MaxN)
break;
auto P = Q.nextMetric();
storage_number SN = P.second;
if (does_storage_number_exist(SN)) {
CNs[Idx] = unpack_storage_number_dbl(SN);
LastValue = CNs[Idx];
CollectedValues++;
} else
CNs[Idx] = LastValue;
Idx++;
}
TotalValues = Idx;
if (CollectedValues < MinN) {
delete[] CNs;
return { nullptr, 0 };
}
// Find first non-NaN value.
for (Idx = 0; std::isnan(CNs[Idx]); Idx++, TotalValues--) { }
// Overwrite NaN values.
if (Idx != 0)
memmove(CNs, &CNs[Idx], sizeof(CalculatedNumber) * TotalValues);
return { CNs, TotalValues };
}
MLResult TrainableDimension::trainModel() {
auto P = getCalculatedNumbers();
CalculatedNumber *CNs = P.first;
unsigned N = P.second;
if (!CNs)
return MLResult::MissingData;
SamplesBuffer SB = SamplesBuffer(CNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN);
KM.train(SB, Cfg.MaxKMeansIters);
Trained = true;
delete[] CNs;
return MLResult::Success;
}
void PredictableDimension::addValue(CalculatedNumber Value, bool Exists) {
if (!Exists) {
CNs.clear();
return;
}
unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN;
if (CNs.size() < N) {
CNs.push_back(Value);
return;
}
std::rotate(std::begin(CNs), std::begin(CNs) + 1, std::end(CNs));
CNs[N - 1] = Value;
}
std::pair<MLResult, bool> PredictableDimension::predict() {
unsigned N = Cfg.DiffN + Cfg.SmoothN + Cfg.LagN;
if (CNs.size() != N)
return { MLResult::MissingData, AnomalyBit };
CalculatedNumber *TmpCNs = new CalculatedNumber[N * (Cfg.LagN + 1)]();
std::memcpy(TmpCNs, CNs.data(), N * sizeof(CalculatedNumber));
SamplesBuffer SB = SamplesBuffer(TmpCNs, N, 1, Cfg.DiffN, Cfg.SmoothN, Cfg.LagN);
AnomalyScore = computeAnomalyScore(SB);
delete[] TmpCNs;
if (AnomalyScore == std::numeric_limits<CalculatedNumber>::quiet_NaN())
return { MLResult::NaN, AnomalyBit };
AnomalyBit = AnomalyScore >= (100 * Cfg.DimensionAnomalyScoreThreshold);
return { MLResult::Success, AnomalyBit };
}

124
ml/Dimension.h Normal file
View File

@ -0,0 +1,124 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef ML_DIMENSION_H
#define ML_DIMENSION_H
#include "BitBufferCounter.h"
#include "Config.h"
#include "ml-private.h"
namespace ml {
class RrdDimension {
public:
RrdDimension(RRDDIM *RD) : RD(RD), Ops(&RD->state->query_ops) {
std::stringstream SS;
SS << RD->rrdset->id << "|" << RD->name;
ID = SS.str();
}
RRDDIM *getRD() const { return RD; }
time_t latestTime() { return Ops->latest_time(RD); }
time_t oldestTime() { return Ops->oldest_time(RD); }
unsigned updateEvery() const { return RD->update_every; }
const std::string getID() const { return ID; }
virtual ~RrdDimension() {}
private:
RRDDIM *RD;
struct rrddim_volatile::rrddim_query_ops *Ops;
std::string ID;
};
enum class MLResult {
Success = 0,
MissingData,
NaN,
};
class TrainableDimension : public RrdDimension {
public:
TrainableDimension(RRDDIM *RD) :
RrdDimension(RD), TrainEvery(Cfg.TrainEvery * updateEvery()) {}
MLResult trainModel();
CalculatedNumber computeAnomalyScore(SamplesBuffer &SB) {
return Trained ? KM.anomalyScore(SB) : 0.0;
}
bool shouldTrain(const TimePoint &TP) const {
return (LastTrainedAt + TrainEvery) < TP;
}
bool isTrained() const { return Trained; }
double updateTrainingDuration(double Duration) {
return TrainingDuration.exchange(Duration);
}
private:
std::pair<CalculatedNumber *, size_t> getCalculatedNumbers();
public:
TimePoint LastTrainedAt{Seconds{0}};
private:
Seconds TrainEvery;
KMeans KM;
std::atomic<bool> Trained{false};
std::atomic<double> TrainingDuration{0.0};
};
class PredictableDimension : public TrainableDimension {
public:
PredictableDimension(RRDDIM *RD) : TrainableDimension(RD) {}
std::pair<MLResult, bool> predict();
void addValue(CalculatedNumber Value, bool Exists);
bool isAnomalous() { return AnomalyBit; }
private:
CalculatedNumber AnomalyScore{0.0};
std::atomic<bool> AnomalyBit{false};
std::vector<CalculatedNumber> CNs;
};
class DetectableDimension : public PredictableDimension {
public:
DetectableDimension(RRDDIM *RD) : PredictableDimension(RD) {}
std::pair<bool, double> detect(size_t WindowLength, bool Reset) {
bool AnomalyBit = isAnomalous();
if (Reset)
NumSetBits = BBC.numSetBits();
NumSetBits += AnomalyBit;
BBC.insert(AnomalyBit);
double AnomalyRate = static_cast<double>(NumSetBits) / WindowLength;
return { AnomalyBit, AnomalyRate };
}
private:
BitBufferCounter BBC{static_cast<size_t>(Cfg.ADMinWindowSize)};
size_t NumSetBits{0};
};
using Dimension = DetectableDimension;
} // namespace ml
#endif /* ML_DIMENSION_H */

458
ml/Host.cc Normal file
View File

@ -0,0 +1,458 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include <dlib/statistics.h>
#include "Config.h"
#include "Host.h"
#include "json/single_include/nlohmann/json.hpp"
using namespace ml;
static void updateDimensionsChart(RRDHOST *RH,
collected_number NumTrainedDimensions,
collected_number NumNormalDimensions,
collected_number NumAnomalousDimensions) {
static thread_local RRDSET *RS = nullptr;
static thread_local RRDDIM *NumTotalDimensionsRD = nullptr;
static thread_local RRDDIM *NumTrainedDimensionsRD = nullptr;
static thread_local RRDDIM *NumNormalDimensionsRD = nullptr;
static thread_local RRDDIM *NumAnomalousDimensionsRD = nullptr;
if (!RS) {
RS = rrdset_create(
RH, // host
"anomaly_detection", // type
"dimensions", // id
NULL, // name
"dimensions", // family
NULL, // ctx
"Anomaly detection dimensions", // title
"dimensions", // units
"netdata", // plugin
"ml", // module
39183, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
);
NumTotalDimensionsRD = rrddim_add(RS, "total", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
NumTrainedDimensionsRD = rrddim_add(RS, "trained", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
NumNormalDimensionsRD = rrddim_add(RS, "normal", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
NumAnomalousDimensionsRD = rrddim_add(RS, "anomalous", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(RS);
rrddim_set_by_pointer(RS, NumTotalDimensionsRD, NumNormalDimensions + NumAnomalousDimensions);
rrddim_set_by_pointer(RS, NumTrainedDimensionsRD, NumTrainedDimensions);
rrddim_set_by_pointer(RS, NumNormalDimensionsRD, NumNormalDimensions);
rrddim_set_by_pointer(RS, NumAnomalousDimensionsRD, NumAnomalousDimensions);
rrdset_done(RS);
}
static void updateRateChart(RRDHOST *RH, collected_number AnomalyRate) {
static thread_local RRDSET *RS = nullptr;
static thread_local RRDDIM *AnomalyRateRD = nullptr;
if (!RS) {
RS = rrdset_create(
RH, // host
"anomaly_detection", // type
"anomaly_rate", // id
NULL, // name
"anomaly_rate", // family
NULL, // ctx
"Percentage of anomalous dimensions", // title
"percentage", // units
"netdata", // plugin
"ml", // module
39184, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
);
AnomalyRateRD = rrddim_add(RS, "anomaly_rate", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(RS);
rrddim_set_by_pointer(RS, AnomalyRateRD, AnomalyRate);
rrdset_done(RS);
}
static void updateWindowLengthChart(RRDHOST *RH, collected_number WindowLength) {
static thread_local RRDSET *RS = nullptr;
static thread_local RRDDIM *WindowLengthRD = nullptr;
if (!RS) {
RS = rrdset_create(
RH, // host
"anomaly_detection", // type
"detector_window", // id
NULL, // name
"detector_window", // family
NULL, // ctx
"Anomaly detector window length", // title
"seconds", // units
"netdata", // plugin
"ml", // module
39185, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
);
WindowLengthRD = rrddim_add(RS, "duration", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(RS);
rrddim_set_by_pointer(RS, WindowLengthRD, WindowLength * RH->rrd_update_every);
rrdset_done(RS);
}
static void updateEventsChart(RRDHOST *RH,
std::pair<BitRateWindow::Edge, size_t> P,
bool ResetBitCounter,
bool NewAnomalyEvent) {
static thread_local RRDSET *RS = nullptr;
static thread_local RRDDIM *AboveThresholdRD = nullptr;
static thread_local RRDDIM *ResetBitCounterRD = nullptr;
static thread_local RRDDIM *NewAnomalyEventRD = nullptr;
if (!RS) {
RS = rrdset_create(
RH, // host
"anomaly_detection", // type
"detector_events", // id
NULL, // name
"detector_events", // family
NULL, // ctx
"Anomaly events triggered", // title
"boolean", // units
"netdata", // plugin
"ml", // module
39186, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
);
AboveThresholdRD = rrddim_add(RS, "above_threshold", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
ResetBitCounterRD = rrddim_add(RS, "reset_bit_counter", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
NewAnomalyEventRD = rrddim_add(RS, "new_anomaly_event", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(RS);
BitRateWindow::Edge E = P.first;
bool AboveThreshold = E.second == BitRateWindow::State::AboveThreshold;
rrddim_set_by_pointer(RS, AboveThresholdRD, AboveThreshold);
rrddim_set_by_pointer(RS, ResetBitCounterRD, ResetBitCounter);
rrddim_set_by_pointer(RS, NewAnomalyEventRD, NewAnomalyEvent);
rrdset_done(RS);
}
static void updateDetectionChart(RRDHOST *RH, collected_number PredictionDuration) {
static thread_local RRDSET *RS = nullptr;
static thread_local RRDDIM *PredictiobDurationRD = nullptr;
if (!RS) {
RS = rrdset_create(
RH, // host
"anomaly_detection", // type
"prediction_stats", // id
NULL, // name
"prediction_stats", // family
NULL, // ctx
"Time it took to run prediction", // title
"milliseconds", // units
"netdata", // plugin
"ml", // module
39187, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
);
PredictiobDurationRD = rrddim_add(RS, "duration", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(RS);
rrddim_set_by_pointer(RS, PredictiobDurationRD, PredictionDuration);
rrdset_done(RS);
}
static void updateTrainingChart(RRDHOST *RH,
collected_number TotalTrainingDuration,
collected_number MaxTrainingDuration)
{
static thread_local RRDSET *RS = nullptr;
static thread_local RRDDIM *TotalTrainingDurationRD = nullptr;
static thread_local RRDDIM *MaxTrainingDurationRD = nullptr;
if (!RS) {
RS = rrdset_create(
RH, // host
"anomaly_detection", // type
"training_stats", // id
NULL, // name
"training_stats", // family
NULL, // ctx
"Training step statistics", // title
"milliseconds", // units
"netdata", // plugin
"ml", // module
39188, // priority
RH->rrd_update_every, // update_every
RRDSET_TYPE_LINE // chart_type
);
TotalTrainingDurationRD = rrddim_add(RS, "total_training_duration", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
MaxTrainingDurationRD = rrddim_add(RS, "max_training_duration", NULL,
1, 1, RRD_ALGORITHM_ABSOLUTE);
} else
rrdset_next(RS);
rrddim_set_by_pointer(RS, TotalTrainingDurationRD, TotalTrainingDuration);
rrddim_set_by_pointer(RS, MaxTrainingDurationRD, MaxTrainingDuration);
rrdset_done(RS);
}
void RrdHost::addDimension(Dimension *D) {
std::lock_guard<std::mutex> Lock(Mutex);
DimensionsMap[D->getRD()] = D;
// Default construct mutex for dimension
LocksMap[D];
}
void RrdHost::removeDimension(Dimension *D) {
// Remove the dimension from the hosts map.
{
std::lock_guard<std::mutex> Lock(Mutex);
DimensionsMap.erase(D->getRD());
}
// Delete the dimension by locking the mutex that protects it.
{
std::lock_guard<std::mutex> Lock(LocksMap[D]);
delete D;
}
// Remove the lock entry for the deleted dimension.
{
std::lock_guard<std::mutex> Lock(Mutex);
LocksMap.erase(D);
}
}
void RrdHost::getConfigAsJson(nlohmann::json &Json) const {
Json["version"] = 1;
Json["enabled"] = Cfg.EnableAnomalyDetection;
Json["min-train-samples"] = Cfg.MinTrainSamples;
Json["max-train-samples"] = Cfg.MaxTrainSamples;
Json["train-every"] = Cfg.TrainEvery;
Json["diff-n"] = Cfg.DiffN;
Json["smooth-n"] = Cfg.SmoothN;
Json["lag-n"] = Cfg.LagN;
Json["max-kmeans-iters"] = Cfg.MaxKMeansIters;
Json["dimension-anomaly-score-threshold"] = Cfg.DimensionAnomalyScoreThreshold;
Json["host-anomaly-rate-threshold"] = Cfg.HostAnomalyRateThreshold;
Json["min-window-size"] = Cfg.ADMinWindowSize;
Json["max-window-size"] = Cfg.ADMaxWindowSize;
Json["idle-window-size"] = Cfg.ADIdleWindowSize;
Json["window-rate-threshold"] = Cfg.ADWindowRateThreshold;
Json["dimension-rate-threshold"] = Cfg.ADDimensionRateThreshold;
}
std::pair<Dimension *, Duration<double>>
TrainableHost::findDimensionToTrain(const TimePoint &NowTP) {
std::lock_guard<std::mutex> Lock(Mutex);
Duration<double> AllottedDuration = Duration<double>{Cfg.TrainEvery * updateEvery()} / (DimensionsMap.size() + 1);
for (auto &DP : DimensionsMap) {
Dimension *D = DP.second;
if (D->shouldTrain(NowTP)) {
LocksMap[D].lock();
return { D, AllottedDuration };
}
}
return { nullptr, AllottedDuration };
}
void TrainableHost::trainDimension(Dimension *D, const TimePoint &NowTP) {
if (D == nullptr)
return;
D->LastTrainedAt = NowTP + Seconds{D->updateEvery()};
TimePoint StartTP = SteadyClock::now();
D->trainModel();
Duration<double> Duration = SteadyClock::now() - StartTP;
D->updateTrainingDuration(Duration.count());
{
std::lock_guard<std::mutex> Lock(Mutex);
LocksMap[D].unlock();
}
}
void TrainableHost::train() {
Duration<double> MaxSleepFor = Seconds{updateEvery()};
while (!netdata_exit) {
TimePoint NowTP = SteadyClock::now();
auto P = findDimensionToTrain(NowTP);
trainDimension(P.first, NowTP);
Duration<double> AllottedDuration = P.second;
Duration<double> RealDuration = SteadyClock::now() - NowTP;
Duration<double> SleepFor;
if (RealDuration >= AllottedDuration)
continue;
SleepFor = std::min(AllottedDuration - RealDuration, MaxSleepFor);
std::this_thread::sleep_for(SleepFor);
}
}
void DetectableHost::detectOnce() {
auto P = BRW.insert(AnomalyRate >= Cfg.HostAnomalyRateThreshold);
BitRateWindow::Edge Edge = P.first;
size_t WindowLength = P.second;
bool ResetBitCounter = (Edge.first != BitRateWindow::State::AboveThreshold);
bool NewAnomalyEvent = (Edge.first == BitRateWindow::State::AboveThreshold) &&
(Edge.second == BitRateWindow::State::Idle);
std::vector<std::pair<double, std::string>> DimsOverThreshold;
size_t NumAnomalousDimensions = 0;
size_t NumNormalDimensions = 0;
size_t NumTrainedDimensions = 0;
double TotalTrainingDuration = 0.0;
double MaxTrainingDuration = 0.0;
{
std::lock_guard<std::mutex> Lock(Mutex);
DimsOverThreshold.reserve(DimensionsMap.size());
for (auto &DP : DimensionsMap) {
Dimension *D = DP.second;
auto P = D->detect(WindowLength, ResetBitCounter);
bool IsAnomalous = P.first;
double AnomalyRate = P.second;
NumTrainedDimensions += D->isTrained();
double DimTrainingDuration = D->updateTrainingDuration(0.0);
MaxTrainingDuration = std::max(MaxTrainingDuration, DimTrainingDuration);
TotalTrainingDuration += DimTrainingDuration;
if (IsAnomalous)
NumAnomalousDimensions += 1;
if (NewAnomalyEvent && (AnomalyRate >= Cfg.ADDimensionRateThreshold))
DimsOverThreshold.push_back({ AnomalyRate, D->getID() });
}
if (NumAnomalousDimensions)
AnomalyRate = static_cast<double>(NumAnomalousDimensions) / DimensionsMap.size();
else
AnomalyRate = 0.0;
NumNormalDimensions = DimensionsMap.size() - NumAnomalousDimensions;
}
this->NumAnomalousDimensions = NumAnomalousDimensions;
this->NumNormalDimensions = NumNormalDimensions;
this->NumTrainedDimensions = NumTrainedDimensions;
updateDimensionsChart(getRH(), NumTrainedDimensions, NumNormalDimensions, NumAnomalousDimensions);
updateRateChart(getRH(), AnomalyRate * 100.0);
updateWindowLengthChart(getRH(), WindowLength);
updateEventsChart(getRH(), P, ResetBitCounter, NewAnomalyEvent);
updateTrainingChart(getRH(), TotalTrainingDuration * 1000.0, MaxTrainingDuration * 1000.0);
if (!NewAnomalyEvent || (DimsOverThreshold.size() == 0))
return;
std::sort(DimsOverThreshold.begin(), DimsOverThreshold.end());
std::reverse(DimsOverThreshold.begin(), DimsOverThreshold.end());
// Make sure the JSON response won't grow beyond a specific number
// of dimensions. Log an error message if this happens, because it
// most likely means that the user specified a very-low anomaly rate
// threshold.
size_t NumMaxDimsOverThreshold = 2000;
if (DimsOverThreshold.size() > NumMaxDimsOverThreshold) {
error("Found %zu dimensions over threshold. Reducing JSON result to %zu dimensions.",
DimsOverThreshold.size(), NumMaxDimsOverThreshold);
DimsOverThreshold.resize(NumMaxDimsOverThreshold);
}
nlohmann::json JsonResult = DimsOverThreshold;
time_t Before = now_realtime_sec();
time_t After = Before - (WindowLength * updateEvery());
DB.insertAnomaly("AD1", 1, getUUID(), After, Before, JsonResult.dump(4));
}
void DetectableHost::detect() {
std::this_thread::sleep_for(Seconds{10});
while (!netdata_exit) {
TimePoint StartTP = SteadyClock::now();
detectOnce();
TimePoint EndTP = SteadyClock::now();
Duration<double> Dur = EndTP - StartTP;
updateDetectionChart(getRH(), Dur.count() * 1000);
std::this_thread::sleep_for(Seconds{updateEvery()});
}
}
void DetectableHost::getDetectionInfoAsJson(nlohmann::json &Json) const {
Json["anomalous-dimensions"] = NumAnomalousDimensions;
Json["normal-dimensions"] = NumNormalDimensions;
Json["total-dimensions"] = NumAnomalousDimensions + NumNormalDimensions;
Json["trained-dimensions"] = NumTrainedDimensions;
}
void DetectableHost::startAnomalyDetectionThreads() {
TrainingThread = std::thread(&TrainableHost::train, this);
DetectionThread = std::thread(&DetectableHost::detect, this);
}
void DetectableHost::stopAnomalyDetectionThreads() {
TrainingThread.join();
DetectionThread.join();
}

104
ml/Host.h Normal file
View File

@ -0,0 +1,104 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef ML_HOST_H
#define ML_HOST_H
#include "BitRateWindow.h"
#include "Config.h"
#include "Database.h"
#include "Dimension.h"
#include "ml-private.h"
namespace ml {
class RrdHost {
public:
RrdHost(RRDHOST *RH) : RH(RH) {}
RRDHOST *getRH() { return RH; }
unsigned updateEvery() { return RH->rrd_update_every; }
std::string getUUID() {
char S[UUID_STR_LEN];
uuid_unparse_lower(RH->host_uuid, S);
return S;
}
void addDimension(Dimension *D);
void removeDimension(Dimension *D);
void getConfigAsJson(nlohmann::json &Json) const;
virtual ~RrdHost() {};
protected:
RRDHOST *RH;
// Protect dimension and lock maps
std::mutex Mutex;
std::map<RRDDIM *, Dimension *> DimensionsMap;
std::map<Dimension *, std::mutex> LocksMap;
};
class TrainableHost : public RrdHost {
public:
TrainableHost(RRDHOST *RH) : RrdHost(RH) {}
void train();
private:
std::pair<Dimension *, Duration<double>> findDimensionToTrain(const TimePoint &NowTP);
void trainDimension(Dimension *D, const TimePoint &NowTP);
};
class DetectableHost : public TrainableHost {
public:
DetectableHost(RRDHOST *RH) : TrainableHost(RH) {}
void startAnomalyDetectionThreads();
void stopAnomalyDetectionThreads();
template<typename ...ArgTypes>
bool getAnomalyInfo(ArgTypes&&... Args) {
return DB.getAnomalyInfo(Args...);
}
template<typename ...ArgTypes>
bool getAnomaliesInRange(ArgTypes&&... Args) {
return DB.getAnomaliesInRange(Args...);
}
void getDetectionInfoAsJson(nlohmann::json &Json) const;
private:
void detect();
void detectOnce();
private:
std::thread TrainingThread;
std::thread DetectionThread;
BitRateWindow BRW{
static_cast<size_t>(Cfg.ADMinWindowSize),
static_cast<size_t>(Cfg.ADMaxWindowSize),
static_cast<size_t>(Cfg.ADIdleWindowSize),
static_cast<size_t>(Cfg.ADMinWindowSize * Cfg.ADWindowRateThreshold)
};
CalculatedNumber AnomalyRate{0.0};
size_t NumAnomalousDimensions{0};
size_t NumNormalDimensions{0};
size_t NumTrainedDimensions{0};
Database DB{Cfg.AnomalyDBPath};
};
using Host = DetectableHost;
} // namespace ml
#endif /* ML_HOST_H */

8
ml/Makefile.am Normal file
View File

@ -0,0 +1,8 @@
# SPDX-License-Identifier: GPL-3.0-or-later
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
SUBDIRS = \
kmeans \
$(NULL)

49
ml/Query.h Normal file
View File

@ -0,0 +1,49 @@
#ifndef QUERY_H
#define QUERY_H
#include "ml-private.h"
namespace ml {
class Query {
public:
Query(RRDDIM *RD) : RD(RD) {
Ops = &RD->state->query_ops;
}
time_t latestTime() {
return Ops->latest_time(RD);
}
time_t oldestTime() {
return Ops->oldest_time(RD);
}
void init(time_t AfterT, time_t BeforeT) {
Ops->init(RD, &Handle, AfterT, BeforeT);
}
bool isFinished() {
return Ops->is_finished(&Handle);
}
std::pair<time_t, storage_number> nextMetric() {
time_t CurrT;
storage_number SN = Ops->next_metric(&Handle, &CurrT);
return { CurrT, SN };
}
~Query() {
Ops->finalize(&Handle);
}
private:
RRDDIM *RD;
struct rrddim_volatile::rrddim_query_ops *Ops;
struct rrddim_query_handle Handle;
};
} // namespace ml
#endif /* QUERY_H */

301
ml/Tests.cc Normal file
View File

@ -0,0 +1,301 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "BitBufferCounter.h"
#include "BitRateWindow.h"
#include "gtest/gtest.h"
using namespace ml;
TEST(BitBufferCounterTest, Cap_4) {
size_t Capacity = 4;
BitBufferCounter BBC(Capacity);
// No bits set
EXPECT_EQ(BBC.numSetBits(), 0);
// All ones
for (size_t Idx = 0; Idx != (2 * Capacity); Idx++) {
BBC.insert(true);
EXPECT_EQ(BBC.numSetBits(), std::min(Idx + 1, Capacity));
}
// All zeroes
for (size_t Idx = 0; Idx != Capacity; Idx++) {
BBC.insert(false);
if (Idx < Capacity)
EXPECT_EQ(BBC.numSetBits(), Capacity - (Idx + 1));
else
EXPECT_EQ(BBC.numSetBits(), 0);
}
// Even ones/zeroes
for (size_t Idx = 0; Idx != (2 * Capacity); Idx++)
BBC.insert(Idx % 2 == 0);
EXPECT_EQ(BBC.numSetBits(), Capacity / 2);
}
using State = BitRateWindow::State;
using Edge = BitRateWindow::Edge;
using Result = std::pair<Edge, size_t>;
TEST(BitRateWindowTest, Cycles) {
/* Test the FSM by going through its two cycles:
* 1) NotFilled -> AboveThreshold -> Idle -> NotFilled
* 2) NotFilled -> BelowThreshold -> AboveThreshold -> Idle -> NotFilled
*
* Check the window's length on every new state transition.
*/
size_t MinLength = 4, MaxLength = 6, IdleLength = 5;
size_t SetBitsThreshold = 3;
Result R;
BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
/*
* 1st cycle
*/
// NotFilled -> AboveThreshold
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
EXPECT_EQ(R.second, MinLength);
// AboveThreshold -> Idle
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
EXPECT_EQ(R.second, MaxLength);
// Idle -> NotFilled
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled));
EXPECT_EQ(R.second, 1);
// NotFilled -> AboveThreshold
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
EXPECT_EQ(R.second, MinLength);
/*
* 2nd cycle
*/
BRW = BitRateWindow(MinLength, MaxLength, IdleLength, SetBitsThreshold);
// NotFilled -> BelowThreshold
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::BelowThreshold));
EXPECT_EQ(R.second, MinLength);
// BelowThreshold -> BelowThreshold:
// Check the state's self loop by adding set bits that will keep the
// bit buffer below the specified threshold.
//
for (size_t Idx = 0; Idx != 2 * MaxLength; Idx++) {
R = BRW.insert(Idx % 2 == 0);
EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
EXPECT_EQ(R.second, MinLength);
}
// Verify that at the end of the loop the internal bit buffer contains
// "1010". Do so by adding one set bit and checking that we remain below
// the specified threshold.
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
EXPECT_EQ(R.second, MinLength);
// BelowThreshold -> AboveThreshold
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold));
EXPECT_EQ(R.second, MinLength);
// AboveThreshold -> Idle:
// Do the transition without filling the max window size this time.
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
EXPECT_EQ(R.second, MinLength);
// Idle -> NotFilled
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled));
EXPECT_EQ(R.second, 1);
// NotFilled -> AboveThreshold
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::NotFilled));
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
EXPECT_EQ(R.second, MinLength);
}
TEST(BitRateWindowTest, ConsecutiveOnes) {
size_t MinLength = 120, MaxLength = 240, IdleLength = 30;
size_t SetBitsThreshold = 30;
Result R;
BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
for (size_t Idx = 0; Idx != MaxLength; Idx++)
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
EXPECT_EQ(R.second, MinLength);
for (size_t Idx = 0; Idx != SetBitsThreshold; Idx++) {
EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::BelowThreshold));
R = BRW.insert(true);
}
EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold));
EXPECT_EQ(R.second, MinLength);
// At this point the window's buffer contains:
// (MinLength - SetBitsThreshold = 90) 0s, followed by
// (SetBitsThreshold = 30) 1s.
//
// To go below the threshold, we need to add (90 + 1) more 0s in the window's
// buffer. At that point, the the window's buffer will contain:
// (SetBitsThreshold = 29) 1s, followed by
// (MinLength - SetBitsThreshold = 91) 0s.
//
// Right before adding the last 0, we expect the window's length to be equal to 210,
// because the bit buffer has gone through these bits:
// (MinLength - SetBitsThreshold = 90) 0s, followed by
// (SetBitsThreshold = 30) 1s, followed by
// (MinLength - SetBitsThreshold = 90) 0s.
for (size_t Idx = 0; Idx != (MinLength - SetBitsThreshold); Idx++) {
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
}
EXPECT_EQ(R.second, 2 * MinLength - SetBitsThreshold);
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
// Continue with the Idle -> NotFilled edge.
for (size_t Idx = 0; Idx != IdleLength - 1; Idx++) {
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::Idle));
}
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::Idle, State::NotFilled));
EXPECT_EQ(R.second, 1);
}
TEST(BitRateWindowTest, WithHoles) {
size_t MinLength = 120, MaxLength = 240, IdleLength = 30;
size_t SetBitsThreshold = 30;
Result R;
BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
for (size_t Idx = 0; Idx != MaxLength; Idx++)
R = BRW.insert(false);
for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
R = BRW.insert(true);
for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
R = BRW.insert(false);
for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
R = BRW.insert(true);
for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
R = BRW.insert(false);
for (size_t Idx = 0; Idx != SetBitsThreshold / 3; Idx++)
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::BelowThreshold, State::AboveThreshold));
EXPECT_EQ(R.second, MinLength);
// The window's bit buffer contains:
// 70 0s, 10 1s, 10 0s, 10 1s, 10 0s, 10 1s.
// Where: 70 = MinLength - (5 / 3) * SetBitsThresholds, ie. we need
// to add (70 + 1) more zeros to make the bit buffer go below the
// threshold and then the window's length should be:
// 70 + 50 + 70 = 190.
BitRateWindow::Edge E;
do {
R = BRW.insert(false);
E = R.first;
} while (E.first != State::AboveThreshold || E.second != State::Idle);
EXPECT_EQ(R.second, 2 * MinLength - (5 * SetBitsThreshold) / 3);
}
TEST(BitRateWindowTest, MinWindow) {
size_t MinLength = 120, MaxLength = 240, IdleLength = 30;
size_t SetBitsThreshold = 30;
Result R;
BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
BRW.insert(true);
BRW.insert(false);
for (size_t Idx = 2; Idx != SetBitsThreshold; Idx++)
BRW.insert(true);
for (size_t Idx = SetBitsThreshold; Idx != MinLength - 1; Idx++)
BRW.insert(false);
R = BRW.insert(true);
EXPECT_EQ(R.first, std::make_pair(State::NotFilled, State::AboveThreshold));
EXPECT_EQ(R.second, MinLength);
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
}
TEST(BitRateWindowTest, MaxWindow) {
size_t MinLength = 100, MaxLength = 200, IdleLength = 30;
size_t SetBitsThreshold = 50;
Result R;
BitRateWindow BRW(MinLength, MaxLength, IdleLength, SetBitsThreshold);
for (size_t Idx = 0; Idx != MaxLength; Idx++)
R = BRW.insert(Idx % 2 == 0);
EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::AboveThreshold));
EXPECT_EQ(R.second, MaxLength);
R = BRW.insert(false);
EXPECT_EQ(R.first, std::make_pair(State::AboveThreshold, State::Idle));
}

1
ml/json Submodule

@ -0,0 +1 @@
Subproject commit 0b345b20c888f7dc8888485768e4bf9a6be29de0

55
ml/kmeans/KMeans.cc Normal file
View File

@ -0,0 +1,55 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "KMeans.h"
#include <dlib/clustering.h>
void KMeans::train(SamplesBuffer &SB, size_t MaxIterations) {
std::vector<DSample> Samples = SB.preprocess();
MinDist = std::numeric_limits<CalculatedNumber>::max();
MaxDist = std::numeric_limits<CalculatedNumber>::min();
{
std::lock_guard<std::mutex> Lock(Mutex);
ClusterCenters.clear();
dlib::pick_initial_centers(NumClusters, ClusterCenters, Samples);
dlib::find_clusters_using_kmeans(Samples, ClusterCenters, MaxIterations);
for (const auto &S : Samples) {
CalculatedNumber MeanDist = 0.0;
for (const auto &KMCenter : ClusterCenters)
MeanDist += dlib::length(KMCenter - S);
MeanDist /= NumClusters;
if (MeanDist < MinDist)
MinDist = MeanDist;
if (MeanDist > MaxDist)
MaxDist = MeanDist;
}
}
}
CalculatedNumber KMeans::anomalyScore(SamplesBuffer &SB) {
std::vector<DSample> DSamples = SB.preprocess();
std::unique_lock<std::mutex> Lock(Mutex, std::defer_lock);
if (!Lock.try_lock())
return std::numeric_limits<CalculatedNumber>::quiet_NaN();
CalculatedNumber MeanDist = 0.0;
for (const auto &CC: ClusterCenters)
MeanDist += dlib::length(CC - DSamples.back());
MeanDist /= NumClusters;
if (MaxDist == MinDist)
return 0.0;
CalculatedNumber AnomalyScore = 100.0 * std::abs((MeanDist - MinDist) / (MaxDist - MinDist));
return (AnomalyScore > 100.0) ? 100.0 : AnomalyScore;
}

34
ml/kmeans/KMeans.h Normal file
View File

@ -0,0 +1,34 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef KMEANS_H
#define KMEANS_H
#include <atomic>
#include <vector>
#include <limits>
#include <mutex>
#include "SamplesBuffer.h"
class KMeans {
public:
KMeans(size_t NumClusters = 2) : NumClusters(NumClusters) {
MinDist = std::numeric_limits<CalculatedNumber>::max();
MaxDist = std::numeric_limits<CalculatedNumber>::min();
};
void train(SamplesBuffer &SB, size_t MaxIterations);
CalculatedNumber anomalyScore(SamplesBuffer &SB);
private:
size_t NumClusters;
std::vector<DSample> ClusterCenters;
CalculatedNumber MinDist;
CalculatedNumber MaxDist;
std::mutex Mutex;
};
#endif /* KMEANS_H */

4
ml/kmeans/Makefile.am Normal file
View File

@ -0,0 +1,4 @@
# SPDX-License-Identifier: GPL-3.0-or-later
AUTOMAKE_OPTIONS = subdir-objects
MAINTAINERCLEANFILES = $(srcdir)/Makefile.in

144
ml/kmeans/SamplesBuffer.cc Normal file
View File

@ -0,0 +1,144 @@
// SPDX-License-Identifier: GPL-3.0-or-later
//
#include "SamplesBuffer.h"
#include <fstream>
#include <sstream>
#include <string>
void Sample::print(std::ostream &OS) const {
for (size_t Idx = 0; Idx != NumDims - 1; Idx++)
OS << CNs[Idx] << ", ";
OS << CNs[NumDims - 1];
}
void SamplesBuffer::print(std::ostream &OS) const {
for (size_t Idx = Preprocessed ? (DiffN + (SmoothN - 1) + (LagN)) : 0;
Idx != NumSamples; Idx++) {
Sample S = Preprocessed ? getPreprocessedSample(Idx) : getSample(Idx);
OS << S << std::endl;
}
}
std::vector<Sample> SamplesBuffer::getPreprocessedSamples() const {
std::vector<Sample> V;
for (size_t Idx = Preprocessed ? (DiffN + (SmoothN - 1) + (LagN)) : 0;
Idx != NumSamples; Idx++) {
Sample S = Preprocessed ? getPreprocessedSample(Idx) : getSample(Idx);
V.push_back(S);
}
return V;
}
void SamplesBuffer::diffSamples() {
// Panda's DataFrame default behaviour is to subtract each element from
// itself. For us `DiffN = 0` means "disable diff-ing" when preprocessing
// the samples buffer. This deviation will make it easier for us to test
// the KMeans implementation.
if (DiffN == 0)
return;
for (size_t Idx = 0; Idx != (NumSamples - DiffN); Idx++) {
size_t High = (NumSamples - 1) - Idx;
size_t Low = High - DiffN;
Sample LHS = getSample(High);
Sample RHS = getSample(Low);
LHS.diff(RHS);
}
}
void SamplesBuffer::smoothSamples() {
// Holds the mean value of each window
CalculatedNumber *AccCNs = new CalculatedNumber[NumDimsPerSample]();
Sample Acc(AccCNs, NumDimsPerSample);
// Used to avoid clobbering the accumulator when moving the window
CalculatedNumber *TmpCNs = new CalculatedNumber[NumDimsPerSample]();
Sample Tmp(TmpCNs, NumDimsPerSample);
CalculatedNumber Factor = (CalculatedNumber) 1 / SmoothN;
// Calculate the value of the 1st window
for (size_t Idx = 0; Idx != std::min(SmoothN, NumSamples); Idx++) {
Tmp.add(getSample(NumSamples - (Idx + 1)));
}
Acc.add(Tmp);
Acc.scale(Factor);
// Move the window and update the samples
for (size_t Idx = NumSamples; Idx != (DiffN + SmoothN - 1); Idx--) {
Sample S = getSample(Idx - 1);
// Tmp <- Next window (if any)
if (Idx >= (SmoothN + 1)) {
Tmp.diff(S);
Tmp.add(getSample(Idx - (SmoothN + 1)));
}
// S <- Acc
S.copy(Acc);
// Acc <- Tmp
Acc.copy(Tmp);
Acc.scale(Factor);
}
delete[] AccCNs;
delete[] TmpCNs;
}
void SamplesBuffer::lagSamples() {
if (LagN == 0)
return;
for (size_t Idx = NumSamples; Idx != LagN; Idx--) {
Sample PS = getPreprocessedSample(Idx - 1);
PS.lag(getSample(Idx - 1), LagN);
}
}
std::vector<DSample> SamplesBuffer::preprocess() {
assert(Preprocessed == false);
std::vector<DSample> DSamples;
size_t OutN = NumSamples;
// Diff
if (DiffN >= OutN)
return DSamples;
OutN -= DiffN;
diffSamples();
// Smooth
if (SmoothN == 0 || SmoothN > OutN)
return DSamples;
OutN -= (SmoothN - 1);
smoothSamples();
// Lag
if (LagN >= OutN)
return DSamples;
OutN -= LagN;
lagSamples();
DSamples.reserve(OutN);
Preprocessed = true;
for (size_t Idx = NumSamples - OutN; Idx != NumSamples; Idx++) {
DSample DS;
DS.set_size(NumDimsPerSample * (LagN + 1));
const Sample PS = getPreprocessedSample(Idx);
PS.initDSample(DS);
DSamples.push_back(DS);
}
return DSamples;
}

140
ml/kmeans/SamplesBuffer.h Normal file
View File

@ -0,0 +1,140 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef SAMPLES_BUFFER_H
#define SAMPLES_BUFFER_H
#include <iostream>
#include <vector>
#include <cassert>
#include <cstdlib>
#include <cstring>
#include <dlib/matrix.h>
typedef double CalculatedNumber;
typedef dlib::matrix<CalculatedNumber, 0, 1> DSample;
class Sample {
public:
Sample(CalculatedNumber *Buf, size_t N) : CNs(Buf), NumDims(N) {}
void initDSample(DSample &DS) const {
for (size_t Idx = 0; Idx != NumDims; Idx++)
DS(Idx) = CNs[Idx];
}
void add(const Sample &RHS) const {
assert(NumDims == RHS.NumDims);
for (size_t Idx = 0; Idx != NumDims; Idx++)
CNs[Idx] += RHS.CNs[Idx];
};
void diff(const Sample &RHS) const {
assert(NumDims == RHS.NumDims);
for (size_t Idx = 0; Idx != NumDims; Idx++)
CNs[Idx] -= RHS.CNs[Idx];
};
void copy(const Sample &RHS) const {
assert(NumDims == RHS.NumDims);
std::memcpy(CNs, RHS.CNs, NumDims * sizeof(CalculatedNumber));
}
void scale(CalculatedNumber Factor) {
for (size_t Idx = 0; Idx != NumDims; Idx++)
CNs[Idx] *= Factor;
}
void lag(const Sample &S, size_t LagN) {
size_t N = S.NumDims;
for (size_t Idx = 0; Idx != (LagN + 1); Idx++) {
Sample Src(S.CNs - (Idx * N), N);
Sample Dst(CNs + (Idx * N), N);
Dst.copy(Src);
}
}
const CalculatedNumber *getCalculatedNumbers() const {
return CNs;
};
void print(std::ostream &OS) const;
private:
CalculatedNumber *CNs;
size_t NumDims;
};
inline std::ostream& operator<<(std::ostream &OS, const Sample &S) {
S.print(OS);
return OS;
}
class SamplesBuffer {
public:
SamplesBuffer(CalculatedNumber *CNs,
size_t NumSamples, size_t NumDimsPerSample,
size_t DiffN = 1, size_t SmoothN = 3, size_t LagN = 3) :
CNs(CNs), NumSamples(NumSamples), NumDimsPerSample(NumDimsPerSample),
DiffN(DiffN), SmoothN(SmoothN), LagN(LagN),
BytesPerSample(NumDimsPerSample * sizeof(CalculatedNumber)),
Preprocessed(false) {};
std::vector<DSample> preprocess();
std::vector<Sample> getPreprocessedSamples() const;
size_t capacity() const { return NumSamples; }
void print(std::ostream &OS) const;
private:
size_t getSampleOffset(size_t Index) const {
assert(Index < NumSamples);
return Index * NumDimsPerSample;
}
size_t getPreprocessedSampleOffset(size_t Index) const {
assert(Index < NumSamples);
return getSampleOffset(Index) * (LagN + 1);
}
void setSample(size_t Index, const Sample &S) const {
size_t Offset = getSampleOffset(Index);
std::memcpy(&CNs[Offset], S.getCalculatedNumbers(), BytesPerSample);
}
const Sample getSample(size_t Index) const {
size_t Offset = getSampleOffset(Index);
return Sample(&CNs[Offset], NumDimsPerSample);
};
const Sample getPreprocessedSample(size_t Index) const {
size_t Offset = getPreprocessedSampleOffset(Index);
return Sample(&CNs[Offset], NumDimsPerSample * (LagN + 1));
};
void diffSamples();
void smoothSamples();
void lagSamples();
private:
CalculatedNumber *CNs;
size_t NumSamples;
size_t NumDimsPerSample;
size_t DiffN;
size_t SmoothN;
size_t LagN;
size_t BytesPerSample;
bool Preprocessed;
};
inline std::ostream& operator<<(std::ostream& OS, const SamplesBuffer &SB) {
SB.print(OS);
return OS;
}
#endif /* SAMPLES_BUFFER_H */

143
ml/kmeans/Tests.cc Normal file
View File

@ -0,0 +1,143 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "ml/ml-private.h"
#include <gtest/gtest.h>
/*
* The SamplesBuffer class implements the functionality of the following python
* code:
* >> df = pd.DataFrame(data=samples)
* >> df = df.diff(diff_n).dropna()
* >> df = df.rolling(smooth_n).mean().dropna()
* >> df = pd.concat([df.shift(n) for n in range(lag_n + 1)], axis=1).dropna()
*
* Its correctness has been verified by automatically generating random
* data frames in Python and comparing them with the correspondent preprocessed
* SampleBuffers.
*
* The following tests are meant to catch unintended changes in the SamplesBuffer
* implementation. For development purposes, one should compare changes against
* the aforementioned python code.
*/
TEST(SamplesBufferTest, NS_8_NDPS_1_DN_1_SN_3_LN_1) {
size_t NumSamples = 8, NumDimsPerSample = 1;
size_t DiffN = 1, SmoothN = 3, LagN = 3;
size_t N = NumSamples * NumDimsPerSample * (LagN + 1);
CalculatedNumber *CNs = new CalculatedNumber[N]();
CNs[0] = 0.7568336679490107;
CNs[1] = 0.4814406581763254;
CNs[2] = 0.40073555156221874;
CNs[3] = 0.5973257298194408;
CNs[4] = 0.5334727814345868;
CNs[5] = 0.2632477193454843;
CNs[6] = 0.2684839023122384;
CNs[7] = 0.851332948637479;
SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN);
SB.preprocess();
std::vector<Sample> Samples = SB.getPreprocessedSamples();
EXPECT_EQ(Samples.size(), 2);
Sample S0 = Samples[0];
const CalculatedNumber *S0_CNs = S0.getCalculatedNumbers();
Sample S1 = Samples[1];
const CalculatedNumber *S1_CNs = S1.getCalculatedNumbers();
EXPECT_NEAR(S0_CNs[0], -0.109614, 0.001);
EXPECT_NEAR(S0_CNs[1], -0.0458293, 0.001);
EXPECT_NEAR(S0_CNs[2], 0.017344, 0.001);
EXPECT_NEAR(S0_CNs[3], -0.0531693, 0.001);
EXPECT_NEAR(S1_CNs[0], 0.105953, 0.001);
EXPECT_NEAR(S1_CNs[1], -0.109614, 0.001);
EXPECT_NEAR(S1_CNs[2], -0.0458293, 0.001);
EXPECT_NEAR(S1_CNs[3], 0.017344, 0.001);
delete[] CNs;
}
TEST(SamplesBufferTest, NS_8_NDPS_1_DN_2_SN_3_LN_2) {
size_t NumSamples = 8, NumDimsPerSample = 1;
size_t DiffN = 2, SmoothN = 3, LagN = 2;
size_t N = NumSamples * NumDimsPerSample * (LagN + 1);
CalculatedNumber *CNs = new CalculatedNumber[N]();
CNs[0] = 0.20511885291342846;
CNs[1] = 0.13151717360306558;
CNs[2] = 0.6017085062423134;
CNs[3] = 0.46256882933941545;
CNs[4] = 0.7887758447877941;
CNs[5] = 0.9237989080034406;
CNs[6] = 0.15552559051428083;
CNs[7] = 0.6309750314597955;
SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN);
SB.preprocess();
std::vector<Sample> Samples = SB.getPreprocessedSamples();
EXPECT_EQ(Samples.size(), 2);
Sample S0 = Samples[0];
const CalculatedNumber *S0_CNs = S0.getCalculatedNumbers();
Sample S1 = Samples[1];
const CalculatedNumber *S1_CNs = S1.getCalculatedNumbers();
EXPECT_NEAR(S0_CNs[0], 0.005016, 0.001);
EXPECT_NEAR(S0_CNs[1], 0.326450, 0.001);
EXPECT_NEAR(S0_CNs[2], 0.304903, 0.001);
EXPECT_NEAR(S1_CNs[0], -0.154948, 0.001);
EXPECT_NEAR(S1_CNs[1], 0.005016, 0.001);
EXPECT_NEAR(S1_CNs[2], 0.326450, 0.001);
delete[] CNs;
}
TEST(SamplesBufferTest, NS_8_NDPS_3_DN_2_SN_4_LN_1) {
size_t NumSamples = 8, NumDimsPerSample = 3;
size_t DiffN = 2, SmoothN = 4, LagN = 1;
size_t N = NumSamples * NumDimsPerSample * (LagN + 1);
CalculatedNumber *CNs = new CalculatedNumber[N]();
CNs[0] = 0.34310900399667765; CNs[1] = 0.14694315994488194; CNs[2] = 0.8246677800938796;
CNs[3] = 0.48249504592307835; CNs[4] = 0.23241087965531182; CNs[5] = 0.9595348555892567;
CNs[6] = 0.44281094035598334; CNs[7] = 0.5143142171362715; CNs[8] = 0.06391303014242555;
CNs[9] = 0.7460491027783901; CNs[10] = 0.43887217459032923; CNs[11] = 0.2814395025355999;
CNs[12] = 0.9231114281214198; CNs[13] = 0.326882401786898; CNs[14] = 0.26747939220376216;
CNs[15] = 0.7787571209969636; CNs[16] =0.5851700001235088; CNs[17] = 0.34410728945321567;
CNs[18] = 0.9394494507088997; CNs[19] =0.17567223681734334; CNs[20] = 0.42732886195446984;
CNs[21] = 0.9460522396152958; CNs[22] =0.23462747016780894; CNs[23] = 0.35983249900892145;
SamplesBuffer SB(CNs, NumSamples, NumDimsPerSample, DiffN, SmoothN, LagN);
SB.preprocess();
std::vector<Sample> Samples = SB.getPreprocessedSamples();
EXPECT_EQ(Samples.size(), 2);
Sample S0 = Samples[0];
const CalculatedNumber *S0_CNs = S0.getCalculatedNumbers();
Sample S1 = Samples[1];
const CalculatedNumber *S1_CNs = S1.getCalculatedNumbers();
EXPECT_NEAR(S0_CNs[0], 0.198225, 0.001);
EXPECT_NEAR(S0_CNs[1], 0.003529, 0.001);
EXPECT_NEAR(S0_CNs[2], -0.063003, 0.001);
EXPECT_NEAR(S0_CNs[3], 0.219066, 0.001);
EXPECT_NEAR(S0_CNs[4], 0.133175, 0.001);
EXPECT_NEAR(S0_CNs[5], -0.293154, 0.001);
EXPECT_NEAR(S1_CNs[0], 0.174160, 0.001);
EXPECT_NEAR(S1_CNs[1], -0.135722, 0.001);
EXPECT_NEAR(S1_CNs[2], 0.110452, 0.001);
EXPECT_NEAR(S1_CNs[3], 0.198225, 0.001);
EXPECT_NEAR(S1_CNs[4], 0.003529, 0.001);
EXPECT_NEAR(S1_CNs[5], -0.063003, 0.001);
delete[] CNs;
}

1
ml/kmeans/dlib Submodule

@ -0,0 +1 @@
Subproject commit 021cbbb1c2ddec39d8dd4cb6abfbbafdf1cf4482

38
ml/ml-dummy.c Normal file
View File

@ -0,0 +1,38 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "ml.h"
#if !defined(ENABLE_ML)
void ml_init(void) {}
void ml_new_host(RRDHOST *RH) { (void) RH; }
void ml_delete_host(RRDHOST *RH) { (void) RH; }
char *ml_get_host_info(RRDHOST *RH) { (void) RH; }
void ml_new_dimension(RRDDIM *RD) { (void) RD; }
void ml_delete_dimension(RRDDIM *RD) { (void) RD; }
bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) {
(void) RD; (void) Value; (void) Exists;
return false;
}
char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName,
int AnomalyDetectorVersion, time_t After, time_t Before) {
(void) RH; (void) AnomalyDetectorName;
(void) AnomalyDetectorVersion; (void) After; (void) Before;
return NULL;
}
char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName,
int AnomalyDetectorVersion, time_t After, time_t Before) {
(void) RH; (void) AnomalyDetectorName;
(void) AnomalyDetectorVersion; (void) After; (void) Before;
return NULL;
}
#endif

26
ml/ml-private.h Normal file
View File

@ -0,0 +1,26 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef ML_PRIVATE_H
#define ML_PRIVATE_H
#include "kmeans/KMeans.h"
#include "ml/ml.h"
#include <chrono>
#include <map>
#include <mutex>
#include <sstream>
namespace ml {
using SteadyClock = std::chrono::steady_clock;
using TimePoint = std::chrono::time_point<SteadyClock>;
template<typename T>
using Duration = std::chrono::duration<T>;
using Seconds = std::chrono::seconds;
} // namespace ml
#endif /* ML_PRIVATE_H */

153
ml/ml.cc Normal file
View File

@ -0,0 +1,153 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "Config.h"
#include "Dimension.h"
#include "Host.h"
using namespace ml;
/*
* Assumptions:
* 1) hosts outlive their sets, and sets outlive their dimensions,
* 2) dimensions always have a set that has a host.
*/
void ml_init(void) {
Cfg.readMLConfig();
}
void ml_new_host(RRDHOST *RH) {
if (!Cfg.EnableAnomalyDetection)
return;
if (simple_pattern_matches(Cfg.SP_HostsToSkip, RH->hostname))
return;
Host *H = new Host(RH);
RH->ml_host = static_cast<ml_host_t>(H);
H->startAnomalyDetectionThreads();
}
void ml_delete_host(RRDHOST *RH) {
Host *H = static_cast<Host *>(RH->ml_host);
if (!H)
return;
H->stopAnomalyDetectionThreads();
delete H;
RH->ml_host = nullptr;
}
void ml_new_dimension(RRDDIM *RD) {
RRDSET *RS = RD->rrdset;
Host *H = static_cast<Host *>(RD->rrdset->rrdhost->ml_host);
if (!H)
return;
if (static_cast<unsigned>(RD->update_every) != H->updateEvery())
return;
if (simple_pattern_matches(Cfg.SP_ChartsToSkip, RS->name))
return;
Dimension *D = new Dimension(RD);
RD->state->ml_dimension = static_cast<ml_dimension_t>(D);
H->addDimension(D);
}
void ml_delete_dimension(RRDDIM *RD) {
Dimension *D = static_cast<Dimension *>(RD->state->ml_dimension);
if (!D)
return;
Host *H = static_cast<Host *>(RD->rrdset->rrdhost->ml_host);
H->removeDimension(D);
RD->state->ml_dimension = nullptr;
}
char *ml_get_host_info(RRDHOST *RH) {
nlohmann::json ConfigJson;
if (RH && RH->ml_host) {
Host *H = static_cast<Host *>(RH->ml_host);
H->getConfigAsJson(ConfigJson);
H->getDetectionInfoAsJson(ConfigJson);
} else {
ConfigJson["enabled"] = false;
}
return strdup(ConfigJson.dump(2, '\t').c_str());
}
bool ml_is_anomalous(RRDDIM *RD, double Value, bool Exists) {
Dimension *D = static_cast<Dimension *>(RD->state->ml_dimension);
if (!D)
return false;
D->addValue(Value, Exists);
bool Result = D->predict().second;
return Result;
}
char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName,
int AnomalyDetectorVersion, time_t After, time_t Before) {
if (!RH || !RH->ml_host) {
error("No host");
return nullptr;
}
Host *H = static_cast<Host *>(RH->ml_host);
std::vector<std::pair<time_t, time_t>> TimeRanges;
bool Res = H->getAnomaliesInRange(TimeRanges, AnomalyDetectorName,
AnomalyDetectorVersion,
H->getUUID(),
After, Before);
if (!Res) {
error("DB result is empty");
return nullptr;
}
nlohmann::json Json = TimeRanges;
return strdup(Json.dump(4).c_str());
}
char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName,
int AnomalyDetectorVersion, time_t After, time_t Before) {
if (!RH || !RH->ml_host) {
error("No host");
return nullptr;
}
Host *H = static_cast<Host *>(RH->ml_host);
nlohmann::json Json;
bool Res = H->getAnomalyInfo(Json, AnomalyDetectorName,
AnomalyDetectorVersion,
H->getUUID(),
After, Before);
if (!Res) {
error("DB result is empty");
return nullptr;
}
return strdup(Json.dump(4, '\t').c_str());
}
#if defined(ENABLE_ML_TESTS)
#include "gtest/gtest.h"
int test_ml(int argc, char *argv[]) {
(void) argc;
(void) argv;
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#endif // ENABLE_ML_TESTS

41
ml/ml.h Normal file
View File

@ -0,0 +1,41 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef NETDATA_ML_H
#define NETDATA_ML_H
#ifdef __cplusplus
extern "C" {
#endif
#include "daemon/common.h"
typedef void* ml_host_t;
typedef void* ml_dimension_t;
void ml_init(void);
void ml_new_host(RRDHOST *RH);
void ml_delete_host(RRDHOST *RH);
char *ml_get_host_info(RRDHOST *RH);
void ml_new_dimension(RRDDIM *RD);
void ml_delete_dimension(RRDDIM *RD);
bool ml_is_anomalous(RRDDIM *RD, double value, bool exists);
char *ml_get_anomaly_events(RRDHOST *RH, const char *AnomalyDetectorName,
int AnomalyDetectorVersion, time_t After, time_t Before);
char *ml_get_anomaly_event_info(RRDHOST *RH, const char *AnomalyDetectorName,
int AnomalyDetectorVersion, time_t After, time_t Before);
#if defined(ENABLE_ML_TESTS)
int test_ml(int argc, char *argv[]);
#endif
#ifdef __cplusplus
};
#endif
#endif /* NETDATA_ML_H */

View File

@ -241,6 +241,8 @@ USAGE: ${PROGRAM} [options]
--disable-backend-mongodb
--enable-lto Enable Link-Time-Optimization. Default: disabled
--disable-lto
--enable-ml Enable anomaly detection with machine learning. (Default: autodetect)
--disable-ml
--disable-x86-sse Disable SSE instructions. By default SSE optimizations are enabled.
--use-system-lws Use a system copy of libwebsockets instead of bundling our own (default is to use the bundled copy).
--use-system-protobuf Use a system copy of libprotobuf instead of bundling our own (default is to use the bundled copy).
@ -329,6 +331,10 @@ while [ -n "${1}" ]; do
"--enable-backend-mongodb") NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--enable-backend-mongodb/} --enable-backend-mongodb" ;;
"--disable-backend-mongodb") NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--disable-backend-mongodb/} --disable-backend-mongodb" ;;
"--enable-lto") NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--enable-lto/} --enable-lto" ;;
"--enable-ml") NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--enable-ml/} --enable-ml" ;;
"--disable-ml") NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--disable-ml/} --disable-ml" ;;
"--enable-ml-tests") NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--enable-ml-tests/} --enable-ml-tests" ;;
"--disable-ml-tests") NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--disable-ml-tests/} --disable-ml-tests" ;;
"--disable-lto") NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--disable-lto/} --disable-lto" ;;
"--disable-x86-sse") NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--disable-x86-sse/} --disable-x86-sse" ;;
"--disable-telemetry") NETDATA_DISABLE_TELEMETRY=1 ;;

View File

@ -389,6 +389,7 @@ static inline void do_dimension_variablestep(
, long dim_id_in_rrdr
, time_t after_wanted
, time_t before_wanted
, uint32_t options
){
// RRDSET *st = r->st;
@ -445,7 +446,11 @@ static inline void do_dimension_variablestep(
// db_now has a different value than above
if (likely(now >= db_now)) {
if (likely(does_storage_number_exist(n_curr))) {
value = unpack_storage_number(n_curr);
if (options & RRDR_OPTION_ANOMALY_BIT)
value = (n_curr & SN_ANOMALY_BIT) ? 0.0 : 100.0;
else
value = unpack_storage_number(n_curr);
if (likely(value != 0.0))
values_in_group_non_zero++;
@ -530,6 +535,7 @@ static inline void do_dimension_fixedstep(
, long dim_id_in_rrdr
, time_t after_wanted
, time_t before_wanted
, uint32_t options
){
RRDSET *st = r->st;
@ -593,7 +599,11 @@ static inline void do_dimension_fixedstep(
error("INTERNAL CHECK: Unaligned query for %s, database time: %ld, expected time: %ld", rd->id, (long)handle.rrdeng.now, (long)now);
}
#endif
value = unpack_storage_number(n);
if (options & RRDR_OPTION_ANOMALY_BIT)
value = (n & SN_ANOMALY_BIT) ? 0.0 : 100.0;
else
value = unpack_storage_number(n);
if(likely(value != 0.0))
values_in_group_non_zero++;
@ -1100,6 +1110,7 @@ static RRDR *rrd2rrdr_fixedstep(
, c
, after_wanted
, before_wanted
, options
);
if(r->od[c] & RRDR_DIMENSION_NONZERO)
@ -1476,6 +1487,7 @@ static RRDR *rrd2rrdr_variablestep(
, c
, after_wanted
, before_wanted
, options
);
if(r->od[c] & RRDR_DIMENSION_NONZERO)
@ -1644,4 +1656,4 @@ RRDR *rrd2rrdr(
return rrd2rrdr_fixedstep(st, points_requested, after_requested, before_requested, group_method,
resampling_time_requested, options, dimensions,
rrd_update_every, first_entry_t, last_entry_t, absolute_period_requested, context_param_list);
}
}

View File

@ -24,6 +24,7 @@ typedef enum rrdr_options {
RRDR_OPTION_MATCH_NAMES = 0x00008000, // when filtering dimensions, match only names
RRDR_OPTION_CUSTOM_VARS = 0x00010000, // when wrapping response in a JSON, return custom variables in response
RRDR_OPTION_ALLOW_PAST = 0x00020000, // The after parameter can extend in the past before the first entry
RRDR_OPTION_ANOMALY_BIT = 0x00040000, // Return the anomaly bit stored in each collected_number
} RRDR_OPTIONS;
typedef enum rrdr_value_flag {

View File

@ -36,6 +36,7 @@ static struct {
, {"match-names" , 0 , RRDR_OPTION_MATCH_NAMES}
, {"showcustomvars" , 0 , RRDR_OPTION_CUSTOM_VARS}
, {"allow_past" , 0 , RRDR_OPTION_ALLOW_PAST}
, {"anomaly-bit" , 0 , RRDR_OPTION_ANOMALY_BIT}
, { NULL, 0, 0}
};
@ -1088,12 +1089,110 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb)
buffer_strcat(wb, "\t\"metrics-count\": ");
analytics_get_data(analytics_data.netdata_metrics_count, wb);
buffer_strcat(wb, ",\n");
#if defined(ENABLE_ML)
char *ml_info = ml_get_host_info(host);
buffer_strcat(wb, "\t\"ml-info\": ");
buffer_strcat(wb, ml_info);
buffer_strcat(wb, "\n");
free(ml_info);
#endif
buffer_strcat(wb, "}");
return 0;
}
#if defined(ENABLE_ML)
int web_client_api_request_v1_anomaly_events(RRDHOST *host, struct web_client *w, char *url) {
if (!netdata_ready)
return HTTP_RESP_BACKEND_FETCH_FAILED;
uint32_t after = 0, before = 0;
while (url) {
char *value = mystrsep(&url, "&");
if (!value || !*value)
continue;
char *name = mystrsep(&value, "=");
if (!name || !*name)
continue;
if (!value || !*value)
continue;
if (!strcmp(name, "after"))
after = (uint32_t) (strtoul(value, NULL, 0) / 1000);
else if (!strcmp(name, "before"))
before = (uint32_t) (strtoul(value, NULL, 0) / 1000);
}
char *s;
if (!before || !after)
s = strdup("{\"error\": \"missing after/before parameters\" }\n");
else {
s = ml_get_anomaly_events(host, "AD1", 1, after, before);
if (!s)
s = strdup("{\"error\": \"json string is empty\" }\n");
}
BUFFER *wb = w->response.data;
buffer_flush(wb);
wb->contenttype = CT_APPLICATION_JSON;
buffer_strcat(wb, s);
buffer_no_cacheable(wb);
freez(s);
return HTTP_RESP_OK;
}
int web_client_api_request_v1_anomaly_event_info(RRDHOST *host, struct web_client *w, char *url) {
if (!netdata_ready)
return HTTP_RESP_BACKEND_FETCH_FAILED;
uint32_t after = 0, before = 0;
while (url) {
char *value = mystrsep(&url, "&");
if (!value || !*value)
continue;
char *name = mystrsep(&value, "=");
if (!name || !*name)
continue;
if (!value || !*value)
continue;
if (!strcmp(name, "after"))
after = (uint32_t) strtoul(value, NULL, 0);
else if (!strcmp(name, "before"))
before = (uint32_t) strtoul(value, NULL, 0);
}
char *s;
if (!before || !after)
s = strdup("{\"error\": \"missing after/before parameters\" }\n");
else {
s = ml_get_anomaly_event_info(host, "AD1", 1, after, before);
if (!s)
s = strdup("{\"error\": \"json string is empty\" }\n");
}
BUFFER *wb = w->response.data;
buffer_flush(wb);
wb->contenttype = CT_APPLICATION_JSON;
buffer_strcat(wb, s);
buffer_no_cacheable(wb);
freez(s);
return HTTP_RESP_OK;
}
#endif // defined(ENABLE_ML)
inline int web_client_api_request_v1_info(RRDHOST *host, struct web_client *w, char *url) {
(void)url;
if (!netdata_ready) return HTTP_RESP_BACKEND_FETCH_FAILED;
@ -1148,6 +1247,12 @@ static struct api_command {
{ "alarm_variables", 0, WEB_CLIENT_ACL_DASHBOARD, web_client_api_request_v1_alarm_variables },
{ "alarm_count", 0, WEB_CLIENT_ACL_DASHBOARD, web_client_api_request_v1_alarm_count },
{ "allmetrics", 0, WEB_CLIENT_ACL_DASHBOARD, web_client_api_request_v1_allmetrics },
#if defined(ENABLE_ML)
{ "anomaly_events", 0, WEB_CLIENT_ACL_DASHBOARD, web_client_api_request_v1_anomaly_events },
{ "anomaly_event_info", 0, WEB_CLIENT_ACL_DASHBOARD, web_client_api_request_v1_anomaly_event_info },
#endif
{ "manage/health", 0, WEB_CLIENT_ACL_MGMT, web_client_api_request_v1_mgmt_health },
{ "aclk", 0, WEB_CLIENT_ACL_DASHBOARD, web_client_api_request_v1_aclk_state },
// terminator

View File

@ -698,6 +698,11 @@ netdataDashboard.menu = {
info: 'Z scores scores relating to key system metrics.'
},
'anomaly_detection': {
title: 'Anomaly Detection',
icon: '<i class="fas fa-brain"></i>',
info: 'Charts relating to anomaly detection, increased <code>anomalous</code> dimensions or a higher than usual <code>anomaly_rate</code> could be signs of some abnormal behaviour. Read our <a href="https://learn.netdata.cloud/guides/monitor/anomaly-detection" target="_blank">anomaly detection guide</a> for more details.'
},
};
@ -6344,4 +6349,28 @@ netdataDashboard.context = {
'See <a href="https://www.freedesktop.org/software/systemd/man/systemd.slice.html#" target="_blank"> systemd.slice(5)</a>.'
},
'anomaly_detection.dimensions': {
info: 'Total count of dimensions considered anomalous or normal. '
},
'anomaly_detection.anomaly_rate': {
info: 'Percentage of anomalous dimensions. '
},
'anomaly_detection.detector_window': {
info: 'The length of the active window used by the detector. '
},
'anomaly_detection.detector_events': {
info: 'Flags (0 or 1) to show when an anomaly event has been triggered by the detector. '
},
'anomaly_detection.prediction_stats': {
info: 'Diagnostic metrics relating to prediction time of anomaly detection. '
},
'anomaly_detection.training_stats': {
info: 'Diagnostic metrics relating to training time of anomaly detection. '
},
};