Fix blocking commands timeout is reset due to re-processing command (#13004)

In #11012, we will reprocess command when client is unblocked on keys,
in some blocking commands, for example, in the XREADGROUP BLOCK
scenario,
because of the re-processing command, we will recalculate the block
timeout,
causing the blocking time to be reset.

This commit add a new CLIENT_REPROCESSING_COMMAND clent flag, explicitly
let the command know that it is being re-processed, later in
blockForKeys
we will not reset the timeout.

Affected BLOCK cases: 
- list / zset / stream, added test cases for each.

Unaffected cases:
- module (never re-process the commands).
- WAIT / WAITAOF (never re-process the commands).

Fixes #12998.
This commit is contained in:
Binbin 2024-01-30 17:32:59 +08:00 committed by GitHub
parent af7ceeb765
commit 492021db95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 105 additions and 8 deletions

View File

@ -370,7 +370,12 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
list *l;
int j;
c->bstate.timeout = timeout;
if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) {
/* If the client is re-processing the command, we do not set the timeout
* because we need to retain the client's original timeout. */
c->bstate.timeout = timeout;
}
for (j = 0; j < numkeys; j++) {
/* If the key already exists in the dictionary ignore it. */
if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) {
@ -392,7 +397,6 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
listAddNodeTail(l,c);
dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l));
/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
* wants to be awakened if key is deleted (like XREADGROUP) */
if (unblock_on_nokey) {

View File

@ -7802,15 +7802,15 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
bc->background_timer = 0;
bc->background_duration = 0;
c->bstate.timeout = 0;
mstime_t timeout = 0;
if (timeout_ms) {
mstime_t now = mstime();
if (timeout_ms > LLONG_MAX - now) {
if (timeout_ms > LLONG_MAX - now) {
c->bstate.module_blocked_handle = NULL;
addReplyError(c, "timeout is out of range"); /* 'timeout_ms+now' would overflow */
return bc;
}
c->bstate.timeout = timeout_ms + now;
timeout = timeout_ms + now;
}
if (islua || ismulti) {
@ -7826,7 +7826,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
addReplyError(c, "Clients undergoing module based authentication can only be blocked on auth");
} else {
if (keys) {
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,c->bstate.timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED);
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED);
} else {
blockClient(c,BLOCKED_MODULE);
}

View File

@ -3646,12 +3646,20 @@ void call(client *c, int flags) {
* re-processing and unblock the client.*/
c->flags |= CLIENT_EXECUTING_COMMAND;
/* Setting the CLIENT_REPROCESSING_COMMAND flag so that during the actual
* processing of the command proc, the client is aware that it is being
* re-processed. */
if (reprocessing_command) c->flags |= CLIENT_REPROCESSING_COMMAND;
monotime monotonic_start = 0;
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
monotonic_start = getMonotonicUs();
c->cmd->proc(c);
/* Clear the CLIENT_REPROCESSING_COMMAND flag after the proc is executed. */
if (reprocessing_command) c->flags &= ~CLIENT_REPROCESSING_COMMAND;
exitExecutionUnit();
/* In case client is blocked after trying to execute the command,
@ -3709,7 +3717,7 @@ void call(client *c, int flags) {
/* Send the command to clients in MONITOR mode if applicable,
* since some administrative commands are considered too dangerous to be shown.
* Other exceptions is a client which is unblocked and retring to process the command
* Other exceptions is a client which is unblocked and retrying to process the command
* or we are currently in the process of loading AOF. */
if (update_command_stats && !reprocessing_command &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) {

View File

@ -400,6 +400,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
auth had been authenticated from the Module. */
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */

View File

@ -1186,6 +1186,34 @@ foreach {pop} {BLPOP BLMPOP_LEFT} {
r select 9
} {OK} {singledb:skip needs:debug}
test {BLPOP unblock but the key is expired and then block again - reprocessing command} {
r flushall
r debug set-active-expire 0
set rd [redis_deferring_client]
set start [clock milliseconds]
$rd blpop mylist 1
wait_for_blocked_clients_count 1
# The exec will try to awake the blocked client, but the key is expired,
# so the client will be blocked again during the command reprocessing.
r multi
r rpush mylist a
r pexpire mylist 100
r debug sleep 0.2
r exec
assert_equal {} [$rd read]
set end [clock milliseconds]
# In the past, this time would have been 1000+200, in order to avoid
# timing issues, we increase the range a bit.
assert_range [expr $end-$start] 1000 1100
r debug set-active-expire 1
$rd close
} {0} {needs:debug}
foreach {pop} {BLPOP BLMPOP_LEFT} {
test "$pop when new key is moved into place" {
set rd [redis_deferring_client]

View File

@ -475,7 +475,7 @@ start_server {
$rd close
}
test {Blocking XREADGROUP for stream key that has clients blocked on list - avoid endless loop} {
test {Blocking XREADGROUP for stream key that has clients blocked on stream - avoid endless loop} {
r DEL mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
@ -498,6 +498,34 @@ start_server {
assert_equal [r ping] {PONG}
}
test {Blocking XREADGROUP for stream key that has clients blocked on stream - reprocessing command} {
r DEL mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
$rd1 xreadgroup GROUP mygroup myuser BLOCK 0 STREAMS mystream >
wait_for_blocked_clients_count 1
set start [clock milliseconds]
$rd2 xreadgroup GROUP mygroup myuser BLOCK 1000 STREAMS mystream >
wait_for_blocked_clients_count 2
# After a while call xadd and let rd2 re-process the command.
after 200
r xadd mystream * field value
assert_equal {} [$rd2 read]
set end [clock milliseconds]
# In the past, this time would have been 1000+200, in order to avoid
# timing issues, we increase the range a bit.
assert_range [expr $end-$start] 1000 1100
$rd1 close
$rd2 close
}
test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} {
r config resetstat
r del mystream

View File

@ -1989,6 +1989,34 @@ start_server {tags {"zset"}} {
}
}
test {BZPOPMIN unblock but the key is expired and then block again - reprocessing command} {
r flushall
r debug set-active-expire 0
set rd [redis_deferring_client]
set start [clock milliseconds]
$rd bzpopmin zset{t} 1
wait_for_blocked_clients_count 1
# The exec will try to awake the blocked client, but the key is expired,
# so the client will be blocked again during the command reprocessing.
r multi
r zadd zset{t} 1 one
r pexpire zset{t} 100
r debug sleep 0.2
r exec
assert_equal {} [$rd read]
set end [clock milliseconds]
# In the past, this time would have been 1000+200, in order to avoid
# timing issues, we increase the range a bit.
assert_range [expr $end-$start] 1000 1100
r debug set-active-expire 1
$rd close
} {0} {needs:debug}
test "BZPOPMIN with same key multiple times should work" {
set rd [redis_deferring_client]
r del z1{t} z2{t}