Send host labels via exporting connectors (#7554)
* Add labels to the JSON exporting connector * Add labels to the Graphite exporting connector * Add labels to the OpenTSDB telnet exporting connector * Add labels to the OpenTSDB HTTP exporting connector * Replace control characters in JSON strings * Add unit tests
This commit is contained in:
parent
37edc6898b
commit
0fba85e2c2
|
@ -603,6 +603,7 @@ RRDSET *rrdset_create_custom(
|
|||
st->variables = NULL;
|
||||
st->alarms = NULL;
|
||||
st->flags = 0x00000000;
|
||||
st->exporting_flags = NULL;
|
||||
|
||||
if(memory_mode == RRD_MEMORY_MODE_RAM) {
|
||||
memset(st, 0, size);
|
||||
|
|
|
@ -13,44 +13,62 @@
|
|||
|
||||
extern struct config exporting_config;
|
||||
|
||||
#define EXPORTER_DATA_SOURCE "data source"
|
||||
#define EXPORTER_DATA_SOURCE_DEFAULT "average"
|
||||
#define EXPORTER_DATA_SOURCE "data source"
|
||||
#define EXPORTER_DATA_SOURCE_DEFAULT "average"
|
||||
|
||||
#define EXPORTER_DESTINATION "destination"
|
||||
#define EXPORTER_DESTINATION_DEFAULT "localhost"
|
||||
#define EXPORTER_DESTINATION "destination"
|
||||
#define EXPORTER_DESTINATION_DEFAULT "localhost"
|
||||
|
||||
#define EXPORTER_UPDATE_EVERY "update every"
|
||||
#define EXPORTER_UPDATE_EVERY_DEFAULT 10
|
||||
#define EXPORTER_UPDATE_EVERY "update every"
|
||||
#define EXPORTER_UPDATE_EVERY_DEFAULT 10
|
||||
|
||||
#define EXPORTER_BUF_ONFAIL "buffer on failures"
|
||||
#define EXPORTER_BUF_ONFAIL_DEFAULT 10
|
||||
#define EXPORTER_BUF_ONFAIL "buffer on failures"
|
||||
#define EXPORTER_BUF_ONFAIL_DEFAULT 10
|
||||
|
||||
#define EXPORTER_TIMEOUT_MS "timeout ms"
|
||||
#define EXPORTER_TIMEOUT_MS_DEFAULT 10000
|
||||
#define EXPORTER_TIMEOUT_MS "timeout ms"
|
||||
#define EXPORTER_TIMEOUT_MS_DEFAULT 10000
|
||||
|
||||
#define EXPORTER_SEND_CHART_MATCH "send charts matching"
|
||||
#define EXPORTER_SEND_CHART_MATCH_DEFAULT "*"
|
||||
#define EXPORTER_SEND_CHART_MATCH "send charts matching"
|
||||
#define EXPORTER_SEND_CHART_MATCH_DEFAULT "*"
|
||||
|
||||
#define EXPORTER_SEND_HOST_MATCH "send hosts matching"
|
||||
#define EXPORTER_SEND_HOST_MATCH_DEFAULT "localhost *"
|
||||
#define EXPORTER_SEND_HOST_MATCH "send hosts matching"
|
||||
#define EXPORTER_SEND_HOST_MATCH_DEFAULT "localhost *"
|
||||
|
||||
#define EXPORTER_SEND_NAMES "send names instead of ids"
|
||||
#define EXPORTER_SEND_NAMES_DEFAULT CONFIG_BOOLEAN_YES
|
||||
#define EXPORTER_SEND_CONFIGURED_LABELS "send configured labels"
|
||||
#define EXPORTER_SEND_CONFIGURED_LABELS_DEFAULT CONFIG_BOOLEAN_YES
|
||||
|
||||
#define EXPORTER_SEND_AUTOMATIC_LABELS "send automatic labels"
|
||||
#define EXPORTER_SEND_AUTOMATIC_LABELS_DEFAULT CONFIG_BOOLEAN_NO
|
||||
|
||||
#define EXPORTER_SEND_NAMES "send names instead of ids"
|
||||
#define EXPORTER_SEND_NAMES_DEFAULT CONFIG_BOOLEAN_YES
|
||||
|
||||
typedef enum exporting_options {
|
||||
EXPORTING_OPTION_NONE = 0,
|
||||
EXPORTING_OPTION_NONE = 0,
|
||||
|
||||
EXPORTING_SOURCE_DATA_AS_COLLECTED = (1 << 0),
|
||||
EXPORTING_SOURCE_DATA_AVERAGE = (1 << 1),
|
||||
EXPORTING_SOURCE_DATA_SUM = (1 << 2),
|
||||
EXPORTING_SOURCE_DATA_AS_COLLECTED = (1 << 0),
|
||||
EXPORTING_SOURCE_DATA_AVERAGE = (1 << 1),
|
||||
EXPORTING_SOURCE_DATA_SUM = (1 << 2),
|
||||
|
||||
EXPORTING_OPTION_SEND_NAMES = (1 << 16)
|
||||
EXPORTING_OPTION_SEND_CONFIGURED_LABELS = (1 << 3),
|
||||
EXPORTING_OPTION_SEND_AUTOMATIC_LABELS = (1 << 4),
|
||||
|
||||
EXPORTING_OPTION_SEND_NAMES = (1 << 16)
|
||||
} EXPORTING_OPTIONS;
|
||||
|
||||
#define EXPORTING_OPTIONS_SOURCE_BITS \
|
||||
(EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_SOURCE_DATA_SUM)
|
||||
#define EXPORTING_OPTIONS_DATA_SOURCE(exporting_options) (exporting_options & EXPORTING_OPTIONS_SOURCE_BITS)
|
||||
|
||||
#define sending_labels_configured(instance) \
|
||||
(instance->config.options & (EXPORTING_OPTION_SEND_CONFIGURED_LABELS | EXPORTING_OPTION_SEND_AUTOMATIC_LABELS))
|
||||
|
||||
#define should_send_label(instance, label) \
|
||||
((instance->config.options & EXPORTING_OPTION_SEND_CONFIGURED_LABELS && \
|
||||
label->label_source == LABEL_SOURCE_NETDATA_CONF) || \
|
||||
(instance->config.options & EXPORTING_OPTION_SEND_AUTOMATIC_LABELS && \
|
||||
label->label_source != LABEL_SOURCE_NETDATA_CONF))
|
||||
|
||||
struct engine;
|
||||
|
||||
struct instance_config {
|
||||
|
@ -107,6 +125,8 @@ struct instance {
|
|||
int skip_host;
|
||||
int skip_chart;
|
||||
|
||||
BUFFER *labels;
|
||||
|
||||
time_t after;
|
||||
time_t before;
|
||||
|
||||
|
@ -174,6 +194,7 @@ int metric_formatting(struct engine *engine, RRDDIM *rd);
|
|||
int end_chart_formatting(struct engine *engine, RRDSET *st);
|
||||
int end_host_formatting(struct engine *engine, RRDHOST *host);
|
||||
int end_batch_formatting(struct engine *engine);
|
||||
int flush_host_labels(struct instance *instance, RRDHOST *host);
|
||||
|
||||
int exporting_discard_response(BUFFER *buffer, struct instance *instance);
|
||||
void simple_connector_receive_response(int *sock, struct instance *instance);
|
||||
|
|
|
@ -28,7 +28,7 @@ int init_graphite_connector(struct connector *connector)
|
|||
int init_graphite_instance(struct instance *instance)
|
||||
{
|
||||
instance->start_batch_formatting = NULL;
|
||||
instance->start_host_formatting = NULL;
|
||||
instance->start_host_formatting = format_host_labels_graphite_plaintext;
|
||||
instance->start_chart_formatting = NULL;
|
||||
|
||||
if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
|
||||
|
@ -37,7 +37,7 @@ int init_graphite_instance(struct instance *instance)
|
|||
instance->metric_formatting = format_dimension_stored_graphite_plaintext;
|
||||
|
||||
instance->end_chart_formatting = NULL;
|
||||
instance->end_host_formatting = NULL;
|
||||
instance->end_host_formatting = flush_host_labels;
|
||||
instance->end_batch_formatting = NULL;
|
||||
|
||||
instance->buffer = (void *)buffer_create(0);
|
||||
|
@ -51,6 +51,60 @@ int init_graphite_instance(struct instance *instance)
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy a label value and substitute underscores in place of charachters which can't be used in Graphite output
|
||||
*
|
||||
* @param dst a destination string.
|
||||
* @param src a source string.
|
||||
* @param len the maximum number of characters copied.
|
||||
*/
|
||||
|
||||
void sanitize_graphite_label_value(char *dst, char *src, size_t len)
|
||||
{
|
||||
while (*src != '\0' && len) {
|
||||
if (isspace(*src) || *src == ';' || *src == '~')
|
||||
*dst++ = '_';
|
||||
else
|
||||
*dst++ = *src;
|
||||
src++;
|
||||
len--;
|
||||
}
|
||||
*dst = '\0';
|
||||
}
|
||||
|
||||
/**
|
||||
* Format host labels for JSON connector
|
||||
*
|
||||
* @param instance an instance data structure.
|
||||
* @param host a data collecting host.
|
||||
* @return Always returns 0.
|
||||
*/
|
||||
int format_host_labels_graphite_plaintext(struct instance *instance, RRDHOST *host)
|
||||
{
|
||||
if (!instance->labels)
|
||||
instance->labels = buffer_create(1024);
|
||||
|
||||
if (unlikely(!sending_labels_configured(instance)))
|
||||
return 0;
|
||||
|
||||
netdata_rwlock_rdlock(&host->labels_rwlock);
|
||||
for (struct label *label = host->labels; label; label = label->next) {
|
||||
if (!should_send_label(instance, label))
|
||||
continue;
|
||||
|
||||
char value[CONFIG_MAX_VALUE + 1];
|
||||
sanitize_graphite_label_value(value, label->value, CONFIG_MAX_VALUE);
|
||||
|
||||
if (*value) {
|
||||
buffer_strcat(instance->labels, ";");
|
||||
buffer_sprintf(instance->labels, "%s=%s", label->key, value);
|
||||
}
|
||||
}
|
||||
netdata_rwlock_unlock(&host->labels_rwlock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format dimension using collected data for Graphite connector
|
||||
*
|
||||
|
@ -78,13 +132,14 @@ int format_dimension_collected_graphite_plaintext(struct instance *instance, RRD
|
|||
|
||||
buffer_sprintf(
|
||||
instance->buffer,
|
||||
"%s.%s.%s.%s%s%s " COLLECTED_NUMBER_FORMAT " %llu\n",
|
||||
"%s.%s.%s.%s%s%s%s " COLLECTED_NUMBER_FORMAT " %llu\n",
|
||||
engine->config.prefix,
|
||||
engine->config.hostname,
|
||||
chart_name,
|
||||
dimension_name,
|
||||
(host->tags) ? ";" : "",
|
||||
(host->tags) ? host->tags : "",
|
||||
(instance->labels) ? buffer_tostring(instance->labels) : "",
|
||||
rd->last_collected_value,
|
||||
(unsigned long long)rd->last_collected_time.tv_sec);
|
||||
|
||||
|
@ -124,13 +179,14 @@ int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM
|
|||
|
||||
buffer_sprintf(
|
||||
instance->buffer,
|
||||
"%s.%s.%s.%s%s%s " CALCULATED_NUMBER_FORMAT " %llu\n",
|
||||
"%s.%s.%s.%s%s%s%s " CALCULATED_NUMBER_FORMAT " %llu\n",
|
||||
engine->config.prefix,
|
||||
engine->config.hostname,
|
||||
chart_name,
|
||||
dimension_name,
|
||||
(host->tags) ? ";" : "",
|
||||
(host->tags) ? host->tags : "",
|
||||
(instance->labels) ? buffer_tostring(instance->labels) : "",
|
||||
value,
|
||||
(unsigned long long)last_t);
|
||||
|
||||
|
|
|
@ -7,6 +7,10 @@
|
|||
|
||||
int init_graphite_connector(struct connector *connector);
|
||||
int init_graphite_instance(struct instance *instance);
|
||||
|
||||
void sanitize_graphite_label_value(char *dst, char *src, size_t len);
|
||||
int format_host_labels_graphite_plaintext(struct instance *instance, RRDHOST *host);
|
||||
|
||||
int format_dimension_collected_graphite_plaintext(struct instance *instance, RRDDIM *rd);
|
||||
int format_dimension_stored_graphite_plaintext(struct instance *instance, RRDDIM *rd);
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ int init_json_connector(struct connector *connector)
|
|||
int init_json_instance(struct instance *instance)
|
||||
{
|
||||
instance->start_batch_formatting = NULL;
|
||||
instance->start_host_formatting = NULL;
|
||||
instance->start_host_formatting = format_host_labels_json_plaintext;
|
||||
instance->start_chart_formatting = NULL;
|
||||
|
||||
if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
|
||||
|
@ -37,7 +37,7 @@ int init_json_instance(struct instance *instance)
|
|||
instance->metric_formatting = format_dimension_stored_json_plaintext;
|
||||
|
||||
instance->end_chart_formatting = NULL;
|
||||
instance->end_host_formatting = NULL;
|
||||
instance->end_host_formatting = flush_host_labels;
|
||||
instance->end_batch_formatting = NULL;
|
||||
|
||||
instance->buffer = (void *)buffer_create(0);
|
||||
|
@ -51,6 +51,44 @@ int init_json_instance(struct instance *instance)
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format host labels for JSON connector
|
||||
*
|
||||
* @param instance an instance data structure.
|
||||
* @param host a data collecting host.
|
||||
* @return Always returns 0.
|
||||
*/
|
||||
int format_host_labels_json_plaintext(struct instance *instance, RRDHOST *host)
|
||||
{
|
||||
if (!instance->labels)
|
||||
instance->labels = buffer_create(1024);
|
||||
|
||||
if (unlikely(!sending_labels_configured(instance)))
|
||||
return 0;
|
||||
|
||||
buffer_strcat(instance->labels, "\"labels\":{");
|
||||
|
||||
int count = 0;
|
||||
netdata_rwlock_rdlock(&host->labels_rwlock);
|
||||
for (struct label *label = host->labels; label; label = label->next) {
|
||||
if (!should_send_label(instance, label))
|
||||
continue;
|
||||
|
||||
char value[CONFIG_MAX_VALUE * 2 + 1];
|
||||
sanitize_json_string(value, label->value, CONFIG_MAX_VALUE);
|
||||
if (count > 0)
|
||||
buffer_strcat(instance->labels, ",");
|
||||
buffer_sprintf(instance->labels, "\"%s\":\"%s\"", label->key, value);
|
||||
|
||||
count++;
|
||||
}
|
||||
netdata_rwlock_unlock(&host->labels_rwlock);
|
||||
|
||||
buffer_strcat(instance->labels, "},");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format dimension using collected data for JSON connector
|
||||
*
|
||||
|
@ -80,28 +118,32 @@ int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM
|
|||
|
||||
buffer_sprintf(
|
||||
instance->buffer,
|
||||
|
||||
"{"
|
||||
"\"prefix\":\"%s\","
|
||||
"\"hostname\":\"%s\","
|
||||
"%s%s%s"
|
||||
"%s"
|
||||
|
||||
"\"chart_id\":\"%s\","
|
||||
"\"chart_name\":\"%s\","
|
||||
"\"chart_family\":\"%s\","
|
||||
"\"chart_context\": \"%s\","
|
||||
"\"chart_context\":\"%s\","
|
||||
"\"chart_type\":\"%s\","
|
||||
"\"units\": \"%s\","
|
||||
"\"units\":\"%s\","
|
||||
|
||||
"\"id\":\"%s\","
|
||||
"\"name\":\"%s\","
|
||||
"\"value\":" COLLECTED_NUMBER_FORMAT ","
|
||||
|
||||
"\"timestamp\": %llu}\n",
|
||||
"\"timestamp\":%llu}\n",
|
||||
|
||||
engine->config.prefix,
|
||||
engine->config.hostname,
|
||||
tags_pre,
|
||||
tags,
|
||||
tags_post,
|
||||
instance->labels ? buffer_tostring(instance->labels) : "",
|
||||
|
||||
st->id,
|
||||
st->name,
|
||||
|
@ -158,6 +200,7 @@ int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd
|
|||
"\"prefix\":\"%s\","
|
||||
"\"hostname\":\"%s\","
|
||||
"%s%s%s"
|
||||
"%s"
|
||||
|
||||
"\"chart_id\":\"%s\","
|
||||
"\"chart_name\":\"%s\","
|
||||
|
@ -171,11 +214,13 @@ int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd
|
|||
"\"value\":" CALCULATED_NUMBER_FORMAT ","
|
||||
|
||||
"\"timestamp\": %llu}\n",
|
||||
|
||||
engine->config.prefix,
|
||||
engine->config.hostname,
|
||||
tags_pre,
|
||||
tags,
|
||||
tags_post,
|
||||
instance->labels ? buffer_tostring(instance->labels) : "",
|
||||
|
||||
st->id,
|
||||
st->name,
|
||||
|
|
|
@ -7,6 +7,9 @@
|
|||
|
||||
int init_json_connector(struct connector *connector);
|
||||
int init_json_instance(struct instance *instance);
|
||||
|
||||
int format_host_labels_json_plaintext(struct instance *instance, RRDHOST *host);
|
||||
|
||||
int format_dimension_collected_json_plaintext(struct instance *instance, RRDDIM *rd);
|
||||
int format_dimension_stored_json_plaintext(struct instance *instance, RRDDIM *rd);
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ int init_opentsdb_connector(struct connector *connector)
|
|||
int init_opentsdb_telnet_instance(struct instance *instance)
|
||||
{
|
||||
instance->start_batch_formatting = NULL;
|
||||
instance->start_host_formatting = NULL;
|
||||
instance->start_host_formatting = format_host_labels_opentsdb_telnet;
|
||||
instance->start_chart_formatting = NULL;
|
||||
|
||||
if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
|
||||
|
@ -37,7 +37,7 @@ int init_opentsdb_telnet_instance(struct instance *instance)
|
|||
instance->metric_formatting = format_dimension_stored_opentsdb_telnet;
|
||||
|
||||
instance->end_chart_formatting = NULL;
|
||||
instance->end_host_formatting = NULL;
|
||||
instance->end_host_formatting = flush_host_labels;
|
||||
instance->end_batch_formatting = NULL;
|
||||
|
||||
instance->buffer = (void *)buffer_create(0);
|
||||
|
@ -60,7 +60,7 @@ int init_opentsdb_telnet_instance(struct instance *instance)
|
|||
int init_opentsdb_http_instance(struct instance *instance)
|
||||
{
|
||||
instance->start_batch_formatting = NULL;
|
||||
instance->start_host_formatting = NULL;
|
||||
instance->start_host_formatting = format_host_labels_opentsdb_http;
|
||||
instance->start_chart_formatting = NULL;
|
||||
|
||||
if (EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_AS_COLLECTED)
|
||||
|
@ -69,7 +69,7 @@ int init_opentsdb_http_instance(struct instance *instance)
|
|||
instance->metric_formatting = format_dimension_stored_opentsdb_http;
|
||||
|
||||
instance->end_chart_formatting = NULL;
|
||||
instance->end_host_formatting = NULL;
|
||||
instance->end_host_formatting = flush_host_labels;
|
||||
instance->end_batch_formatting = NULL;
|
||||
|
||||
instance->buffer = (void *)buffer_create(0);
|
||||
|
@ -83,6 +83,58 @@ int init_opentsdb_http_instance(struct instance *instance)
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy a label value and substitute underscores in place of charachters which can't be used in OpenTSDB output
|
||||
*
|
||||
* @param dst a destination string.
|
||||
* @param src a source string.
|
||||
* @param len the maximum number of characters copied.
|
||||
*/
|
||||
|
||||
void sanitize_opentsdb_label_value(char *dst, char *src, size_t len)
|
||||
{
|
||||
while (*src != '\0' && len) {
|
||||
if (isalpha(*src) || isdigit(*src) || *src == '-' || *src == '_' || *src == '.' || *src == '/' || IS_UTF8_BYTE(*src))
|
||||
*dst++ = *src;
|
||||
else
|
||||
*dst++ = '_';
|
||||
src++;
|
||||
len--;
|
||||
}
|
||||
*dst = '\0';
|
||||
}
|
||||
|
||||
/**
|
||||
* Format host labels for JSON connector
|
||||
*
|
||||
* @param instance an instance data structure.
|
||||
* @param host a data collecting host.
|
||||
* @return Always returns 0.
|
||||
*/
|
||||
int format_host_labels_opentsdb_telnet(struct instance *instance, RRDHOST *host)
|
||||
{
|
||||
if (!instance->labels)
|
||||
instance->labels = buffer_create(1024);
|
||||
|
||||
if (unlikely(!sending_labels_configured(instance)))
|
||||
return 0;
|
||||
|
||||
netdata_rwlock_rdlock(&host->labels_rwlock);
|
||||
for (struct label *label = host->labels; label; label = label->next) {
|
||||
if (!should_send_label(instance, label))
|
||||
continue;
|
||||
|
||||
char value[CONFIG_MAX_VALUE + 1];
|
||||
sanitize_opentsdb_label_value(value, label->value, CONFIG_MAX_VALUE);
|
||||
|
||||
if (*value)
|
||||
buffer_sprintf(instance->labels, " %s=%s", label->key, value);
|
||||
}
|
||||
netdata_rwlock_unlock(&host->labels_rwlock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format dimension using collected data for OpenTSDB telnet connector
|
||||
*
|
||||
|
@ -110,7 +162,7 @@ int format_dimension_collected_opentsdb_telnet(struct instance *instance, RRDDIM
|
|||
|
||||
buffer_sprintf(
|
||||
instance->buffer,
|
||||
"put %s.%s.%s %llu " COLLECTED_NUMBER_FORMAT " host=%s%s%s\n",
|
||||
"put %s.%s.%s %llu " COLLECTED_NUMBER_FORMAT " host=%s%s%s%s\n",
|
||||
engine->config.prefix,
|
||||
chart_name,
|
||||
dimension_name,
|
||||
|
@ -118,7 +170,8 @@ int format_dimension_collected_opentsdb_telnet(struct instance *instance, RRDDIM
|
|||
rd->last_collected_value,
|
||||
engine->config.hostname,
|
||||
(host->tags) ? " " : "",
|
||||
(host->tags) ? host->tags : "");
|
||||
(host->tags) ? host->tags : "",
|
||||
(instance->labels) ? buffer_tostring(instance->labels) : "");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -156,7 +209,7 @@ int format_dimension_stored_opentsdb_telnet(struct instance *instance, RRDDIM *r
|
|||
|
||||
buffer_sprintf(
|
||||
instance->buffer,
|
||||
"put %s.%s.%s %llu " CALCULATED_NUMBER_FORMAT " host=%s%s%s\n",
|
||||
"put %s.%s.%s %llu " CALCULATED_NUMBER_FORMAT " host=%s%s%s%s\n",
|
||||
engine->config.prefix,
|
||||
chart_name,
|
||||
dimension_name,
|
||||
|
@ -164,7 +217,8 @@ int format_dimension_stored_opentsdb_telnet(struct instance *instance, RRDDIM *r
|
|||
value,
|
||||
engine->config.hostname,
|
||||
(host->tags) ? " " : "",
|
||||
(host->tags) ? host->tags : "");
|
||||
(host->tags) ? host->tags : "",
|
||||
(instance->labels) ? buffer_tostring(instance->labels) : "");
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -192,6 +246,42 @@ static inline void opentsdb_build_message(BUFFER *buffer, char *message, const c
|
|||
message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Format host labels for OpenTSDB HTTP connector
|
||||
*
|
||||
* @param instance an instance data structure.
|
||||
* @param host a data collecting host.
|
||||
* @return Always returns 0.
|
||||
*/
|
||||
int format_host_labels_opentsdb_http(struct instance *instance, RRDHOST *host)
|
||||
{
|
||||
if (!instance->labels)
|
||||
instance->labels = buffer_create(1024);
|
||||
|
||||
if (unlikely(!sending_labels_configured(instance)))
|
||||
return 0;
|
||||
|
||||
netdata_rwlock_rdlock(&host->labels_rwlock);
|
||||
for (struct label *label = host->labels; label; label = label->next) {
|
||||
if (!should_send_label(instance, label))
|
||||
continue;
|
||||
|
||||
char escaped_value[CONFIG_MAX_VALUE * 2 + 1];
|
||||
sanitize_json_string(escaped_value, label->value, CONFIG_MAX_VALUE);
|
||||
|
||||
char value[CONFIG_MAX_VALUE + 1];
|
||||
sanitize_opentsdb_label_value(value, escaped_value, CONFIG_MAX_VALUE);
|
||||
|
||||
if (*value) {
|
||||
buffer_strcat(instance->labels, ",");
|
||||
buffer_sprintf(instance->labels, "\"%s\":\"%s\"", label->key, value);
|
||||
}
|
||||
}
|
||||
netdata_rwlock_unlock(&host->labels_rwlock);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format dimension using collected data for OpenTSDB HTTP connector
|
||||
*
|
||||
|
@ -226,7 +316,7 @@ int format_dimension_collected_opentsdb_http(struct instance *instance, RRDDIM *
|
|||
" \"timestamp\": %llu,"
|
||||
" \"value\": " COLLECTED_NUMBER_FORMAT ","
|
||||
" \"tags\": {"
|
||||
" \"host\": \"%s%s%s\""
|
||||
" \"host\": \"%s%s%s\"%s"
|
||||
" }"
|
||||
"}",
|
||||
engine->config.prefix,
|
||||
|
@ -236,7 +326,8 @@ int format_dimension_collected_opentsdb_http(struct instance *instance, RRDDIM *
|
|||
rd->last_collected_value,
|
||||
engine->config.hostname,
|
||||
(host->tags) ? " " : "",
|
||||
(host->tags) ? host->tags : "");
|
||||
(host->tags) ? host->tags : "",
|
||||
instance->labels ? buffer_tostring(instance->labels) : "");
|
||||
|
||||
if (length > 0) {
|
||||
opentsdb_build_message(instance->buffer, message, engine->config.hostname, length);
|
||||
|
@ -285,7 +376,7 @@ int format_dimension_stored_opentsdb_http(struct instance *instance, RRDDIM *rd)
|
|||
" \"timestamp\": %llu,"
|
||||
" \"value\": " CALCULATED_NUMBER_FORMAT ","
|
||||
" \"tags\": {"
|
||||
" \"host\": \"%s%s%s\""
|
||||
" \"host\": \"%s%s%s\"%s"
|
||||
" }"
|
||||
"}",
|
||||
engine->config.prefix,
|
||||
|
@ -295,7 +386,8 @@ int format_dimension_stored_opentsdb_http(struct instance *instance, RRDDIM *rd)
|
|||
value,
|
||||
engine->config.hostname,
|
||||
(host->tags) ? " " : "",
|
||||
(host->tags) ? host->tags : "");
|
||||
(host->tags) ? host->tags : "",
|
||||
instance->labels ? buffer_tostring(instance->labels) : "");
|
||||
|
||||
if (length > 0) {
|
||||
opentsdb_build_message(instance->buffer, message, engine->config.hostname, length);
|
||||
|
|
|
@ -9,6 +9,10 @@ int init_opentsdb_connector(struct connector *connector);
|
|||
int init_opentsdb_telnet_instance(struct instance *instance);
|
||||
int init_opentsdb_http_instance(struct instance *instance);
|
||||
|
||||
void sanitize_opentsdb_label_value(char *dst, char *src, size_t len);
|
||||
int format_host_labels_opentsdb_telnet(struct instance *instance, RRDHOST *host);
|
||||
int format_host_labels_opentsdb_http(struct instance *instance, RRDHOST *host);
|
||||
|
||||
int format_dimension_collected_opentsdb_telnet(struct instance *instance, RRDDIM *rd);
|
||||
int format_dimension_stored_opentsdb_telnet(struct instance *instance, RRDDIM *rd);
|
||||
|
||||
|
|
|
@ -388,6 +388,23 @@ int prepare_buffers(struct engine *engine)
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush a buffer with host labels
|
||||
*
|
||||
* @param instance an instance data structure.
|
||||
* @param host a data collecting host.
|
||||
* @return Always returns 0.
|
||||
*/
|
||||
int flush_host_labels(struct instance *instance, RRDHOST *host)
|
||||
{
|
||||
(void)host;
|
||||
|
||||
if (instance->labels)
|
||||
buffer_flush(instance->labels);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Notify workers
|
||||
*
|
||||
|
|
|
@ -318,6 +318,18 @@ struct engine *read_exporting_config()
|
|||
|
||||
tmp_instance->config.options = exporting_parse_data_source(data_source, tmp_instance->config.options);
|
||||
|
||||
if (exporter_get_boolean(
|
||||
instance_name, EXPORTER_SEND_CONFIGURED_LABELS, EXPORTER_SEND_CONFIGURED_LABELS_DEFAULT))
|
||||
tmp_instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
|
||||
else
|
||||
tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
|
||||
|
||||
if (exporter_get_boolean(
|
||||
instance_name, EXPORTER_SEND_AUTOMATIC_LABELS, EXPORTER_SEND_AUTOMATIC_LABELS_DEFAULT))
|
||||
tmp_instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
|
||||
else
|
||||
tmp_instance->config.options &= ~EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
|
||||
|
||||
if (exporter_get_boolean(instance_name, EXPORTER_SEND_NAMES, EXPORTER_SEND_NAMES_DEFAULT))
|
||||
tmp_instance->config.options |= EXPORTING_OPTION_SEND_NAMES;
|
||||
else
|
||||
|
|
|
@ -39,6 +39,18 @@ int setup_rrdhost()
|
|||
|
||||
localhost->tags = strdupz("TAG1=VALUE1 TAG2=VALUE2");
|
||||
|
||||
struct label *label = calloc(1, sizeof(struct label));
|
||||
label->key = strdupz("key1");
|
||||
label->value = strdupz("value1");
|
||||
label->label_source = LABEL_SOURCE_NETDATA_CONF;
|
||||
localhost->labels = label;
|
||||
|
||||
label = calloc(1, sizeof(struct label));
|
||||
label->key = strdupz("key2");
|
||||
label->value = strdupz("value2");
|
||||
label->label_source = LABEL_SOURCE_AUTO;
|
||||
localhost->labels->next = label;
|
||||
|
||||
localhost->rrdset_root = calloc(1, sizeof(RRDSET));
|
||||
RRDSET *st = localhost->rrdset_root;
|
||||
st->rrdhost = localhost;
|
||||
|
@ -80,6 +92,13 @@ int teardown_rrdhost()
|
|||
free((void *)st->name);
|
||||
free(st);
|
||||
|
||||
free(localhost->labels->next->key);
|
||||
free(localhost->labels->next->value);
|
||||
free(localhost->labels->next);
|
||||
free(localhost->labels->key);
|
||||
free(localhost->labels->value);
|
||||
free(localhost->labels);
|
||||
|
||||
free((void *)localhost->tags);
|
||||
free(localhost);
|
||||
|
||||
|
@ -103,6 +122,7 @@ int teardown_initialized_engine(void **state)
|
|||
struct engine *engine = *state;
|
||||
|
||||
teardown_rrdhost();
|
||||
buffer_free(engine->connector_root->instance_root->labels);
|
||||
buffer_free(engine->connector_root->instance_root->buffer);
|
||||
teardown_configured_engine(state);
|
||||
|
||||
|
|
|
@ -61,7 +61,9 @@ static void test_exporting_engine(void **state)
|
|||
expect_memory(__wrap_send_internal_metrics, engine, engine, sizeof(struct engine));
|
||||
will_return(__wrap_send_internal_metrics, 0);
|
||||
|
||||
void *ptr = malloc(sizeof(int));
|
||||
expect_function_call(__wrap_info_int);
|
||||
|
||||
void *ptr = malloc(sizeof(struct netdata_static_thread));
|
||||
assert_ptr_equal(exporting_main(ptr), NULL);
|
||||
assert_int_equal(engine->now, 2);
|
||||
free(ptr);
|
||||
|
@ -118,11 +120,11 @@ static void test_init_connectors(void **state)
|
|||
assert_ptr_equal(instance->next, NULL);
|
||||
assert_int_equal(instance->index, 0);
|
||||
assert_ptr_equal(instance->start_batch_formatting, NULL);
|
||||
assert_ptr_equal(instance->start_host_formatting, NULL);
|
||||
assert_ptr_equal(instance->start_host_formatting, format_host_labels_graphite_plaintext);
|
||||
assert_ptr_equal(instance->start_chart_formatting, NULL);
|
||||
assert_ptr_equal(instance->metric_formatting, format_dimension_collected_graphite_plaintext);
|
||||
assert_ptr_equal(instance->end_chart_formatting, NULL);
|
||||
assert_ptr_equal(instance->end_host_formatting, NULL);
|
||||
assert_ptr_equal(instance->end_host_formatting, flush_host_labels);
|
||||
assert_ptr_equal(instance->end_batch_formatting, NULL);
|
||||
|
||||
BUFFER *buffer = instance->buffer;
|
||||
|
@ -460,8 +462,8 @@ static void test_format_dimension_collected_json_plaintext(void **state)
|
|||
buffer_tostring(engine->connector_root->instance_root->buffer),
|
||||
"{\"prefix\":\"netdata\",\"hostname\":\"test-host\",\"host_tags\":\"TAG1=VALUE1 TAG2=VALUE2\","
|
||||
"\"chart_id\":\"chart_id\",\"chart_name\":\"chart_name\",\"chart_family\":\"(null)\","
|
||||
"\"chart_context\": \"(null)\",\"chart_type\":\"(null)\",\"units\": \"(null)\",\"id\":\"dimension_id\","
|
||||
"\"name\":\"dimension_name\",\"value\":123000321,\"timestamp\": 15051}\n");
|
||||
"\"chart_context\":\"(null)\",\"chart_type\":\"(null)\",\"units\":\"(null)\",\"id\":\"dimension_id\","
|
||||
"\"name\":\"dimension_name\",\"value\":123000321,\"timestamp\":15051}\n");
|
||||
}
|
||||
|
||||
static void test_format_dimension_stored_json_plaintext(void **state)
|
||||
|
@ -678,6 +680,103 @@ static void test_simple_connector_worker(void **state)
|
|||
simple_connector_worker(instance);
|
||||
}
|
||||
|
||||
static void test_sanitize_json_string(void **state)
|
||||
{
|
||||
(void)state;
|
||||
|
||||
char *src = "check \t\\\" string";
|
||||
char dst[19 + 1];
|
||||
|
||||
sanitize_json_string(dst, src, 19);
|
||||
|
||||
assert_string_equal(dst, "check _\\\\\\\" string");
|
||||
}
|
||||
|
||||
static void test_sanitize_graphite_label_value(void **state)
|
||||
{
|
||||
(void)state;
|
||||
|
||||
char *src = "check ;~ string";
|
||||
char dst[15 + 1];
|
||||
|
||||
sanitize_graphite_label_value(dst, src, 15);
|
||||
|
||||
assert_string_equal(dst, "check____string");
|
||||
}
|
||||
|
||||
static void test_sanitize_opentsdb_label_value(void **state)
|
||||
{
|
||||
(void)state;
|
||||
|
||||
char *src = "check \t\\\" #&$? -_./ string";
|
||||
char dst[26 + 1];
|
||||
|
||||
sanitize_opentsdb_label_value(dst, src, 26);
|
||||
|
||||
assert_string_equal(dst, "check__________-_./_string");
|
||||
}
|
||||
|
||||
static void test_format_host_labels_json_plaintext(void **state)
|
||||
{
|
||||
struct engine *engine = *state;
|
||||
struct instance *instance = engine->connector_root->instance_root;
|
||||
|
||||
instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
|
||||
instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
|
||||
|
||||
assert_int_equal(format_host_labels_json_plaintext(instance, localhost), 0);
|
||||
assert_string_equal(buffer_tostring(instance->labels), "\"labels\":{\"key1\":\"value1\",\"key2\":\"value2\"},");
|
||||
}
|
||||
|
||||
static void test_format_host_labels_graphite_plaintext(void **state)
|
||||
{
|
||||
struct engine *engine = *state;
|
||||
struct instance *instance = engine->connector_root->instance_root;
|
||||
|
||||
instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
|
||||
instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
|
||||
|
||||
assert_int_equal(format_host_labels_graphite_plaintext(instance, localhost), 0);
|
||||
assert_string_equal(buffer_tostring(instance->labels), ";key1=value1;key2=value2");
|
||||
}
|
||||
|
||||
static void test_format_host_labels_opentsdb_telnet(void **state)
|
||||
{
|
||||
struct engine *engine = *state;
|
||||
struct instance *instance = engine->connector_root->instance_root;
|
||||
|
||||
instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
|
||||
instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
|
||||
|
||||
assert_int_equal(format_host_labels_opentsdb_telnet(instance, localhost), 0);
|
||||
assert_string_equal(buffer_tostring(instance->labels), " key1=value1 key2=value2");
|
||||
}
|
||||
|
||||
static void test_format_host_labels_opentsdb_http(void **state)
|
||||
{
|
||||
struct engine *engine = *state;
|
||||
struct instance *instance = engine->connector_root->instance_root;
|
||||
|
||||
instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
|
||||
instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
|
||||
|
||||
assert_int_equal(format_host_labels_opentsdb_http(instance, localhost), 0);
|
||||
assert_string_equal(buffer_tostring(instance->labels), ",\"key1\":\"value1\",\"key2\":\"value2\"");
|
||||
}
|
||||
|
||||
static void test_flush_host_labels(void **state)
|
||||
{
|
||||
struct engine *engine = *state;
|
||||
struct instance *instance = engine->connector_root->instance_root;
|
||||
|
||||
instance->labels = buffer_create(12);
|
||||
buffer_strcat(instance->labels, "check string");
|
||||
assert_int_equal(buffer_strlen(instance->labels), 12);
|
||||
|
||||
assert_int_equal(flush_host_labels(instance, localhost), 0);
|
||||
assert_int_equal(buffer_strlen(instance->labels), 0);
|
||||
}
|
||||
|
||||
int main(void)
|
||||
{
|
||||
const struct CMUnitTest tests[] = {
|
||||
|
@ -734,5 +833,21 @@ int main(void)
|
|||
test_simple_connector_worker, setup_initialized_engine, teardown_initialized_engine),
|
||||
};
|
||||
|
||||
return cmocka_run_group_tests_name("exporting_engine", tests, NULL, NULL);
|
||||
const struct CMUnitTest label_tests[] = {
|
||||
cmocka_unit_test(test_sanitize_json_string),
|
||||
cmocka_unit_test(test_sanitize_graphite_label_value),
|
||||
cmocka_unit_test(test_sanitize_opentsdb_label_value),
|
||||
cmocka_unit_test_setup_teardown(
|
||||
test_format_host_labels_json_plaintext, setup_initialized_engine, teardown_initialized_engine),
|
||||
cmocka_unit_test_setup_teardown(
|
||||
test_format_host_labels_graphite_plaintext, setup_initialized_engine, teardown_initialized_engine),
|
||||
cmocka_unit_test_setup_teardown(
|
||||
test_format_host_labels_opentsdb_telnet, setup_initialized_engine, teardown_initialized_engine),
|
||||
cmocka_unit_test_setup_teardown(
|
||||
test_format_host_labels_opentsdb_http, setup_initialized_engine, teardown_initialized_engine),
|
||||
cmocka_unit_test_setup_teardown(test_flush_host_labels, setup_initialized_engine, teardown_initialized_engine),
|
||||
};
|
||||
|
||||
return cmocka_run_group_tests_name("exporting_engine", tests, NULL, NULL) +
|
||||
cmocka_run_group_tests_name("labels_in_exporting_engine", label_tests, NULL, NULL);
|
||||
}
|
||||
|
|
|
@ -246,12 +246,18 @@ static inline char *strncpyz(char *dst, const char *src, size_t n) {
|
|||
return p;
|
||||
}
|
||||
|
||||
static inline void escape_json_string(char *dst, char *src, size_t len) {
|
||||
static inline void sanitize_json_string(char *dst, char *src, size_t len) {
|
||||
while (*src != '\0' && len > 1) {
|
||||
if (*src == '\\' || *src == '\"' || *src < 0x1F) {
|
||||
*dst++ = '\\';
|
||||
*dst++ = *src++;
|
||||
len -= 2;
|
||||
if (*src < 0x1F) {
|
||||
*dst++ = '_';
|
||||
src++;
|
||||
len--;
|
||||
} else {
|
||||
*dst++ = '\\';
|
||||
*dst++ = *src++;
|
||||
len -= 2;
|
||||
}
|
||||
} else {
|
||||
*dst++ = *src++;
|
||||
len--;
|
||||
|
|
|
@ -780,7 +780,7 @@ inline void host_labels2json(RRDHOST *host, BUFFER *wb, size_t indentation) {
|
|||
buffer_strcat(wb, tabs);
|
||||
|
||||
char value[CONFIG_MAX_VALUE * 2 + 1];
|
||||
escape_json_string(value, label->value, CONFIG_MAX_VALUE * 2);
|
||||
sanitize_json_string(value, label->value, CONFIG_MAX_VALUE * 2);
|
||||
buffer_sprintf(wb, "\"%s\": \"%s\"", label->key, value);
|
||||
|
||||
count++;
|
||||
|
|
Loading…
Reference in New Issue