Merge branch 'unstable' into dict-split-by-slot

This commit is contained in:
Vitaly Arbuzov 2023-05-16 11:28:01 -07:00
commit 0cdafe3cc8
10 changed files with 232 additions and 40 deletions

View File

@ -726,3 +726,14 @@ nodataerr:
"No samples available for event '%s'", (char*) c->argv[2]->ptr);
}
void durationAddSample(int type, monotime duration) {
if (type >= EL_DURATION_TYPE_NUM) {
return;
}
durationStats* ds = &server.duration_stats[type];
ds->cnt++;
ds->sum += duration;
if (duration > ds->max) {
ds->max = duration;
}
}

View File

@ -89,4 +89,20 @@ void latencyAddSample(const char *event, mstime_t latency);
#define latencyRemoveNestedEvent(event_var,nested_var) \
event_var += nested_var;
typedef struct durationStats {
unsigned long long cnt;
unsigned long long sum;
unsigned long long max;
} durationStats;
typedef enum {
EL_DURATION_TYPE_EL = 0, // cumulative time duration metric of the whole eventloop
EL_DURATION_TYPE_CMD, // cumulative time duration metric of executing commands
EL_DURATION_TYPE_AOF, // cumulative time duration metric of flushing AOF in eventloop
EL_DURATION_TYPE_CRON, // cumulative time duration metric of cron (serverCron and beforeSleep, but excluding IO and AOF)
EL_DURATION_TYPE_NUM
} DurationType;
void durationAddSample(int type, monotime duration);
#endif /* __LATENCY_H */

View File

