Adds some introspection into the MQTT_WSS (#14039)
This commit is contained in:
parent
cbbd338cad
commit
7d0ca0b83e
|
@ -1282,7 +1282,8 @@ ADD_LIBRARY(mqttwebsockets STATIC
|
|||
|
||||
target_compile_options(mqttwebsockets PUBLIC
|
||||
-DMQTT_WSS_CUSTOM_ALLOC
|
||||
-DRBUF_CUSTOM_MALLOC)
|
||||
-DRBUF_CUSTOM_MALLOC
|
||||
-DMQTT_WSS_CPUSTATS)
|
||||
|
||||
target_include_directories(mqttwebsockets PUBLIC
|
||||
${CMAKE_SOURCE_DIR}/aclk/helpers)
|
||||
|
|
|
@ -742,7 +742,7 @@ libmqttwebsockets_a_SOURCES = \
|
|||
mqtt_websockets/c_rhash/include/c_rhash.h \
|
||||
mqtt_websockets/c_rhash/src/c_rhash_internal.h
|
||||
|
||||
libmqttwebsockets_a_CFLAGS = $(CFLAGS) -DMQTT_WSS_CUSTOM_ALLOC -DRBUF_CUSTOM_MALLOC -I$(srcdir)/aclk/helpers -I$(srcdir)/mqtt_websockets/c_rhash/include
|
||||
libmqttwebsockets_a_CFLAGS = $(CFLAGS) -DMQTT_WSS_CUSTOM_ALLOC -DRBUF_CUSTOM_MALLOC -DMQTT_WSS_CPUSTATS -I$(srcdir)/aclk/helpers -I$(srcdir)/mqtt_websockets/c_rhash/include
|
||||
|
||||
if MQTT_WSS_DEBUG
|
||||
libmqttwebsockets_a_CFLAGS += -DMQTT_WSS_DEBUG
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
#define MQTT_WSS_CPUSTATS
|
||||
|
||||
#include "aclk_stats.h"
|
||||
|
||||
#include "aclk_query.h"
|
||||
|
@ -259,6 +261,23 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
|
|||
static uint64_t sent = 0;
|
||||
static uint64_t recvd = 0;
|
||||
|
||||
static RRDSET *st_txbuf_perc = NULL;
|
||||
static RRDDIM *rd_txbuf_perc = NULL;
|
||||
|
||||
static RRDSET *st_txbuf = NULL;
|
||||
static RRDDIM *rd_tx_buffer_usable = NULL;
|
||||
static RRDDIM *rd_tx_buffer_reclaimable = NULL;
|
||||
static RRDDIM *rd_tx_buffer_used = NULL;
|
||||
static RRDDIM *rd_tx_buffer_free = NULL;
|
||||
static RRDDIM *rd_tx_buffer_size = NULL;
|
||||
|
||||
static RRDSET *st_timing = NULL;
|
||||
static RRDDIM *rd_keepalive = NULL;
|
||||
static RRDDIM *rd_read_socket = NULL;
|
||||
static RRDDIM *rd_write_socket = NULL;
|
||||
static RRDDIM *rd_process_websocket = NULL;
|
||||
static RRDDIM *rd_process_mqtt = NULL;
|
||||
|
||||
sent += stats->bytes_tx;
|
||||
recvd += stats->bytes_rx;
|
||||
|
||||
|
@ -271,10 +290,61 @@ static void aclk_stats_mqtt_wss(struct mqtt_wss_stats *stats)
|
|||
rd_recvd = rrddim_add(st, "received", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
|
||||
}
|
||||
|
||||
if (unlikely(!st_txbuf_perc)) {
|
||||
st_txbuf_perc = rrdset_create_localhost(
|
||||
"netdata", "aclk_mqtt_tx_perc", NULL, "aclk", NULL, "Actively used percentage of MQTT Tx Buffer,", "%",
|
||||
"netdata", "stats", 200012, localhost->rrd_update_every, RRDSET_TYPE_LINE);
|
||||
|
||||
rd_txbuf_perc = rrddim_add(st_txbuf_perc, "used", NULL, 1, 100, RRD_ALGORITHM_ABSOLUTE);
|
||||
}
|
||||
|
||||
if (unlikely(!st_txbuf)) {
|
||||
st_txbuf = rrdset_create_localhost(
|
||||
"netdata", "aclk_mqtt_tx_queue", NULL, "aclk", NULL, "State of transmit MQTT queue.", "B",
|
||||
"netdata", "stats", 200013, localhost->rrd_update_every, RRDSET_TYPE_LINE);
|
||||
|
||||
rd_tx_buffer_usable = rrddim_add(st_txbuf, "usable", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
rd_tx_buffer_reclaimable = rrddim_add(st_txbuf, "reclaimable", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
rd_tx_buffer_used = rrddim_add(st_txbuf, "used", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
rd_tx_buffer_free = rrddim_add(st_txbuf, "free", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
rd_tx_buffer_size = rrddim_add(st_txbuf, "size", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
}
|
||||
|
||||
if (unlikely(!st_timing)) {
|
||||
st_timing = rrdset_create_localhost(
|
||||
"netdata", "aclk_mqtt_wss_time", NULL, "aclk", NULL, "Time spent handling MQTT, WSS, SSL and network communication.", "us",
|
||||
"netdata", "stats", 200014, localhost->rrd_update_every, RRDSET_TYPE_STACKED);
|
||||
|
||||
rd_keepalive = rrddim_add(st_timing, "keep-alive", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
rd_read_socket = rrddim_add(st_timing, "socket_read_ssl", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
rd_write_socket = rrddim_add(st_timing, "socket_write_ssl", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
rd_process_websocket = rrddim_add(st_timing, "process_websocket", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
rd_process_mqtt = rrddim_add(st_timing, "process_mqtt", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
}
|
||||
|
||||
rrddim_set_by_pointer(st, rd_sent, sent);
|
||||
rrddim_set_by_pointer(st, rd_recvd, recvd);
|
||||
|
||||
float usage = ((float)stats->mqtt.tx_buffer_free + stats->mqtt.tx_buffer_reclaimable) / stats->mqtt.tx_buffer_size;
|
||||
usage = (1 - usage) * 10000;
|
||||
rrddim_set_by_pointer(st_txbuf_perc, rd_txbuf_perc, usage);
|
||||
|
||||
rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_usable, stats->mqtt.tx_buffer_reclaimable + stats->mqtt.tx_buffer_free);
|
||||
rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_reclaimable, stats->mqtt.tx_buffer_reclaimable);
|
||||
rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_used, stats->mqtt.tx_buffer_used);
|
||||
rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_free, stats->mqtt.tx_buffer_free);
|
||||
rrddim_set_by_pointer(st_txbuf, rd_tx_buffer_size, stats->mqtt.tx_buffer_size);
|
||||
|
||||
rrddim_set_by_pointer(st_timing, rd_keepalive, stats->time_keepalive);
|
||||
rrddim_set_by_pointer(st_timing, rd_read_socket, stats->time_read_socket);
|
||||
rrddim_set_by_pointer(st_timing, rd_write_socket, stats->time_write_socket);
|
||||
rrddim_set_by_pointer(st_timing, rd_process_websocket, stats->time_process_websocket);
|
||||
rrddim_set_by_pointer(st_timing, rd_process_mqtt, stats->time_process_mqtt);
|
||||
|
||||
rrdset_done(st);
|
||||
rrdset_done(st_txbuf_perc);
|
||||
rrdset_done(st_txbuf);
|
||||
rrdset_done(st_timing);
|
||||
}
|
||||
|
||||
void aclk_stats_thread_prepare(int query_thread_count, unsigned int proto_hdl_cnt)
|
||||
|
|
Loading…
Reference in New Issue