Drop dirty dbengine pages if disk cannot keep up (#7777)

* Introduce dirty page pressure handling in the dbengine page cache that invalidates pages when the disk cannot keep up with the flushing speed.
This commit is contained in:
Markos Fountoulakis 2020-02-06 21:58:13 +02:00 committed by GitHub
parent b2b3c18254
commit 6b119d9170
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 375 additions and 116 deletions

View File

@ -381,7 +381,7 @@ declare -A configs_signatures=(
['7deb236ec68a512b9bdd18e6a51d76f7']='python.d/mysql.conf'
['7e5fc1644aa7a54f9dbb1bd102521b09']='health.d/memcached.conf'
['7f13631183fbdf79c21c8e5a171e9b34']='health.d/zfs.conf'
['8edc8c73a8f3ca40b32e27fe452c70f3']='health.d/dbengine.conf'
['82f1dc0a477a175ae31d7b815411e44e']='health.d/dbengine.conf'
['7fb8184d56a27040e73261ed9c6fc76f']='health_alarm_notify.conf'
['80266bddd3df374923c750a6de91d120']='health.d/apache.conf'
['803a7f9dcb942eeac0fd764b9e3e38ca']='fping.conf'

View File

@ -544,7 +544,7 @@ void global_statistics_charts(void) {
if (host->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE) {
++hosts_with_dbengine;
/* get localhost's DB engine's statistics */
rrdeng_get_35_statistics(host->rrdeng_ctx, local_stats_array);
rrdeng_get_37_statistics(host->rrdeng_ctx, local_stats_array);
for (i = 0 ; i < RRDENG_NR_STATS ; ++i) {
/* aggregate statistics across hosts */
stats_array[i] += local_stats_array[i];
@ -558,6 +558,8 @@ void global_statistics_charts(void) {
stats_array[30] = local_stats_array[30];
stats_array[31] = local_stats_array[31];
stats_array[32] = local_stats_array[32];
stats_array[34] = local_stats_array[34];
stats_array[36] = local_stats_array[36];
// ----------------------------------------------------------------
@ -642,7 +644,6 @@ void global_statistics_charts(void) {
old_misses = misses;
if (hits_delta + misses_delta) {
// allow negative savings
ratio = (hits_delta * 100 * 1000) / (hits_delta + misses_delta);
} else {
ratio = 0;
@ -658,11 +659,10 @@ void global_statistics_charts(void) {
static RRDSET *st_pg_cache_pages = NULL;
static RRDDIM *rd_descriptors = NULL;
static RRDDIM *rd_populated = NULL;
static RRDDIM *rd_committed = NULL;
static RRDDIM *rd_insertions = NULL;
static RRDDIM *rd_deletions = NULL;
static RRDDIM *rd_dirty = NULL;
static RRDDIM *rd_backfills = NULL;
static RRDDIM *rd_evictions = NULL;
static RRDDIM *rd_used_by_collectors = NULL;
if (unlikely(!st_pg_cache_pages)) {
st_pg_cache_pages = rrdset_create_localhost(
@ -671,7 +671,7 @@ void global_statistics_charts(void) {
, NULL
, "dbengine"
, NULL
, "NetData DB engine page statistics"
, "NetData dbengine page cache statistics"
, "pages"
, "netdata"
, "stats"
@ -682,27 +682,68 @@ void global_statistics_charts(void) {
rd_descriptors = rrddim_add(st_pg_cache_pages, "descriptors", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_populated = rrddim_add(st_pg_cache_pages, "populated", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_committed = rrddim_add(st_pg_cache_pages, "committed", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_insertions = rrddim_add(st_pg_cache_pages, "insertions", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_deletions = rrddim_add(st_pg_cache_pages, "deletions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_dirty = rrddim_add(st_pg_cache_pages, "dirty", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_backfills = rrddim_add(st_pg_cache_pages, "backfills", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_evictions = rrddim_add(st_pg_cache_pages, "evictions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_used_by_collectors = rrddim_add(st_pg_cache_pages, "used_by_collectors", NULL, 1, 1,
RRD_ALGORITHM_ABSOLUTE);
}
else
rrdset_next(st_pg_cache_pages);
rrddim_set_by_pointer(st_pg_cache_pages, rd_descriptors, (collected_number)stats_array[27]);
rrddim_set_by_pointer(st_pg_cache_pages, rd_populated, (collected_number)stats_array[3]);
rrddim_set_by_pointer(st_pg_cache_pages, rd_committed, (collected_number)stats_array[4]);
rrddim_set_by_pointer(st_pg_cache_pages, rd_insertions, (collected_number)stats_array[5]);
rrddim_set_by_pointer(st_pg_cache_pages, rd_deletions, (collected_number)stats_array[6]);
rrddim_set_by_pointer(st_pg_cache_pages, rd_dirty, (collected_number)stats_array[0] + stats_array[4]);
rrddim_set_by_pointer(st_pg_cache_pages, rd_backfills, (collected_number)stats_array[9]);
rrddim_set_by_pointer(st_pg_cache_pages, rd_evictions, (collected_number)stats_array[10]);
rrddim_set_by_pointer(st_pg_cache_pages, rd_used_by_collectors, (collected_number)stats_array[0]);
rrdset_done(st_pg_cache_pages);
}
// ----------------------------------------------------------------
{
static RRDSET *st_long_term_pages = NULL;
static RRDDIM *rd_total = NULL;
static RRDDIM *rd_insertions = NULL;
static RRDDIM *rd_deletions = NULL;
static RRDDIM *rd_flushing_pressure_deletions = NULL;
if (unlikely(!st_long_term_pages)) {
st_long_term_pages = rrdset_create_localhost(
"netdata"
, "dbengine_long_term_page_stats"
, NULL
, "dbengine"
, NULL
, "NetData dbengine long-term page statistics"
, "pages"
, "netdata"
, "stats"
, 130505
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
rd_total = rrddim_add(st_long_term_pages, "total", NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
rd_insertions = rrddim_add(st_long_term_pages, "insertions", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_deletions = rrddim_add(st_long_term_pages, "deletions", NULL, -1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_flushing_pressure_deletions = rrddim_add(st_long_term_pages, "flushing_pressure_deletions", NULL, -1,
1, RRD_ALGORITHM_INCREMENTAL);
}
else
rrdset_next(st_long_term_pages);
rrddim_set_by_pointer(st_long_term_pages, rd_total, (collected_number)stats_array[2]);
rrddim_set_by_pointer(st_long_term_pages, rd_insertions, (collected_number)stats_array[5]);
rrddim_set_by_pointer(st_long_term_pages, rd_deletions, (collected_number)stats_array[6]);
rrddim_set_by_pointer(st_long_term_pages, rd_flushing_pressure_deletions,
(collected_number)stats_array[36]);
rrdset_done(st_long_term_pages);
}
// ----------------------------------------------------------------
{
static RRDSET *st_io_stats = NULL;
static RRDDIM *rd_reads = NULL;
@ -719,7 +760,7 @@ void global_statistics_charts(void) {
, "MiB/s"
, "netdata"
, "stats"
, 130505
, 130506
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
@ -753,7 +794,7 @@ void global_statistics_charts(void) {
, "operations/s"
, "netdata"
, "stats"
, 130506
, 130507
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
@ -775,7 +816,7 @@ void global_statistics_charts(void) {
static RRDSET *st_errors = NULL;
static RRDDIM *rd_fs_errors = NULL;
static RRDDIM *rd_io_errors = NULL;
static RRDDIM *rd_flushing_errors = NULL;
static RRDDIM *pg_cache_over_half_dirty_events = NULL;
if (unlikely(!st_errors)) {
st_errors = rrdset_create_localhost(
@ -788,21 +829,22 @@ void global_statistics_charts(void) {
, "errors/s"
, "netdata"
, "stats"
, 130507
, 130508
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
rd_io_errors = rrddim_add(st_errors, "I/O errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_fs_errors = rrddim_add(st_errors, "FS errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_flushing_errors = rrddim_add(st_errors, "flushing errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_io_errors = rrddim_add(st_errors, "io_errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
rd_fs_errors = rrddim_add(st_errors, "fs_errors", NULL, 1, 1, RRD_ALGORITHM_INCREMENTAL);
pg_cache_over_half_dirty_events = rrddim_add(st_errors, "pg_cache_over_half_dirty_events", NULL, 1, 1,
RRD_ALGORITHM_INCREMENTAL);
}
else
rrdset_next(st_errors);
rrddim_set_by_pointer(st_errors, rd_io_errors, (collected_number)stats_array[30]);
rrddim_set_by_pointer(st_errors, rd_fs_errors, (collected_number)stats_array[31]);
rrddim_set_by_pointer(st_errors, rd_flushing_errors, (collected_number)stats_array[34]);
rrddim_set_by_pointer(st_errors, pg_cache_over_half_dirty_events, (collected_number)stats_array[34]);
rrdset_done(st_errors);
}
@ -824,7 +866,7 @@ void global_statistics_charts(void) {
, "descriptors"
, "netdata"
, "stats"
, 130508
, 130509
, localhost->rrd_update_every
, RRDSET_TYPE_LINE
);
@ -863,7 +905,7 @@ void global_statistics_charts(void) {
, "MiB"
, "netdata"
, "stats"
, 130509
, 130510
, localhost->rrd_update_every
, RRDSET_TYPE_STACKED
);

View File

@ -1491,6 +1491,9 @@ static inline void rrddim_set_by_pointer_fake_time(RRDDIM *rd, collected_number
static RRDHOST *dbengine_rrdhost_find_or_create(char *name)
{
/* We don't want to drop metrics when generating load, we prefer to block data generation itself */
rrdeng_drop_metrics_under_page_cache_pressure = 0;
return rrdhost_find_or_create(
name
, name

View File

@ -217,7 +217,6 @@ static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned numb
/*
* This function returns the maximum number of pages allowed in the page cache.
* The caller must hold the page cache lock.
*/
unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
{
@ -228,7 +227,6 @@ unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx)
/*
* This function returns the low watermark number of pages in the page cache. The page cache should strive to keep the
* number of pages below that number.
* The caller must hold the page cache lock.
*/
unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
{
@ -236,6 +234,16 @@ unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx)
return ctx->cache_pages_low_watermark + 2 * (unsigned long)ctx->stats.metric_API_producers;
}
/*
* This function returns the maximum number of dirty pages that are committed to be written to disk allowed in the page
* cache.
*/
unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx)
{
/* We remove the active pages of the producers from the calculation and only allow 50% of the extra pinned pages */
return ctx->cache_pages_low_watermark + (unsigned long)ctx->stats.metric_API_producers / 2;
}
/*
* This function will block until it reserves #number populated pages.
* It will trigger evictions or dirty page flushing if the pg_cache_hard_limit() limit is hit.
@ -375,7 +383,11 @@ static int pg_cache_try_evict_one_page_unsafe(struct rrdengine_instance *ctx)
return 0;
}
void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty)
/*
* Callers of this function need to make sure they're not deleting the same descriptor concurrently
*/
void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
uint8_t is_exclusive_holder)
{
struct page_cache *pg_cache = &ctx->pg_cache;
struct page_cache_descr *pg_cache_descr = NULL;
@ -408,11 +420,14 @@ void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_desc
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
while (!pg_cache_try_get_unsafe(descr, 1)) {
debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
if (unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
pg_cache_wait_event_unsafe(descr);
if (!is_exclusive_holder) {
/* If we don't hold an exclusive page reference get one */
while (!pg_cache_try_get_unsafe(descr, 1)) {
debug(D_RRDENGINE, "%s: Waiting for locked page:", __func__);
if (unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
pg_cache_wait_event_unsafe(descr);
}
}
if (remove_dirty) {
pg_cache_descr->flags &= ~RRD_PAGE_DIRTY;

View File

@ -163,7 +163,8 @@ extern void pg_cache_put_unsafe(struct rrdeng_page_descr *descr);
extern void pg_cache_put(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr);
extern void pg_cache_insert(struct rrdengine_instance *ctx, struct pg_cache_page_index *index,
struct rrdeng_page_descr *descr);
extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty);
extern void pg_cache_punch_hole(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr, uint8_t remove_dirty,
uint8_t is_exclusive_holder);
extern usec_t pg_cache_oldest_time_in_range(struct rrdengine_instance *ctx, uuid_t *id,
usec_t start_time, usec_t end_time);
extern void pg_cache_get_filtered_info_prev(struct rrdengine_instance *ctx, struct pg_cache_page_index *page_index,
@ -185,6 +186,7 @@ extern void pg_cache_add_new_metric_time(struct pg_cache_page_index *page_index,
extern void pg_cache_update_metric_times(struct pg_cache_page_index *page_index);
extern unsigned long pg_cache_hard_limit(struct rrdengine_instance *ctx);
extern unsigned long pg_cache_soft_limit(struct rrdengine_instance *ctx);
extern unsigned long pg_cache_committed_hard_limit(struct rrdengine_instance *ctx);
static inline void
pg_cache_atomic_get_pg_info(struct rrdeng_page_descr *descr, usec_t *end_timep, uint32_t *page_lengthp)

View File

@ -6,7 +6,8 @@
rrdeng_stats_t global_io_errors = 0;
rrdeng_stats_t global_fs_errors = 0;
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
rrdeng_stats_t global_flushing_errors = 0;
rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
static void sanity_check(void)
{
@ -248,6 +249,109 @@ static void do_commit_transaction(struct rrdengine_worker_config* wc, uint8_t ty
}
}
static void after_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
{
int error;
error = uv_thread_join(wc->now_invalidating_dirty_pages);
if (error) {
error("uv_thread_join(): %s", uv_strerror(error));
}
freez(wc->now_invalidating_dirty_pages);
wc->now_invalidating_dirty_pages = NULL;
wc->cleanup_thread_invalidating_dirty_pages = 0;
}
static void invalidate_oldest_committed(void *arg)
{
struct rrdengine_instance *ctx = arg;
struct rrdengine_worker_config *wc = &ctx->worker_config;
struct page_cache *pg_cache = &ctx->pg_cache;
int ret;
struct rrdeng_page_descr *descr;
struct page_cache_descr *pg_cache_descr;
Pvoid_t *PValue;
Word_t Index;
unsigned nr_committed_pages;
do {
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
for (Index = 0,
PValue = JudyLFirst(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue;
descr != NULL;
PValue = JudyLNext(pg_cache->committed_page_index.JudyL_array, &Index, PJE0),
descr = unlikely(NULL == PValue) ? NULL : *PValue) {
assert(0 != descr->page_length);
rrdeng_page_descr_mutex_lock(ctx, descr);
pg_cache_descr = descr->pg_cache_descr;
if (!(pg_cache_descr->flags & RRD_PAGE_WRITE_PENDING) && pg_cache_try_get_unsafe(descr, 1)) {
rrdeng_page_descr_mutex_unlock(ctx, descr);
ret = JudyLDel(&pg_cache->committed_page_index.JudyL_array, Index, PJE0);
assert(1 == ret);
break;
}
rrdeng_page_descr_mutex_unlock(ctx, descr);
}
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
if (!descr) {
info("Failed to invalidate any dirty pages to relieve page cache pressure.");
goto out;
}
pg_cache_punch_hole(ctx, descr, 1, 1);
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
nr_committed_pages = --pg_cache->committed_page_index.nr_committed_pages;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
rrd_stat_atomic_add(&ctx->stats.flushing_pressure_page_deletions, 1);
rrd_stat_atomic_add(&global_flushing_pressure_page_deletions, 1);
} while (nr_committed_pages >= pg_cache_committed_hard_limit(ctx));
out:
wc->cleanup_thread_invalidating_dirty_pages = 1;
/* wake up event loop */
assert(0 == uv_async_send(&wc->async));
}
void rrdeng_invalidate_oldest_committed(struct rrdengine_worker_config* wc)
{
struct rrdengine_instance *ctx = wc->ctx;
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned nr_committed_pages;
int error;
uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
if (nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
/* delete the oldest page in memory */
if (wc->now_invalidating_dirty_pages) {
/* already deleting a page */
return;
}
errno = 0;
error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
"Metric data are being deleted, please reduce disk load or use a faster disk.", ctx->dbfiles_path);
wc->now_invalidating_dirty_pages = mallocz(sizeof(*wc->now_invalidating_dirty_pages));
wc->cleanup_thread_invalidating_dirty_pages = 0;
error = uv_thread_create(wc->now_invalidating_dirty_pages, invalidate_oldest_committed, ctx);
if (error) {
error("uv_thread_create(): %s", uv_strerror(error));
freez(wc->now_invalidating_dirty_pages);
wc->now_invalidating_dirty_pages = NULL;
}
}
}
void flush_pages_cb(uv_fs_t* req)
{
struct rrdengine_worker_config* wc = req->loop->data;
@ -294,6 +398,7 @@ void flush_pages_cb(uv_fs_t* req)
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
pg_cache->committed_page_index.nr_committed_pages -= count;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
wc->inflight_dirty_pages -= count;
}
/*
@ -366,6 +471,8 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
complete(completion);
return 0;
}
wc->inflight_dirty_pages += count;
xt_io_descr = mallocz(sizeof(*xt_io_descr));
payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
switch (compression_algorithm) {
@ -466,17 +573,15 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
return ALIGN_BYTES_CEILING(size_bytes);
}
static void after_delete_old_data(uv_work_t *req, int status)
static void after_delete_old_data(struct rrdengine_worker_config* wc)
{
struct rrdengine_instance *ctx = req->data;
struct rrdengine_worker_config* wc = &ctx->worker_config;
struct rrdengine_instance *ctx = wc->ctx;
struct rrdengine_datafile *datafile;
struct rrdengine_journalfile *journalfile;
unsigned deleted_bytes, journalfile_bytes, datafile_bytes;
int ret;
int ret, error;
char path[RRDENG_PATH_MAX];
(void)status;
datafile = ctx->datafiles.first;
journalfile = datafile->journalfile;
datafile_bytes = datafile->pos;
@ -503,15 +608,24 @@ static void after_delete_old_data(uv_work_t *req, int status)
ctx->disk_space -= deleted_bytes;
info("Reclaimed %u bytes of disk space.", deleted_bytes);
error = uv_thread_join(wc->now_deleting_files);
if (error) {
error("uv_thread_join(): %s", uv_strerror(error));
}
freez(wc->now_deleting_files);
/* unfreeze command processing */
wc->now_deleting.data = NULL;
/* wake up event loop */
assert(0 == uv_async_send(&wc->async));
wc->now_deleting_files = NULL;
wc->cleanup_thread_deleting_files = 0;
/* interrupt event loop */
uv_stop(wc->loop);
}
static void delete_old_data(uv_work_t *req)
static void delete_old_data(void *arg)
{
struct rrdengine_instance *ctx = req->data;
struct rrdengine_instance *ctx = arg;
struct rrdengine_worker_config* wc = &ctx->worker_config;
struct rrdengine_datafile *datafile;
struct extent_info *extent, *next;
struct rrdeng_page_descr *descr;
@ -524,11 +638,14 @@ static void delete_old_data(uv_work_t *req)
count = extent->number_of_pages;
for (i = 0 ; i < count ; ++i) {
descr = extent->pages[i];
pg_cache_punch_hole(ctx, descr, 0);
pg_cache_punch_hole(ctx, descr, 0, 0);
}
next = extent->next;
freez(extent);
}
wc->cleanup_thread_deleting_files = 1;
/* wake up event loop */
assert(0 == uv_async_send(&wc->async));
}
void rrdeng_test_quota(struct rrdengine_worker_config* wc)
@ -537,7 +654,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
struct rrdengine_datafile *datafile;
unsigned current_size, target_size;
uint8_t out_of_space, only_one_datafile;
int ret;
int ret, error;
out_of_space = 0;
if (unlikely(ctx->disk_space > ctx->max_disk_space)) {
@ -559,7 +676,7 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
}
if (unlikely(out_of_space)) {
/* delete old data */
if (wc->now_deleting.data) {
if (wc->now_deleting_files) {
/* already deleting data */
return;
}
@ -571,8 +688,33 @@ void rrdeng_test_quota(struct rrdengine_worker_config* wc)
}
info("Deleting data file \"%s/"DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION"\".",
ctx->dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
wc->now_deleting.data = ctx;
assert(0 == uv_queue_work(wc->loop, &wc->now_deleting, delete_old_data, after_delete_old_data));
wc->now_deleting_files = mallocz(sizeof(*wc->now_deleting_files));
wc->cleanup_thread_deleting_files = 0;
error = uv_thread_create(wc->now_deleting_files, delete_old_data, ctx);
if (error) {
error("uv_thread_create(): %s", uv_strerror(error));
freez(wc->now_deleting_files);
wc->now_deleting_files = NULL;
}
}
}
static inline int rrdeng_threads_alive(struct rrdengine_worker_config* wc)
{
if (wc->now_invalidating_dirty_pages || wc->now_deleting_files) {
return 1;
}
return 0;
}
static void rrdeng_cleanup_finished_threads(struct rrdengine_worker_config* wc)
{
if (unlikely(wc->cleanup_thread_invalidating_dirty_pages)) {
after_invalidate_oldest_committed(wc);
}
if (unlikely(wc->cleanup_thread_deleting_files)) {
after_delete_old_data(wc);
}
}
@ -662,34 +804,37 @@ void timer_cb(uv_timer_t* handle)
uv_update_time(handle->loop);
rrdeng_test_quota(wc);
debug(D_RRDENGINE, "%s: timeout reached.", __func__);
if (likely(!wc->now_deleting.data)) {
/* There is free space so we can write to disk */
if (likely(!wc->now_deleting_files && !wc->now_invalidating_dirty_pages)) {
/* There is free space so we can write to disk and we are not actively deleting dirty buffers */
struct rrdengine_instance *ctx = wc->ctx;
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned long total_bytes, bytes_written, nr_committed_pages, bytes_to_write = 0, producers, low_watermark,
high_watermark;
uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
uv_rwlock_rdlock(&pg_cache->committed_page_index.lock);
nr_committed_pages = pg_cache->committed_page_index.nr_committed_pages;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
uv_rwlock_rdunlock(&pg_cache->committed_page_index.lock);
producers = ctx->stats.metric_API_producers;
/* are flushable pages more than 25% of the maximum page cache size */
high_watermark = (ctx->max_cache_pages * 25LLU) / 100;
low_watermark = (ctx->max_cache_pages * 5LLU) / 100; /* 5%, must be smaller than high_watermark */
if (nr_committed_pages > producers &&
/* committed to be written pages are more than the produced number */
nr_committed_pages - producers > high_watermark) {
/* Flushing speed must increase to stop page cache from filling with dirty pages */
bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
}
bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
/* Flush more pages only if disk can keep up */
if (wc->inflight_dirty_pages < high_watermark + producers) {
if (nr_committed_pages > producers &&
/* committed to be written pages are more than the produced number */
nr_committed_pages - producers > high_watermark) {
/* Flushing speed must increase to stop page cache from filling with dirty pages */
bytes_to_write = (nr_committed_pages - producers - low_watermark) * RRDENG_BLOCK_SIZE;
}
bytes_to_write = MAX(DATAFILE_IDEAL_IO_SIZE, bytes_to_write);
debug(D_RRDENGINE, "Flushing pages to disk.");
for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL) ;
bytes_written && (total_bytes < bytes_to_write) ;
total_bytes += bytes_written) {
bytes_written = do_flush_pages(wc, 0, NULL);
debug(D_RRDENGINE, "Flushing pages to disk.");
for (total_bytes = bytes_written = do_flush_pages(wc, 0, NULL);
bytes_written && (total_bytes < bytes_to_write);
total_bytes += bytes_written) {
bytes_written = do_flush_pages(wc, 0, NULL);
}
}
}
#ifdef NETDATA_INTERNAL_CHECKS
@ -730,7 +875,12 @@ void rrdeng_worker(void* arg)
}
wc->async.data = wc;
wc->now_deleting.data = NULL;
wc->now_deleting_files = NULL;
wc->cleanup_thread_deleting_files = 0;
wc->now_invalidating_dirty_pages = NULL;
wc->cleanup_thread_invalidating_dirty_pages = 0;
wc->inflight_dirty_pages = 0;
/* dirty page flushing timer */
ret = uv_timer_init(loop, &timer_req);
@ -746,8 +896,9 @@ void rrdeng_worker(void* arg)
assert(0 == uv_timer_start(&timer_req, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
shutdown = 0;
while (shutdown == 0 || uv_loop_alive(loop)) {
while (likely(shutdown == 0 || rrdeng_threads_alive(wc))) {
uv_run(loop, UV_RUN_DEFAULT);
rrdeng_cleanup_finished_threads(wc);
/* wait for commands */
cmd_batch_size = 0;
@ -769,14 +920,6 @@ void rrdeng_worker(void* arg)
break;
case RRDENG_SHUTDOWN:
shutdown = 1;
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
* it is however undocumented behaviour and we need to be aware if this becomes
* an issue in the future.
*/
uv_close((uv_handle_t *)&wc->async, NULL);
assert(0 == uv_timer_stop(&timer_req));
uv_close((uv_handle_t *)&timer_req, NULL);
break;
case RRDENG_READ_PAGE:
do_read_extent(wc, &cmd.read_page.page_cache_descr, 1, 0);
@ -788,16 +931,16 @@ void rrdeng_worker(void* arg)
do_commit_transaction(wc, STORE_DATA, NULL);
break;
case RRDENG_FLUSH_PAGES: {
unsigned bytes_written;
/* First I/O should be enough to call completion */
bytes_written = do_flush_pages(wc, 1, cmd.completion);
if (bytes_written) {
while (do_flush_pages(wc, 1, NULL) && likely(!wc->now_deleting.data)) {
; /* Force flushing of all committed pages if there is free space. */
}
if (wc->now_invalidating_dirty_pages) {
/* Do not flush if the disk cannot keep up */
complete(cmd.completion);
} else {
(void)do_flush_pages(wc, 1, cmd.completion);
}
break;
case RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE:
rrdeng_invalidate_oldest_committed(wc);
break;
}
default:
debug(D_RRDENGINE, "%s: default.", __func__);
@ -805,11 +948,19 @@ void rrdeng_worker(void* arg)
}
} while (opcode != RRDENG_NOOP);
}
/* cleanup operations of the event loop */
if (unlikely(wc->now_deleting.data)) {
info("Postponing shutting RRD engine event loop down until after datafile deletion is finished.");
}
info("Shutting down RRD engine event loop.");
/*
* uv_async_send after uv_close does not seem to crash in linux at the moment,
* it is however undocumented behaviour and we need to be aware if this becomes
* an issue in the future.
*/
uv_close((uv_handle_t *)&wc->async, NULL);
assert(0 == uv_timer_stop(&timer_req));
uv_close((uv_handle_t *)&timer_req, NULL);
while (do_flush_pages(wc, 1, NULL)) {
; /* Force flushing of all committed pages. */
}

View File

@ -49,6 +49,7 @@ enum rrdeng_opcode {
RRDENG_COMMIT_PAGE,
RRDENG_FLUSH_PAGES,
RRDENG_SHUTDOWN,
RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE,
RRDENG_MAX_OPCODE
};
@ -102,7 +103,16 @@ struct rrdengine_worker_config {
uv_thread_t thread;
uv_loop_t* loop;
uv_async_t async;
uv_work_t now_deleting;
/* file deletion thread */
uv_thread_t *now_deleting_files;
unsigned long cleanup_thread_deleting_files; /* set to 0 when now_deleting_files is still running */
/* dirty page deletion thread */
uv_thread_t *now_invalidating_dirty_pages;
/* set to 0 when now_invalidating_dirty_pages is still running */
unsigned long cleanup_thread_invalidating_dirty_pages;
unsigned inflight_dirty_pages;
/* FIFO command queue */
uv_mutex_t cmd_mutex;
@ -145,7 +155,8 @@ struct rrdengine_statistics {
rrdeng_stats_t page_cache_descriptors;
rrdeng_stats_t io_errors;
rrdeng_stats_t fs_errors;
rrdeng_stats_t flushing_errors;
rrdeng_stats_t pg_cache_over_half_dirty_events;
rrdeng_stats_t flushing_pressure_page_deletions;
};
/* I/O errors global counter */
@ -154,13 +165,15 @@ extern rrdeng_stats_t global_io_errors;
extern rrdeng_stats_t global_fs_errors;
/* number of File-Descriptors that have been reserved by dbengine */
extern rrdeng_stats_t rrdeng_reserved_file_descriptors;
/* inability to flush global counter */
extern rrdeng_stats_t global_flushing_errors;
/* inability to flush global counters */
extern rrdeng_stats_t global_pg_cache_over_half_dirty_events;
extern rrdeng_stats_t global_flushing_pressure_page_deletions; /* number of deleted pages */
struct rrdengine_instance {
struct rrdengine_worker_config worker_config;
struct completion rrdengine_completion;
struct page_cache pg_cache;
uint8_t drop_metrics_under_page_cache_pressure; /* boolean */
uint8_t global_compress_alg;
struct transaction_commit_log commit_log;
struct rrdengine_datafile_list datafiles;

View File

@ -6,6 +6,8 @@ static struct rrdengine_instance default_global_ctx;
int default_rrdeng_page_cache_mb = 32;
int default_rrdeng_disk_quota_mb = RRDENG_MIN_DISK_SPACE_MB;
/* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */
uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1;
/*
* Gets a handle for storing metrics to the database.
@ -107,7 +109,7 @@ void rrdeng_store_metric_flush_current_page(RRDDIM *rd)
if (unlikely(debug_flags & D_RRDENGINE))
print_page_cache_descr(descr);
pg_cache_put(ctx, descr);
pg_cache_punch_hole(ctx, descr, 1);
pg_cache_punch_hole(ctx, descr, 1, 0);
handle->prev_descr = NULL;
} else {
/* added 1 extra reference to keep 2 dirty pages pinned per metric, expected refcnt = 2 */
@ -629,16 +631,27 @@ void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr
nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages;
uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
if (nr_committed_pages >= (ctx->max_cache_pages) / 2 + (unsigned long)ctx->stats.metric_API_producers) {
/* 50% of pages have not been committed yet */
if (0 == (unsigned long)ctx->stats.flushing_errors) {
/* only print the first time */
error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\"."
"Metric data at risk of not being stored in the database, "
"please reduce disk load or use a faster disk.", ctx->dbfiles_path);
if (nr_committed_pages >= pg_cache_hard_limit(ctx) / 2) {
/* over 50% of pages have not been committed yet */
if (ctx->drop_metrics_under_page_cache_pressure &&
nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
/* 100% of pages are dirty */
struct rrdeng_cmd cmd;
cmd.opcode = RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE;
rrdeng_enq_cmd(&ctx->worker_config, &cmd);
} else {
if (0 == (unsigned long) ctx->stats.pg_cache_over_half_dirty_events) {
/* only print the first time */
errno = 0;
error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
"Metric data at risk of not being stored in the database, "
"please reduce disk load or use a faster disk.", ctx->dbfiles_path);
}
rrd_stat_atomic_add(&ctx->stats.pg_cache_over_half_dirty_events, 1);
rrd_stat_atomic_add(&global_pg_cache_over_half_dirty_events, 1);
}
rrd_stat_atomic_add(&ctx->stats.flushing_errors, 1);
rrd_stat_atomic_add(&global_flushing_errors, 1);
}
pg_cache_put(ctx, descr);
@ -688,7 +701,7 @@ void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_i
* You must not change the indices of the statistics or user code will break.
* You must not exceed RRDENG_NR_STATS or it will crash.
*/
void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
{
struct page_cache *pg_cache = &ctx->pg_cache;
@ -725,9 +738,11 @@ void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long
array[30] = (uint64_t)global_io_errors;
array[31] = (uint64_t)global_fs_errors;
array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
array[33] = (uint64_t)ctx->stats.flushing_errors;
array[34] = (uint64_t)global_flushing_errors;
assert(RRDENG_NR_STATS == 35);
array[33] = (uint64_t)ctx->stats.pg_cache_over_half_dirty_events;
array[34] = (uint64_t)global_pg_cache_over_half_dirty_events;
array[35] = (uint64_t)ctx->stats.flushing_pressure_page_deletions;
array[36] = (uint64_t)global_flushing_pressure_page_deletions;
assert(RRDENG_NR_STATS == 37);
}
/* Releases reference to page */
@ -777,6 +792,7 @@ int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned p
ctx->max_disk_space = disk_space_mb * 1048576LLU;
strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1);
ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0';
ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure;
memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
ctx->worker_config.ctx = ctx;

View File

@ -8,12 +8,13 @@
#define RRDENG_MIN_PAGE_CACHE_SIZE_MB (8)
#define RRDENG_MIN_DISK_SPACE_MB (256)
#define RRDENG_NR_STATS (35)
#define RRDENG_NR_STATS (37)
#define RRDENG_FD_BUDGET_PER_INSTANCE (50)
extern int default_rrdeng_page_cache_mb;
extern int default_rrdeng_disk_quota_mb;
extern uint8_t rrdeng_drop_metrics_under_page_cache_pressure;
struct rrdeng_region_info {
time_t start_time;
@ -41,7 +42,7 @@ extern int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_han
extern void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle);
extern time_t rrdeng_metric_latest_time(RRDDIM *rd);
extern time_t rrdeng_metric_oldest_time(RRDDIM *rd);
extern void rrdeng_get_35_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
extern void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array);
/* must call once before using anything */
extern int rrdeng_init(struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,

View File

@ -159,8 +159,10 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
"global_io_errors: %ld\n"
"global_fs_errors: %ld\n"
"rrdeng_reserved_file_descriptors: %ld\n"
"flushing_errors: %ld\n"
"global_flushing_errors: %ld\n",
"pg_cache_over_half_dirty_events: %ld\n"
"global_pg_cache_over_half_dirty_events: %ld\n"
"flushing_pressure_page_deletions: %ld\n"
"global_flushing_pressure_page_deletions: %ld\n",
(long)ctx->stats.metric_API_producers,
(long)ctx->stats.metric_API_consumers,
(long)pg_cache->page_descriptors,
@ -194,8 +196,10 @@ char *get_rrdeng_statistics(struct rrdengine_instance *ctx, char *str, size_t si
(long)global_io_errors,
(long)global_fs_errors,
(long)rrdeng_reserved_file_descriptors,
(long)ctx->stats.flushing_errors,
(long)global_flushing_errors
(long)ctx->stats.pg_cache_over_half_dirty_events,
(long)global_pg_cache_over_half_dirty_events,
(long)ctx->stats.flushing_pressure_page_deletions,
(long)global_flushing_pressure_page_deletions
);
return str;
}

View File

@ -5,7 +5,7 @@
on: netdata.dbengine_global_errors
os: linux freebsd macos
hosts: *
lookup: sum -10m unaligned of FS errors
lookup: sum -10m unaligned of fs_errors
units: errors
every: 10s
crit: $this > 0
@ -17,7 +17,7 @@ lookup: sum -10m unaligned of FS errors
on: netdata.dbengine_global_errors
os: linux freebsd macos
hosts: *
lookup: sum -10m unaligned of I/O errors
lookup: sum -10m unaligned of io_errors
units: errors
every: 10s
crit: $this > 0
@ -25,14 +25,26 @@ lookup: sum -10m unaligned of I/O errors
info: number of IO errors dbengine came across the last 10 minutes (CRC errors, out of space, bad disk etc)
to: sysadmin
alarm: 10min_dbengine_global_flushing_errors
alarm: 10min_dbengine_global_flushing_warnings
on: netdata.dbengine_global_errors
os: linux freebsd macos
hosts: *
lookup: sum -10m unaligned of flushing errors
lookup: sum -10m unaligned of pg_cache_over_half_dirty_events
units: errors
every: 3s
crit: $this > 0
every: 10s
warn: $this > 0
delay: down 1h multiplier 1.5 max 3h
info: number of times in the last 10 minutes that the dbengine failed to completely flush data to disk, metric data will not be stored in the database, please reduce disk load or use a faster disk
info: number of times in the last 10 minutes that dbengine dirty pages were over 50% of the instance's page cache, metric data at risk of not being stored in the database, please reduce disk load or use faster disks
to: sysadmin
alarm: 10min_dbengine_global_flushing_errors
on: netdata.dbengine_long_term_page_stats
os: linux freebsd macos
hosts: *
lookup: sum -10m unaligned of flushing_pressure_deletions
units: pages
every: 10s
crit: $this != 0
delay: down 1h multiplier 1.5 max 3h
info: number of pages deleted due to failure to flush data to disk in the last 10 minutes, metric data were lost to unblock data collection, please reduce disk load or use faster disks
to: sysadmin