detect if the disk cannot keep up with data collection (#7139)

* Adjust dbengine flushing speed more dynamically

* Added error tracking statistics for failure to flush events

* Added alarm for dbengine flushing errors

* Improved dbengine accounting for commited to be written pages
This commit is contained in:
Markos Fountoulakis 2019-10-24 19:43:09 +03:00 committed by GitHub
parent a6229b245c
commit 88f966593a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 122 additions and 51 deletions

View File

@ -381,7 +381,7 @@ declare -A configs_signatures=(
['7deb236ec68a512b9bdd18e6a51d76f7']='python.d/mysql.conf'
['7e5fc1644aa7a54f9dbb1bd102521b09']='health.d/memcached.conf'
['7f13631183fbdf79c21c8e5a171e9b34']='health.d/zfs.conf'
['e48b89d4a97b96acf9a88970ab858c3b']='health.d/dbengine.conf'
['8edc8c73a8f3ca40b32e27fe452c70f3']='health.d/dbengine.conf'
['7fb8184d56a27040e73261ed9c6fc76f']='health_alarm_notify.conf'
['80266bddd3df374923c750a6de91d120']='health.d/apache.conf'
['803a7f9dcb942eeac0fd764b9e3e38ca']='fping.conf'

View File

@ -544,7 +544,7 @@ void global_statistics_charts(void) {
if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
++hosts_with_dbengine;
/* get localhost's DB engine's statistics */
rrdeng_get_33_statistics(host->rrdeng_ctx, local_stats_array);
rrdeng_get_35_statistics(host->rrdeng_ctx, local_stats_array);
for (i = 0 ; i < RRDENG_NR_STATS ; ++i) {
/* aggregate statistics across hosts */
stats_array[i] += local_stats_array[i];
@ -775,6 +775,7 @@ void global_statistics_charts(void) {
static RRDSET *st_errors = NULL;
static RRDDIM *rd_fs_errors = NULL;
static RRDDIM *rd_io_errors = NULL;
static RRDDIM *rd_flushing_errors = NULL;
if (unlikely(!st_errors)) {
st_errors = rrdset_create_localhost(
@ -794,12 +795,14 @@ void global_statistics_charts(void) {
rd_io_errors = rrddim_add(st_errors, "I/O errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_fs_errors = rrddim_add(st_errors, "FS errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_flushing_errors = rrddim_add(st_errors, "flushing errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
}
else
rrdset_next(st_errors);
rrddim_set_by_pointer(st_errors, rd_io_errors, (collected_number)stats_array[30]);
rrddim_set_by_pointer(st_errors, rd_fs_errors, (collected_number)stats_array[31]);
rrddim_set_by_pointer(st_errors, rd_flushing_errors, (collected_number)stats_array[34]);
rrdset_done(st_errors);
}

View File

@ -214,7 +214,7 @@ static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned numb
* This function returns the maximum number of pages allowed in the page cache.
* The caller must hold the page cache lock.
*/
static inline unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
{
/* it's twice the number of producers since we pin 2 pages per producer */
return ctx->max_cache_pages + 2 * (unsigned long)ctx->stats.metric_API_producers;
@ -225,7 +225,7 @@ static inline unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
* number of pages below that number.
* The caller must hold the page cache lock.
*/
static inline unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
{
/* it's twice the number of producers since we pin 2 pages per producer */
return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->stats.metric_API_producers;
@ -1029,14 +1029,14 @@ static void init_replaceQ(struct rrdengine_instance *ctx)
assert(0 == uv_rwlock_init(&pg_cache->replaceQ.lock));
}
static void init_commited_page_index(struct rrdengine_instance *ctx)
static void init_committed_page_index(struct rrdengine_instance *ctx)
{
struct page_cache *pg_cache = &ctx->pg_cache;
pg_cache->commited_page_index.JudyL_array = (Pvoid_t) NULL;
assert(0 == uv_rwlock_init(&pg_cache->commited_page_index.lock));
pg_cache->commited_page_index.latest_corr_id = 0;
pg_cache->commited_page_index.nr_commited_pages = 0;
pg_cache->committed_page_index.JudyL_array = (Pvoid_t) NULL;
assert(0 == uv_rwlock_init(&pg_cache->committed_page_index.lock));
pg_cache->committed_page_index.latest_corr_id = 0;
pg_cache->committed_page_index.nr_committed_pages = 0;
}
void init_page_cache(struct rrdengine_instance *ctx)
@ -1049,7 +1049,7 @@ void init_page_cache(struct rrdengine_instance *ctx)
init_metrics_index(ctx);
init_replaceQ(ctx);
init_commited_page_index(ctx);
init_committed_page_index(ctx);
}
void free_page_cache(struct rrdengine_instance *ctx)
@ -1062,9 +1062,9 @@ void free_page_cache(struct rrdengine_instance *ctx)
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
/* Free commited page index */
ret_Judy = JudyLFreeArray(&pg_cache->commited_page_index.JudyL_array, PJE0);
assert(NULL == pg_cache->commited_page_index.JudyL_array);
/* Free committed page index */
ret_Judy = JudyLFreeArray(&pg_cache->committed_page_index.JudyL_array, PJE0);
assert(NULL == pg_cache->committed_page_index.JudyL_array);
bytes_freed += ret_Judy;
for (page_index = pg_cache->metrics_index.last_page_index ;

View File

@ -110,7 +110,7 @@ struct pg_cache_metrics_index {
};
/* gathers dirty pages to be written on disk */
struct pg_cache_commited_page_index {
struct pg_cache_committed_page_index {
uv_rwlock_t lock;
Pvoid_t JudyL_array;
@ -122,7 +122,7 @@ struct pg_cache_commited_page_index {
*/
Word_t latest_corr_id;
unsigned nr_commited_pages;
unsigned nr_committed_pages;
};
/*
@ -140,7 +140,7 @@ struct page_cache { /* TODO: add statistics */
uv_rwlock_t pg_cache_rwlock; /* page cache lock */
struct pg_cache_metrics_index metrics_index;
struct pg_cache_commited_page_index commited_page_index;
struct pg_cache_committed_page_index committed_page_index;
struct pg_cache_replaceQ replaceQ;
unsigned page_descriptors;
@ -182,6 +182,8 @@ extern void init_page_cache(struct rrdengine_instance *ctx);
extern void free_page_cache(struct rrdengine_instance *ctx);
extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index, struct rrdeng_page_descr *descr);
extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx);
extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx);
static inline void
pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp)

View File

@ -5,9 +5,8 @@
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
rrdeng_stats_t global_pg_cache_warnings = 0;
rrdeng_stats_t global_pg_cache_errors = 0;
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
rrdeng_stats_t global_flushing_errors = 0;
void sanity_check(void)
{
@ -253,6 +252,7 @@ void flush_pages_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
struct rrdengine_instance *ctx = wc->ctx;
struct page_cache *pg_cache = &ctx->pg_cache;
struct extent_io_descriptor *xt_io_descr;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
@ -290,6 +290,10 @@ void flush_pages_cb(uv_fs_t* req)
uv_fs_req_cleanup(req);
free(xt_io_descr->buf);
freez(xt_io_descr);
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
pg_cache->committed_page_index.nr_committed_pages -= count;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
}
/*
@ -323,14 +327,14 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
if (force) {
debug(D_RRDENGINE, "Asynchronous flushing of extent has been forced by page pressure.");
}
uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
for (Index = 0, count = 0, uncompressed_payload_length = 0,
PValue = JudyLFirst(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue ;
descr != NULL && count != MAX_PAGES_PER_EXTENT ;
PValue = JudyLNext(pg_cache->commited_page_index.JudyL_array, &Index, PJE0),
PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
uint8_t page_write_pending;
@ -350,12 +354,11 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
rrdeng_page_descr_mutex_unlock(ctx, descr);
if (page_write_pending) {
ret = JudyLDel(&pg_cache->commited_page_index.JudyL_array, Index, PJE0);
ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
assert(1 == ret);
--pg_cache->commited_page_index.nr_commited_pages;
}
}
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
if (!count) {
debug(D_RRDENGINE, "%s: no pages eligible for flushing.", __func__);
@ -648,6 +651,9 @@ void async_cb(uv_async_t *handle)
debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
}
/* Flushes dirty pages when timer expires */
#define TIMER_PERIOD_MS (1000)
void timer_cb(uv_timer_t* handle)
{
struct rrdengine_worker_config* wc = handle->data;
@ -657,12 +663,31 @@ void timer_cb(uv_timer_t* handle)
rrdeng_test_quota(wc);
debug(D_RRDENGINE, "%s: timeout reached.", __func__);
if (likely(!wc->now_deleting.data)) {
unsigned total_bytes, bytes_written;
/* There is free space so we can write to disk */
struct rrdengine_instance *ctx = wc->ctx;
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark,
high_watermark;
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
producers = ctx->stats.metric_API_producers;
/* are flushable pages more than 25% of the maximum page cache size */
high_watermark = (ctx->max_cache_pages * 25LLU) / 100;
low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */
if (nr_committed_pages > producers &&
/* committed to be written pages are more than the produced number */
nr_committed_pages - producers > high_watermark) {
/* Flushing speed must increase to stop page cache from filling with dirty pages */
bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
}
bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
debug(D_RRDENGINE, "Flushing pages to disk.");
for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ;
bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE) ;
bytes_written && (total_bytes < bytes_to_write) ;
total_bytes += bytes_written) {
bytes_written = do_flush_pages(wc, 0, NULL);
}
@ -675,9 +700,6 @@ void timer_cb(uv_timer_t* handle)
#endif
}
/* Flushes dirty pages when timer expires */
#define TIMER_PERIOD_MS (1000)
#define MAX_CMD_BATCH_SIZE (256)
void rrdeng_worker(void* arg)
@ -771,8 +793,8 @@ void rrdeng_worker(void* arg)
/* First I/O should be enough to call completion */
bytes_written = do_flush_pages(wc, 1, cmd.completion);
if (bytes_written) {
while (do_flush_pages(wc, 1, NULL)) {
; /* Force flushing of all commited pages. */
while (do_flush_pages(wc, 1, NULL) && likely(!wc->now_deleting.data)) {
; /* Force flushing of all committed pages if there is free space. */
}
}
break;
@ -789,7 +811,7 @@ void rrdeng_worker(void* arg)
}
info("Shutting down RRD engine event loop.");
while (do_flush_pages(wc, 1, NULL)) {
; /* Force flushing of all commited pages. */
; /* Force flushing of all committed pages. */
}
wal_flush_transaction_buffer(wc);
uv_run(loop, UV_RUN_DEFAULT);

View File

@ -148,6 +148,7 @@ struct rrdengine_statistics {
rrdeng_stats_t page_cache_descriptors;
rrdeng_stats_t io_errors;
rrdeng_stats_t fs_errors;
rrdeng_stats_t flushing_errors;
};
/* I/O errors global counter */
@ -156,6 +157,8 @@ extern rrdeng_stats_t global_io_errors;
extern rrdeng_stats_t global_fs_errors;
/* number of File-Descriptors that have been reserved by dbengine */
extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
/* inability to flush global counter */
extern rrdeng_stats_t global_flushing_errors;
struct rrdengine_instance {
struct rrdengine_worker_config worker_config;

View File

@ -173,9 +173,9 @@ void rrdeng_store_metric_next(RRDDIM *rd, usec_t point_in_time, storage_number n
handle->descr = descr;
uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
handle->page_correlation_id = pg_cache->commited_page_index.latest_corr_id++;
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
handle->page_correlation_id = pg_cache->committed_page_index.latest_corr_id++;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
if (0 == rd->rrdset->rrddim_page_alignment) {
/* this is the leading dimension that defines chart alignment */
@ -614,18 +614,31 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr
{
struct page_cache *pg_cache = &ctx->pg_cache;
Pvoid_t *PValue;
unsigned nr_committed_pages;
if (unlikely(NULL == descr)) {
debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-commited.", __func__);
debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__);
return;
}
assert(descr->page_length);
uv_rwlock_wrlock(&pg_cache->commited_page_index.lock);
PValue = JudyLIns(&pg_cache->commited_page_index.JudyL_array, page_correlation_id, PJE0);
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0);
*PValue = descr;
++pg_cache->commited_page_index.nr_commited_pages;
uv_rwlock_wrunlock(&pg_cache->commited_page_index.lock);
nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
if (nr_committed_pages >= (pg_cache_hard_limit(ctx) - (unsigned long)ctx->stats.metric_API_producers) / 2) {
/* 50% of pages have not been committed yet */
if (0 == (unsigned long)ctx->stats.flushing_errors) {
/* only print the first time */
error("Failed to flush quickly enough in dbengine instance \"%s\""
". Metric data will not be stored in the database"
", please reduce disk load or use a faster disk.", ctx->dbfiles_path);
}
rrd_stat_atomic_add(&ctx->stats.flushing_errors, 1);
rrd_stat_atomic_add(&global_flushing_errors, 1);
}
pg_cache_put(ctx, descr);
}
@ -674,7 +687,7 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
* You must not change the indices of the statistics or user code will break.
* You must not exceed RRDENG_NR_STATS or it will crash.
*/
void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@ -682,7 +695,7 @@ void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long
array[1] = (uint64_t)ctx->stats.metric_API_consumers;
array[2] = (uint64_t)pg_cache->page_descriptors;
array[3] = (uint64_t)pg_cache->populated_pages;
array[4] = (uint64_t)pg_cache->commited_page_index.nr_commited_pages;
array[4] = (uint64_t)pg_cache->committed_page_index.nr_committed_pages;
array[5] = (uint64_t)ctx->stats.pg_cache_insertions;
array[6] = (uint64_t)ctx->stats.pg_cache_deletions;
array[7] = (uint64_t)ctx->stats.pg_cache_hits;
@ -711,7 +724,9 @@ void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long
array[30] = (uint64_t)global_io_errors;
array[31] = (uint64_t)global_fs_errors;
array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
assert(RRDENG_NR_STATS == 33);
array[33] = (uint64_t)ctx->stats.flushing_errors;
array[34] = (uint64_t)global_flushing_errors;
assert(RRDENG_NR_STATS == 35);
}
/* Releases reference to page */

View File

@ -8,7 +8,7 @@
#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (8)
#define RRDENG_MIN_DISK_SPACE_MB (256)
#define RRDENG_NR_STATS (33)
#define RRDENG_NR_STATS (35)
#define RRDENG_FD_BUDGET_PER_INSTANCE (50)
@ -41,7 +41,7 @@ extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_han
extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
extern time_t rrdeng_metric_oldest_time(RRDDIM *rd);
extern void rrdeng_get_33_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
extern void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
/* must call once before using anything */
extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,

View File

@ -131,7 +131,7 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
"page_cache_total_pages: %ld\n"
"page_cache_descriptors: %ld\n"
"page_cache_populated_pages: %ld\n"
"page_cache_commited_pages: %ld\n"
"page_cache_committed_pages: %ld\n"
"page_cache_insertions: %ld\n"
"page_cache_deletions: %ld\n"
"page_cache_hits: %ld\n"
@ -153,13 +153,20 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
"datafile_creations: %ld\n"
"datafile_deletions: %ld\n"
"journalfile_creations: %ld\n"
"journalfile_deletions: %ld\n",
"journalfile_deletions: %ld\n"
"io_errors: %ld\n"
"fs_errors: %ld\n"
"global_io_errors: %ld\n"
"global_fs_errors: %ld\n"
"rrdeng_reserved_file_descriptors: %ld\n"
"flushing_errors: %ld\n"
"global_flushing_errors: %ld\n",
(long)ctx->stats.metric_API_producers,
(long)ctx->stats.metric_API_consumers,
(long)pg_cache->page_descriptors,
(long)ctx->stats.page_cache_descriptors,
(long)pg_cache->populated_pages,
(long)pg_cache->commited_page_index.nr_commited_pages,
(long)pg_cache->committed_page_index.nr_committed_pages,
(long)ctx->stats.pg_cache_insertions,
(long)ctx->stats.pg_cache_deletions,
(long)ctx->stats.pg_cache_hits,
@ -181,7 +188,14 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
(long)ctx->stats.datafile_creations,
(long)ctx->stats.datafile_deletions,
(long)ctx->stats.journalfile_creations,
(long)ctx->stats.journalfile_deletions
(long)ctx->stats.journalfile_deletions,
(long)ctx->stats.io_errors,
(long)ctx->stats.fs_errors,
(long)global_io_errors,
(long)global_fs_errors,
(long)rrdeng_reserved_file_descriptors,
(long)ctx->stats.flushing_errors,
(long)global_flushing_errors
);
return str;
}

View File

@ -23,4 +23,16 @@ lookup: sum -10m unaligned of I/O errors
crit: $this > 0
delay: down 1h multiplier 1.5 max 3h
info: number of IO errors dbengine came across the last 10 minutes (CRC errors, out of space, bad disk etc)
to: sysadmin
to: sysadmin
alarm: 10min_dbengine_global_flushing_errors
on: netdata.dbengine_global_errors
os: linux freebsd macos
hosts: *
lookup: sum -10m unaligned of flushing errors
units: errors
every: 3s
crit: $this > 0
delay: down 1h multiplier 1.5 max 3h
info: number of times in the last 10 minutes that the dbengine failed to completely flush data to disk, metric data will not be stored in the database, please reduce disk load or use a faster disk
to: sysadmin