Improving the ACLK performance - initial changes (#8399)

Add an inspection point for VerneMQ in the local dev env. Remove the bottleneck in sending websocket messages, at the expense of increased CPU-load. Fixed the message encoding. Added support for stress testing - it is still enabled in the main loop so will fire stress-testing payloads when the ACLK is established.

Next patch will integrate the socket polling properly to reduce the CPU overhead and remove the stress testing payloads.
This commit is contained in:
Andrew Moss 2020-03-14 07:35:49 +01:00 committed by GitHub
parent c999f89754
commit 87a0559ec3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 192 additions and 31 deletions

View File

@ -13,3 +13,4 @@ exclude_paths:
- web/gui/src/**
- web/gui/main.js
- tests/**
- aclk/tests/**

View File

@ -10,14 +10,57 @@ struct aclk_lws_wss_perconnect_data {
int todo;
};
struct lws_wss_packet_buffer {
unsigned char *data;
size_t data_size;
struct lws_wss_packet_buffer *next;
};
static struct aclk_lws_wss_engine_instance *engine_instance = NULL;
void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len)
{
if (write_len != NULL && write_len_bytes != NULL)
{
*write_len = 0;
*write_len_bytes = 0;
if (engine_instance != NULL)
{
aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
struct lws_wss_packet_buffer *write_b;
size_t w,wb;
for(w=0, wb=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next)
{
w++;
wb += write_b->data_size;
}
*write_len = w;
*write_len_bytes = wb;
aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
}
}
else if (write_len != NULL)
{
*write_len = 0;
if (engine_instance != NULL)
{
aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
struct lws_wss_packet_buffer *write_b;
size_t w;
for(w=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next)
w++;
*write_len = w;
aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
}
}
if (read_len != NULL)
{
*read_len = 0;
if (engine_instance != NULL)
{
aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
*read_len = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL);
aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
}
}
}
static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size)
{
struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer));
@ -25,6 +68,7 @@ static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data
new->data = mallocz(LWS_PRE + size);
memcpy(new->data + LWS_PRE, data, size);
new->data_size = size;
new->written = 0;
}
return new;
}
@ -355,7 +399,7 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas
{
UNUSED(user);
struct lws_wss_packet_buffer *data;
int retval = 0;
int retval = 0, rc;
// Callback servicing is forced when we are closed from above.
if (engine_instance->upstream_reconnect_request) {
@ -372,12 +416,24 @@ static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reas
switch (reason) {
case LWS_CALLBACK_CLIENT_WRITEABLE:
aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
data = lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head);
data = engine_instance->write_buffer_head;
if (likely(data)) {
lws_write(wsi, data->data + LWS_PRE, data->data_size, LWS_WRITE_BINARY);
lws_wss_packet_buffer_free(data);
size_t bytes_left = data->data_size - data->written;
if ( bytes_left > 65536 )
bytes_left = 65536;
rc = lws_write(wsi, data->data + LWS_PRE + data->written, bytes_left, LWS_WRITE_BINARY);
error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc);
data->written += bytes_left;
if (data->written == data->data_size)
{
lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head);
lws_wss_packet_buffer_free(data);
}
if (engine_instance->write_buffer_head)
{
error("Req write");
lws_callback_on_writable(engine_instance->lws_wsi);
}
}
aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
return retval;
@ -487,7 +543,13 @@ abort:
void aclk_lws_wss_service_loop()
{
if (engine_instance)
{
if (engine_instance->lws_wsi) {
lws_cancel_service(engine_instance->lws_context);
lws_callback_on_writable(engine_instance->lws_wsi);
}
lws_service(engine_instance->lws_context, 0);
}
}
// in case the MQTT connection disconnect while lws transport is still operational

View File

@ -31,7 +31,11 @@ struct aclk_lws_wss_engine_callbacks {
void (*connection_closed)();
};
struct lws_wss_packet_buffer;
struct lws_wss_packet_buffer {
unsigned char *data;
size_t data_size, written;
struct lws_wss_packet_buffer *next;
};
struct aclk_lws_wss_engine_instance {
//target host/port for connection
@ -73,6 +77,7 @@ void aclk_lws_wss_mqtt_layer_disconect_notif();
void aclk_lws_connection_established();
void aclk_lws_connection_data_received();
void aclk_lws_connection_closed();
void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
#endif

View File

@ -1281,7 +1281,7 @@ void aclk_get_challenge(char *aclk_hostname, char *aclk_port)
return;
}
if (challenge.result == NULL ) {
error("Could not retrieve challenge from auth response");
error("Could not retrieve challenge from auth response: %s", payload);
return;
}
@ -1362,6 +1362,7 @@ static void aclk_try_to_connect(char *hostname, char *port, int port_num)
*
* @return It always returns NULL
*/
void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len);
void *aclk_main(void *ptr)
{
struct netdata_static_thread *query_thread;
@ -1410,9 +1411,10 @@ void *aclk_main(void *ptr)
while (!netdata_exit) {
static int first_init = 0;
info("loop state first_init_%d connected=%d connecting=%d", first_init, aclk_connected, aclk_connecting);
sleep_usec(USEC_PER_MS * 500);
size_t write_q, write_q_bytes, read_q;
lws_wss_check_queues(&write_q, &write_q_bytes, &read_q);
info("loop state first_init_%d connected=%d connecting=%d wq=%zu (%zu-bytes) rq=%zu",
first_init, aclk_connected, aclk_connecting, write_q, read_q);
if (unlikely(!aclk_connected)) {
if (unlikely(!first_init)) {
aclk_try_to_connect(aclk_hostname, aclk_port, port_num);
@ -1439,7 +1441,10 @@ void *aclk_main(void *ptr)
}
_link_event_loop();
sleep_usec(USEC_PER_MS * 100);
//sleep_usec(USEC_PER_MS * 50);
static int stress_counter = 0;
if (stress_counter++ % 100 == 0 && write_q==0)
aclk_send_stress_test(2000000);
// TODO: Move to on-connect
if (unlikely(!aclk_subscribed)) {
@ -1498,7 +1503,7 @@ int aclk_send_message(char *sub_topic, char *message, char *msg_id)
}
ACLK_LOCK;
rc = _link_send_message(final_topic, message, &mid);
rc = _link_send_message(final_topic, (unsigned char *)message, &mid);
// TODO: link the msg_id with the mid so we can trace it
ACLK_UNLOCK;
@ -1603,8 +1608,6 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
debug(D_ACLK, "Sending v%d msgid [%s] type [%s] time [%ld]", ACLK_VERSION, msg_id, type, time_created);
}
//#define EYE_FRIENDLY
/*
* Take a buffer, encode it and rewrite it
*
@ -1612,10 +1615,6 @@ inline void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
BUFFER *aclk_encode_response(BUFFER *contents)
{
#ifdef EYE_FRIENDLY
return contents;
#else
char *tmp_buffer = mallocz(contents->len * 2);
char *src, *dst;
@ -1652,7 +1651,6 @@ BUFFER *aclk_encode_response(BUFFER *contents)
freez(tmp_buffer);
return contents;
#endif
}
/*
@ -1684,7 +1682,7 @@ void aclk_send_alarm_metadata()
debug(D_ACLK, "Metadata %s with alarms_active has %zu bytes", msg_id, local_buffer->len);
buffer_sprintf(local_buffer, "\n}\n}");
aclk_send_message(ACLK_ALARMS_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id);
aclk_send_message(ACLK_ALARMS_TOPIC, local_buffer->buffer, msg_id);
debug(D_ACLK, "Metadata %s encoded has %zu bytes", msg_id, local_buffer->len);
freez(msg_id);
@ -1711,7 +1709,7 @@ int aclk_send_info_metadata()
buffer_sprintf(local_buffer, "\n}\n}");
debug(D_ACLK, "Metadata %s with chart has %zu bytes", msg_id, local_buffer->len);
aclk_send_message(ACLK_METADATA_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id);
aclk_send_message(ACLK_METADATA_TOPIC, local_buffer->buffer, msg_id);
debug(D_ACLK, "Metadata %s encoded has %zu bytes", msg_id, local_buffer->len);
freez(msg_id);
@ -1719,10 +1717,30 @@ int aclk_send_info_metadata()
return 0;
}
void aclk_send_stress_test(size_t size)
{
char *buffer = mallocz(size);
if (buffer != NULL)
{
for(size_t i=0; i<size; i++)
buffer[i] = 'x';
buffer[size-1] = 0;
time_t time_created = now_realtime_sec();
sprintf(buffer,"{\"type\":\"stress\", \"timestamp\":%ld,\"payload\":", time_created);
buffer[strlen(buffer)] = '"';
buffer[size-2] = '}';
buffer[size-3] = '"';
aclk_send_message(ACLK_METADATA_TOPIC, buffer, NULL);
error("Sending stress of size %zu at time %ld", size, time_created);
}
free(buffer);
}
// Send info metadata message to the cloud if the link is established
// or on request
int aclk_send_metadata()
{
aclk_send_info_metadata();
aclk_send_alarm_metadata();
@ -1774,7 +1792,7 @@ int aclk_send_single_chart(char *hostname, char *chart)
rrdset2json(st, local_buffer, NULL, NULL, 1);
buffer_sprintf(local_buffer, "\t\n}");
aclk_send_message(ACLK_CHART_TOPIC, aclk_encode_response(local_buffer)->buffer, msg_id);
aclk_send_message(ACLK_CHART_TOPIC, local_buffer->buffer, msg_id);
freez(msg_id);
buffer_free(local_buffer);
@ -1833,7 +1851,7 @@ int aclk_update_alarm(RRDHOST *host, ALARM_ENTRY *ae)
netdata_rwlock_unlock(&host->health_log.alarm_log_rwlock);
buffer_sprintf(local_buffer, "\n}");
aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, aclk_encode_response(local_buffer)->buffer, 0, 1, ACLK_CMD_ALARM);
aclk_queue_query(ACLK_ALARMS_TOPIC, NULL, msg_id, local_buffer->buffer, 0, 1, ACLK_CMD_ALARM);
freez(msg_id);
buffer_free(local_buffer);

View File

@ -275,7 +275,7 @@ int _link_subscribe(char *topic, int qos)
*
*/
int _link_send_message(char *topic, char *message, int *mid)
int _link_send_message(char *topic, unsigned char *message, int *mid)
{
int rc;
@ -285,7 +285,7 @@ int _link_send_message(char *topic, char *message, int *mid)
return rc;
int msg_len = strlen(message);
error("Sending MQTT len=%d starts %02x %02x %02x", msg_len, message[0], message[1], message[2]);
rc = mosquitto_publish(mosq, mid, topic, msg_len, message, ACLK_QOS, 0);
// TODO: Add better handling -- error will flood the logfile here

View File

@ -14,7 +14,7 @@ int mqtt_attempt_connection(char *aclk_hostname, int aclk_port, char *username,
//int _link_lib_init();
int _mqtt_lib_init();
int _link_subscribe(char *topic, int qos);
int _link_send_message(char *topic, char *message, int *mid);
int _link_send_message(char *topic, unsigned char *message, int *mid);
const char *_link_strerror(int rc);
int aclk_handle_cloud_request(char *);

4
aclk/tests/launch-paho.sh Executable file
View File

@ -0,0 +1,4 @@
#!/usr/bin/env bash
docker build -f paho.Dockerfile . --build-arg "HOST_HOSTNAME=$(ping -c1 "$(hostname).local" | head -n1 | grep -o '[0-9]*\.[0-9]*\.[0-9]*\.[0-9]*')" -t paho-client
docker run -it paho-client

View File

@ -0,0 +1,57 @@
import ssl
import paho.mqtt.client as mqtt
import json
import time
import sys
def on_connect(mqttc, obj, flags, rc):
if rc==0:
print("Successful connection", flush=True)
else :
print(f"Connection error rc={rc}", flush=True)
mqttc.subscribe("/agent/#",0)
def on_disconnect(mqttc, obj, flags, rc):
print("disconnected rc: "+str(rc), flush=True)
def on_message(mqttc, obj, msg):
print(f"{msg.topic} {len(msg.payload)}-bytes qos={msg.qos}", flush=True)
try:
print(f"Trying decode of {msg.payload[:60]}",flush=True)
api_msg = json.loads(msg.payload)
except Exception as e:
print(e,flush=True)
return
ts = api_msg["timestamp"]
mtype = api_msg["type"]
print(f"Message {mtype} time={ts} size {len(api_msg)}", flush=True)
now = time.time()
print(f"Current {now} -> Delay {now-ts}", flush=True)
def on_publish(mqttc, obj, mid):
print("mid: "+str(mid), flush=True)
def on_subscribe(mqttc, obj, mid, granted_qos):
print("Subscribed: "+str(mid)+" "+str(granted_qos), flush=True)
def on_log(mqttc, obj, level, string):
print(string)
print(f"Starting paho-inspection on {sys.argv[1]}", flush=True)
mqttc = mqtt.Client(transport='websockets',client_id="paho")
#mqttc.tls_set(certfile="server.crt", keyfile="server.key", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
#mqttc.tls_set(ca_certs="server.crt", cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
mqttc.tls_set(cert_reqs=ssl.CERT_NONE, tls_version=ssl.PROTOCOL_TLS, ciphers=None)
mqttc.tls_insecure_set(True)
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_disconnect = on_disconnect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
mqttc.username_pw_set("paho","paho")
mqttc.connect(sys.argv[1], 8443, 60)
#mqttc.publish("/agent/mine","Test1")
#mqttc.subscribe("$SYS/#", 0)
print("Connected succesfully, monitoring /agent/#", flush=True)
mqttc.loop_forever()

View File

@ -0,0 +1,14 @@
FROM archlinux/base:latest
RUN pacman -Syyu --noconfirm
RUN pacman --noconfirm --needed -S python-pip
RUN pip install paho-mqtt
RUN mkdir -p /opt/paho
COPY paho-inspection.py /opt/paho/
WORKDIR /opt/paho
ARG HOST_HOSTNAME
RUN echo $HOST_HOSTNAME >host
CMD ["/bin/bash", "-c", "/usr/sbin/python paho-inspection.py $(cat host)"]