Allows ACLK NG and Legacy to coexist (#11225)

This commit is contained in:
Timotej S 2021-06-14 10:38:58 +02:00 committed by GitHub
parent f71036cdec
commit 59af90b08c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 997 additions and 1224 deletions

View File

@ -52,7 +52,7 @@ jobs:
- name: Configure
run: |
autoreconf -ivf
./configure --without-aclk-ng
./configure
# XXX: Work-around for bug with libbson-1.0 in Ubuntu 18.04
# See: https://bugs.launchpad.net/ubuntu/+source/libmongoc/+bug/1790771
# https://jira.mongodb.org/browse/CDRIVER-2818

View File

@ -730,12 +730,22 @@ set(BACKENDS_PLUGIN_FILES
set(CLAIM_PLUGIN_FILES
claim/claim.c
claim/claim.h
aclk/legacy/aclk_rrdhost_state.h
aclk/legacy/aclk_common.c
aclk/legacy/aclk_common.h
)
set(ACLK_PLUGIN_FILES
set(ACLK_ALWAYS_BUILD
aclk/aclk_rrdhost_state.h
aclk/aclk_api.c
aclk/aclk_api.h
aclk/aclk_proxy.c
aclk/aclk_proxy.h
)
set(ACLK_COMMON_FILES
aclk/aclk_collector_list.c
aclk/aclk_collector_list.h
)
set(ACLK_LEGACY_FILES
aclk/legacy/agent_cloud_link.c
aclk/legacy/agent_cloud_link.h
aclk/legacy/aclk_query.c
@ -750,6 +760,40 @@ set(ACLK_PLUGIN_FILES
aclk/legacy/aclk_stats.h
aclk/legacy/aclk_rx_msgs.c
aclk/legacy/aclk_rx_msgs.h
aclk/legacy/aclk_common.c
aclk/legacy/aclk_common.h
)
set(ACLK_NG_FILES
aclk/aclk.c
aclk/aclk.h
aclk/aclk_util.c
aclk/aclk_util.h
aclk/aclk_stats.c
aclk/aclk_stats.h
aclk/aclk_query.c
aclk/aclk_query.h
aclk/aclk_query_queue.c
aclk/aclk_query_queue.h
aclk/aclk_otp.c
aclk/aclk_otp.h
aclk/aclk_tx_msgs.c
aclk/aclk_tx_msgs.h
aclk/aclk_rx_msgs.c
aclk/aclk_rx_msgs.h
aclk/https_client.c
aclk/https_client.h
mqtt_websockets/src/mqtt_wss_client.c
mqtt_websockets/src/include/mqtt_wss_client.h
mqtt_websockets/src/mqtt_wss_log.c
mqtt_websockets/src/include/mqtt_wss_log.h
mqtt_websockets/src/ws_client.c
mqtt_websockets/src/include/ws_client.h
mqtt_websockets/c-rbuf/src/ringbuffer.c
mqtt_websockets/c-rbuf/include/ringbuffer.h
mqtt_websockets/c-rbuf/src/ringbuffer_internal.h
mqtt_websockets/MQTT-C/src/mqtt.c
mqtt_websockets/MQTT-C/include/mqtt.h
)
set(SPAWN_PLUGIN_FILES
@ -759,7 +803,7 @@ set(SPAWN_PLUGIN_FILES
spawn/spawn.h
)
set(ACLK_STATIC_LIBS
set(ACLK_LEGACY_STATIC_LIBS
${CMAKE_SOURCE_DIR}/externaldeps/mosquitto/libmosquitto.a
${CMAKE_SOURCE_DIR}/externaldeps/libwebsockets/libwebsockets.a
)
@ -967,21 +1011,21 @@ ENDIF()
set(NETDATA_COMMON_LIBRARIES ${NETDATA_COMMON_LIBRARIES} m ${CMAKE_THREAD_LIBS_INIT})
set(ACLK_CAN_BUILD 1)
set(ACLK_LEGACY_CAN_BUILD 1)
if(NOT EXISTS "${CMAKE_SOURCE_DIR}/externaldeps/mosquitto/libmosquitto.a")
message(WARNING "Static build of mosquitto not found. Disabling ACLK")
set(ACLK_CAN_BUILD 0)
set(ACLK_LEGACY_CAN_BUILD 0)
ENDIF()
if(NOT EXISTS "${CMAKE_SOURCE_DIR}/externaldeps/libwebsockets/libwebsockets.a")
message(WARNING "Static build of libwebsockets not found. Disabling ACLK")
set(ACLK_CAN_BUILD 0)
set(ACLK_LEGACY_CAN_BUILD 0)
ENDIF()
IF(ACLK_CAN_BUILD)
message(STATUS "agent-cloud-link: enabled")
list(APPEND NETDATA_FILES ${ACLK_PLUGIN_FILES})
list(APPEND NETDATA_COMMON_LIBRARIES ${ACLK_STATIC_LIBS})
IF(ACLK_LEGACY_CAN_BUILD)
message(STATUS "agent-cloud-link Legacy: enabled")
list(APPEND NETDATA_FILES ${ACLK_LEGACY_FILES})
list(APPEND NETDATA_COMMON_LIBRARIES ${ACLK_LEGACY_STATIC_LIBS})
include_directories(BEFORE ${CMAKE_SOURCE_DIR}/externaldeps/libwebsockets/include)
IF(LINUX AND CAP_FOUND)
list(APPEND NETDATA_COMMON_LIBRARIES ${CAP_LIBRARIES})
@ -989,9 +1033,16 @@ IF(ACLK_CAN_BUILD)
list(APPEND NETDATA_COMMON_CFLAGS ${CAP_CFLAGS_OTHER})
ENDIF()
ELSE()
message(STATUS "agent-cloud-link: disabled")
message(STATUS "agent-cloud-link Legacy: disabled")
ENDIF()
list(APPEND NETDATA_FILES ${ACLK_ALWAYS_BUILD})
list(APPEND NETDATA_FILES ${ACLK_NG_FILES})
list(APPEND NETDATA_FILES ${ACLK_COMMON_FILES})
include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/MQTT-C/include)
include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/src/include)
include_directories(BEFORE ${CMAKE_SOURCE_DIR}/mqtt_websockets/c-rbuf/include)
# -----------------------------------------------------------------------------
# netdata

View File

@ -114,7 +114,7 @@ SUBDIRS += \
spawn \
$(NULL)
if !ACLK_NG
if ENABLE_ACLK
SUBDIRS += \
aclk/legacy \
$(NULL)
@ -540,7 +540,7 @@ PARSER_FILES = \
$(NULL)
if ACLK_NG
ACLK_FILES = \
ACLK_NG_FILES = \
aclk/aclk.c \
aclk/aclk.h \
aclk/aclk_util.c \
@ -551,8 +551,6 @@ ACLK_FILES = \
aclk/aclk_query.h \
aclk/aclk_query_queue.c \
aclk/aclk_query_queue.h \
aclk/aclk_collector_list.c \
aclk/aclk_collector_list.h \
aclk/aclk_otp.c \
aclk/aclk_otp.h \
aclk/aclk_tx_msgs.c \
@ -573,17 +571,25 @@ ACLK_FILES = \
mqtt_websockets/MQTT-C/src/mqtt.c \
mqtt_websockets/MQTT-C/include/mqtt.h \
$(NULL)
else #ACLK_NG
ACLK_FILES = \
aclk/legacy/aclk_rrdhost_state.h \
aclk/legacy/aclk_common.c \
aclk/legacy/aclk_common.h \
aclk/legacy/aclk_stats.c \
aclk/legacy/aclk_stats.h \
$(NULL)
endif #ACLK_NG
if ENABLE_ACLK
ACLK_FILES += \
ACLK_COMMON_FILES = \
aclk/aclk_collector_list.c \
aclk/aclk_collector_list.h \
$(NULL)
endif
ACLK_ALWAYS_BUILD_FILES = \
aclk/aclk_rrdhost_state.h \
aclk/aclk_api.c \
aclk/aclk_api.h \
aclk/aclk_proxy.c \
aclk/aclk_proxy.h \
$(NULL)
if ACLK_LEGACY
ACLK_LEGACY_FILES = \
aclk/legacy/agent_cloud_link.c \
aclk/legacy/agent_cloud_link.h \
aclk/legacy/aclk_query.c \
@ -596,9 +602,12 @@ ACLK_FILES += \
aclk/legacy/aclk_lws_wss_client.h \
aclk/legacy/aclk_lws_https_client.c \
aclk/legacy/aclk_lws_https_client.h \
aclk/legacy/aclk_common.c \
aclk/legacy/aclk_common.h \
aclk/legacy/aclk_stats.c \
aclk/legacy/aclk_stats.h \
$(NULL)
endif #ENABLE_ACLK
endif #ACLK_NG
endif #ACLK_LEGACY
SPAWN_PLUGIN_FILES = \
spawn/spawn.c \
@ -710,7 +719,10 @@ NETDATA_FILES = \
$(WEB_PLUGIN_FILES) \
$(CLAIM_FILES) \
$(PARSER_FILES) \
$(ACLK_FILES) \
$(ACLK_ALWAYS_BUILD_FILES) \
$(ACLK_COMMON_FILES) \
$(ACLK_LEGACY_FILES) \
$(ACLK_NG_FILES) \
$(SPAWN_PLUGIN_FILES) \
$(NULL)
@ -770,16 +782,14 @@ netdata_LDADD = \
$(NETDATA_COMMON_LIBS) \
$(NULL)
if !ACLK_NG
if ENABLE_ACLK
if ACLK_LEGACY
netdata_LDADD += \
$(abs_top_srcdir)/externaldeps/mosquitto/libmosquitto.a \
$(OPTIONAL_LIBCAP_LIBS) \
$(OPTIONAL_LWS_LIBS) \
$(NETDATA_COMMON_LIBS) \
$(NULL)
endif #ENABLE_ACLK
endif #ACLK_NG
endif #ACLK_LEGACY
if ENABLE_CXX_LINKER
netdata_LINK = $(CXXLD) $(CXXFLAGS) $(LDFLAGS) -o $@

View File

@ -13,6 +13,8 @@
#include "aclk_collector_list.h"
#include "https_client.h"
#include "aclk_proxy.h"
#ifdef ACLK_LOG_CONVERSATION_DIR
#include <sys/types.h>
#include <sys/stat.h>
@ -21,19 +23,10 @@
#define ACLK_STABLE_TIMEOUT 3 // Minimum delay to mark AGENT as stable
//TODO remove most (as in 99.999999999%) of this crap
int aclk_connected = 0;
int aclk_disable_runtime = 0;
int aclk_disable_single_updates = 0;
int aclk_kill_link = 0;
int aclk_pubacks_per_conn = 0; // How many PubAcks we got since MQTT conn est.
time_t aclk_block_until = 0;
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
aclk_env_t *aclk_env = NULL;
mqtt_wss_client mqttwss_client;
@ -43,22 +36,12 @@ netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
struct aclk_shared_state aclk_shared_state = {
.agent_state = AGENT_INITIALIZING,
.agent_state = ACLK_HOST_INITIALIZING,
.last_popcorn_interrupt = 0,
.mqtt_shutdown_msg_id = -1,
.mqtt_shutdown_msg_rcvd = 0
};
void aclk_single_update_disable()
{
aclk_disable_single_updates = 1;
}
void aclk_single_update_enable()
{
aclk_disable_single_updates = 0;
}
//ENDTODO
static RSA *aclk_private_key = NULL;
@ -301,7 +284,7 @@ static int handle_connection(mqtt_wss_client client)
inline static int aclk_popcorn_check_bump()
{
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
aclk_shared_state.last_popcorn_interrupt = now_realtime_sec();
ACLK_SHARED_STATE_UNLOCK;
return 1;
@ -340,7 +323,7 @@ static inline void mqtt_connected_actions(mqtt_wss_client client)
aclk_pubacks_per_conn = 0;
ACLK_SHARED_STATE_LOCK;
if (aclk_shared_state.agent_state != AGENT_INITIALIZING) {
if (aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING) {
error("Sending `connect` payload immediately as popcorning was finished already.");
queue_connect_payloads();
}
@ -360,13 +343,13 @@ static int wait_popcorning_finishes(mqtt_wss_client client, struct aclk_query_th
int need_wait;
while (!netdata_exit) {
ACLK_SHARED_STATE_LOCK;
if (likely(aclk_shared_state.agent_state != AGENT_INITIALIZING)) {
if (likely(aclk_shared_state.agent_state != ACLK_HOST_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
elapsed = now_realtime_sec() - aclk_shared_state.last_popcorn_interrupt;
if (elapsed >= ACLK_STABLE_TIMEOUT) {
aclk_shared_state.agent_state = AGENT_STABLE;
aclk_shared_state.agent_state = ACLK_HOST_STABLE;
ACLK_SHARED_STATE_UNLOCK;
error("ACLK localhost popocorn finished");
if (unlikely(!query_threads->thread_list))
@ -721,10 +704,10 @@ exit:
// fix this in both old and new ACLK
extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
void aclk_alarm_reload(void)
void ng_aclk_alarm_reload(void)
{
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return;
}
@ -733,7 +716,7 @@ void aclk_alarm_reload(void)
aclk_queue_query(aclk_query_new(METADATA_ALARMS));
}
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
int ng_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
BUFFER *local_buffer;
json_object *msg;
@ -742,7 +725,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
return 0;
ACLK_SHARED_STATE_LOCK;
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {
if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {
ACLK_SHARED_STATE_UNLOCK;
return 0;
}
@ -764,7 +747,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
return 0;
}
int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create)
{
struct aclk_query *query;
@ -788,7 +771,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
* Add a new collector to the list
* If it exists, update the chart count
*/
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct aclk_query *query;
struct _collector *tmp_collector;
@ -831,7 +814,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu
* This function will release the memory used and schedule
* a cloud update
*/
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct aclk_query *query;
struct _collector *tmp_collector;
@ -871,27 +854,3 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu
query->data.metadata_alarms.initial_on_connect = 0;
aclk_queue_query(query);
}
struct label *add_aclk_host_labels(struct label *label) {
#ifdef ENABLE_ACLK
ACLK_PROXY_TYPE aclk_proxy;
char *proxy_str;
aclk_get_proxy(&aclk_proxy);
switch(aclk_proxy) {
case PROXY_TYPE_SOCKS5:
proxy_str = "SOCKS5";
break;
case PROXY_TYPE_HTTP:
proxy_str = "HTTP";
break;
default:
proxy_str = "none";
break;
}
label = add_label_to_list(label, "_aclk_impl", "Next Generation", LABEL_SOURCE_AUTO);
return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
#else
return label;
#endif
}

View File

@ -2,57 +2,24 @@
#ifndef ACLK_H
#define ACLK_H
typedef struct aclk_rrdhost_state {
char *claimed_id; // Claimed ID if host has one otherwise NULL
} aclk_rrdhost_state;
#include "daemon/common.h"
#include "aclk_util.h"
// version for aclk legacy (old cloud arch)
#define ACLK_VERSION 2
// Define ACLK Feature Version Boundaries Here
#define ACLK_V_COMPRESSION 2
#include "aclk_rrdhost_state.h"
// How many MQTT PUBACKs we need to get to consider connection
// stable for the purposes of TBEB (truncated binary exponential backoff)
#define ACLK_PUBACKS_CONN_STABLE 3
// TODO get rid of this shit
extern int aclk_disable_runtime;
extern int aclk_disable_single_updates;
extern int aclk_kill_link;
extern int aclk_connected;
extern time_t aclk_block_until;
extern usec_t aclk_session_us;
extern time_t aclk_session_sec;
extern aclk_env_t *aclk_env;
void *aclk_main(void *ptr);
void aclk_single_update_disable();
void aclk_single_update_enable();
#define NETDATA_ACLK_HOOK \
{ .name = "ACLK_Main", \
.config_section = NULL, \
.config_name = NULL, \
.enabled = 1, \
.thread = NULL, \
.init_routine = NULL, \
.start_routine = aclk_main },
extern netdata_mutex_t aclk_shared_state_mutex;
#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
typedef enum aclk_agent_state {
AGENT_INITIALIZING,
AGENT_STABLE
} ACLK_AGENT_STATE;
extern struct aclk_shared_state {
ACLK_AGENT_STATE agent_state;
time_t last_popcorn_interrupt;
@ -65,20 +32,15 @@ extern struct aclk_shared_state {
int mqtt_shutdown_msg_rcvd;
} aclk_shared_state;
void aclk_alarm_reload(void);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void ng_aclk_alarm_reload(void);
int ng_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
// TODO this is for bacward compatibility with ACLK legacy
#define ACLK_CMD_CHART 1
#define ACLK_CMD_CHARTDEL 0
/* Informs ACLK about created/deleted chart
* @param create 0 - if chart was deleted, other if chart created
*/
int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
int ng_aclk_update_chart(RRDHOST *host, char *chart_name, int create);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
struct label *add_aclk_host_labels(struct label *label);
void ng_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void ng_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
#endif /* ACLK_H */

179
aclk/aclk_api.c Normal file
View File

@ -0,0 +1,179 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "libnetdata/libnetdata.h"
#include "database/rrd.h"
#ifdef ACLK_NG
#include "aclk.h"
#endif
#ifdef ACLK_LEGACY
#include "legacy/agent_cloud_link.h"
#endif
int aclk_connected = 0;
int aclk_kill_link = 0;
usec_t aclk_session_us = 0;
time_t aclk_session_sec = 0;
int aclk_disable_runtime = 0;
int aclk_disable_single_updates = 0;
int aclk_stats_enabled;
#ifdef ACLK_LEGACY
int aclk_ng = 0;
#else
int aclk_ng = 1;
#endif
#define ACLK_IMPL_KEY_NAME "aclk implementation"
#ifdef ENABLE_ACLK
void *aclk_starter(void *ptr) {
char *aclk_impl_req = config_get(CONFIG_SECTION_CLOUD, ACLK_IMPL_KEY_NAME, "legacy");
if (!strcasecmp(aclk_impl_req, "ng")) {
aclk_ng = 1;
} else if (!strcasecmp(aclk_impl_req, "legacy")) {
aclk_ng = 0;
} else {
error("Unknown value \"%s\" of key \"" ACLK_IMPL_KEY_NAME "\" in section \"" CONFIG_SECTION_CLOUD "\". Using ACLK %s.", aclk_impl_req, aclk_ng ? "NG" : "Legacy");
}
#ifndef ACLK_NG
if (aclk_ng) {
error("Configuration requests ACLK-NG but it is not available in this agent. Switching to Legacy.");
aclk_ng = 0;
}
#endif
#ifndef ACLK_LEGACY
if (!aclk_ng) {
error("Configuration requests ACLK Legacy but it is not available in this agent. Switching to NG.");
aclk_ng = 1;
}
#endif
#ifdef ACLK_NG
if (aclk_ng)
return aclk_main(ptr);
#endif
#ifdef ACLK_LEGACY
if (!aclk_ng)
return legacy_aclk_main(ptr);
#endif
error_report("No ACLK could be started");
return NULL;
}
void aclk_single_update_disable()
{
aclk_disable_single_updates = 1;
}
void aclk_single_update_enable()
{
aclk_disable_single_updates = 0;
}
void aclk_alarm_reload(void)
{
#ifdef ACLK_NG
if (aclk_ng)
ng_aclk_alarm_reload();
#endif
#ifdef ACLK_LEGACY
if (!aclk_ng)
legacy_aclk_alarm_reload();
#endif
}
int aclk_update_chart(RRDHOST *host, char *chart_name, int create)
{
#ifdef ACLK_NG
if (aclk_ng)
return ng_aclk_update_chart(host, chart_name, create);
#endif
#ifdef ACLK_LEGACY
if (!aclk_ng)
return legacy_aclk_update_chart(host, chart_name, create);
#endif
error_report("No usable aclk_update_chart implementation");
return 1;
}
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
#ifdef ACLK_NG
if (aclk_ng)
return ng_aclk_update_alarm(host, ae);
#endif
#ifdef ACLK_LEGACY
if (!aclk_ng)
return legacy_aclk_update_alarm(host, ae);
#endif
error_report("No usable aclk_update_alarm implementation");
return 1;
}
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
#ifdef ACLK_NG
if (aclk_ng)
return ng_aclk_add_collector(host, plugin_name, module_name);
#endif
#ifdef ACLK_LEGACY
if (!aclk_ng)
return legacy_aclk_add_collector(host, plugin_name, module_name);
#endif
error_report("No usable aclk_add_collector implementation");
}
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
#ifdef ACLK_NG
if (aclk_ng)
return ng_aclk_del_collector(host, plugin_name, module_name);
#endif
#ifdef ACLK_LEGACY
if (!aclk_ng)
return legacy_aclk_del_collector(host, plugin_name, module_name);
#endif
error_report("No usable aclk_del_collector implementation");
}
#endif /* ENABLE_ACLK */
struct label *add_aclk_host_labels(struct label *label) {
#ifdef ACLK_NG
label = add_label_to_list(label, "_aclk_ng_available", "true", LABEL_SOURCE_AUTO);
#else
label = add_label_to_list(label, "_aclk_ng_available", "false", LABEL_SOURCE_AUTO);
#endif
#ifdef ACLK_LEGACY
label = add_label_to_list(label, "_aclk_legacy_available", "true", LABEL_SOURCE_AUTO);
#else
label = add_label_to_list(label, "_aclk_legacy_available", "false", LABEL_SOURCE_AUTO);
#endif
#ifdef ENABLE_ACLK
ACLK_PROXY_TYPE aclk_proxy;
char *proxy_str;
aclk_get_proxy(&aclk_proxy);
switch(aclk_proxy) {
case PROXY_TYPE_SOCKS5:
proxy_str = "SOCKS5";
break;
case PROXY_TYPE_HTTP:
proxy_str = "HTTP";
break;
default:
proxy_str = "none";
break;
}
label = add_label_to_list(label, "_aclk_impl", aclk_ng ? "Next Generation" : "Legacy", LABEL_SOURCE_AUTO);
label = add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
#endif
return label;
}

51
aclk/aclk_api.h Normal file
View File

@ -0,0 +1,51 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef ACLK_API_H
#define ACLK_API_H
#include "libnetdata/libnetdata.h"
#include "aclk_proxy.h"
// TODO get rid global vars as soon as
// ACLK Legacy is removed
extern int aclk_connected;
extern int aclk_kill_link;
extern usec_t aclk_session_us;
extern time_t aclk_session_sec;
extern int aclk_disable_runtime;
extern int aclk_disable_single_updates;
extern int aclk_stats_enabled;
extern int aclk_ng;
#ifdef ENABLE_ACLK
void *aclk_starter(void *ptr);
void aclk_single_update_disable();
void aclk_single_update_enable();
void aclk_alarm_reload(void);
int aclk_update_chart(RRDHOST *host, char *chart_name, int create);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
#define NETDATA_ACLK_HOOK \
{ .name = "ACLK_Main", \
.config_section = NULL, \
.config_name = NULL, \
.enabled = 1, \
.thread = NULL, \
.init_routine = NULL, \
.start_routine = aclk_starter },
#endif
struct label *add_aclk_host_labels(struct label *label);
#endif /* ACLK_API_H */

View File

@ -31,6 +31,8 @@ struct _collector {
struct _collector *next;
};
extern struct _collector *collector_list;
struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name);
struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name);
void _reset_collector_list();

View File

@ -2,6 +2,8 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#include "aclk_otp.h"
#include "aclk_util.h"
#include "aclk.h"
#include "daemon/common.h"

View File

@ -6,6 +6,7 @@
#include "daemon/common.h"
#include "https_client.h"
#include "aclk_util.h"
int aclk_get_mqtt_otp(RSA *p_key, char **mqtt_id, char **mqtt_usr, char **mqtt_pass, url_t *target);
int aclk_get_env(aclk_env_t *env, const char *aclk_hostname, int aclk_port);

186
aclk/aclk_proxy.c Normal file
View File

@ -0,0 +1,186 @@
#include "aclk_proxy.h"
#include "daemon/common.h"
#define ACLK_PROXY_ENV "env"
#define ACLK_PROXY_CONFIG_VAR "proxy"
struct {
ACLK_PROXY_TYPE type;
const char *url_str;
} supported_proxy_types[] = {
{ .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_UNKNOWN, .url_str = NULL },
};
const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type)
{
switch (*type) {
case PROXY_DISABLED:
return "disabled";
case PROXY_TYPE_HTTP:
return "HTTP";
case PROXY_TYPE_SOCKS5:
return "SOCKS";
default:
return "Unknown";
}
}
static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string)
{
int i = 0;
while (supported_proxy_types[i].url_str) {
if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str)))
return supported_proxy_types[i].type;
i++;
}
return PROXY_TYPE_UNKNOWN;
}
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string)
{
if (!string)
return PROXY_TYPE_UNKNOWN;
while (*string == 0x20)
string++;
if (!*string)
return PROXY_TYPE_UNKNOWN;
return aclk_find_proxy(string);
}
// helper function to censor user&password
// for logging purposes
void safe_log_proxy_censor(char *proxy)
{
size_t length = strlen(proxy);
char *auth = proxy + length - 1;
char *cur;
while ((auth >= proxy) && (*auth != '@'))
auth--;
//if not found or @ is first char do nothing
if (auth <= proxy)
return;
cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
if (!cur)
cur = proxy;
else
cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
while (cur < auth) {
*cur = 'X';
cur++;
}
}
static inline void safe_log_proxy_error(char *str, const char *proxy)
{
char *log = strdupz(proxy);
safe_log_proxy_censor(log);
error("%s Provided Value:\"%s\"", str, log);
freez(log);
}
static inline int check_socks_enviroment(const char **proxy)
{
char *tmp = getenv("socks_proxy");
if (!tmp)
return 1;
if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) {
*proxy = tmp;
return 0;
}
safe_log_proxy_error(
"Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
tmp);
return 1;
}
static inline int check_http_enviroment(const char **proxy)
{
char *tmp = getenv("http_proxy");
if (!tmp)
return 1;
if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) {
*proxy = tmp;
return 0;
}
safe_log_proxy_error(
"Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".",
tmp);
return 1;
}
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type)
{
const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
*type = PROXY_DISABLED;
if (strcmp(proxy, "none") == 0)
return proxy;
if (strcmp(proxy, ACLK_PROXY_ENV) == 0) {
if (check_socks_enviroment(&proxy) == 0) {
#ifdef LWS_WITH_SOCKS5
*type = PROXY_TYPE_SOCKS5;
return proxy;
#else
safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy "
"but Libwebsockets used doesn't have SOCKS5 support built in. "
"Ignoring and checking for other options.",
proxy);
#endif
}
if (check_http_enviroment(&proxy) == 0)
*type = PROXY_TYPE_HTTP;
return proxy;
}
*type = aclk_verify_proxy(proxy);
#ifndef LWS_WITH_SOCKS5
if (*type == PROXY_TYPE_SOCKS5) {
safe_log_proxy_error(
"Config var \"" ACLK_PROXY_CONFIG_VAR
"\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.",
proxy);
}
#endif
if (*type == PROXY_TYPE_UNKNOWN) {
*type = PROXY_DISABLED;
safe_log_proxy_error(
"Config var \"" ACLK_PROXY_CONFIG_VAR
"\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
proxy);
}
return proxy;
}
// helper function to read settings only once (static)
// as claiming, challenge/response and ACLK
// read the same thing, no need to parse again
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type)
{
static const char *proxy = NULL;
static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET;
if (proxy_type == PROXY_NOT_SET)
proxy = aclk_lws_wss_get_proxy_setting(&proxy_type);
*type = proxy_type;
return proxy;
}

