netdata/database/engine/journalfile.c

551 lines
18 KiB
C

// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdengine.h"
static void flush_transaction_buffer_cb(uv_fs_t* req)
{
struct generic_io_descriptor *io_descr = req->data;
struct rrdengine_worker_config* wc = req->loop->data;
struct rrdengine_instance *ctx = wc->ctx;
debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
if (req->result < 0) {
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
error("%s: uv_fs_write: %s", __func__, uv_strerror((int)req->result));
} else {
debug(D_RRDENGINE, "%s: Journal block was written to disk.", __func__);
}
uv_fs_req_cleanup(req);
free(io_descr->buf);
freez(io_descr);
}
/* Careful to always call this before creating a new journal file */
void wal_flush_transaction_buffer(struct rrdengine_worker_config* wc)
{
struct rrdengine_instance *ctx = wc->ctx;
int ret;
struct generic_io_descriptor *io_descr;
unsigned pos, size;
struct rrdengine_journalfile *journalfile;
if (unlikely(NULL == ctx->commit_log.buf || 0 == ctx->commit_log.buf_pos)) {
return;
}
/* care with outstanding transactions when switching journal files */
journalfile = ctx->datafiles.last->journalfile;
io_descr = mallocz(sizeof(*io_descr));
pos = ctx->commit_log.buf_pos;
size = ctx->commit_log.buf_size;
if (pos < size) {
/* simulate an empty transaction to skip the rest of the block */
*(uint8_t *) (ctx->commit_log.buf + pos) = STORE_PADDING;
}
io_descr->buf = ctx->commit_log.buf;
io_descr->bytes = size;
io_descr->pos = journalfile->pos;
io_descr->req.data = io_descr;
io_descr->completion = NULL;
io_descr->iov = uv_buf_init((void *)io_descr->buf, size);
ret = uv_fs_write(wc->loop, &io_descr->req, journalfile->file, &io_descr->iov, 1,
journalfile->pos, flush_transaction_buffer_cb);
fatal_assert(-1 != ret);
journalfile->pos += RRDENG_BLOCK_SIZE;
ctx->disk_space += RRDENG_BLOCK_SIZE;
ctx->commit_log.buf = NULL;
ctx->stats.io_write_bytes += RRDENG_BLOCK_SIZE;
++ctx->stats.io_write_requests;
}
void * wal_get_transaction_buffer(struct rrdengine_worker_config* wc, unsigned size)
{
struct rrdengine_instance *ctx = wc->ctx;
int ret;
unsigned buf_pos = 0, buf_size;
fatal_assert(size);
if (ctx->commit_log.buf) {
unsigned remaining;
buf_pos = ctx->commit_log.buf_pos;
buf_size = ctx->commit_log.buf_size;
remaining = buf_size - buf_pos;
if (size > remaining) {
/* we need a new buffer */
wal_flush_transaction_buffer(wc);
}
}
if (NULL == ctx->commit_log.buf) {
buf_size = ALIGN_BYTES_CEILING(size);
ret = posix_memalign((void *)&ctx->commit_log.buf, RRDFILE_ALIGNMENT, buf_size);
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
}
memset(ctx->commit_log.buf, 0, buf_size);
buf_pos = ctx->commit_log.buf_pos = 0;
ctx->commit_log.buf_size = buf_size;
}
ctx->commit_log.buf_pos += size;
return ctx->commit_log.buf + buf_pos;
}
void generate_journalfilepath(struct rrdengine_datafile *datafile, char *str, size_t maxlen)
{
(void) snprintfz(str, maxlen, "%s/" WALFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL WALFILE_EXTENSION,
datafile->ctx->dbfiles_path, datafile->tier, datafile->fileno);
}
void journalfile_init(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
journalfile->file = (uv_file)0;
journalfile->pos = 0;
journalfile->datafile = datafile;
}
int close_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
return ret;
}
int unlink_journal_file(struct rrdengine_journalfile *journalfile)
{
struct rrdengine_datafile *datafile = journalfile->datafile;
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
return ret;
}
int destroy_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
int ret;
char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
ret = uv_fs_ftruncate(NULL, &req, journalfile->file, 0, NULL);
if (ret < 0) {
error("uv_fs_ftruncate(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_close(NULL, &req, journalfile->file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
ret = uv_fs_unlink(NULL, &req, path, NULL);
if (ret < 0) {
error("uv_fs_fsunlink(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
++ctx->stats.journalfile_deletions;
return ret;
}
int create_journal_file(struct rrdengine_journalfile *journalfile, struct rrdengine_datafile *datafile)
{
struct rrdengine_instance *ctx = datafile->ctx;
uv_fs_t req;
uv_file file;
int ret, fd;
struct rrdeng_jf_sb *superblock;
uv_buf_t iov;
char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_CREAT | O_RDWR | O_TRUNC, &file);
if (fd < 0) {
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
journalfile->file = file;
++ctx->stats.journalfile_creations;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
}
memset(superblock, 0, sizeof(*superblock));
(void) strncpy(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ);
(void) strncpy(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ);
iov = uv_buf_init((void *)superblock, sizeof(*superblock));
ret = uv_fs_write(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
fatal_assert(req.result < 0);
error("uv_fs_write: %s", uv_strerror(ret));
++ctx->stats.io_errors;
rrd_stat_atomic_add(&global_io_errors, 1);
}
uv_fs_req_cleanup(&req);
free(superblock);
if (ret < 0) {
destroy_journal_file(journalfile, datafile);
return ret;
}
journalfile->pos = sizeof(*superblock);
ctx->stats.io_write_bytes += sizeof(*superblock);
++ctx->stats.io_write_requests;
return 0;
}
static int check_journal_file_superblock(uv_file file)
{
int ret;
struct rrdeng_jf_sb *superblock;
uv_buf_t iov;
uv_fs_t req;
ret = posix_memalign((void *)&superblock, RRDFILE_ALIGNMENT, sizeof(*superblock));
if (unlikely(ret)) {
fatal("posix_memalign:%s", strerror(ret));
}
iov = uv_buf_init((void *)superblock, sizeof(*superblock));
ret = uv_fs_read(NULL, &req, file, &iov, 1, 0, NULL);
if (ret < 0) {
error("uv_fs_read: %s", uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto error;
}
fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
if (strncmp(superblock->magic_number, RRDENG_JF_MAGIC, RRDENG_MAGIC_SZ) ||
strncmp(superblock->version, RRDENG_JF_VER, RRDENG_VER_SZ)) {
error("File has invalid superblock.");
ret = UV_EINVAL;
} else {
ret = 0;
}
error:
free(superblock);
return ret;
}
static void restore_extent_metadata(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
void *buf, unsigned max_size)
{
static BITMAP256 page_error_map;
struct page_cache *pg_cache = &ctx->pg_cache;
unsigned i, count, payload_length, descr_size, valid_pages;
struct rrdeng_page_descr *descr;
struct extent_info *extent;
/* persistent structures */
struct rrdeng_jf_store_data *jf_metric_data;
jf_metric_data = buf;
count = jf_metric_data->number_of_pages;
descr_size = sizeof(*jf_metric_data->descr) * count;
payload_length = sizeof(*jf_metric_data) + descr_size;
if (payload_length > max_size) {
error("Corrupted transaction payload.");
return;
}
extent = mallocz(sizeof(*extent) + count * sizeof(extent->pages[0]));
extent->offset = jf_metric_data->extent_offset;
extent->size = jf_metric_data->extent_size;
extent->datafile = journalfile->datafile;
extent->next = NULL;
for (i = 0, valid_pages = 0 ; i < count ; ++i) {
uuid_t *temp_id;
Pvoid_t *PValue;
struct pg_cache_page_index *page_index = NULL;
uint8_t page_type = jf_metric_data->descr[i].type;
if (page_type > PAGE_TYPE_MAX) {
if (!bitmap256_get_bit(&page_error_map, page_type)) {
error("Unknown page type %d encountered.", page_type);
bitmap256_set_bit(&page_error_map, page_type, 1);
}
continue;
}
uint64_t start_time = jf_metric_data->descr[i].start_time;
uint64_t end_time = jf_metric_data->descr[i].end_time;
if (unlikely(start_time > end_time)) {
error("Invalid page encountered, start time %lu > end time %lu", start_time , end_time );
continue;
}
if (unlikely(start_time == end_time)) {
size_t entries = jf_metric_data->descr[i].page_length / page_type_size[page_type];
if (unlikely(entries > 1)) {
error("Invalid page encountered, start time %lu = end time but %zu entries were found", start_time, entries);
continue;
}
}
temp_id = (uuid_t *)jf_metric_data->descr[i].uuid;
uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t));
if (likely(NULL != PValue)) {
page_index = *PValue;
}
uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
if (NULL == PValue) {
/* First time we see the UUID */
uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, temp_id, sizeof(uuid_t), PJE0);
fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
*PValue = page_index = create_page_index(temp_id);
page_index->prev = pg_cache->metrics_index.last_page_index;
pg_cache->metrics_index.last_page_index = page_index;
uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
}
descr = pg_cache_create_descr();
descr->page_length = jf_metric_data->descr[i].page_length;
descr->start_time = start_time;
descr->end_time = end_time;
descr->id = &page_index->id;
descr->extent = extent;
descr->type = page_type;
extent->pages[valid_pages++] = descr;
pg_cache_insert(ctx, page_index, descr);
}
extent->number_of_pages = valid_pages;
if (likely(valid_pages))
df_extent_insert(extent);
else
freez(extent);
}
/*
* Replays transaction by interpreting up to max_size bytes from buf.
* Sets id to the current transaction id or to 0 if unknown.
* Returns size of transaction record or 0 for unknown size.
*/
static unsigned replay_transaction(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
void *buf, uint64_t *id, unsigned max_size)
{
unsigned payload_length, size_bytes;
int ret;
/* persistent structures */
struct rrdeng_jf_transaction_header *jf_header;
struct rrdeng_jf_transaction_trailer *jf_trailer;
uLong crc;
*id = 0;
jf_header = buf;
if (STORE_PADDING == jf_header->type) {
debug(D_RRDENGINE, "Skipping padding.");
return 0;
}
if (sizeof(*jf_header) > max_size) {
error("Corrupted transaction record, skipping.");
return 0;
}
*id = jf_header->id;
payload_length = jf_header->payload_length;
size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
if (size_bytes > max_size) {
error("Corrupted transaction record, skipping.");
return 0;
}
jf_trailer = buf + sizeof(*jf_header) + payload_length;
crc = crc32(0L, Z_NULL, 0);
crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
ret = crc32cmp(jf_trailer->checksum, crc);
debug(D_RRDENGINE, "Transaction %"PRIu64" was read from disk. CRC32 check: %s", *id, ret ? "FAILED" : "SUCCEEDED");
if (unlikely(ret)) {
error("Transaction %"PRIu64" was read from disk. CRC32 check: FAILED", *id);
return size_bytes;
}
switch (jf_header->type) {
case STORE_DATA:
debug(D_RRDENGINE, "Replaying transaction %"PRIu64"", jf_header->id);
restore_extent_metadata(ctx, journalfile, buf + sizeof(*jf_header), payload_length);
break;
default:
error("Unknown transaction type. Skipping record.");
break;
}
return size_bytes;
}
#define READAHEAD_BYTES (RRDENG_BLOCK_SIZE * 256)
/*
* Iterates journal file transactions and populates the page cache.
* Page cache must already be initialized.
* Returns the maximum transaction id it discovered.
*/
static uint64_t iterate_transactions(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile)
{
uv_file file;
uint64_t file_size;//, data_file_size;
int ret;
uint64_t pos, pos_i, max_id, id;
unsigned size_bytes;
void *buf;
uv_buf_t iov;
uv_fs_t req;
file = journalfile->file;
file_size = journalfile->pos;
//data_file_size = journalfile->datafile->pos; TODO: utilize this?
max_id = 1;
bool journal_is_mmapped = (journalfile->data != NULL);
if (unlikely(!journal_is_mmapped)) {
ret = posix_memalign((void *)&buf, RRDFILE_ALIGNMENT, READAHEAD_BYTES);
if (unlikely(ret))
fatal("posix_memalign:%s", strerror(ret));
}
else
buf = journalfile->data + sizeof(struct rrdeng_jf_sb);
for (pos = sizeof(struct rrdeng_jf_sb) ; pos < file_size ; pos += READAHEAD_BYTES) {
size_bytes = MIN(READAHEAD_BYTES, file_size - pos);
if (unlikely(!journal_is_mmapped)) {
iov = uv_buf_init(buf, size_bytes);
ret = uv_fs_read(NULL, &req, file, &iov, 1, pos, NULL);
if (ret < 0) {
error("uv_fs_read: pos=%" PRIu64 ", %s", pos, uv_strerror(ret));
uv_fs_req_cleanup(&req);
goto skip_file;
}
fatal_assert(req.result >= 0);
uv_fs_req_cleanup(&req);
++ctx->stats.io_read_requests;
ctx->stats.io_read_bytes += size_bytes;
}
for (pos_i = 0 ; pos_i < size_bytes ; ) {
unsigned max_size;
max_size = pos + size_bytes - pos_i;
ret = replay_transaction(ctx, journalfile, buf + pos_i, &id, max_size);
if (!ret) /* TODO: support transactions bigger than 4K */
/* unknown transaction size, move on to the next block */
pos_i = ALIGN_BYTES_FLOOR(pos_i + RRDENG_BLOCK_SIZE);
else
pos_i += ret;
max_id = MAX(max_id, id);
}
if (likely(journal_is_mmapped))
buf += size_bytes;
}
skip_file:
if (unlikely(!journal_is_mmapped))
free(buf);
return max_id;
}
int load_journal_file(struct rrdengine_instance *ctx, struct rrdengine_journalfile *journalfile,
struct rrdengine_datafile *datafile)
{
uv_fs_t req;
uv_file file;
int ret, fd, error;
uint64_t file_size, max_id;
char path[RRDENG_PATH_MAX];
generate_journalfilepath(datafile, path, sizeof(path));
fd = open_file_direct_io(path, O_RDWR, &file);
if (fd < 0) {
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
return fd;
}
info("Loading journal file \"%s\".", path);
ret = check_file_properties(file, &file_size, sizeof(struct rrdeng_df_sb));
if (ret)
goto error;
file_size = ALIGN_BYTES_FLOOR(file_size);
ret = check_journal_file_superblock(file);
if (ret)
goto error;
ctx->stats.io_read_bytes += sizeof(struct rrdeng_jf_sb);
++ctx->stats.io_read_requests;
journalfile->file = file;
journalfile->pos = file_size;
journalfile->data = netdata_mmap(path, file_size, MAP_SHARED, 0);
info("Loading journal file \"%s\" using %s.", path, journalfile->data?"MMAP":"uv_fs_read");
max_id = iterate_transactions(ctx, journalfile);
ctx->commit_log.transaction_id = MAX(ctx->commit_log.transaction_id, max_id + 1);
info("Journal file \"%s\" loaded (size:%"PRIu64").", path, file_size);
if (likely(journalfile->data))
munmap(journalfile->data, file_size);
return 0;
error:
error = ret;
ret = uv_fs_close(NULL, &req, file, NULL);
if (ret < 0) {
error("uv_fs_close(%s): %s", path, uv_strerror(ret));
++ctx->stats.fs_errors;
rrd_stat_atomic_add(&global_fs_errors, 1);
}
uv_fs_req_cleanup(&req);
return error;
}
void init_commit_log(struct rrdengine_instance *ctx)
{
ctx->commit_log.buf = NULL;
ctx->commit_log.buf_pos = 0;
ctx->commit_log.transaction_id = 1;
}