cleanup and optimize rrdeng_load_metric_next() (#12966)
* cleanup and optimize rrdeng_load_metric_next() * fixed typo
This commit is contained in:
parent
78d5eb937a
commit
68283aa057
|
@ -52,6 +52,7 @@ struct rrdeng_query_handle {
|
|||
storage_number *page;
|
||||
usec_t page_end_time;
|
||||
uint32_t page_length;
|
||||
usec_t dt;
|
||||
};
|
||||
|
||||
typedef enum {
|
||||
|
|
|
@ -554,98 +554,101 @@ void rrdeng_load_metric_init(RRDDIM *rd, struct rrddim_query_handle *rrdimm_hand
|
|||
handle->next_page_time = INVALID_TIME;
|
||||
}
|
||||
|
||||
/* Returns the metric and sets its timestamp into current_time */
|
||||
storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time)
|
||||
{
|
||||
static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle, unsigned *position_ptr) {
|
||||
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
|
||||
struct rrdengine_instance *ctx;
|
||||
struct rrdeng_page_descr *descr;
|
||||
storage_number *page, ret;
|
||||
unsigned position, entries;
|
||||
usec_t next_page_time = 0, current_position_time, page_end_time = 0;
|
||||
|
||||
struct rrdengine_instance *ctx = handle->ctx;
|
||||
struct rrdeng_page_descr *descr = handle->descr;
|
||||
|
||||
uint32_t page_length;
|
||||
usec_t page_end_time;
|
||||
unsigned position;
|
||||
|
||||
if (unlikely(INVALID_TIME == handle->next_page_time)) {
|
||||
return SN_EMPTY_SLOT;
|
||||
}
|
||||
ctx = handle->ctx;
|
||||
if (unlikely(NULL == (descr = handle->descr))) {
|
||||
/* it's the first call */
|
||||
next_page_time = handle->next_page_time * USEC_PER_SEC;
|
||||
} else {
|
||||
// pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
|
||||
page_end_time = handle->page_end_time;
|
||||
page_length = handle->page_length;
|
||||
page = handle->page;
|
||||
}
|
||||
position = handle->position + 1;
|
||||
if (likely(descr)) {
|
||||
// Drop old page's reference
|
||||
|
||||
if (unlikely(NULL == descr ||
|
||||
position >= (page_length / sizeof(storage_number)))) {
|
||||
/* We need to get a new page */
|
||||
if (descr) {
|
||||
/* Drop old page's reference */
|
||||
#ifdef NETDATA_INTERNAL_CHECKS
|
||||
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
|
||||
#endif
|
||||
|
||||
pg_cache_put(ctx, descr);
|
||||
handle->descr = NULL;
|
||||
handle->next_page_time = (page_end_time / USEC_PER_SEC) + 1;
|
||||
if (unlikely(handle->next_page_time > rrdimm_handle->end_time)) {
|
||||
handle->next_page_time = (handle->page_end_time / USEC_PER_SEC) + 1;
|
||||
|
||||
if (unlikely(handle->next_page_time > rrdimm_handle->end_time))
|
||||
goto no_more_metrics;
|
||||
}
|
||||
next_page_time = handle->next_page_time * USEC_PER_SEC;
|
||||
}
|
||||
|
||||
descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
|
||||
next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
|
||||
if (NULL == descr) {
|
||||
usec_t next_page_time = handle->next_page_time * USEC_PER_SEC;
|
||||
descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
|
||||
if (NULL == descr)
|
||||
goto no_more_metrics;
|
||||
}
|
||||
|
||||
#ifdef NETDATA_INTERNAL_CHECKS
|
||||
rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
|
||||
#endif
|
||||
|
||||
handle->descr = descr;
|
||||
pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
|
||||
if (unlikely(INVALID_TIME == descr->start_time ||
|
||||
INVALID_TIME == page_end_time)) {
|
||||
if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == page_end_time))
|
||||
goto no_more_metrics;
|
||||
}
|
||||
|
||||
if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) {
|
||||
/* we're in the middle of the page somewhere */
|
||||
entries = page_length / sizeof(storage_number);
|
||||
// we're in the middle of the page somewhere
|
||||
unsigned entries = page_length / sizeof(storage_number);
|
||||
position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) /
|
||||
(page_end_time - descr->start_time);
|
||||
} else {
|
||||
position = 0;
|
||||
}
|
||||
else
|
||||
position = 0;
|
||||
|
||||
handle->page_end_time = page_end_time;
|
||||
handle->page_length = page_length;
|
||||
page = handle->page = descr->pg_cache_descr->page;
|
||||
handle->page = descr->pg_cache_descr->page;
|
||||
usec_t entries = page_length / sizeof(storage_number);
|
||||
if (likely(entries > 1))
|
||||
handle->dt = (page_end_time - descr->start_time) / (entries - 1);
|
||||
else
|
||||
handle->dt = 0;
|
||||
|
||||
*position_ptr = position;
|
||||
return 0;
|
||||
|
||||
no_more_metrics:
|
||||
return 1;
|
||||
}
|
||||
|
||||
// page = descr->pg_cache_descr->page;
|
||||
ret = page[position];
|
||||
entries = page_length / sizeof(storage_number);
|
||||
/* Returns the metric and sets its timestamp into current_time */
|
||||
storage_number rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle, time_t *current_time) {
|
||||
struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
|
||||
|
||||
if (entries > 1) {
|
||||
usec_t dt;
|
||||
if (unlikely(INVALID_TIME == handle->next_page_time))
|
||||
return SN_EMPTY_SLOT;
|
||||
|
||||
dt = (page_end_time - descr->start_time) / (entries - 1);
|
||||
current_position_time = descr->start_time + position * dt;
|
||||
} else {
|
||||
current_position_time = descr->start_time;
|
||||
struct rrdeng_page_descr *descr = handle->descr;
|
||||
|
||||
storage_number *page = handle->page;
|
||||
unsigned position = handle->position + 1;
|
||||
|
||||
if (unlikely(!descr || position >= (handle->page_length / sizeof(storage_number)))) {
|
||||
// We need to get a new page
|
||||
if(rrdeng_load_page_next(rrdimm_handle, &position))
|
||||
goto no_more_metrics;
|
||||
|
||||
descr = handle->descr;
|
||||
page = handle->page;
|
||||
}
|
||||
|
||||
storage_number ret = page[position];
|
||||
handle->position = position;
|
||||
handle->now = current_position_time / USEC_PER_SEC;
|
||||
/* fatal_assert(handle->now >= rrdimm_handle->start_time && handle->now <= rrdimm_handle->end_time);
|
||||
The above assertion is an approximation and needs to take update_every into account */
|
||||
if (unlikely(handle->now >= rrdimm_handle->end_time)) {
|
||||
/* next calls will not load any more metrics */
|
||||
time_t now = handle->now = (descr->start_time + position * handle->dt) / USEC_PER_SEC;
|
||||
|
||||
if (unlikely(now >= rrdimm_handle->end_time)) {
|
||||
// next calls will not load any more metrics
|
||||
handle->next_page_time = INVALID_TIME;
|
||||
}
|
||||
*current_time = handle->now;
|
||||
|
||||
*current_time = now;
|
||||
return ret;
|
||||
|
||||
no_more_metrics:
|
||||
|
|
Loading…
Reference in New Issue