22
aclk/aclk_proxy.h Normal file
View File

@ -0,0 +1,22 @@
#ifndef ACLK_PROXY_H
#define ACLK_PROXY_H
#include <config.h>
#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://"
typedef enum aclk_proxy_type {
PROXY_TYPE_UNKNOWN = 0,
PROXY_TYPE_SOCKS5,
PROXY_TYPE_HTTP,
PROXY_DISABLED,
PROXY_NOT_SET,
} ACLK_PROXY_TYPE;
const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
void safe_log_proxy_censor(char *proxy);
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
#endif /* ACLK_PROXY_H */

View File

@ -3,6 +3,7 @@
#include "libnetdata/libnetdata.h"
#ifdef ACLK_LEGACY
typedef enum aclk_cmd {
ACLK_CMD_CLOUD,
ACLK_CMD_ONCONNECT,
@ -20,23 +21,24 @@ typedef enum aclk_metadata_state {
ACLK_METADATA_CMD_QUEUED,
ACLK_METADATA_SENT
} ACLK_METADATA_STATE;
#endif
typedef enum aclk_agent_state {
ACLK_HOST_INITIALIZING,
ACLK_HOST_STABLE
} ACLK_POPCORNING_STATE;
} ACLK_AGENT_STATE;
typedef struct aclk_rrdhost_state {
char *claimed_id; // Claimed ID if host has one otherwise NULL
#ifdef ENABLE_ACLK
#ifdef ACLK_LEGACY
// per child popcorning
ACLK_POPCORNING_STATE state;
ACLK_AGENT_STATE state;
ACLK_METADATA_STATE metadata;
time_t timestamp_created;
time_t t_last_popcorn_update;
#endif /* ENABLE_ACLK */
#endif /* ACLK_LEGACY */
} aclk_rrdhost_state;
#endif /* ACLK_RRDHOST_STATE_H */

View File