@ -11702,7 +11702,7 @@ void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid, int flags) {
} else if (flags & DB_FLAG_KEY_OVERWRITE) {
subevent = REDISMODULE_SUBEVENT_KEY_OVERWRITTEN;
}
KeyInfo info = {dbid, key, val, REDISMODULE_WRITE};
KeyInfo info = {dbid, key, val, REDISMODULE_READ};
moduleFireServerEvent(REDISMODULE_EVENT_KEY, subevent, &info);
if (val->type == OBJ_MODULE) {

View File

@ -239,6 +239,7 @@ static struct config {
int get_functions_rdb_mode;
int stat_mode;
int scan_mode;
int count;
int intrinsic_latency_mode;
int intrinsic_latency_duration;
sds pattern;
@ -2727,6 +2728,8 @@ static int parseOptions(int argc, char **argv) {
} else if (!strcmp(argv[i],"--pattern") && !lastarg) {
sdsfree(config.pattern);
config.pattern = sdsnew(argv[++i]);
} else if (!strcmp(argv[i],"--count") && !lastarg) {
config.count = atoi(argv[++i]);
} else if (!strcmp(argv[i],"--quoted-pattern") && !lastarg) {
sdsfree(config.pattern);
config.pattern = unquoteCString(argv[++i]);
@ -3081,6 +3084,7 @@ version,tls_usage);
" --scan List all keys using the SCAN command.\n"
" --pattern <pat> Keys pattern when using the --scan, --bigkeys or --hotkeys\n"
" options (default: *).\n"
" --count <count> Count option when using the --scan, --bigkeys or --hotkeys (default: 10).\n"
" --quoted-pattern <pat> Same as --pattern, but the specified string can be\n"
" quoted, in order to pass an otherwise non binary-safe string.\n"
" --intrinsic-latency <sec> Run a test to measure intrinsic system latency.\n"
@ -3111,6 +3115,7 @@ version,tls_usage);
" redis-cli --quoted-input set '\"null-\\x00-separated\"' value\n"
" redis-cli --eval myscript.lua key1 key2 , arg1 arg2 arg3\n"
" redis-cli --scan --pattern '*:12345*'\n"
" redis-cli --scan --pattern '*:12345*' --count 100\n"
"\n"
" (Note: when using --eval the comma separates KEYS[] from ARGV[] items)\n"
"\n"
@ -8826,8 +8831,8 @@ static redisReply *sendScan(unsigned long long *it) {
redisReply *reply;
if (config.pattern)
reply = redisCommand(context, "SCAN %llu MATCH %b",
*it, config.pattern, sdslen(config.pattern));
reply = redisCommand(context, "SCAN %llu MATCH %b COUNT %d",
*it, config.pattern, sdslen(config.pattern), config.count);
else
reply = redisCommand(context,"SCAN %llu",*it);
@ -9770,6 +9775,7 @@ int main(int argc, char **argv) {
config.get_functions_rdb_mode = 0;
config.stat_mode = 0;
config.scan_mode = 0;
config.count = 10;
config.intrinsic_latency_mode = 0;
config.pattern = NULL;
config.rdb_filename = NULL;

View File

@ -717,22 +717,24 @@ int allPersistenceDisabled(void) {
/* ======================= Cron: called every 100 ms ======================== */
/* Add a sample to the operations per second array of samples. */
void trackInstantaneousMetric(int metric, long long current_reading) {
long long now = mstime();
long long t = now - server.inst_metric[metric].last_sample_time;
long long ops = current_reading -
server.inst_metric[metric].last_sample_count;
long long ops_sec;
ops_sec = t > 0 ? (ops*1000/t) : 0;
server.inst_metric[metric].samples[server.inst_metric[metric].idx] =
ops_sec;
server.inst_metric[metric].idx++;
server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
server.inst_metric[metric].last_sample_time = now;
server.inst_metric[metric].last_sample_count = current_reading;
/* Add a sample to the instantaneous metric. This function computes the quotient
* of the increment of value and base, which is useful to record operation count
* per second, or the average time consumption of an operation.
*
* current_value - The dividend
* current_base - The divisor
* */
void trackInstantaneousMetric(int metric, long long current_value, long long current_base, long long factor) {
if (server.inst_metric[metric].last_sample_base > 0) {
long long base = current_base - server.inst_metric[metric].last_sample_base;
long long value = current_value - server.inst_metric[metric].last_sample_value;
long long avg = base > 0 ? (value * factor / base) : 0;
server.inst_metric[metric].samples[server.inst_metric[metric].idx] = avg;
server.inst_metric[metric].idx++;
server.inst_metric[metric].idx %= STATS_METRIC_SAMPLES;
}
server.inst_metric[metric].last_sample_base = current_base;
server.inst_metric[metric].last_sample_value = current_value;
}
/* Return the mean of all the samples. */
@ -1308,6 +1310,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* for debug purposes: skip actual cron work if pause_cron is on */
if (server.pause_cron) return 1000/server.hz;
monotime cron_start = getMonotonicUs();
run_with_period(100) {
long long stat_net_input_bytes, stat_net_output_bytes;
long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
@ -1315,16 +1319,21 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes);
atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes);
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
stat_net_input_bytes + stat_net_repl_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
stat_net_output_bytes + stat_net_repl_output_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION,
stat_net_repl_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION,
stat_net_repl_output_bytes);
monotime current_time = getMonotonicUs();
long long factor = 1000000; // us
trackInstantaneousMetric(STATS_METRIC_COMMAND, server.stat_numcommands, current_time, factor);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT, stat_net_input_bytes + stat_net_repl_input_bytes,
current_time, factor);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, stat_net_output_bytes + stat_net_repl_output_bytes,
current_time, factor);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION, stat_net_repl_input_bytes, current_time,
factor);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION, stat_net_repl_output_bytes,
current_time, factor);
trackInstantaneousMetric(STATS_METRIC_EL_CYCLE, server.duration_stats[EL_DURATION_TYPE_EL].cnt,
current_time, factor);
trackInstantaneousMetric(STATS_METRIC_EL_DURATION, server.duration_stats[EL_DURATION_TYPE_EL].sum,
server.duration_stats[EL_DURATION_TYPE_EL].cnt, 1);
}
/* We have just LRU_BITS bits per object for LRU information.
@ -1537,6 +1546,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
&ei);
server.cronloops++;
server.el_cron_duration = getMonotonicUs() - cron_start;
return 1000/server.hz;
}
@ -1672,6 +1684,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* If any connection type(typical TLS) still has pending unread data don't sleep at all. */
aeSetDontWait(server.el, connTypeHasPendingData());
/* Record cron time in beforeSleep, which is the sum of active-expire, active-defrag and all other
* tasks done by cron and beforeSleep, but excluding read, write and AOF, that are counted by other
* sets of metrics. */
monotime cron_start_time_before_aof = getMonotonicUs();
/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
@ -1738,12 +1755,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* since the unblocked clients may write data. */
handleClientsBlockedOnKeys();
/* Record time consumption of AOF writing. */
monotime aof_start_time = getMonotonicUs();
/* Record cron time in beforeSleep. This does not include the time consumed by AOF writing and IO writing below. */
monotime duration_before_aof = aof_start_time - cron_start_time_before_aof;
/* Write the AOF buffer on disk,
* must be done before handleClientsWithPendingWritesUsingThreads,
* in case of appendfsync=always. */
if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE)
flushAppendOnlyFile(0);
/* Record time consumption of AOF writing. */
durationAddSample(EL_DURATION_TYPE_AOF, getMonotonicUs() - aof_start_time);
/* Update the fsynced replica offset.
* If an initial rewrite is in progress then not all data is guaranteed to have actually been
* persisted to disk yet, so we cannot update the field. We will wait for the rewrite to complete. */
@ -1756,6 +1781,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
/* Record cron time in beforeSleep. This does not include the time consumed by AOF writing and IO writing above. */
monotime cron_start_time_after_write = getMonotonicUs();
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
@ -1767,6 +1795,25 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Disconnect some clients if they are consuming too much memory. */
evictClients();
/* Record cron time in beforeSleep. */
monotime duration_after_write = getMonotonicUs() - cron_start_time_after_write;
/* Record eventloop latency. */
if (server.el_start > 0) {
monotime el_duration = getMonotonicUs() - server.el_start;
durationAddSample(EL_DURATION_TYPE_EL, el_duration);
}
server.el_cron_duration += duration_before_aof + duration_after_write;
durationAddSample(EL_DURATION_TYPE_CRON, server.el_cron_duration);
server.el_cron_duration = 0;
/* Record max command count per cycle. */
if (server.stat_numcommands > server.el_cmd_cnt_start) {
long long el_command_cnt = server.stat_numcommands - server.el_cmd_cnt_start;
if (el_command_cnt > server.el_cmd_cnt_max) {
server.el_cmd_cnt_max = el_command_cnt;
}
}
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
* time. */
@ -1797,6 +1844,10 @@ void afterSleep(struct aeEventLoop *eventLoop) {
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("module-acquire-GIL",latency);
}
/* Set the eventloop start time. */
server.el_start = getMonotonicUs();
/* Set the eventloop command count at start. */
server.el_cmd_cnt_start = server.stat_numcommands;
}
/* Update the time cache. */
@ -2517,8 +2568,8 @@ void resetServerStats(void) {
atomicSet(server.stat_total_writes_processed, 0);
for (j = 0; j < STATS_METRIC_COUNT; j++) {
server.inst_metric[j].idx = 0;
server.inst_metric[j].last_sample_time = mstime();
server.inst_metric[j].last_sample_count = 0;
server.inst_metric[j].last_sample_base = 0;
server.inst_metric[j].last_sample_value = 0;
memset(server.inst_metric[j].samples,0,
sizeof(server.inst_metric[j].samples));
}
@ -2535,6 +2586,8 @@ void resetServerStats(void) {
server.aof_delayed_fsync = 0;
server.stat_reply_buffer_shrinks = 0;
server.stat_reply_buffer_expands = 0;
memset(server.duration_stats, 0, sizeof(durationStats) * EL_DURATION_TYPE_NUM);
server.el_cmd_cnt_max = 0;
lazyfreeResetStats();
}
@ -3150,7 +3203,7 @@ struct redisCommand *lookupCommandBySdsLogic(dict *commands, sds s) {
sds *strings = sdssplitlen(s,sdslen(s),"|",1,&argc);
if (strings == NULL)
return NULL;
if (argc > 2) {
if (argc < 1 || argc > 2) {
/* Currently we support just one level of subcommands */
sdsfreesplitres(strings,argc);
return NULL;
@ -3559,6 +3612,8 @@ void call(client *c, int flags) {
char *latency_event = (real_cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
if (server.execution_nesting == 0)
durationAddSample(EL_DURATION_TYPE_CMD, duration);
}
/* Log the command into the Slow log if needed.
@ -5915,7 +5970,12 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"io_threaded_reads_processed:%lld\r\n"
"io_threaded_writes_processed:%lld\r\n"
"reply_buffer_shrinks:%lld\r\n"
"reply_buffer_expands:%lld\r\n",
"reply_buffer_expands:%lld\r\n"
"eventloop_cycles:%llu\r\n"
"eventloop_duration_sum:%llu\r\n"
"eventloop_duration_cmd_sum:%llu\r\n"
"instantaneous_eventloop_cycles_per_sec:%llu\r\n"
"instantaneous_eventloop_duration_usec:%llu\r\n",
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
@ -5965,7 +6025,12 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
server.stat_io_reads_processed,
server.stat_io_writes_processed,
server.stat_reply_buffer_shrinks,
server.stat_reply_buffer_expands);
server.stat_reply_buffer_expands,
server.duration_stats[EL_DURATION_TYPE_EL].cnt,
server.duration_stats[EL_DURATION_TYPE_EL].sum,
server.duration_stats[EL_DURATION_TYPE_CMD].sum,
getInstantaneousMetric(STATS_METRIC_EL_CYCLE),
getInstantaneousMetric(STATS_METRIC_EL_DURATION));
info = genRedisInfoStringACLStats(info);
}
@ -6215,6 +6280,21 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
0, /* not a crash report */
sections);
}
if (dictFind(section_dict, "debug") != NULL) {
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
"# Debug\r\n"
"eventloop_duration_aof_sum:%llu\r\n"
"eventloop_duration_cron_sum:%llu\r\n"
"eventloop_duration_max:%llu\r\n"
"eventloop_cmd_per_cycle_max:%lld\r\n",
server.duration_stats[EL_DURATION_TYPE_AOF].sum,
server.duration_stats[EL_DURATION_TYPE_CRON].sum,
server.duration_stats[EL_DURATION_TYPE_EL].max,
server.el_cmd_cnt_max);
}
return info;
}

View File

@ -170,7 +170,9 @@ struct hdr_histogram;
#define STATS_METRIC_NET_OUTPUT 2 /* Bytes written to network. */
#define STATS_METRIC_NET_INPUT_REPLICATION 3 /* Bytes read to network during replication. */
#define STATS_METRIC_NET_OUTPUT_REPLICATION 4 /* Bytes written to network during replication. */
#define STATS_METRIC_COUNT 5
#define STATS_METRIC_EL_CYCLE 5 /* Number of eventloop cycled. */
#define STATS_METRIC_EL_DURATION 6 /* Eventloop duration. */
#define STATS_METRIC_COUNT 7
/* Protocol and I/O related defines */
#define PROTO_IOBUF_LEN (1024*16) /* Generic I/O buffer size */
@ -1697,13 +1699,22 @@ struct redisServer {
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
struct {
long long last_sample_time; /* Timestamp of last sample in ms */
long long last_sample_count;/* Count in last sample */
long long last_sample_base; /* The divisor of last sample window */
long long last_sample_value; /* The dividend of last sample window */
long long samples[STATS_METRIC_SAMPLES];
int idx;
} inst_metric[STATS_METRIC_COUNT];
long long stat_reply_buffer_shrinks; /* Total number of output buffer shrinks */
long long stat_reply_buffer_expands; /* Total number of output buffer expands */
monotime el_start;
/* The following two are used to record the max number of commands executed in one eventloop.
* Note that commands in transactions are also counted. */
long long el_cmd_cnt_start;
long long el_cmd_cnt_max;
/* The sum of active-expire, active-defrag and all other tasks done by cron and beforeSleep,
but excluding read, write and AOF, which are counted by other sets of metrics. */
monotime el_cron_duration;
durationStats duration_stats[EL_DURATION_TYPE_NUM];
/* Configuration */
int verbosity; /* Loglevel in redis.conf */

View File

@ -971,7 +971,7 @@ void lposCommand(client *c) {
if (!strcasecmp(opt,"RANK") && moreargs) {
j++;
if (getLongFromObjectOrReply(c, c->argv[j], &rank, NULL) != C_OK)
if (getRangeLongFromObjectOrReply(c, c->argv[j], -LONG_MAX, LONG_MAX, &rank, NULL) != C_OK)
return;
if (rank == 0) {
addReplyError(c,"RANK can't be zero: use 1 to start from "

View File

@ -274,5 +274,68 @@ start_server {tags {"info" "external:skip"}} {
$rd close
}
test {stats: eventloop metrics} {
set info1 [r info stats]
set cycle1 [getInfoProperty $info1 eventloop_cycles]
set el_sum1 [getInfoProperty $info1 eventloop_duration_sum]
set cmd_sum1 [getInfoProperty $info1 eventloop_duration_cmd_sum]
assert_morethan $cycle1 0
assert_morethan $el_sum1 0
assert_morethan $cmd_sum1 0
after 110 ;# default hz is 10, wait for a cron tick.
set info2 [r info stats]
set cycle2 [getInfoProperty $info2 eventloop_cycles]
set el_sum2 [getInfoProperty $info2 eventloop_duration_sum]
set cmd_sum2 [getInfoProperty $info2 eventloop_duration_cmd_sum]
assert_morethan $cycle2 $cycle1
assert_lessthan $cycle2 [expr $cycle1+10] ;# we expect 2 or 3 cycles here, but allow some tolerance
assert_morethan $el_sum2 $el_sum1
assert_lessthan $el_sum2 [expr $el_sum1+5000] ;# we expect roughly 100ms here, but allow some tolerance
assert_morethan $cmd_sum2 $cmd_sum1
assert_lessthan $cmd_sum2 [expr $cmd_sum1+3000] ;# we expect about tens of ms here, but allow some tolerance
}
test {stats: instantaneous metrics} {
r config resetstat
after 1600 ;# hz is 10, wait for 16 cron tick so that sample array is fulfilled
set value [s instantaneous_eventloop_cycles_per_sec]
assert_morethan $value 0
assert_lessthan $value 15 ;# default hz is 10
set value [s instantaneous_eventloop_duration_usec]
assert_morethan $value 0
assert_lessthan $value 22000 ;# default hz is 10, so duration < 1000 / 10, allow some tolerance
}
test {stats: debug metrics} {
# make sure debug info is hidden
set info [r info]
assert_equal [getInfoProperty $info eventloop_duration_aof_sum] {}
set info_all [r info all]
assert_equal [getInfoProperty $info_all eventloop_duration_aof_sum] {}
set info1 [r info debug]
set aof1 [getInfoProperty $info1 eventloop_duration_aof_sum]
assert {$aof1 >= 0}
set cron1 [getInfoProperty $info1 eventloop_duration_cron_sum]
assert {$cron1 > 0}
set cycle_max1 [getInfoProperty $info1 eventloop_cmd_per_cycle_max]
assert {$cycle_max1 > 0}
set duration_max1 [getInfoProperty $info1 eventloop_duration_max]
assert {$duration_max1 > 0}
after 110 ;# hz is 10, wait for a cron tick.
set info2 [r info debug]
set aof2 [getInfoProperty $info2 eventloop_duration_aof_sum]
assert {$aof2 >= $aof1} ;# AOF is disabled, we expect $aof2 == $aof1, but allow some tolerance.
set cron2 [getInfoProperty $info2 eventloop_duration_cron_sum]
assert_morethan $cron2 $cron1
set cycle_max2 [getInfoProperty $info2 eventloop_cmd_per_cycle_max]
assert {$cycle_max2 >= $cycle_max1}
set duration_max2 [getInfoProperty $info2 eventloop_duration_max]
assert {$duration_max2 >= $duration_max1}
}
}
}

View File

@ -464,6 +464,7 @@ foreach {type large} [array get largevalue] {
assert {[r LPOS mylist c RANK -1] == 7}
assert {[r LPOS mylist c RANK -2] == 6}
assert_error "*RANK can't be zero: use 1 to start from the first match, 2 from the second ... or use negative to start*" {r LPOS mylist c RANK 0}
assert_error "*value is out of range*" {r LPOS mylist c RANK -9223372036854775808}
}
test {LPOS COUNT option} {

View File

@ -53,14 +53,18 @@ start_server {
assert_equal {16 17} [lsort [r smembers myset]]
}
test {SMISMEMBER against non set} {
test {SMISMEMBER SMEMBERS SCARD against non set} {
r lpush mylist foo
assert_error WRONGTYPE* {r smismember mylist bar}
assert_error WRONGTYPE* {r smembers mylist}
assert_error WRONGTYPE* {r scard mylist}
}
test {SMISMEMBER non existing key} {
test {SMISMEMBER SMEMBERS SCARD against non existing key} {
assert_equal {0} [r smismember myset1 foo]
assert_equal {0 0} [r smismember myset1 foo bar]
assert_equal {} [r smembers myset1]
assert_equal {0} [r scard myset1]
}
test {SMISMEMBER requires one or more members} {