Stress test insertions into dbengine and bugfixes (#6814)
* Fix memory corruption during deallocation of page cache * Refactored dataset generator in order to support the upcoming self-validating stress test and multithreading. * Fix starvation in database engine loop when the command queues are continuously populated * Fixing disk quota limits for dbengine dataset generator
This commit is contained in:
parent
7bcb19fb8b
commit
4c9d6a3713
|
@ -1928,21 +1928,90 @@ error_out:
|
|||
return errors;
|
||||
}
|
||||
|
||||
void generate_dbengine_dataset(unsigned history_seconds)
|
||||
struct dbengine_chart_thread {
|
||||
uv_thread_t thread;
|
||||
RRDHOST *host;
|
||||
char *chartname; /* Will be prefixed by type, e.g. "example_local1.", "example_local2." etc */
|
||||
int dset_charts; /* number of charts */
|
||||
int dset_dims; /* dimensions per chart */
|
||||
int chart_i; /* current chart offset */
|
||||
time_t time_present; /* current virtual time of the benchmark */
|
||||
unsigned history_seconds; /* how far back in the past to go */
|
||||
};
|
||||
|
||||
collected_number generate_dbengine_chart_value(struct dbengine_chart_thread *thread_info, int dim_i,
|
||||
time_t time_current)
|
||||
{
|
||||
const int DSET_DIMS = 128;
|
||||
const uint64_t EXPECTED_COMPRESSION_RATIO = 94;
|
||||
collected_number value;
|
||||
|
||||
value = ((collected_number)time_current) * thread_info->chart_i;
|
||||
value += ((collected_number)time_current) * dim_i;
|
||||
value %= 1024LLU;
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
static void generate_dbengine_chart(void *arg)
|
||||
{
|
||||
struct dbengine_chart_thread *thread_info = (struct dbengine_chart_thread *)arg;
|
||||
RRDHOST *host = thread_info->host;
|
||||
char *chartname = thread_info->chartname;
|
||||
const int DSET_DIMS = thread_info->dset_dims;
|
||||
unsigned history_seconds = thread_info->history_seconds;
|
||||
time_t time_present = thread_info->time_present;
|
||||
|
||||
int j, update_every = 1;
|
||||
RRDHOST *host = NULL;
|
||||
RRDSET *st;
|
||||
RRDDIM *rd[DSET_DIMS];
|
||||
char name[101];
|
||||
time_t time_current, time_present;
|
||||
char name[RRD_ID_LENGTH_MAX + 1];
|
||||
time_t time_current;
|
||||
|
||||
// create the chart
|
||||
snprintfz(name, RRD_ID_LENGTH_MAX, "example_local%d", thread_info->chart_i + 1);
|
||||
st = rrdset_create(host, name, chartname, chartname, "example", NULL, chartname, chartname, chartname, NULL, 1,
|
||||
update_every, RRDSET_TYPE_LINE);
|
||||
for (j = 0 ; j < DSET_DIMS ; ++j) {
|
||||
snprintfz(name, RRD_ID_LENGTH_MAX, "%s%d", chartname, j);
|
||||
|
||||
rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
}
|
||||
|
||||
// feed it with the test data
|
||||
time_current = time_present - history_seconds;
|
||||
for (j = 0 ; j < DSET_DIMS ; ++j) {
|
||||
rd[j]->last_collected_time.tv_sec =
|
||||
st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current;
|
||||
rd[j]->last_collected_time.tv_usec =
|
||||
st->last_collected_time.tv_usec = st->last_updated.tv_usec = 0;
|
||||
}
|
||||
for( ; time_current < time_present ; ++time_current) {
|
||||
st->usec_since_last_update = USEC_PER_SEC;
|
||||
|
||||
for (j = 0; j < DSET_DIMS; ++j) {
|
||||
collected_number value;
|
||||
|
||||
value = generate_dbengine_chart_value(thread_info, j, time_current);
|
||||
rrddim_set_by_pointer_fake_time(rd[j], value, time_current);
|
||||
}
|
||||
rrdset_done(st);
|
||||
}
|
||||
}
|
||||
|
||||
void generate_dbengine_dataset(unsigned history_seconds)
|
||||
{
|
||||
const int DSET_CHARTS = 16;
|
||||
const int DSET_DIMS = 128;
|
||||
const uint64_t EXPECTED_COMPRESSION_RATIO = 20;
|
||||
RRDHOST *host = NULL;
|
||||
struct dbengine_chart_thread thread_info[DSET_CHARTS];
|
||||
int i;
|
||||
time_t time_present;
|
||||
|
||||
default_rrd_memory_mode = RRD_MEMORY_MODE_DBENGINE;
|
||||
default_rrdeng_page_cache_mb = 128;
|
||||
// Worst case for uncompressible data
|
||||
default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS) * sizeof(storage_number) * history_seconds) / (1024 * 1024);
|
||||
default_rrdeng_disk_quota_mb = (((uint64_t)DSET_DIMS * DSET_CHARTS) * sizeof(storage_number) * history_seconds) /
|
||||
(1024 * 1024);
|
||||
default_rrdeng_disk_quota_mb -= default_rrdeng_disk_quota_mb * EXPECTED_COMPRESSION_RATIO / 100;
|
||||
|
||||
error_log_limit_unlimited();
|
||||
|
@ -1954,35 +2023,23 @@ void generate_dbengine_dataset(unsigned history_seconds)
|
|||
|
||||
fprintf(stderr, "\nRunning DB-engine workload generator\n");
|
||||
|
||||
// create the chart
|
||||
st = rrdset_create(host, "example", "random", "random", "example", NULL, "random", "random", "random",
|
||||
NULL, 1, update_every, RRDSET_TYPE_LINE);
|
||||
for (j = 0 ; j < DSET_DIMS ; ++j) {
|
||||
snprintfz(name, 100, "random%d", j);
|
||||
|
||||
rd[j] = rrddim_add(st, name, NULL, 1, 1, RRD_ALGORITHM_ABSOLUTE);
|
||||
}
|
||||
|
||||
time_present = now_realtime_sec();
|
||||
// feed it with the test data
|
||||
time_current = time_present - history_seconds;
|
||||
for (j = 0 ; j < DSET_DIMS ; ++j) {
|
||||
rd[j]->last_collected_time.tv_sec =
|
||||
st->last_collected_time.tv_sec = st->last_updated.tv_sec = time_current;
|
||||
rd[j]->last_collected_time.tv_usec =
|
||||
st->last_collected_time.tv_usec = st->last_updated.tv_usec = 0;
|
||||
for (i = 0 ; i < DSET_CHARTS ; ++i) {
|
||||
thread_info[i].host = host;
|
||||
thread_info[i].chartname = "random";
|
||||
thread_info[i].dset_charts = DSET_CHARTS;
|
||||
thread_info[i].chart_i = i;
|
||||
thread_info[i].dset_dims = DSET_DIMS;
|
||||
thread_info[i].history_seconds = history_seconds;
|
||||
thread_info[i].time_present = time_present;
|
||||
assert(0 == uv_thread_create(&thread_info[i].thread, generate_dbengine_chart, &thread_info[i]));
|
||||
}
|
||||
for (i = 0 ; i < DSET_CHARTS ; ++i) {
|
||||
assert(0 == uv_thread_join(&thread_info[i].thread));
|
||||
}
|
||||
for( ; time_current < time_present; ++time_current) {
|
||||
st->usec_since_last_update = USEC_PER_SEC;
|
||||
|
||||
for (j = 0; j < DSET_DIMS; ++j) {
|
||||
rrddim_set_by_pointer_fake_time(rd[j], (time_current + j) % 128, time_current);
|
||||
}
|
||||
rrdset_done(st);
|
||||
}
|
||||
rrd_wrlock();
|
||||
rrdhost_free(host);
|
||||
rrd_unlock();
|
||||
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
|
|
@ -216,6 +216,9 @@ static void pg_cache_release_pages(struct rrdengine_instance *ctx, unsigned numb
|
|||
static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned number)
|
||||
{
|
||||
struct page_cache *pg_cache = &ctx->pg_cache;
|
||||
unsigned failures = 0;
|
||||
const unsigned FAILURES_CEILING = 10; /* truncates exponential backoff to (2^FAILURES_CEILING x slot) */
|
||||
unsigned long exp_backoff_slot_usec = USEC_PER_MS * 10;
|
||||
|
||||
assert(number < ctx->max_cache_pages);
|
||||
|
||||
|
@ -224,11 +227,13 @@ static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned numb
|
|||
debug(D_RRDENGINE, "==Page cache full. Reserving %u pages.==",
|
||||
number);
|
||||
while (pg_cache->populated_pages + number >= ctx->max_cache_pages + 1) {
|
||||
|
||||
if (!pg_cache_try_evict_one_page_unsafe(ctx)) {
|
||||
/* failed to evict */
|
||||
struct completion compl;
|
||||
struct rrdeng_cmd cmd;
|
||||
|
||||
++failures;
|
||||
uv_rwlock_wrunlock(&pg_cache->pg_cache_rwlock);
|
||||
|
||||
init_completion(&compl);
|
||||
|
@ -240,6 +245,12 @@ static void pg_cache_reserve_pages(struct rrdengine_instance *ctx, unsigned numb
|
|||
wait_for_completion(&compl);
|
||||
destroy_completion(&compl);
|
||||
|
||||
if (unlikely(failures > 1)) {
|
||||
unsigned long slots;
|
||||
/* exponential backoff */
|
||||
slots = random() % (2LU << MIN(failures, FAILURES_CEILING));
|
||||
(void)sleep_usec(slots * exp_backoff_slot_usec);
|
||||
}
|
||||
uv_rwlock_wrlock(&pg_cache->pg_cache_rwlock);
|
||||
}
|
||||
}
|
||||
|
@ -1042,9 +1053,8 @@ void free_page_cache(struct rrdengine_instance *ctx)
|
|||
/* Find first page in range */
|
||||
Index = (Word_t) 0;
|
||||
PValue = JudyLFirst(page_index->JudyL_array, &Index, PJE0);
|
||||
if (likely(NULL != PValue)) {
|
||||
descr = *PValue;
|
||||
}
|
||||
descr = unlikely(NULL == PValue) ? NULL : *PValue;
|
||||
|
||||
while (descr != NULL) {
|
||||
/* Iterate all page descriptors of this metric */
|
||||
|
||||
|
|
|
@ -676,7 +676,7 @@ void timer_cb(uv_timer_t* handle)
|
|||
/* Flushes dirty pages when timer expires */
|
||||
#define TIMER_PERIOD_MS (1000)
|
||||
|
||||
#define CMD_BATCH_SIZE (256)
|
||||
#define MAX_CMD_BATCH_SIZE (256)
|
||||
|
||||
void rrdeng_worker(void* arg)
|
||||
{
|
||||
|
@ -687,6 +687,7 @@ void rrdeng_worker(void* arg)
|
|||
enum rrdeng_opcode opcode;
|
||||
uv_timer_t timer_req;
|
||||
struct rrdeng_cmd cmd;
|
||||
unsigned cmd_batch_size;
|
||||
|
||||
rrdeng_init_cmd_queue(wc);
|
||||
|
||||
|
@ -723,10 +724,20 @@ void rrdeng_worker(void* arg)
|
|||
shutdown = 0;
|
||||
while (shutdown == 0 || uv_loop_alive(loop)) {
|
||||
uv_run(loop, UV_RUN_DEFAULT);
|
||||
|
||||
/* wait for commands */
|
||||
cmd_batch_size = 0;
|
||||
do {
|
||||
/*
|
||||
* Avoid starving the loop when there are too many commands coming in.
|
||||
* timer_cb will interrupt the loop again to allow serving more commands.
|
||||
*/
|
||||
if (unlikely(cmd_batch_size >= MAX_CMD_BATCH_SIZE))
|
||||
break;
|
||||
|
||||
cmd = rrdeng_deq_cmd(wc);
|
||||
opcode = cmd.opcode;
|
||||
++cmd_batch_size;
|
||||
|
||||
switch (opcode) {
|
||||
case RRDENG_NOOP:
|
||||
|
|
Loading…
Reference in New Issue