@ -4,10 +4,13 @@
#include "aclk_stats.h"
#include "aclk_query_queue.h"
#include "aclk.h"
#define ACLK_V2_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
#define ACLK_CLOUD_REQ_V2_PREFIX "GET /api/v1/"
#define ACLK_V_COMPRESSION 2
struct aclk_request {
char *type_id;
char *msg_id;
@ -18,7 +21,7 @@ struct aclk_request {
int max_version;
};
int cloud_to_agent_parse(JSON_ENTRY *e)
static int cloud_to_agent_parse(JSON_ENTRY *e)
{
struct aclk_request *data = e->callback_data;
@ -108,7 +111,7 @@ static inline int aclk_v2_payload_get_query(const char *payload, char **query_ur
}
#define HTTP_CHECK_AGENT_INITIALIZED() ACLK_SHARED_STATE_LOCK;\
if (unlikely(aclk_shared_state.agent_state == AGENT_INITIALIZING)) {\
if (unlikely(aclk_shared_state.agent_state == ACLK_HOST_INITIALIZING)) {\
debug(D_ACLK, "Ignoring \"http\" cloud request; agent not in stable state");\
ACLK_SHARED_STATE_UNLOCK;\
return 1;\

View File

@ -4,8 +4,6 @@
netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
int aclk_stats_enabled;
int query_thread_count;
// data ACLK stats need per query thread

View File

@ -13,8 +13,6 @@ extern netdata_mutex_t aclk_stats_mutex;
#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex)
#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex)
extern int aclk_stats_enabled;
struct aclk_stats_thread {
netdata_thread_t *thread;
int query_thread_count;

View File

@ -4,11 +4,15 @@
#include "daemon/common.h"
#include "aclk_util.h"
#include "aclk_stats.h"
#include "aclk.h"
#ifndef __GNUC__
#pragma region aclk_tx_msgs helper functions
#endif
// version for aclk legacy (old cloud arch)
#define ACLK_VERSION 2
static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
{
uint16_t packet_id;

View File

@ -2,8 +2,6 @@
#include "aclk_util.h"
#include <stdio.h>
#include "daemon/common.h"
// CentOS 7 has older version that doesn't define this
@ -315,189 +313,6 @@ unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, un
return delay;
}
#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://"
#define ACLK_PROXY_ENV "env"
#define ACLK_PROXY_CONFIG_VAR "proxy"
struct {
ACLK_PROXY_TYPE type;
const char *url_str;
} supported_proxy_types[] = {
{ .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_UNKNOWN, .url_str = NULL },
};
const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type)
{
switch (*type) {
case PROXY_DISABLED:
return "disabled";
case PROXY_TYPE_HTTP:
return "HTTP";
case PROXY_TYPE_SOCKS5:
return "SOCKS";
default:
return "Unknown";
}
}
static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string)
{
int i = 0;
while (supported_proxy_types[i].url_str) {
if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str)))
return supported_proxy_types[i].type;
i++;
}
return PROXY_TYPE_UNKNOWN;
}
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string)
{
if (!string)
return PROXY_TYPE_UNKNOWN;
while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove)
string++;
if (!*string)
return PROXY_TYPE_UNKNOWN;
return aclk_find_proxy(string);
}
// helper function to censor user&password
// for logging purposes
void safe_log_proxy_censor(char *proxy)
{
size_t length = strlen(proxy);
char *auth = proxy + length - 1;
char *cur;
while ((auth >= proxy) && (*auth != '@'))
auth--;
//if not found or @ is first char do nothing
if (auth <= proxy)
return;
cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
if (!cur)
cur = proxy;
else
cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
while (cur < auth) {
*cur = 'X';
cur++;
}
}
static inline void safe_log_proxy_error(char *str, const char *proxy)
{
char *log = strdupz(proxy);
safe_log_proxy_censor(log);
error("%s Provided Value:\"%s\"", str, log);
freez(log);
}
static inline int check_socks_enviroment(const char **proxy)
{
char *tmp = getenv("socks_proxy");
if (!tmp)
return 1;
if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) {
*proxy = tmp;
return 0;
}
safe_log_proxy_error(
"Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
tmp);
return 1;
}
static inline int check_http_enviroment(const char **proxy)
{
char *tmp = getenv("http_proxy");
if (!tmp)
return 1;
if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) {
*proxy = tmp;
return 0;
}
safe_log_proxy_error(
"Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".",
tmp);
return 1;
}
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type)
{
const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
*type = PROXY_DISABLED;
if (strcmp(proxy, "none") == 0)
return proxy;
if (strcmp(proxy, ACLK_PROXY_ENV) == 0) {
if (check_socks_enviroment(&proxy) == 0) {
#ifdef LWS_WITH_SOCKS5
*type = PROXY_TYPE_SOCKS5;
return proxy;
#else
safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy "
"but Libwebsockets used doesn't have SOCKS5 support built in. "
"Ignoring and checking for other options.",
proxy);
#endif
}
if (check_http_enviroment(&proxy) == 0)
*type = PROXY_TYPE_HTTP;
return proxy;
}
*type = aclk_verify_proxy(proxy);
#ifndef LWS_WITH_SOCKS5
if (*type == PROXY_TYPE_SOCKS5) {
safe_log_proxy_error(
"Config var \"" ACLK_PROXY_CONFIG_VAR
"\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.",
proxy);
}
#endif
if (*type == PROXY_TYPE_UNKNOWN) {
*type = PROXY_DISABLED;
safe_log_proxy_error(
"Config var \"" ACLK_PROXY_CONFIG_VAR
"\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
proxy);
}
return proxy;
}
// helper function to read settings only once (static)
// as claiming, challenge/response and ACLK
// read the same thing, no need to parse again
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type)
{
static const char *proxy = NULL;
static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET;
if (proxy_type == PROXY_NOT_SET)
proxy = aclk_lws_wss_get_proxy_setting(&proxy_type);
*type = proxy_type;
return proxy;
}
#define HTTP_PROXY_PREFIX "http://"
void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type)

View File

@ -78,20 +78,6 @@ int aclk_get_conv_log_next();
unsigned long int aclk_tbeb_delay(int reset, int base, unsigned long int min, unsigned long int max);
#define aclk_tbeb_reset(x) aclk_tbeb_delay(1, 0, 0, 0)
typedef enum aclk_proxy_type {
PROXY_TYPE_UNKNOWN = 0,
PROXY_TYPE_SOCKS5,
PROXY_TYPE_HTTP,
PROXY_DISABLED,
PROXY_NOT_SET,
} ACLK_PROXY_TYPE;
const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
void safe_log_proxy_censor(char *proxy);
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
void aclk_set_proxy(char **ohost, int *port, enum mqtt_wss_proxy_type *type);
#endif /* ACLK_UTIL_H */

View File

@ -6,196 +6,13 @@
#include <libwebsockets.h>
#endif
netdata_mutex_t aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
netdata_mutex_t legacy_aclk_shared_state_mutex = NETDATA_MUTEX_INITIALIZER;
int aclk_disable_runtime = 0;
int aclk_kill_link = 0;
struct aclk_shared_state aclk_shared_state = {
struct legacy_aclk_shared_state legacy_aclk_shared_state = {
.version_neg = 0,
.version_neg_wait_till = 0
};
struct {
ACLK_PROXY_TYPE type;
const char *url_str;
} supported_proxy_types[] = {
{ .type = PROXY_TYPE_SOCKS5, .url_str = "socks5" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_SOCKS5, .url_str = "socks5h" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_HTTP, .url_str = "http" ACLK_PROXY_PROTO_ADDR_SEPARATOR },
{ .type = PROXY_TYPE_UNKNOWN, .url_str = NULL },
};
const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type)
{
switch (*type) {
case PROXY_DISABLED:
return "disabled";
case PROXY_TYPE_HTTP:
return "HTTP";
case PROXY_TYPE_SOCKS5:
return "SOCKS";
default:
return "Unknown";
}
}
static inline ACLK_PROXY_TYPE aclk_find_proxy(const char *string)
{
int i = 0;
while (supported_proxy_types[i].url_str) {
if (!strncmp(supported_proxy_types[i].url_str, string, strlen(supported_proxy_types[i].url_str)))
return supported_proxy_types[i].type;
i++;
}
return PROXY_TYPE_UNKNOWN;
}
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string)
{
if (!string)
return PROXY_TYPE_UNKNOWN;
while (*string == 0x20 && *string!=0) // Help coverity (compiler will remove)
string++;
if (!*string)
return PROXY_TYPE_UNKNOWN;
return aclk_find_proxy(string);
}
// helper function to censor user&password
// for logging purposes
void safe_log_proxy_censor(char *proxy)
{
size_t length = strlen(proxy);
char *auth = proxy + length - 1;
char *cur;
while ((auth >= proxy) && (*auth != '@'))
auth--;
//if not found or @ is first char do nothing
if (auth <= proxy)
return;
cur = strstr(proxy, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
if (!cur)
cur = proxy;
else
cur += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
while (cur < auth) {
*cur = 'X';
cur++;
}
}
static inline void safe_log_proxy_error(char *str, const char *proxy)
{
char *log = strdupz(proxy);
safe_log_proxy_censor(log);
error("%s Provided Value:\"%s\"", str, log);
freez(log);
}
static inline int check_socks_environment(const char **proxy)
{
char *tmp = getenv("socks_proxy");
if (!tmp)
return 1;
if (aclk_verify_proxy(tmp) == PROXY_TYPE_SOCKS5) {
*proxy = tmp;
return 0;
}
safe_log_proxy_error(
"Environment var \"socks_proxy\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
tmp);
return 1;
}
static inline int check_http_environment(const char **proxy)
{
char *tmp = getenv("http_proxy");
if (!tmp)
return 1;
if (aclk_verify_proxy(tmp) == PROXY_TYPE_HTTP) {
*proxy = tmp;
return 0;
}
safe_log_proxy_error(
"Environment var \"http_proxy\" defined but of unknown format. Supported syntax: \"http[s]://[user:pass@]host:ip\".",
tmp);
return 1;
}
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type)
{
const char *proxy = config_get(CONFIG_SECTION_CLOUD, ACLK_PROXY_CONFIG_VAR, ACLK_PROXY_ENV);
*type = PROXY_DISABLED;
if (strcmp(proxy, "none") == 0)
return proxy;
if (strcmp(proxy, ACLK_PROXY_ENV) == 0) {
if (check_socks_environment(&proxy) == 0) {
#ifdef LWS_WITH_SOCKS5
*type = PROXY_TYPE_SOCKS5;
return proxy;
#else
safe_log_proxy_error("socks_proxy environment variable set to use SOCKS5 proxy "
"but Libwebsockets used doesn't have SOCKS5 support built in. "
"Ignoring and checking for other options.",
proxy);
#endif
}
if (check_http_environment(&proxy) == 0)
*type = PROXY_TYPE_HTTP;
return proxy;
}
*type = aclk_verify_proxy(proxy);
#ifndef LWS_WITH_SOCKS5
if (*type == PROXY_TYPE_SOCKS5) {
safe_log_proxy_error(
"Config var \"" ACLK_PROXY_CONFIG_VAR
"\" set to use SOCKS5 proxy but Libwebsockets used is built without support for SOCKS proxy. ACLK will be disabled.",
proxy);
}
#endif
if (*type == PROXY_TYPE_UNKNOWN) {
*type = PROXY_DISABLED;
safe_log_proxy_error(
"Config var \"" ACLK_PROXY_CONFIG_VAR
"\" defined but of unknown format. Supported syntax: \"socks5[h]://[user:pass@]host:ip\".",
proxy);
}
return proxy;
}
// helper function to read settings only once (static)
// as claiming, challenge/response and ACLK
// read the same thing, no need to parse again
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type)
{
static const char *proxy = NULL;
static ACLK_PROXY_TYPE proxy_type = PROXY_NOT_SET;
if (proxy_type == PROXY_NOT_SET)
proxy = aclk_lws_wss_get_proxy_setting(&proxy_type);
*type = proxy_type;
return proxy;
}
int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port)
{
int pos = 0;
@ -234,27 +51,3 @@ int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port)
info("Setting ACLK target host=%s port=%d from %s", *aclk_hostname, *aclk_port, url);
return 0;
}
struct label *add_aclk_host_labels(struct label *label) {
#ifdef ENABLE_ACLK
ACLK_PROXY_TYPE aclk_proxy;
char *proxy_str;
aclk_get_proxy(&aclk_proxy);
switch(aclk_proxy) {
case PROXY_TYPE_SOCKS5:
proxy_str = "SOCKS5";
break;
case PROXY_TYPE_HTTP:
proxy_str = "HTTP";
break;
default:
proxy_str = "none";
break;
}
label = add_label_to_list(label, "_aclk_impl", "Legacy", LABEL_SOURCE_AUTO);
return add_label_to_list(label, "_aclk_proxy", proxy_str, LABEL_SOURCE_AUTO);
#else
return label;
#endif
}

View File

@ -1,12 +1,12 @@
#ifndef ACLK_COMMON_H
#define ACLK_COMMON_H
#include "aclk_rrdhost_state.h"
#include "../aclk_rrdhost_state.h"
#include "daemon/common.h"
extern netdata_mutex_t aclk_shared_state_mutex;
#define ACLK_SHARED_STATE_LOCK netdata_mutex_lock(&aclk_shared_state_mutex)
#define ACLK_SHARED_STATE_UNLOCK netdata_mutex_unlock(&aclk_shared_state_mutex)
extern netdata_mutex_t legacy_aclk_shared_state_mutex;
#define legacy_aclk_shared_state_LOCK netdata_mutex_lock(&legacy_aclk_shared_state_mutex)
#define legacy_aclk_shared_state_UNLOCK netdata_mutex_unlock(&legacy_aclk_shared_state_mutex)
// minimum and maximum supported version of ACLK
// in this version of agent
@ -33,7 +33,7 @@ extern netdata_mutex_t aclk_shared_state_mutex;
#define ACLK_IS_HOST_INITIALIZING(host) (host->aclk_state.state == ACLK_HOST_INITIALIZING)
#define ACLK_IS_HOST_POPCORNING(host) (ACLK_IS_HOST_INITIALIZING(host) && host->aclk_state.t_last_popcorn_update)
extern struct aclk_shared_state {
extern struct legacy_aclk_shared_state {
// optimization to avoid looping trough hosts
// every time Query Thread wakes up
RRDHOST *next_popcorn_host;
@ -42,31 +42,10 @@ extern struct aclk_shared_state {
// protect by lock otherwise
int version_neg;
usec_t version_neg_wait_till;
} aclk_shared_state;
typedef enum aclk_proxy_type {
PROXY_TYPE_UNKNOWN = 0,
PROXY_TYPE_SOCKS5,
PROXY_TYPE_HTTP,
PROXY_DISABLED,
PROXY_NOT_SET,
} ACLK_PROXY_TYPE;
extern int aclk_kill_link; // Tells the agent to tear down the link
extern int aclk_disable_runtime;
} legacy_aclk_shared_state;
const char *aclk_proxy_type_to_s(ACLK_PROXY_TYPE *type);
#define ACLK_PROXY_PROTO_ADDR_SEPARATOR "://"
#define ACLK_PROXY_ENV "env"
#define ACLK_PROXY_CONFIG_VAR "proxy"
ACLK_PROXY_TYPE aclk_verify_proxy(const char *string);
const char *aclk_lws_wss_get_proxy_setting(ACLK_PROXY_TYPE *type);
void safe_log_proxy_censor(char *proxy);
int aclk_decode_base_url(char *url, char **aclk_hostname, int *aclk_port);
const char *aclk_get_proxy(ACLK_PROXY_TYPE *type);
struct label *add_aclk_host_labels(struct label *label);
#endif //ACLK_COMMON_H

View File

@ -2,13 +2,7 @@
#define ACLK_LWS_HTTPS_CLIENT_INTERNAL
#include "aclk_lws_https_client.h"
#ifndef ACLK_NG
#include "aclk_common.h"
#else
#include "../aclk.h"
#endif
#include "aclk_lws_wss_client.h"
#define SMALL_BUFFER 16

View File

@ -6,6 +6,7 @@
#include "daemon/common.h"
#include "aclk_common.h"
#include "aclk_stats.h"
#include "../aclk_proxy.h"
extern int aclk_shutting_down;
@ -450,9 +451,9 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas
if (n>=0) {
data->written += n;
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.write_q_consumed += n;
ACLK_STATS_UNLOCK;
LEGACY_ACLK_STATS_LOCK;
legacy_aclk_metrics_per_sample.write_q_consumed += n;
LEGACY_ACLK_STATS_UNLOCK;
}
}
//error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc);
@ -473,9 +474,9 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas
retval = 1;
aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.read_q_added += len;
ACLK_STATS_UNLOCK;
LEGACY_ACLK_STATS_LOCK;
legacy_aclk_metrics_per_sample.read_q_added += len;
LEGACY_ACLK_STATS_UNLOCK;
}
// to future myself -> do not call this while read lock is active as it will eventually
@ -553,9 +554,9 @@ int aclk_lws_wss_client_write(void *buf, size_t count)
aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.write_q_added += count;
ACLK_STATS_UNLOCK;
LEGACY_ACLK_STATS_LOCK;
legacy_aclk_metrics_per_sample.write_q_added += count;
LEGACY_ACLK_STATS_UNLOCK;
}
lws_callback_on_writable(engine_instance->lws_wsi);
@ -584,9 +585,9 @@ int aclk_lws_wss_client_read(void *buf, size_t count)
engine_instance->data_to_read = 0;
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.read_q_consumed += data_to_be_read;
ACLK_STATS_UNLOCK;
LEGACY_ACLK_STATS_LOCK;
legacy_aclk_metrics_per_sample.read_q_consumed += data_to_be_read;
LEGACY_ACLK_STATS_UNLOCK;
}
abort:

View File

@ -2,15 +2,16 @@
#include "aclk_query.h"
#include "aclk_stats.h"
#include "aclk_rx_msgs.h"
#include "agent_cloud_link.h"
#define WEB_HDR_ACCEPT_ENC "Accept-Encoding:"
pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
#define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
#define ACLK_QUERY_THREAD_NAME "ACLK_Query"
volatile int aclk_connected = 0;
pthread_cond_t legacy_query_cond_wait = PTHREAD_COND_INITIALIZER;
pthread_mutex_t legacy_query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
#define LEGACY_QUERY_THREAD_LOCK pthread_mutex_lock(&legacy_query_lock_wait)
#define LEGACY_QUERY_THREAD_UNLOCK pthread_mutex_unlock(&legacy_query_lock_wait)
#ifndef __GNUC__
#pragma region ACLK_QUEUE
@ -188,7 +189,7 @@ aclk_query_find(char *topic, void *data, char *msg_id, char *query, ACLK_CMD cmd
* Add a query to execute, the result will be send to the specified topic
*/
int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
int legacy_aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run_after, int internal, ACLK_CMD aclk_cmd)
{
struct aclk_query *new_query, *tmp_query;
@ -205,7 +206,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
if (unlikely(tmp_query)) {
if (tmp_query->run_after == run_after) {
ACLK_QUEUE_UNLOCK;
QUERY_THREAD_WAKEUP;
LEGACY_QUERY_THREAD_WAKEUP;
return 0;
}
@ -220,9 +221,9 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
}
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_queued++;
ACLK_STATS_UNLOCK;
LEGACY_ACLK_STATS_LOCK;
legacy_aclk_metrics_per_sample.queries_queued++;
LEGACY_ACLK_STATS_UNLOCK;
}
new_query = callocz(1, sizeof(struct aclk_query));
@ -255,7 +256,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
aclk_queue.aclk_query_tail = new_query;
aclk_queue.count++;
ACLK_QUEUE_UNLOCK;
QUERY_THREAD_WAKEUP;
LEGACY_QUERY_THREAD_WAKEUP;
return 0;
}
@ -264,7 +265,7 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
aclk_queue.count++;
ACLK_QUEUE_UNLOCK;
QUERY_THREAD_WAKEUP;
LEGACY_QUERY_THREAD_WAKEUP;
return 0;
}
@ -332,12 +333,12 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli
static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created)
{
usec_t t = now_boottime_usec();
aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created);
legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_recvd_to_processed, t - q_created);
w->response.code = web_client_api_request_v1(host, w, url);
t = now_boottime_usec() - t;
aclk_metric_mat_update(&aclk_metrics_per_sample.cloud_q_db_query_time, t);
legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.cloud_q_db_query_time, t);
return t;
}
@ -375,7 +376,7 @@ static int aclk_execute_query(struct aclk_query *this_query)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
char *encoded_response = aclk_encode_response(w->response.data->buffer, w->response.data->len, 0);
char *encoded_header = aclk_encode_response(w->response.header_output->buffer, w->response.header_output->len, 1);
@ -510,7 +511,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
local_buffer->contenttype = CT_APPLICATION_JSON;
aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, aclk_shared_state.version_neg);
aclk_create_header(local_buffer, "http", this_query->msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_sprintf(local_buffer, ",\"t-exec\": %llu,\"t-rx\": %llu,\"http-code\": %d", t, this_query->created, w->response.code);
buffer_strcat(local_buffer, "}\x0D\x0A\x0D\x0A");
buffer_strcat(local_buffer, w->response.header_output->buffer);
@ -607,7 +608,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
case ACLK_CMD_ONCONNECT:
ACLK_HOST_PTR_COMPULSORY("ACLK_CMD_ONCONNECT");
#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
if (host != localhost && aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
if (host != localhost && legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE) {
error("We are not allowed to send connect message in ACLK version before %d", ACLK_V_CHILDRENSTATE);
break;
}
@ -638,7 +639,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
debug(D_ACLK, "EXECUTING a chart delete command");
//TODO: This send the info metadata for now
aclk_send_info_metadata(ACLK_METADATA_SENT, host);
legacy_aclk_send_info_metadata(ACLK_METADATA_SENT, host);
break;
case ACLK_CMD_ALARM:
@ -673,10 +674,10 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
debug(D_ACLK, "Query #%ld (%s) done", query_count, this_query->topic);
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.queries_dispatched++;
aclk_queries_per_thread[t_info->idx]++;
ACLK_STATS_UNLOCK;
LEGACY_ACLK_STATS_LOCK;
legacy_aclk_metrics_per_sample.queries_dispatched++;
legacy_aclk_queries_per_thread[t_info->idx]++;
LEGACY_ACLK_STATS_UNLOCK;
if (likely(getrusage_called_this_tick[t_info->idx] < MAX_GETRUSAGE_CALLS_PER_TICK)) {
getrusage(RUSAGE_THREAD, &rusage_per_thread[t_info->idx]);
@ -690,7 +691,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
return 1;
}
void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
{
if (query_threads && query_threads->thread_list) {
for (int i = 0; i < query_threads->count; i++) {
@ -708,7 +709,7 @@ void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads)
}
#define TASK_LEN_MAX 16
void aclk_query_threads_start(struct aclk_query_threads *query_threads)
void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads)
{
info("Starting %d query threads.", query_threads->count);
@ -717,10 +718,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
for (int i = 0; i < query_threads->count; i++) {
query_threads->thread_list[i].idx = i; //thread needs to know its index for statistics
if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_THREAD_NAME, i) < 0))
if(unlikely(snprintf(thread_name, TASK_LEN_MAX, "%s_%d", ACLK_QUERY_THREAD_NAME, i) < 0))
error("snprintf encoding error");
netdata_thread_create(
&query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, aclk_query_main_thread,
&query_threads->thread_list[i].thread, thread_name, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_query_main_thread,
&query_threads->thread_list[i]);
}
}
@ -730,10 +731,10 @@ void aclk_query_threads_start(struct aclk_query_threads *query_threads)
* returns actual/updated popcorning state
*/
ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host)
ACLK_AGENT_STATE aclk_host_popcorn_check(RRDHOST *host)
{
rrdhost_aclk_state_lock(host);
ACLK_POPCORNING_STATE ret = host->aclk_state.state;
ACLK_AGENT_STATE ret = host->aclk_state.state;
if (host->aclk_state.state != ACLK_HOST_INITIALIZING){
rrdhost_aclk_state_unlock(host);
return ret;
@ -766,7 +767,7 @@ ACLK_POPCORNING_STATE aclk_host_popcorn_check(RRDHOST *host)
* of no new collectors coming in in order to mark the agent
* as stable (set agent_state = AGENT_STABLE)
*/
void *aclk_query_main_thread(void *ptr)
void *legacy_aclk_query_main_thread(void *ptr)
{
struct aclk_query_thread *info = ptr;
@ -785,10 +786,10 @@ void *aclk_query_main_thread(void *ptr)
sleep(1);
continue;
}
ACLK_SHARED_STATE_LOCK;
if (unlikely(!aclk_shared_state.version_neg)) {
if (!aclk_shared_state.version_neg_wait_till || aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_LOCK;
if (unlikely(!legacy_aclk_shared_state.version_neg)) {
if (!legacy_aclk_shared_state.version_neg_wait_till || legacy_aclk_shared_state.version_neg_wait_till > now_monotonic_usec()) {
legacy_aclk_shared_state_UNLOCK;
info("Waiting for ACLK Version Negotiation message from Cloud");
sleep(1);
continue;
@ -796,14 +797,14 @@ void *aclk_query_main_thread(void *ptr)
errno = 0;
error("ACLK version negotiation failed. No reply to \"hello\" with \"version\" from cloud in time of %ds."
" Reverting to default ACLK version of %d.", VERSION_NEG_TIMEOUT, ACLK_VERSION_MIN);
aclk_shared_state.version_neg = ACLK_VERSION_MIN;
aclk_set_rx_handlers(aclk_shared_state.version_neg);
legacy_aclk_shared_state.version_neg = ACLK_VERSION_MIN;
aclk_set_rx_handlers(legacy_aclk_shared_state.version_neg);
}
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_UNLOCK;
rrdhost_aclk_state_lock(localhost);
if (unlikely(localhost->aclk_state.metadata == ACLK_METADATA_REQUIRED)) {
if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
rrdhost_aclk_state_unlock(localhost);
errno = 0;
error("ACLK failed to queue on_connect command");
@ -814,25 +815,25 @@ void *aclk_query_main_thread(void *ptr)
}
rrdhost_aclk_state_unlock(localhost);
ACLK_SHARED_STATE_LOCK;
if (aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
aclk_queue_query("on_connect", aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
aclk_shared_state.next_popcorn_host = NULL;
legacy_aclk_shared_state_LOCK;
if (legacy_aclk_shared_state.next_popcorn_host && aclk_host_popcorn_check(legacy_aclk_shared_state.next_popcorn_host) == ACLK_HOST_STABLE) {
legacy_aclk_queue_query("on_connect", legacy_aclk_shared_state.next_popcorn_host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT);
legacy_aclk_shared_state.next_popcorn_host = NULL;
aclk_update_next_child_to_popcorn();
}
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_UNLOCK;
while (aclk_process_query(info)) {
// Process all commands
};
QUERY_THREAD_LOCK;
LEGACY_QUERY_THREAD_LOCK;
// TODO: Need to check if there are queries awaiting already
if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
if (unlikely(pthread_cond_wait(&legacy_query_cond_wait, &legacy_query_lock_wait)))
sleep_usec(USEC_PER_SEC * 1);
QUERY_THREAD_UNLOCK;
LEGACY_QUERY_THREAD_UNLOCK;
}
return NULL;

View File

@ -10,14 +10,11 @@
#define MAX_GETRUSAGE_CALLS_PER_TICK 5 // Maximum number of times getrusage can be called per tick, per thread.
extern pthread_cond_t query_cond_wait;
extern pthread_mutex_t query_lock_wait;
extern pthread_cond_t legacy_query_cond_wait;
extern pthread_mutex_t legacy_query_lock_wait;
extern uint8_t *getrusage_called_this_tick;
#define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
#define QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&query_cond_wait)
extern volatile int aclk_connected;
#define LEGACY_QUERY_THREAD_WAKEUP pthread_cond_signal(&legacy_query_cond_wait)
#define LEGACY_QUERY_THREAD_WAKEUP_ALL pthread_cond_broadcast(&legacy_query_cond_wait)
struct aclk_query_thread {
netdata_thread_t thread;
int idx;
@ -34,11 +31,11 @@ struct aclk_cloud_req_v2 {
char *query_endpoint;
};
void *aclk_query_main_thread(void *ptr);
int aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd);
void *legacy_aclk_query_main_thread(void *ptr);
int legacy_aclk_queue_query(char *token, void *data, char *msg_type, char *query, int run_after, int internal, ACLK_CMD cmd);
void aclk_query_threads_start(struct aclk_query_threads *query_threads);
void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
void legacy_aclk_query_threads_start(struct aclk_query_threads *query_threads);
void legacy_aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
unsigned int aclk_query_size();
#endif //NETDATA_AGENT_CLOUD_LINK_H

View File

@ -4,6 +4,7 @@
#include "aclk_common.h"
#include "aclk_stats.h"
#include "aclk_query.h"
#include "agent_cloud_link.h"
#ifndef UUID_STR_LEN
#define UUID_STR_LEN 37
@ -107,7 +108,7 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha
error(
"Received \"http\" message from Cloud with version %d, but ACLK version %d is used",
cloud_to_agent->version,
aclk_shared_state.version_neg);
legacy_aclk_shared_state.version_neg);
return 1;
}
@ -126,14 +127,14 @@ static int aclk_handle_cloud_request_v1(struct aclk_request *cloud_to_agent, cha
return 1;
}
if (unlikely(aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
if (unlikely(legacy_aclk_queue_query(cloud_to_agent->callback_topic, NULL, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0, ACLK_CMD_CLOUD)))
debug(D_ACLK, "ACLK failed to queue incoming \"http\" message");
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.cloud_req_v1++;
aclk_metrics_per_sample.cloud_req_ok++;
ACLK_STATS_UNLOCK;
LEGACY_ACLK_STATS_LOCK;
legacy_aclk_metrics_per_sample.cloud_req_v1++;
legacy_aclk_metrics_per_sample.cloud_req_ok++;
LEGACY_ACLK_STATS_UNLOCK;
}
return 0;
@ -181,11 +182,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
}
// we do this here due to cloud_req being taken over by query thread
// which if crazy quick can free it after aclk_queue_query
// which if crazy quick can free it after legacy_aclk_queue_query
stat_idx = aclk_cloud_req_type_to_idx(cloud_req->query_endpoint);
// aclk_queue_query takes ownership of data pointer
if (unlikely(aclk_queue_query(
// legacy_aclk_queue_query takes ownership of data pointer
if (unlikely(legacy_aclk_queue_query(
cloud_to_agent->callback_topic, cloud_req, cloud_to_agent->msg_id, cloud_to_agent->payload, 0, 0,
ACLK_CMD_CLOUD_QUERY_2))) {
error("ACLK failed to queue incoming \"http\" v2 message");
@ -193,11 +194,11 @@ static int aclk_handle_cloud_request_v2(struct aclk_request *cloud_to_agent, cha
}
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.cloud_req_v2++;
aclk_metrics_per_sample.cloud_req_ok++;
aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++;
ACLK_STATS_UNLOCK;
LEGACY_ACLK_STATS_LOCK;
legacy_aclk_metrics_per_sample.cloud_req_v2++;
legacy_aclk_metrics_per_sample.cloud_req_ok++;
legacy_aclk_metrics_per_sample.cloud_req_by_type[stat_idx]++;
LEGACY_ACLK_STATS_UNLOCK;
}
return 0;
@ -258,19 +259,19 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha
version = MIN(cloud_to_agent->max_version, ACLK_VERSION_MAX);
ACLK_SHARED_STATE_LOCK;
if (unlikely(now_monotonic_usec() > aclk_shared_state.version_neg_wait_till)) {
legacy_aclk_shared_state_LOCK;
if (unlikely(now_monotonic_usec() > legacy_aclk_shared_state.version_neg_wait_till)) {
errno = 0;
error("The \"version\" message came too late ignoring.");
goto err_cleanup;
}
if (unlikely(aclk_shared_state.version_neg)) {
if (unlikely(legacy_aclk_shared_state.version_neg)) {
errno = 0;
error("Version has already been set to %d", aclk_shared_state.version_neg);
error("Version has already been set to %d", legacy_aclk_shared_state.version_neg);
goto err_cleanup;
}
aclk_shared_state.version_neg = version;
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state.version_neg = version;
legacy_aclk_shared_state_UNLOCK;
info("Choosing version %d of ACLK", version);
@ -279,7 +280,7 @@ static int aclk_handle_version_response(struct aclk_request *cloud_to_agent, cha
return 0;
err_cleanup:
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_UNLOCK;
return 1;
}
@ -288,31 +289,31 @@ typedef struct aclk_incoming_msg_type{
int(*fnc)(struct aclk_request *, char *);
}aclk_incoming_msg_type;
aclk_incoming_msg_type aclk_incoming_msg_types_v1[] = {
aclk_incoming_msg_type legacy_aclk_incoming_msg_types_v1[] = {
{ .name = "http", .fnc = aclk_handle_cloud_request_v1 },
{ .name = "version", .fnc = aclk_handle_version_response },
{ .name = NULL, .fnc = NULL }
};
aclk_incoming_msg_type aclk_incoming_msg_types_compression[] = {
aclk_incoming_msg_type legacy_aclk_incoming_msg_types_compression[] = {
{ .name = "http", .fnc = aclk_handle_cloud_request_v2 },
{ .name = "version", .fnc = aclk_handle_version_response },
{ .name = NULL, .fnc = NULL }
};
struct aclk_incoming_msg_type *aclk_incoming_msg_types = aclk_incoming_msg_types_v1;
struct aclk_incoming_msg_type *legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1;
void aclk_set_rx_handlers(int version)
{
if(version >= ACLK_V_COMPRESSION) {
aclk_incoming_msg_types = aclk_incoming_msg_types_compression;
legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_compression;
return;
}
aclk_incoming_msg_types = aclk_incoming_msg_types_v1;
legacy_aclk_incoming_msg_types = legacy_aclk_incoming_msg_types_v1;
}
int aclk_handle_cloud_message(char *payload)
int legacy_aclk_handle_cloud_message(char *payload)
{
struct aclk_request cloud_to_agent;
memset(&cloud_to_agent, 0, sizeof(struct aclk_request));
@ -325,7 +326,7 @@ int aclk_handle_cloud_message(char *payload)
debug(D_ACLK, "ACLK incoming message (%s)", payload);
int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
int rc = json_parse(payload, &cloud_to_agent, legacy_cloud_to_agent_parse);
if (unlikely(rc != JSON_OK)) {
errno = 0;
@ -339,22 +340,22 @@ int aclk_handle_cloud_message(char *payload)
goto err_cleanup;
}
if (!aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
if (!legacy_aclk_shared_state.version_neg && strcmp(cloud_to_agent.type_id, "version")) {
error("Only \"version\" message is allowed before popcorning and version negotiation is finished. Ignoring");
goto err_cleanup;
}
for (int i = 0; aclk_incoming_msg_types[i].name; i++) {
if (strcmp(cloud_to_agent.type_id, aclk_incoming_msg_types[i].name) == 0) {
if (likely(!aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
for (int i = 0; legacy_aclk_incoming_msg_types[i].name; i++) {
if (strcmp(cloud_to_agent.type_id, legacy_aclk_incoming_msg_types[i].name) == 0) {
if (likely(!legacy_aclk_incoming_msg_types[i].fnc(&cloud_to_agent, payload))) {
// in case of success handler is supposed to clean up after itself
// or as in the case of aclk_handle_cloud_request take
// ownership of the pointers (done to avoid copying)
// see what `aclk_queue_query` parameter `internal` does
// see what `legacy_aclk_queue_query` parameter `internal` does
// NEVER CONTINUE THIS LOOP AFTER CALLING FUNCTION!!!
// msg handlers (namely aclk_handle_version_response)
// can freely change what aclk_incoming_msg_types points to
// can freely change what legacy_aclk_incoming_msg_types points to
// so either exit or restart this for loop
freez(cloud_to_agent.type_id);
return 0;
@ -378,9 +379,9 @@ err_cleanup:
err_cleanup_nojson:
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
aclk_metrics_per_sample.cloud_req_err++;
ACLK_STATS_UNLOCK;
LEGACY_ACLK_STATS_LOCK;
legacy_aclk_metrics_per_sample.cloud_req_err++;
LEGACY_ACLK_STATS_UNLOCK;
}
return 1;

View File

@ -6,7 +6,7 @@
#include "daemon/common.h"
#include "libnetdata/libnetdata.h"
int aclk_handle_cloud_message(char *payload);
int legacy_aclk_handle_cloud_message(char *payload);
void aclk_set_rx_handlers(int version);

View File

@ -1,33 +1,31 @@
#include "aclk_stats.h"
netdata_mutex_t aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
netdata_mutex_t legacy_aclk_stats_mutex = NETDATA_MUTEX_INITIALIZER;
int aclk_stats_enabled;
int query_thread_count;
int legacy_query_thread_count;
// data ACLK stats need per query thread
struct aclk_qt_data {
struct legacy_aclk_qt_data {
RRDDIM *dim;
} *aclk_qt_data = NULL;
} *legacy_aclk_qt_data = NULL;
// ACLK per query thread cpu stats
struct aclk_cpu_data {
struct legacy_aclk_cpu_data {
RRDDIM *user;
RRDDIM *system;
RRDSET *st;
} *aclk_cpu_data = NULL;
} *legacy_aclk_cpu_data = NULL;
uint32_t *aclk_queries_per_thread = NULL;
uint32_t *aclk_queries_per_thread_sample = NULL;
uint32_t *legacy_aclk_queries_per_thread = NULL;
uint32_t *legacy_aclk_queries_per_thread_sample = NULL;
struct rusage *rusage_per_thread;
uint8_t *getrusage_called_this_tick = NULL;
struct aclk_metrics aclk_metrics = {
static struct legacy_aclk_metrics legacy_aclk_metrics = {
.online = 0,
};
struct aclk_metrics_per_sample aclk_metrics_per_sample;
struct legacy_aclk_metrics_per_sample legacy_aclk_metrics_per_sample;
struct aclk_mat_metrics aclk_mat_metrics = {
#ifdef NETDATA_INTERNAL_CHECKS
@ -61,20 +59,20 @@ struct aclk_mat_metrics aclk_mat_metrics = {
"by query thread (just before passing to the database)." }
};
void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement)
void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement)
{
if (aclk_stats_enabled) {
ACLK_STATS_LOCK;
LEGACY_ACLK_STATS_LOCK;
if (metric->max < measurement)
metric->max = measurement;
metric->total += measurement;
metric->count++;
ACLK_STATS_UNLOCK;
LEGACY_ACLK_STATS_UNLOCK;
}
}
static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struct aclk_metrics *permanent)
static void aclk_stats_collect(struct legacy_aclk_metrics_per_sample *per_sample, struct legacy_aclk_metrics *permanent)
{
static RRDSET *st_aclkstats = NULL;
static RRDDIM *rd_online_status = NULL;
@ -93,7 +91,7 @@ static void aclk_stats_collect(struct aclk_metrics_per_sample *per_sample, struc
rrdset_done(st_aclkstats);
}
static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample)
static void aclk_stats_query_queue(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st_query_thread = NULL;
static RRDDIM *rd_queued = NULL;
@ -115,7 +113,7 @@ static void aclk_stats_query_queue(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st_query_thread);
}
static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample)
static void aclk_stats_write_q(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
static RRDDIM *rd_wq_add = NULL;
@ -137,7 +135,7 @@ static void aclk_stats_write_q(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample)
static void aclk_stats_read_q(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
static RRDDIM *rd_rq_add = NULL;
@ -159,7 +157,7 @@ static void aclk_stats_read_q(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
static void aclk_stats_cloud_req(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
static RRDDIM *rd_rq_ok = NULL;
@ -181,7 +179,7 @@ static void aclk_stats_cloud_req(struct aclk_metrics_per_sample *per_sample)
rrdset_done(st);
}
static void aclk_stats_cloud_req_version(struct aclk_metrics_per_sample *per_sample)
static void aclk_stats_cloud_req_version(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st = NULL;
static RRDDIM *rd_rq_v1 = NULL;
@ -223,7 +221,7 @@ int aclk_cloud_req_type_to_idx(const char *name)
return 0;
}
static void aclk_stats_cloud_req_cmd(struct aclk_metrics_per_sample *per_sample)
static void aclk_stats_cloud_req_cmd(struct legacy_aclk_metrics_per_sample *per_sample)
{
static RRDSET *st;
static int initialized = 0;
@ -258,16 +256,16 @@ static void aclk_stats_query_threads(uint32_t *queries_per_thread)
"netdata", "aclk_query_threads", NULL, "aclk", NULL, "Queries Processed Per Thread", "req/s",
"netdata", "stats", 200008, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
for (int i = 0; i < query_thread_count; i++) {
for (int i = 0; i < legacy_query_thread_count; i++) {
if (snprintf(dim_name, MAX_DIM_NAME, "Query %d", i) < 0)
error("snprintf encoding error");
aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
legacy_aclk_qt_data[i].dim = rrddim_add(st, dim_name, NULL, 1, localhost->rrd_update_every, RRD_ALGORITHM_ABSOLUTE);
}
} else
rrdset_next(st);
for (int i = 0; i < query_thread_count; i++) {
rrddim_set_by_pointer(st, aclk_qt_data[i].dim, queries_per_thread[i]);
for (int i = 0; i < legacy_query_thread_count; i++) {
rrddim_set_by_pointer(st, legacy_aclk_qt_data[i].dim, queries_per_thread[i]);
}
rrdset_done(st);
@ -301,59 +299,59 @@ static void aclk_stats_cpu_threads(void)
char id[100 + 1];
char title[100 + 1];
for (int i = 0; i < query_thread_count; i++) {
if (unlikely(!aclk_cpu_data[i].st)) {
for (int i = 0; i < legacy_query_thread_count; i++) {
if (unlikely(!legacy_aclk_cpu_data[i].st)) {
snprintfz(id, 100, "aclk_thread%d_cpu", i);
snprintfz(title, 100, "Cpu Usage For Thread No %d", i);
aclk_cpu_data[i].st = rrdset_create_localhost(
legacy_aclk_cpu_data[i].st = rrdset_create_localhost(
"netdata", id, NULL, "aclk", NULL, title, "milliseconds/s",
"netdata", "stats", 200020 + i, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
aclk_cpu_data[i].user = rrddim_add(aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
aclk_cpu_data[i].system = rrddim_add(aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
legacy_aclk_cpu_data[i].user = rrddim_add(legacy_aclk_cpu_data[i].st, "user", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
legacy_aclk_cpu_data[i].system = rrddim_add(legacy_aclk_cpu_data[i].st, "system", NULL, 1, 1000, RRD_ALGORITHM_INCREMENTAL);
} else
rrdset_next(aclk_cpu_data[i].st);
rrdset_next(legacy_aclk_cpu_data[i].st);
}
for (int i = 0; i < query_thread_count; i++) {
rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec);
rrddim_set_by_pointer(aclk_cpu_data[i].st, aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec);
rrdset_done(aclk_cpu_data[i].st);
for (int i = 0; i < legacy_query_thread_count; i++) {
rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].user, rusage_per_thread[i].ru_utime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_utime.tv_usec);
rrddim_set_by_pointer(legacy_aclk_cpu_data[i].st, legacy_aclk_cpu_data[i].system, rusage_per_thread[i].ru_stime.tv_sec * 1000000ULL + rusage_per_thread[i].ru_stime.tv_usec);
rrdset_done(legacy_aclk_cpu_data[i].st);
}
}
void aclk_stats_thread_cleanup()
void legacy_aclk_stats_thread_cleanup()
{
freez(aclk_qt_data);
freez(aclk_queries_per_thread);
freez(aclk_queries_per_thread_sample);
freez(aclk_cpu_data);
freez(legacy_aclk_qt_data);
freez(legacy_aclk_queries_per_thread);
freez(legacy_aclk_queries_per_thread_sample);
freez(legacy_aclk_cpu_data);
freez(rusage_per_thread);
}
void *aclk_stats_main_thread(void *ptr)
void *legacy_aclk_stats_main_thread(void *ptr)
{
struct aclk_stats_thread *args = ptr;
query_thread_count = args->query_thread_count;
aclk_qt_data = callocz(query_thread_count, sizeof(struct aclk_qt_data));
aclk_cpu_data = callocz(query_thread_count, sizeof(struct aclk_cpu_data));
aclk_queries_per_thread = callocz(query_thread_count, sizeof(uint32_t));
aclk_queries_per_thread_sample = callocz(query_thread_count, sizeof(uint32_t));
rusage_per_thread = callocz(query_thread_count, sizeof(struct rusage));
getrusage_called_this_tick = callocz(query_thread_count, sizeof(uint8_t));
legacy_query_thread_count = args->query_thread_count;
legacy_aclk_qt_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_qt_data));
legacy_aclk_cpu_data = callocz(legacy_query_thread_count, sizeof(struct legacy_aclk_cpu_data));
legacy_aclk_queries_per_thread = callocz(legacy_query_thread_count, sizeof(uint32_t));
legacy_aclk_queries_per_thread_sample = callocz(legacy_query_thread_count, sizeof(uint32_t));
rusage_per_thread = callocz(legacy_query_thread_count, sizeof(struct rusage));
getrusage_called_this_tick = callocz(legacy_query_thread_count, sizeof(uint8_t));
heartbeat_t hb;
heartbeat_init(&hb);
usec_t step_ut = localhost->rrd_update_every * USEC_PER_SEC;
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample));
struct aclk_metrics_per_sample per_sample;
struct aclk_metrics permanent;
struct legacy_aclk_metrics_per_sample per_sample;
struct legacy_aclk_metrics permanent;
while (!netdata_exit) {
netdata_thread_testcancel();
@ -363,17 +361,17 @@ void *aclk_stats_main_thread(void *ptr)
heartbeat_next(&hb, step_ut);
if (netdata_exit) break;
ACLK_STATS_LOCK;
LEGACY_ACLK_STATS_LOCK;
// to not hold lock longer than necessary, especially not to hold it
// during database rrd* operations
memcpy(&per_sample, &aclk_metrics_per_sample, sizeof(struct aclk_metrics_per_sample));
memcpy(&permanent, &aclk_metrics, sizeof(struct aclk_metrics));
memset(&aclk_metrics_per_sample, 0, sizeof(struct aclk_metrics_per_sample));
memcpy(&per_sample, &legacy_aclk_metrics_per_sample, sizeof(struct legacy_aclk_metrics_per_sample));
memcpy(&permanent, &legacy_aclk_metrics, sizeof(struct legacy_aclk_metrics));
memset(&legacy_aclk_metrics_per_sample, 0, sizeof(struct legacy_aclk_metrics_per_sample));
memcpy(aclk_queries_per_thread_sample, aclk_queries_per_thread, sizeof(uint32_t) * query_thread_count);
memset(aclk_queries_per_thread, 0, sizeof(uint32_t) * query_thread_count);
memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * query_thread_count);
ACLK_STATS_UNLOCK;
memcpy(legacy_aclk_queries_per_thread_sample, legacy_aclk_queries_per_thread, sizeof(uint32_t) * legacy_query_thread_count);
memset(legacy_aclk_queries_per_thread, 0, sizeof(uint32_t) * legacy_query_thread_count);
memset(getrusage_called_this_tick, 0, sizeof(uint8_t) * legacy_query_thread_count);
LEGACY_ACLK_STATS_UNLOCK;
aclk_stats_collect(&per_sample, &permanent);
aclk_stats_query_queue(&per_sample);
@ -386,7 +384,7 @@ void *aclk_stats_main_thread(void *ptr)
aclk_stats_cloud_req_cmd(&per_sample);
aclk_stats_query_threads(aclk_queries_per_thread_sample);
aclk_stats_query_threads(legacy_aclk_queries_per_thread_sample);
aclk_stats_cpu_threads();
@ -400,14 +398,14 @@ void *aclk_stats_main_thread(void *ptr)
return 0;
}
void aclk_stats_upd_online(int online) {
void legacy_aclk_stats_upd_online(int online) {
if(!aclk_stats_enabled)
return;
ACLK_STATS_LOCK;
aclk_metrics.online = online;
LEGACY_ACLK_STATS_LOCK;
legacy_aclk_metrics.online = online;
if(!online)
aclk_metrics_per_sample.offline_during_sample = 1;
ACLK_STATS_UNLOCK;
legacy_aclk_metrics_per_sample.offline_during_sample = 1;
LEGACY_ACLK_STATS_UNLOCK;
}

View File

@ -9,12 +9,10 @@
#define ACLK_STATS_THREAD_NAME "ACLK_Stats"
extern netdata_mutex_t aclk_stats_mutex;
extern netdata_mutex_t legacy_aclk_stats_mutex;
#define ACLK_STATS_LOCK netdata_mutex_lock(&aclk_stats_mutex)
#define ACLK_STATS_UNLOCK netdata_mutex_unlock(&aclk_stats_mutex)
extern int aclk_stats_enabled;
#define LEGACY_ACLK_STATS_LOCK netdata_mutex_lock(&legacy_aclk_stats_mutex)
#define LEGACY_ACLK_STATS_UNLOCK netdata_mutex_unlock(&legacy_aclk_stats_mutex)
struct aclk_stats_thread {
netdata_thread_t *thread;
@ -22,7 +20,7 @@ struct aclk_stats_thread {
};
// preserve between samples
struct aclk_metrics {
struct legacy_aclk_metrics {
volatile uint8_t online;
};
@ -53,7 +51,7 @@ extern struct aclk_mat_metrics {
struct aclk_metric_mat cloud_q_recvd_to_processed;
} aclk_mat_metrics;
void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement);
void legacy_aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurement);
#define ACLK_STATS_CLOUD_REQ_TYPE_CNT 7
// if you change update cloud_req_type_names
@ -61,7 +59,7 @@ void aclk_metric_mat_update(struct aclk_metric_mat_data *metric, usec_t measurem
int aclk_cloud_req_type_to_idx(const char *name);
// reset to 0 on every sample
extern struct aclk_metrics_per_sample {
extern struct legacy_aclk_metrics_per_sample {
/* in the unlikely event of ACLK disconnecting
and reconnecting under 1 sampling rate
we want to make sure we record the disconnection
@ -90,13 +88,13 @@ extern struct aclk_metrics_per_sample {
#endif
struct aclk_metric_mat_data cloud_q_db_query_time;
struct aclk_metric_mat_data cloud_q_recvd_to_processed;
} aclk_metrics_per_sample;
} legacy_aclk_metrics_per_sample;
extern uint32_t *aclk_queries_per_thread;
extern uint32_t *legacy_aclk_queries_per_thread;
extern struct rusage *rusage_per_thread;
void *aclk_stats_main_thread(void *ptr);
void aclk_stats_thread_cleanup();
void aclk_stats_upd_online(int online);
void *legacy_aclk_stats_main_thread(void *ptr);
void legacy_aclk_stats_thread_cleanup();
void legacy_aclk_stats_upd_online(int online);
#endif /* NETDATA_ACLK_STATS_H */

View File

@ -6,6 +6,7 @@
#include "aclk_query.h"
#include "aclk_common.h"
#include "aclk_stats.h"
#include "../aclk_collector_list.h"
#ifdef ENABLE_ACLK
#include <libwebsockets.h>
@ -15,46 +16,20 @@ int aclk_shutting_down = 0;
// Other global state
static int aclk_subscribed = 0;
static int aclk_disable_single_updates = 0;
static char *aclk_username = NULL;
static char *aclk_password = NULL;
static char *global_base_topic = NULL;
static int aclk_connecting = 0;
int aclk_force_reconnect = 0; // Indication from lower layers
usec_t aclk_session_us = 0; // Used by the mqtt layer
time_t aclk_session_sec = 0; // Used by the mqtt layer
static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
static netdata_mutex_t collector_mutex = NETDATA_MUTEX_INITIALIZER;
#define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
#define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
#define COLLECTOR_LOCK netdata_mutex_lock(&collector_mutex)
#define COLLECTOR_UNLOCK netdata_mutex_unlock(&collector_mutex)
void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
void aclk_lws_wss_destroy_context();
/*
* Maintain a list of collectors and chart count
* If all the charts of a collector are deleted
* then a new metadata dataset must be send to the cloud
*
*/
struct _collector {
time_t created;
uint32_t count; //chart count
uint32_t hostname_hash;
uint32_t plugin_hash;
uint32_t module_hash;
char *hostname;
char *plugin_name;
char *module_name;
struct _collector *next;
};
struct _collector *collector_list = NULL;
char *create_uuid()
{
@ -67,7 +42,7 @@ char *create_uuid()
return uuid_str;
}
int cloud_to_agent_parse(JSON_ENTRY *e)
int legacy_cloud_to_agent_parse(JSON_ENTRY *e)
{
struct aclk_request *data = e->callback_data;
@ -247,202 +222,10 @@ char *get_topic(char *sub_topic, char *final_topic, int max_size)
return final_topic;
}
#ifndef __GNUC__
#pragma region ACLK Internal Collector Tracking
#endif
/*
* Free a collector structure
*/
static void _free_collector(struct _collector *collector)
{
if (likely(collector->plugin_name))
freez(collector->plugin_name);
if (likely(collector->module_name))
freez(collector->module_name);
if (likely(collector->hostname))
freez(collector->hostname);
freez(collector);
}
/*
* This will report the collector list
*
*/
#ifdef ACLK_DEBUG
static void _dump_collector_list()
{
struct _collector *tmp_collector;
COLLECTOR_LOCK;
info("DUMPING ALL COLLECTORS");
if (unlikely(!collector_list || !collector_list->next)) {
COLLECTOR_UNLOCK;
info("DUMPING ALL COLLECTORS -- nothing found");
return;
}
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
while (tmp_collector) {
info(
"COLLECTOR %s : [%s:%s] count = %u", tmp_collector->hostname,
tmp_collector->plugin_name ? tmp_collector->plugin_name : "",
tmp_collector->module_name ? tmp_collector->module_name : "", tmp_collector->count);
tmp_collector = tmp_collector->next;
}
info("DUMPING ALL COLLECTORS DONE");
COLLECTOR_UNLOCK;
}
#endif
/*
* This will cleanup the collector list
*
*/
static void _reset_collector_list()
{
struct _collector *tmp_collector, *next_collector;
COLLECTOR_LOCK;
if (unlikely(!collector_list || !collector_list->next)) {
COLLECTOR_UNLOCK;
return;
}
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
collector_list->count = 0;
collector_list->next = NULL;
// We broke the link; we can unlock
COLLECTOR_UNLOCK;
while (tmp_collector) {
next_collector = tmp_collector->next;
_free_collector(tmp_collector);
tmp_collector = next_collector;
}
}
/*
* Find a collector (if it exists)
* Must lock before calling this
* If last_collector is not null, it will return the previous collector in the linked
* list (used in collector delete)
*/
static struct _collector *_find_collector(
const char *hostname, const char *plugin_name, const char *module_name, struct _collector **last_collector)
{
struct _collector *tmp_collector, *prev_collector;
uint32_t plugin_hash;
uint32_t module_hash;
uint32_t hostname_hash;
if (unlikely(!collector_list)) {
collector_list = callocz(1, sizeof(struct _collector));
return NULL;
}
if (unlikely(!collector_list->next))
return NULL;
plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
module_hash = module_name ? simple_hash(module_name) : 1;
hostname_hash = simple_hash(hostname);
// Note that the first entry is "dummy"
tmp_collector = collector_list->next;
prev_collector = collector_list;
while (tmp_collector) {
if (plugin_hash == tmp_collector->plugin_hash && module_hash == tmp_collector->module_hash &&
hostname_hash == tmp_collector->hostname_hash && (!strcmp(hostname, tmp_collector->hostname)) &&
(!plugin_name || !tmp_collector->plugin_name || !strcmp(plugin_name, tmp_collector->plugin_name)) &&
(!module_name || !tmp_collector->module_name || !strcmp(module_name, tmp_collector->module_name))) {
if (unlikely(last_collector))
*last_collector = prev_collector;
return tmp_collector;
}
prev_collector = tmp_collector;
tmp_collector = tmp_collector->next;
}
return tmp_collector;
}
/*
* Called to delete a collector
* It will reduce the count (chart_count) and will remove it
* from the linked list if the count reaches zero
* The structure will be returned to the caller to free
* the resources
*
*/
static struct _collector *_del_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector, *prev_collector = NULL;
tmp_collector = _find_collector(hostname, plugin_name, module_name, &prev_collector);
if (likely(tmp_collector)) {
--tmp_collector->count;
if (unlikely(!tmp_collector->count))
prev_collector->next = tmp_collector->next;
}
return tmp_collector;
}
/*
* Add a new collector (plugin / module) to the list
* If it already exists just update the chart count
*
* Lock before calling
*/
static struct _collector *_add_collector(const char *hostname, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
tmp_collector = _find_collector(hostname, plugin_name, module_name, NULL);
if (unlikely(!tmp_collector)) {
tmp_collector = callocz(1, sizeof(struct _collector));
tmp_collector->hostname_hash = simple_hash(hostname);
tmp_collector->plugin_hash = plugin_name ? simple_hash(plugin_name) : 1;
tmp_collector->module_hash = module_name ? simple_hash(module_name) : 1;
tmp_collector->hostname = strdupz(hostname);
tmp_collector->plugin_name = plugin_name ? strdupz(plugin_name) : NULL;
tmp_collector->module_name = module_name ? strdupz(module_name) : NULL;
tmp_collector->next = collector_list->next;
collector_list->next = tmp_collector;
}
tmp_collector->count++;
debug(
D_ACLK, "ADD COLLECTOR %s [%s:%s] -- chart %u", hostname, plugin_name ? plugin_name : "*",
module_name ? module_name : "*", tmp_collector->count);
return tmp_collector;
}
#ifndef __GNUC__
#pragma endregion
#endif
/* Avoids the need to scan trough all RRDHOSTS
* every time any Query Thread Wakes Up
* (every time we need to check child popcorn expiry)
* call with ACLK_SHARED_STATE_LOCK held
* call with legacy_aclk_shared_state_LOCK held
*/
void aclk_update_next_child_to_popcorn(void)
{
@ -462,19 +245,19 @@ void aclk_update_next_child_to_popcorn(void)
any = 1;
if (unlikely(!aclk_shared_state.next_popcorn_host)) {
aclk_shared_state.next_popcorn_host = host;
if (unlikely(!legacy_aclk_shared_state.next_popcorn_host)) {
legacy_aclk_shared_state.next_popcorn_host = host;
rrdhost_aclk_state_unlock(host);
continue;
}
if (aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
aclk_shared_state.next_popcorn_host = host;
if (legacy_aclk_shared_state.next_popcorn_host->aclk_state.t_last_popcorn_update > host->aclk_state.t_last_popcorn_update)
legacy_aclk_shared_state.next_popcorn_host = host;
rrdhost_aclk_state_unlock(host);
}
if(!any)
aclk_shared_state.next_popcorn_host = NULL;
legacy_aclk_shared_state.next_popcorn_host = NULL;
rrd_unlock();
}
@ -487,7 +270,7 @@ static int aclk_popcorn_check_bump(RRDHOST *host)
{
time_t now = now_monotonic_sec();
int updated = 0, ret;
ACLK_SHARED_STATE_LOCK;
legacy_aclk_shared_state_LOCK;
rrdhost_aclk_state_lock(host);
ret = ACLK_IS_HOST_INITIALIZING(host);
@ -502,12 +285,12 @@ static int aclk_popcorn_check_bump(RRDHOST *host)
if (host != localhost && updated)
aclk_update_next_child_to_popcorn();
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_UNLOCK;
return ret;
}
rrdhost_aclk_state_unlock(host);
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_UNLOCK;
return ret;
}
@ -523,13 +306,13 @@ static void aclk_start_host_popcorning(RRDHOST *host)
{
usec_t now = now_monotonic_sec();
info("Starting ACLK popcorn timer for host \"%s\" with GUID \"%s\"", host->hostname, host->machine_guid);
ACLK_SHARED_STATE_LOCK;
legacy_aclk_shared_state_LOCK;
rrdhost_aclk_state_lock(host);
if (host == localhost && !ACLK_IS_HOST_INITIALIZING(host)) {
errno = 0;
error("Localhost is allowed to do popcorning only once after startup!");
rrdhost_aclk_state_unlock(host);
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_UNLOCK;
return;
}
@ -539,16 +322,16 @@ static void aclk_start_host_popcorning(RRDHOST *host)
rrdhost_aclk_state_unlock(host);
if (host != localhost)
aclk_update_next_child_to_popcorn();
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_UNLOCK;
}
static void aclk_stop_host_popcorning(RRDHOST *host)
{
ACLK_SHARED_STATE_LOCK;
legacy_aclk_shared_state_LOCK;
rrdhost_aclk_state_lock(host);
if (!ACLK_IS_HOST_POPCORNING(host)) {
rrdhost_aclk_state_unlock(host);
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_UNLOCK;
return;
}
@ -557,18 +340,18 @@ static void aclk_stop_host_popcorning(RRDHOST *host)
host->aclk_state.metadata = ACLK_METADATA_REQUIRED;
rrdhost_aclk_state_unlock(host);
if(host == aclk_shared_state.next_popcorn_host) {
aclk_shared_state.next_popcorn_host = NULL;
if(host == legacy_aclk_shared_state.next_popcorn_host) {
legacy_aclk_shared_state.next_popcorn_host = NULL;
aclk_update_next_child_to_popcorn();
}
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_UNLOCK;
}
/*
* Add a new collector to the list
* If it exists, update the chart count
*/
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
@ -589,7 +372,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu
if(aclk_popcorn_check_bump(host))
return;
if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector addition");
}
@ -601,7 +384,7 @@ void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *modu
* This function will release the memory used and schedule
* a cloud update
*/
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name)
{
struct _collector *tmp_collector;
if (unlikely(!netdata_ready)) {
@ -628,7 +411,7 @@ void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *modu
if (aclk_popcorn_check_bump(host))
return;
if (unlikely(aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
if (unlikely(legacy_aclk_queue_query("collector", host, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT)))
debug(D_ACLK, "ACLK failed to queue on_connect command on collector deletion");
}
@ -639,7 +422,7 @@ static void aclk_graceful_disconnect()
// Send a graceful disconnect message
BUFFER *b = buffer_create(512);
aclk_create_header(b, "disconnect", NULL, 0, 0, aclk_shared_state.version_neg);
aclk_create_header(b, "disconnect", NULL, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(b, ",\n\t\"payload\": \"graceful\"}");
aclk_send_message(ACLK_METADATA_TOPIC, (char*)buffer_tostring(b), NULL);
buffer_free(b);
@ -963,10 +746,10 @@ static void aclk_try_to_connect(char *hostname, int port)
aclk_connecting = 1;
create_publish_base_topic();
ACLK_SHARED_STATE_LOCK;
aclk_shared_state.version_neg = 0;
aclk_shared_state.version_neg_wait_till = 0;
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_LOCK;
legacy_aclk_shared_state.version_neg = 0;
legacy_aclk_shared_state.version_neg_wait_till = 0;
legacy_aclk_shared_state_UNLOCK;
rc = mqtt_attempt_connection(hostname, port, aclk_username, aclk_password);
if (unlikely(rc)) {
@ -981,10 +764,10 @@ static inline void aclk_hello_msg()
char *msg_id = create_uuid();
ACLK_SHARED_STATE_LOCK;
aclk_shared_state.version_neg = 0;
aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
ACLK_SHARED_STATE_UNLOCK;
legacy_aclk_shared_state_LOCK;
legacy_aclk_shared_state.version_neg = 0;
legacy_aclk_shared_state.version_neg_wait_till = now_monotonic_usec() + USEC_PER_SEC * VERSION_NEG_TIMEOUT;
legacy_aclk_shared_state_UNLOCK;
//Hello message is versioned separately from the rest of the protocol
aclk_create_header(buf, "hello", msg_id, 0, 0, ACLK_VERSION_NEG_VERSION);
@ -1004,7 +787,7 @@ static inline void aclk_hello_msg()
*
* @return It always returns NULL
*/
void *aclk_main(void *ptr)
void *legacy_aclk_main(void *ptr)
{
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
struct aclk_query_threads query_threads;
@ -1065,7 +848,7 @@ void *aclk_main(void *ptr)
stats_thread->thread = mallocz(sizeof(netdata_thread_t));
stats_thread->query_thread_count = query_threads.count;
netdata_thread_create(
stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, aclk_stats_main_thread,
stats_thread->thread, ACLK_STATS_THREAD_NAME, NETDATA_THREAD_OPTION_JOINABLE, legacy_aclk_stats_main_thread,
stats_thread);
}
@ -1165,20 +948,20 @@ void *aclk_main(void *ptr)
}
if (unlikely(!query_threads.thread_list)) {
aclk_query_threads_start(&query_threads);
legacy_aclk_query_threads_start(&query_threads);
}
time_t now = now_monotonic_sec();
if(aclk_connected && last_periodic_query_wakeup < now) {
// to make `aclk_queue_query()` param `run_after` work
// to make `legacy_aclk_queue_query()` param `run_after` work
// also makes per child popcorning work
last_periodic_query_wakeup = now;
QUERY_THREAD_WAKEUP;
LEGACY_QUERY_THREAD_WAKEUP;
}
} // forever
exited:
// Wakeup query thread to cleanup
QUERY_THREAD_WAKEUP_ALL;
LEGACY_QUERY_THREAD_WAKEUP_ALL;
freez(aclk_username);
freez(aclk_password);
@ -1192,18 +975,18 @@ exited:
if (agent_id && aclk_connected) {
freez(agent_id);
// Wakeup thread to cleanup
QUERY_THREAD_WAKEUP;
LEGACY_QUERY_THREAD_WAKEUP;
aclk_graceful_disconnect();
}
aclk_query_threads_cleanup(&query_threads);
legacy_aclk_query_threads_cleanup(&query_threads);
_reset_collector_list();
freez(collector_list);
if(aclk_stats_enabled) {
netdata_thread_join(*stats_thread->thread, NULL);
aclk_stats_thread_cleanup();
legacy_aclk_stats_thread_cleanup();
freez(stats_thread->thread);
freez(stats_thread);
}
@ -1306,12 +1089,12 @@ void aclk_connect()
{
info("Connection detected (%u queued queries)", aclk_query_size());
aclk_stats_upd_online(1);
legacy_aclk_stats_upd_online(1);
aclk_connected = 1;
aclk_reconnect_delay(0);
QUERY_THREAD_WAKEUP;
LEGACY_QUERY_THREAD_WAKEUP;
return;
}
@ -1321,7 +1104,7 @@ void aclk_disconnect()
if (likely(aclk_connected))
info("Disconnect detected (%u queued queries)", aclk_query_size());
aclk_stats_upd_online(0);
legacy_aclk_stats_upd_online(0);
aclk_subscribed = 0;
rrdhost_aclk_state_lock(localhost);
@ -1372,7 +1155,7 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts
*/
void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@ -1388,9 +1171,9 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
// session.
if (metadata_submitted == ACLK_METADATA_SENT)
aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, aclk_shared_state.version_neg);
aclk_create_header(local_buffer, "connect_alarms", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
else
aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
aclk_create_header(local_buffer, "connect_alarms", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
@ -1418,7 +1201,7 @@ void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted)
* /api/v1/info
* charts
*/
int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host)
{
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
@ -1433,9 +1216,9 @@ int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *hos
// a fake on_connect message then use the real timestamp to indicate it is within the existing
// session.
if (metadata_submitted == ACLK_METADATA_SENT)
aclk_create_header(local_buffer, "update", msg_id, 0, 0, aclk_shared_state.version_neg);
aclk_create_header(local_buffer, "update", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
else
aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, aclk_shared_state.version_neg);
aclk_create_header(local_buffer, "connect", msg_id, aclk_session_sec, aclk_session_us, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
buffer_sprintf(local_buffer, "{\n\t \"info\" : ");
@ -1459,14 +1242,14 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
local_buffer->contenttype = CT_APPLICATION_JSON;
if(aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, aclk_shared_state.version_neg);
if(legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
fatal("This function should not be called if ACLK version is less than %d (current %d)", ACLK_V_CHILDRENSTATE, legacy_aclk_shared_state.version_neg);
debug(D_ACLK, "Sending Child Disconnect");
char *msg_id = create_uuid();
aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, aclk_shared_state.version_neg);
aclk_create_header(local_buffer, cmd == ACLK_CMD_CHILD_CONNECT ? "child_connect" : "child_disconnect", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\"payload\":");
@ -1489,7 +1272,7 @@ int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd)
void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd)
{
#if ACLK_VERSION_MIN < ACLK_V_CHILDRENSTATE
if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE)
return;
#else
#warning "This check became unnecessary. Remove"
@ -1502,12 +1285,12 @@ void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd)
case ACLK_CMD_CHILD_CONNECT:
debug(D_ACLK, "Child Connected %s %s.", host->hostname, host->machine_guid);
aclk_start_host_popcorning(host);
aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
legacy_aclk_queue_query("add_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_CONNECT);
break;
case ACLK_CMD_CHILD_DISCONNECT:
debug(D_ACLK, "Child Disconnected %s %s.", host->hostname, host->machine_guid);
aclk_stop_host_popcorning(host);
aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
legacy_aclk_queue_query("del_child", host, NULL, NULL, 0, 1, ACLK_CMD_CHILD_DISCONNECT);
break;
default:
error("Unknown command for aclk_host_state_update %d.", (int)cmd);
@ -1537,31 +1320,21 @@ void aclk_send_stress_test(size_t size)
// or on request
int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host)
{
aclk_send_info_metadata(state, host);
legacy_aclk_send_info_metadata(state, host);
if(host == localhost)
aclk_send_alarm_metadata(state);
legacy_aclk_send_alarm_metadata(state);
return 0;
}
void aclk_single_update_disable()
{
aclk_disable_single_updates = 1;
}
void aclk_single_update_enable()
{
aclk_disable_single_updates = 0;
}
// Triggered by a health reload, sends the alarm metadata
void aclk_alarm_reload()
void legacy_aclk_alarm_reload()
{
if (unlikely(aclk_host_initializing(localhost)))
return;
if (unlikely(aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
if (unlikely(legacy_aclk_queue_query("on_connect", localhost, NULL, NULL, 0, 1, ACLK_CMD_ONCONNECT))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue on_connect command on alarm reload");
@ -1585,7 +1358,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart)
buffer_flush(local_buffer);
local_buffer->contenttype = CT_APPLICATION_JSON;
aclk_create_header(local_buffer, "chart", msg_id, 0, 0, aclk_shared_state.version_neg);
aclk_create_header(local_buffer, "chart", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
rrdset2json(st, local_buffer, NULL, NULL, 1);
@ -1598,7 +1371,7 @@ int aclk_send_single_chart(RRDHOST *host, char *chart)
return 0;
}
int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create)
{
#ifndef ENABLE_ACLK
UNUSED(host);
@ -1611,7 +1384,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
if (!netdata_cloud_setting)
return 0;
if (aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
if (legacy_aclk_shared_state.version_neg < ACLK_V_CHILDRENSTATE && host != localhost)
return 0;
if (aclk_host_initializing(localhost))
@ -1623,7 +1396,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
if (aclk_popcorn_check_bump(host))
return 0;
if (unlikely(aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, aclk_cmd))) {
if (unlikely(legacy_aclk_queue_query("_chart", host, NULL, chart_name, 0, 1, create ? ACLK_CMD_CHART : ACLK_CMD_CHARTDEL))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue chart_update command");
@ -1634,7 +1407,7 @@ int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd)
#endif
}
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
{
BUFFER *local_buffer = NULL;
@ -1661,7 +1434,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
char *msg_id = create_uuid();
buffer_flush(local_buffer);
aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, aclk_shared_state.version_neg);
aclk_create_header(local_buffer, "status-change", msg_id, 0, 0, legacy_aclk_shared_state.version_neg);
buffer_strcat(local_buffer, ",\n\t\"payload\": ");
netdata_rwlock_rdlock(&host->health_log.alarm_log_rwlock);
@ -1670,7 +1443,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
buffer_sprintf(local_buffer, "\n}");
if (unlikely(aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
if (unlikely(legacy_aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM))) {
if (likely(aclk_connected)) {
errno = 0;
error("ACLK failed to queue alarm_command on alarm_update");

View File

@ -7,7 +7,6 @@
#include "mqtt.h"
#include "aclk_common.h"
#define ACLK_THREAD_NAME "ACLK_Query"
#define ACLK_CHART_TOPIC "outbound/meta"
#define ACLK_ALARMS_TOPIC "outbound/alarms"
#define ACLK_METADATA_TOPIC "outbound/meta"
@ -18,7 +17,6 @@
#define ACLK_INITIALIZATION_WAIT 60 // Wait for link to initialize in seconds (per msg)
#define ACLK_INITIALIZATION_SLEEP_WAIT 1 // Wait time @ spin lock for MQTT initialization in seconds
#define ACLK_QOS 1
#define ACLK_PING_INTERVAL 60
#define ACLK_LOOP_TIMEOUT 5 // seconds to wait for operations in the library loop
@ -42,16 +40,7 @@ struct aclk_request {
typedef enum aclk_init_action { ACLK_INIT, ACLK_REINIT } ACLK_INIT_ACTION;
void *aclk_main(void *ptr);
#define NETDATA_ACLK_HOOK \
{ .name = "ACLK_Main", \
.config_section = NULL, \
.config_name = NULL, \
.enabled = 1, \
.thread = NULL, \
.init_routine = NULL, \
.start_routine = aclk_main },
void *legacy_aclk_main(void *ptr);
extern int aclk_send_message(char *sub_topic, char *message, char *msg_id);
extern int aclk_send_message_bin(char *sub_topic, const void *message, size_t len, char *msg_id);
@ -62,29 +51,27 @@ char *create_uuid();
// callbacks for agent cloud link
int aclk_subscribe(char *topic, int qos);
int cloud_to_agent_parse(JSON_ENTRY *e);
int legacy_cloud_to_agent_parse(JSON_ENTRY *e);
void aclk_disconnect();
void aclk_connect();
int aclk_send_metadata(ACLK_METADATA_STATE state, RRDHOST *host);
int aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host);
void aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted);
int legacy_aclk_send_info_metadata(ACLK_METADATA_STATE metadata_submitted, RRDHOST *host);
void legacy_aclk_send_alarm_metadata(ACLK_METADATA_STATE metadata_submitted);
int aclk_wait_for_initialization();
char *create_publish_base_topic();
int aclk_send_single_chart(RRDHOST *host, char *chart);
int aclk_update_chart(RRDHOST *host, char *chart_name, ACLK_CMD aclk_cmd);
int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
int legacy_aclk_update_chart(RRDHOST *host, char *chart_name, int create);
int legacy_aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae);
void aclk_create_header(BUFFER *dest, char *type, char *msg_id, time_t ts_secs, usec_t ts_us, int version);
int aclk_handle_cloud_message(char *payload);
void aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void aclk_alarm_reload();
int legacy_aclk_handle_cloud_message(char *payload);
void legacy_aclk_add_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void legacy_aclk_del_collector(RRDHOST *host, const char *plugin_name, const char *module_name);
void legacy_aclk_alarm_reload(void);
unsigned long int aclk_reconnect_delay(int mode);
extern void health_alarm_entry2json_nolock(BUFFER *wb, ALARM_ENTRY *ae, RRDHOST *host);
void aclk_single_update_enable();
void aclk_single_update_disable();
void aclk_host_state_update(RRDHOST *host, ACLK_CMD cmd);
int aclk_send_info_child_connection(RRDHOST *host, ACLK_CMD cmd);

View File

@ -7,6 +7,10 @@
#include "aclk_stats.h"
#include "aclk_rx_msgs.h"
#include "agent_cloud_link.h"
#define ACLK_QOS 1
extern usec_t aclk_session_us;
extern time_t aclk_session_sec;
@ -27,7 +31,7 @@ void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosqu
UNUSED(mosq);
UNUSED(obj);
aclk_handle_cloud_message(msg->payload);
legacy_aclk_handle_cloud_message(msg->payload);
}
void publish_callback(struct mosquitto *mosq, void *obj, int rc)
@ -44,7 +48,7 @@ void publish_callback(struct mosquitto *mosq, void *obj, int rc)
info("Publish_callback: mid=%d latency=%" PRId64 "ms", rc, diff);
aclk_metric_mat_update(&aclk_metrics_per_sample.latency, diff);
legacy_aclk_metric_mat_update(&legacy_aclk_metrics_per_sample.latency, diff);
#endif
return;
}

View File

@ -19,7 +19,7 @@ const char *_link_strerror(int rc);
int _link_set_lwt(char *topic, int qos);
int aclk_handle_cloud_message(char *);
int legacy_aclk_handle_cloud_message(char *);
extern char *get_topic(char *sub_topic, char *final_topic, int max_size);
#endif //NETDATA_MQTT_H

View File

@ -2,11 +2,7 @@
#include "claim.h"
#include "registry/registry_internals.h"
#ifndef ACLK_NG
#include "aclk/legacy/aclk_common.h"
#else
#include "aclk/aclk.h"
#endif
#include "aclk/aclk_api.h"
char *claiming_pending_arguments = NULL;

View File

@ -193,12 +193,20 @@ AC_ARG_ENABLE(
[ enable_cloud="detect" ]
)
AC_ARG_WITH(
[aclk-legacy],
[AS_HELP_STRING([--with-aclk-legacy],
[Requires Legacy ACLK to be used even in case ACLK-NG can run on this system])],
[aclk_legacy="$withval"],
[aclk_legacy="detect"]
)
AC_ARG_WITH(
[aclk-ng],
[AS_HELP_STRING([--with-aclk-ng],
[Requires ACLK-NG to be used even in case ACLK Legacy can run on this system])],
[aclk_ng="$withval"],
[aclk_ng="fallback"]
[aclk_ng="detect"]
)
if test "${enable_cloud}" = "no"; then
@ -634,11 +642,64 @@ AM_CONDITIONAL([ENABLE_CAPABILITY], [test "${with_libcap}" = "yes"])
# -----------------------------------------------------------------------------
# ACLK
AC_MSG_CHECKING([if cloud functionality should be enabled])
AC_MSG_CHECKING([if Cloud functionality should be enabled])
AC_MSG_RESULT([${enable_cloud}])
if test "$enable_cloud" != "no" -a "$aclk_ng" != "yes"; then
# just to have all messages that can fail ACLK build in one place
# so it is easier to see why it can't be built
if test "$aclk_ng" = "no"; then
AC_DEFINE([ACLK_NG_DISABLED], [1], [ACLK NG was disabled by user request])
fi
if test "$aclk_legacy" = "no"; then
AC_DEFINE([ACLK_LEGACY_DISABLED], [1], [ACLK Legacy was disabled by user request])
fi
if test "$enable_cloud" = "no" -a "$aclk_legacy" = "yes"; then
AC_MSG_ERROR([--disable-cloud && --with-aclk-legacy not allowed together (such configuration is self contradicting)])
fi
if test "$enable_cloud" = "no" -a "$aclk_ng" = "yes"; then
AC_MSG_ERROR([--disable-cloud && --with-aclk-ng not allowed together (such configuration is self contradicting)])
fi
if test "$enable_cloud" != "no" -a "$aclk_ng" != "no"; then
AC_MSG_NOTICE([Checking if ACLK Next Generation can be built])
can_enable_ng="yes"
AC_MSG_CHECKING([if git submodules present for ACLK Next Generation])
if test -f "mqtt_websockets/src/mqtt_wss_client.c"; then
AC_MSG_RESULT([yes])
else
AC_MSG_RESULT([no])
can_enable_ng="no"
fi
AC_MSG_CHECKING([if SSL available for ACLK Next Generation])
if test -n "${SSL_LIBS}"; then
AC_MSG_RESULT([yes])
OPTIONAL_SSL_CFLAGS="${SSL_CFLAGS}"
OPTIONAL_SSL_LIBS="${SSL_LIBS}"
else
AC_MSG_RESULT([no])
fi
AC_MSG_CHECKING([if JSON-C available for ACLK Next Generation])
if test "$enable_jsonc" != "yes"; then
AC_MSG_RESULT([no])
can_enable_ng="no"
else
AC_MSG_RESULT([yes])
fi
AC_MSG_CHECKING([ACLK Next Generation can be built])
AC_MSG_RESULT([${can_enable_ng}])
if test "$can_enable_ng" = "no" -a "$aclk_ng" = "yes"; then
AC_MSG_ERROR([You have requested --with-aclk-ng but it can't be built. See reasons in lines above])
fi
if test "$can_enable_ng" = "yes"; then
aclk_ng="yes"
enable_aclk="yes"
AC_DEFINE([ACLK_NG], [1], [ACLK Next Generation Should be used])
AC_DEFINE([ENABLE_ACLK], [1], [netdata ACLK])
OPTIONAL_ACLK_NG_CFLAGS="-I \$(abs_top_srcdir)/mqtt_websockets/src/include -I \$(abs_top_srcdir)/mqtt_websockets/c-rbuf/include -I \$(abs_top_srcdir)/mqtt_websockets/MQTT-C/include"
fi
fi
if test "$enable_cloud" != "no" -a "$aclk_legacy" != "no"; then
AC_MSG_NOTICE([Checking if ACLK Legacy can be built])
if test -n "${SSL_LIBS}"; then
OPTIONAL_SSL_CFLAGS="${SSL_CFLAGS}"
OPTIONAL_SSL_LIBS="${SSL_LIBS}"
@ -665,7 +726,7 @@ if test "$enable_cloud" != "no" -a "$aclk_ng" != "yes"; then
AC_MSG_RESULT([${HAVE_libmosquitto_a}])
if test "${with_bundled_lws}" = "yes"; then
AC_MSG_CHECKING([if libwebsockets static lib is present])
AC_MSG_CHECKING([if libwebsockets static lib is present for ACLK Legacy])
if test -f "externaldeps/libwebsockets/libwebsockets.a"; then
LWS_CFLAGS="-I \$(abs_top_srcdir)/externaldeps/libwebsockets/include"
OPTIONAL_LWS_LIBS="\$(abs_top_srcdir)/externaldeps/libwebsockets/libwebsockets.a"
@ -677,7 +738,7 @@ if test "$enable_cloud" != "no" -a "$aclk_ng" != "yes"; then
# as currently this is default we prefer building netdata without ACLK
# instead of error fail
AC_MSG_RESULT([no])
AC_MSG_WARN([You required static libwebsockets to be used but we can't use it. Disabling ACLK])
AC_MSG_WARN([You required static libwebsockets to be used but we can't use it. Disabling ACLK Legacy])
fi
else
AC_CHECK_LIB([websockets],
@ -686,7 +747,7 @@ if test "$enable_cloud" != "no" -a "$aclk_ng" != "yes"; then
[AC_DEFINE([ACLK_NO_LWS], [1], [usable system libwebsockets was not found during build.])])
fi
if test "${build_target}" = "linux" -a "${enable_cloud}" != "no"; then
if test "${build_target}" = "linux"; then
if test "${have_libcap}" = "yes" -a "${with_libcap}" = "no"; then
AC_MSG_ERROR([agent-cloud-link can't be built without libcap. Disable it by --disable-cloud or enable libcap])
fi
@ -696,85 +757,33 @@ if test "$enable_cloud" != "no" -a "$aclk_ng" != "yes"; then
fi
# next 2 lines are just to have info for ACLK dependencies in common place
AC_MSG_CHECKING([if json-c available for ACLK])
AC_MSG_CHECKING([if json-c available for ACLK Legacy])
AC_MSG_RESULT([${enable_jsonc}])
test "${enable_cloud}" = "yes" -a "${enable_jsonc}" = "no" && \
AC_MSG_ERROR([You have asked for ACLK to be built but no json-c available. ACLK requires json-c])
AC_MSG_CHECKING([if netdata agent-cloud-link can be enabled])
AC_MSG_CHECKING([if netdata ACLK Legacy can be built])
if test "${HAVE_libmosquitto_a}" = "yes" -a -n "${OPTIONAL_LWS_LIBS}" -a -n "${SSL_LIBS}" -a "${enable_jsonc}" = "yes"; then
can_enable_aclk="yes"
can_build_legacy="yes"
else
can_enable_aclk="no"
can_build_legacy="no"
fi
AC_MSG_RESULT([${can_enable_aclk}])
AC_MSG_RESULT([${can_build_legacy}])
# TODO fix this (you need to try fallback)
test "${enable_cloud}" = "yes" -a "${can_enable_aclk}" = "no" && \
AC_MSG_ERROR([User required agent-cloud-link but it can't be built!])
AC_MSG_CHECKING([if netdata agent-cloud-link should/will be enabled])
if test "${enable_cloud}" = "detect"; then
enable_aclk=$can_enable_aclk
else
enable_aclk=$enable_cloud
if test "$can_build_legacy" = "no" -a "$aclk_legacy" = "yes"; then
AC_MSG_ERROR([You have requested --with-aclk-legacy but it can't be built. See reasons in lines above])
fi
if test "${enable_aclk}" = "yes"; then
AC_DEFINE([ENABLE_ACLK], [1], [netdata ACLK])
fi
AC_MSG_RESULT([${enable_aclk}])
fi
if test "$enable_cloud" = "no" -a "$aclk_ng" = "yes"; then
AC_MSG_ERROR([--disable-cloud && --aclk-ng not allowed together (such configuration is self contradicting)])
fi
if test "$enable_cloud" != "no" -a "$aclk_ng" != "no"; then
can_enable_ng="yes"
AC_MSG_CHECKING([if git submodules present for ACLK Next Generation])
if test -f "mqtt_websockets/src/mqtt_wss_client.c"; then
AC_MSG_RESULT([yes])
else
AC_MSG_RESULT([no])
can_enable_ng="no"
fi
AC_MSG_CHECKING([if SSL available for ACLK Next Generation])
if test -n "${SSL_LIBS}"; then
AC_MSG_RESULT([yes])
OPTIONAL_SSL_CFLAGS="${SSL_CFLAGS}"
OPTIONAL_SSL_LIBS="${SSL_LIBS}"
else
AC_MSG_RESULT([no])
fi
AC_MSG_CHECKING([if JSON-C available for ACLK Next Generation])
if test "$enable_jsonc" != "yes"; then
AC_MSG_RESULT([no])
can_enable_ng="no"
else
AC_MSG_RESULT([yes])
fi
AC_MSG_CHECKING([ACLK Next Generation can be built])
AC_MSG_RESULT([${can_enable_ng}])
if test "$aclk_ng" = "yes" -a "$can_enable_ng" != "yes"; then
AC_MSG_ERROR([ACLK-NG requested but can't be built])
fi
if test "$aclk_ng" != "yes" -a "$enable_aclk" == "no" -a "$can_enable_ng" = "yes"; then #default "fallback"
AC_MSG_NOTICE([ACLK Legacy could not be built. Trying ACLK-NG as fallback.])
aclk_ng="yes"
fi
if test "$aclk_ng" = "yes"; then
AC_DEFINE([ACLK_NG], [1], [ACLK Next Generation Should be used])
if test "$can_build_legacy" = "yes"; then
AC_DEFINE([ACLK_LEGACY], [1], [ACLK Legacy Should be used])
AC_DEFINE([ENABLE_ACLK], [1], [netdata ACLK])
aclk_legacy="yes"
enable_aclk="yes"
OPTIONAL_ACLK_NG_CFLAGS="-I \$(abs_top_srcdir)/mqtt_websockets/src/include -I \$(abs_top_srcdir)/mqtt_websockets/c-rbuf/include -I \$(abs_top_srcdir)/mqtt_websockets/MQTT-C/include"
fi
fi
AC_SUBST([enable_cloud])
AC_SUBST([enable_aclk])
AM_CONDITIONAL([ACLK_NG], [test "${aclk_ng}" = "yes"])
AM_CONDITIONAL([ACLK_LEGACY], [test "${aclk_legacy}" = "yes"])
AM_CONDITIONAL([ENABLE_ACLK], [test "${enable_aclk}" = "yes"])
# -----------------------------------------------------------------------------

View File

@ -9,13 +9,7 @@
#ifdef ENABLE_ACLK
#define FEAT_CLOUD 1
#define FEAT_CLOUD_MSG ""
#ifdef ACLK_NG
#define ACLK_IMPL "Next Generation"
#else
#define ACLK_IMPL "Legacy"
#endif
#else
#define ACLK_IMPL ""
#ifdef DISABLE_CLOUD
#define FEAT_CLOUD 0
#define FEAT_CLOUD_MSG "(by user request)"
@ -69,29 +63,29 @@
#define FEAT_LIBCAP 0
#endif
#ifndef ACLK_NG
#ifdef ACLK_NO_LIBMOSQ
#define FEAT_MOSQUITTO 0
#else
#define FEAT_MOSQUITTO 1
#endif
#ifndef ACLK_LEGACY_DISABLED
#ifdef ACLK_NO_LIBMOSQ
#define FEAT_MOSQUITTO 0
#else
#define FEAT_MOSQUITTO 1
#endif
#ifdef ACLK_NO_LWS
#define FEAT_LWS 0
#define FEAT_LWS_MSG ""
#else
#ifdef ENABLE_ACLK
#include <libwebsockets.h>
#endif
#ifdef BUNDLED_LWS
#define FEAT_LWS 1
#define FEAT_LWS_MSG "static"
#else
#define FEAT_LWS 1
#define FEAT_LWS_MSG "shared-lib"
#endif
#endif
#endif /* ACLK_NG */
#ifdef ACLK_NO_LWS
#define FEAT_LWS 0
#define FEAT_LWS_MSG ""
#else
#ifdef ACLK_LEGACY
#include <libwebsockets.h>
#endif
#ifdef BUNDLED_LWS
#define FEAT_LWS 1
#define FEAT_LWS_MSG "static"
#else
#define FEAT_LWS 1
#define FEAT_LWS_MSG "shared-lib"
#endif
#endif
#endif /* ACLK_LEGACY_DISABLED */
#ifdef NETDATA_WITH_ZLIB
#define FEAT_ZLIB 1
@ -199,6 +193,18 @@
#define FEAT_REMOTE_WRITE 0
#endif
#ifdef ACLK_NG
#define FEAT_ACLK_NG 1
#else
#define FEAT_ACLK_NG 0
#endif
#ifdef ACLK_LEGACY
#define FEAT_ACLK_LEGACY 1
#else
#define FEAT_ACLK_LEGACY 0
#endif
#define FEAT_YES_NO(x) ((x) ? "YES" : "NO")
void print_build_info(void) {
@ -208,9 +214,8 @@ void print_build_info(void) {
printf(" dbengine: %s\n", FEAT_YES_NO(FEAT_DBENGINE));
printf(" Native HTTPS: %s\n", FEAT_YES_NO(FEAT_NATIVE_HTTPS));
printf(" Netdata Cloud: %s %s\n", FEAT_YES_NO(FEAT_CLOUD), FEAT_CLOUD_MSG);
#if FEAT_CLOUD == 1
printf(" Cloud Implementation: %s\n", ACLK_IMPL);
#endif
printf(" ACLK Next Generation: %s\n", FEAT_YES_NO(FEAT_ACLK_NG));
printf(" ACLK Legacy: %s\n", FEAT_YES_NO(FEAT_ACLK_LEGACY));
printf(" TLS Host Verification: %s\n", FEAT_YES_NO(FEAT_TLS_HOST_VERIFY));
printf("Libraries:\n");
@ -219,8 +224,8 @@ void print_build_info(void) {
printf(" libcap: %s\n", FEAT_YES_NO(FEAT_LIBCAP));
printf(" libcrypto: %s\n", FEAT_YES_NO(FEAT_CRYPTO));
printf(" libm: %s\n", FEAT_YES_NO(FEAT_LIBM));
#ifndef ACLK_NG
#if defined(ENABLE_ACLK)
#ifndef ACLK_LEGACY_DISABLED
#if defined(ACLK_LEGACY)
printf(" LWS: %s %s v%d.%d.%d\n", FEAT_YES_NO(FEAT_LWS), FEAT_LWS_MSG, LWS_LIBRARY_VERSION_MAJOR, LWS_LIBRARY_VERSION_MINOR, LWS_LIBRARY_VERSION_PATCH);
#else
printf(" LWS: %s %s\n", FEAT_YES_NO(FEAT_LWS), FEAT_LWS_MSG);
@ -266,9 +271,9 @@ void print_build_info_json(void) {
#else
printf(" \"cloud-disabled\": false,\n");
#endif
#if FEAT_CLOUD == 1
printf(" \"cloud-implementation\": \"%s\",\n", ACLK_IMPL);
#endif
printf(" \"aclk-ng\": \"%s\",\n", FEAT_JSON_BOOL(FEAT_ACLK_NG));
printf(" \"aclk-legacy\": \"%s\",\n", FEAT_JSON_BOOL(FEAT_ACLK_LEGACY));
printf(" \"tls-host-verify\": %s\n", FEAT_JSON_BOOL(FEAT_TLS_HOST_VERIFY));
printf(" },\n");
@ -328,8 +333,8 @@ void analytics_build_info(BUFFER *b) {
if(FEAT_CRYPTO) buffer_strcat (b, "|libcrypto");
if(FEAT_LIBM) buffer_strcat (b, "|libm");
#ifndef ACLK_NG
#if defined(ENABLE_ACLK)
#ifndef ACLK_LEGACY_DISABLED
#if defined(ENABLE_ACLK) && defined(ACLK_LEGACY)
{
char buf[20];
snprintfz(buf, 19, "|LWS v%d.%d.%d", LWS_LIBRARY_VERSION_MAJOR, LWS_LIBRARY_VERSION_MINOR, LWS_LIBRARY_VERSION_PATCH);

View File

@ -66,10 +66,8 @@
#include "claim/claim.h"
// netdata agent cloud link
#ifndef ACLK_NG
#include "aclk/legacy/agent_cloud_link.h"
#else
#include "aclk/aclk.h"
#ifdef ENABLE_ACLK
#include "aclk/aclk_api.h"
#endif
// global GUID map functions

View File

@ -34,12 +34,7 @@ struct pg_cache_page_index;
#include "rrdcalc.h"
#include "rrdcalctemplate.h"
#include "streaming/rrdpush.h"
#ifndef ACLK_NG
#include "aclk/legacy/aclk_rrdhost_state.h"
#else
#include "aclk/aclk.h"
#endif
#include "aclk/aclk_rrdhost_state.h"
enum {
CONTEXT_FLAGS_ARCHIVE = 0x01,

View File

@ -938,7 +938,7 @@ RRDSET *rrdset_create_custom(
rrdhost_cleanup_obsolete_charts(host);
#ifdef ENABLE_ACLK
if (host->obsolete_count)
aclk_update_chart(st->rrdhost, "dummy-chart", ACLK_CMD_CHARTDEL);
aclk_update_chart(st->rrdhost, "dummy-chart", 0);
#endif
rrdhost_unlock(host);
@ -1384,7 +1384,7 @@ void rrdset_done(RRDSET *st) {
#ifdef ENABLE_ACLK
if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_ACLK))) {
rrdset_flag_clear(st, RRDSET_FLAG_ACLK);
aclk_update_chart(st->rrdhost, st->id, ACLK_CMD_CHART);
aclk_update_chart(st->rrdhost, st->id, 1);
}
#endif

View File

@ -215,7 +215,7 @@ USAGE: ${PROGRAM} [options]
--disable-ebpf Disable eBPF Kernel plugin (Default: enabled)
--disable-cloud Disable all Netdata Cloud functionality.
--require-cloud Fail the install if it can't build Netdata Cloud support.
--aclk-ng Forces build of ACLK Next Generation which is fallback by default.
--aclk-legacy Forces build of ACLK Legacy which is fallback by default.
--enable-plugin-freeipmi Enable the FreeIPMI plugin. Default: enable it when libipmimonitoring is available.
--disable-plugin-freeipmi
--disable-https Explicitly disable TLS support
@ -320,9 +320,9 @@ while [ -n "${1}" ]; do
"--disable-go") NETDATA_DISABLE_GO=1 ;;
"--enable-ebpf") NETDATA_DISABLE_EBPF=0 ;;
"--disable-ebpf") NETDATA_DISABLE_EBPF=1 NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--disable-ebpf/} --disable-ebpf" ;;
"--aclk-ng")
NETDATA_ACLK_NG=1
NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--with-aclk-ng/} --with-aclk-ng"
"--aclk-ng") ;;
"--aclk-legacy")
NETDATA_CONFIGURE_OPTIONS="${NETDATA_CONFIGURE_OPTIONS//--with-aclk-legacy/} --with-aclk-legacy"
;;
"--disable-cloud")
if [ -n "${NETDATA_REQUIRE_CLOUD}" ]; then
@ -572,7 +572,7 @@ copy_libmosquitto() {
}
bundle_libmosquitto() {
if [ -n "${NETDATA_DISABLE_CLOUD}" ] || [ -n "${NETDATA_ACLK_NG}" ]; then
if [ -n "${NETDATA_DISABLE_CLOUD}" ]; then
echo "Skipping libmosquitto"
return 0
fi
@ -674,7 +674,8 @@ copy_libwebsockets() {
}
bundle_libwebsockets() {
if [ -n "${NETDATA_DISABLE_CLOUD}" ] || [ -n "${USE_SYSTEM_LWS}" ] || [ -n "${NETDATA_ACLK_NG}" ]; then
if [ -n "${NETDATA_DISABLE_CLOUD}" ] || [ -n "${USE_SYSTEM_LWS}" ]; then
echo "Skipping libwebsockets"
return 0
fi

View File

@ -980,10 +980,22 @@ inline int web_client_api_request_v1_info_fill_buffer(RRDHOST *host, BUFFER *wb)
#ifdef ENABLE_ACLK
buffer_strcat(wb, "\t\"cloud-available\": true,\n");
#ifdef ACLK_NG
buffer_strcat(wb, "\t\"aclk-implementation\": \"Next Generation\",\n");
buffer_strcat(wb, "\t\"aclk-ng-available\": true,\n");
#else
buffer_strcat(wb, "\t\"aclk-implementation\": \"legacy\",\n");
buffer_strcat(wb, "\t\"aclk-ng-available\": false,\n");
#endif
#ifdef ACLK_LEGACY
buffer_strcat(wb, "\t\"aclk-legacy-available\": true,\n");
#else
buffer_strcat(wb, "\t\"aclk-legacy-available\": false,\n");
#endif
buffer_strcat(wb, "\t\"aclk-implementation\": \"");
if (aclk_ng) {
buffer_strcat(wb, "Next Generation");
} else {
buffer_strcat(wb, "legacy");
}
buffer_strcat(wb, "\",\n");
#else
buffer_strcat(wb, "\t\"cloud-available\": false,\n");
#endif