Support for RM_Call on blocking commands (#11568)

Allow running blocking commands from within a module using `RM_Call`.

Today, when `RM_Call` is used, the fake client that is used to run command
is marked with `CLIENT_DENY_BLOCKING` flag. This flag tells the command
that it is not allowed to block the client and in case it needs to block, it must
fallback to some alternative (either return error or perform some default behavior).
For example, `BLPOP` fallback to simple `LPOP` if it is not allowed to block.

All the commands must respect the `CLIENT_DENY_BLOCKING` flag (including
module commands). When the command invocation finished, Redis asserts that
the client was not blocked.

This PR introduces the ability to call blocking command using `RM_Call` by
passing a callback that will be called when the client will get unblocked.
In order to do that, the user must explicitly say that he allow to perform blocking
command by passing a new format specifier argument, `K`, to the `RM_Call`
function. This new flag will tell Redis that it is allow to run blocking command
and block the client. In case the command got blocked, Redis will return a new
type of call reply (`REDISMODULE_REPLY_PROMISE`). This call reply indicates
that the command got blocked and the user can set the on_unblocked handler using
`RM_CallReplyPromiseSetUnblockHandler`.

When clients gets unblocked, it eventually reaches `processUnblockedClients` function.
This is where we check if the client is a fake module client and if it is, we call the unblock
callback instead of performing the usual unblock operations.

**Notice**: `RM_CallReplyPromiseSetUnblockHandler` must be called atomically
along side the command invocation (without releasing the Redis lock in between).
In addition, unlike other CallReply types, the promise call reply must be released
by the module when the Redis GIL is acquired.

The module can abort the execution on the blocking command (if it was not yet
executed) using `RM_CallReplyPromiseAbort`. the API will return `REDISMODULE_OK`
on success and `REDISMODULE_ERR` if the operation is already executed.
**Notice** that in case of misbehave module, Abort might finished successfully but the
operation will not really be aborted. This can only happened if the module do not respect
the disconnect callback of the blocked client. 
For pure Redis commands this can not happened.

### Atomicity Guarantees

The API promise that the unblock handler will run atomically as an execution unit.
This means that all the operation performed on the unblock handler will be wrapped
with a multi exec transaction when replicated to the replica and AOF.
The API **do not** grantee any other atomicity properties such as when the unblock
handler will be called. This gives us the flexibility to strengthen the grantees (or not)
in the future if we will decide that we need a better guarantees.

That said, the implementation **does** provide a better guarantees when performing
pure Redis blocking command like `BLPOP`. In this case the unblock handler will run
atomically with the operation that got unblocked (for example, in case of `BLPOP`, the
unblock handler will run atomically with the `LPOP` operation that run when the command
got unblocked). This is an implementation detail that might be change in the future and the
module writer should not count on that.

### Calling blocking commands while running on script mode (`S`)

`RM_Call` script mode (`S`) was introduced on #0372. It is used for usecases where the
command that was invoked on `RM_Call` comes from a user input and we want to make
sure the user will not run dangerous commands like `shutdown`. Some command, such
as `BLPOP`, are marked with `NO_SCRIPT` flag, which means they will not be allowed on
script mode. Those commands are marked with  `NO_SCRIPT` just because they are
blocking commands and not because they are dangerous. Now that we can run blocking
commands on RM_Call, there is no real reason not to allow such commands on script mode.

The underline problem is that the `NO_SCRIPT` flag is abused to also mark some of the
blocking commands (notice that those commands know not to block the client if it is not
allowed to do so, and have a fallback logic to such cases. So even if those commands
were not marked with `NO_SCRIPT` flag, it would not harm Redis, and today we can
already run those commands within multi exec).

In addition, not all blocking commands are marked with `NO_SCRIPT` flag, for example
`blmpop` are not marked and can run from within a script.

Those facts shows that there are some ambiguity about the meaning of the `NO_SCRIPT`
flag, and its not fully clear where it should be use.

The PR suggest that blocking commands should not be marked with `NO_SCRIPT` flag,
those commands should handle `CLIENT_DENY_BLOCKING` flag and only block when
it's safe (like they already does today). To achieve that, the PR removes the `NO_SCRIPT`
flag from the following commands:
* `blmove`
* `blpop`
* `brpop`
* `brpoplpush`
* `bzpopmax`
* `bzpopmin`
* `wait`

This might be considered a breaking change as now, on scripts, instead of getting
`command is not allowed from script` error, the user will get some fallback behavior
base on the command implementation. That said, the change matches the behavior
of scripts and multi exec with respect to those commands and allow running them on
`RM_Call` even when script mode is used.

### Additional RedisModule API and changes

* `RM_BlockClientSetPrivateData` - Set private data on the blocked client without the
  need to unblock the client. This allows up to set the promise CallReply as the private
  data of the blocked client and abort it if the client gets disconnected.
* `RM_BlockClientGetPrivateData` - Return the current private data set on a blocked client.
  We need it so we will have access to this private data on the disconnect callback.
* On RM_Call, the returned reply will be added to the auto memory context only if auto
  memory is enabled, this allows us to keep the call reply for longer time then the context
  lifetime and does not force an unneeded borrow relationship between the CallReply and
  the RedisModuleContext.
This commit is contained in:
Meir Shpilraien (Spielrein) 2023-03-16 14:04:31 +02:00 committed by GitHub
parent 484b73a842
commit d0da0a6a3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1025 additions and 105 deletions

View File

@ -52,5 +52,6 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/publish \
--single unit/moduleapi/usercall \
--single unit/moduleapi/postnotifications \
--single unit/moduleapi/async_rm_call \
--single unit/moduleapi/moduleauth \
"${@}"

View File

@ -79,6 +79,7 @@ void initClientBlockingState(client *c) {
c->bstate.numreplicas = 0;
c->bstate.reploffset = 0;
c->bstate.unblock_on_nokey = 0;
c->bstate.async_rm_call_handle = NULL;
}
/* Block a client for the specific operation type. Once the CLIENT_BLOCKED
@ -92,7 +93,7 @@ void blockClient(client *c, int btype) {
c->flags |= CLIENT_BLOCKED;
c->bstate.btype = btype;
server.blocked_clients++;
if (!(c->flags & CLIENT_MODULE)) server.blocked_clients++; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[btype]++;
addClientToTimeoutTable(c);
}
@ -131,6 +132,13 @@ void processUnblockedClients(void) {
listDelNode(server.unblocked_clients,ln);
c->flags &= ~CLIENT_UNBLOCKED;
if (c->flags & CLIENT_MODULE) {
if (!(c->flags & CLIENT_BLOCKED)) {
moduleCallCommandUnblockedHandler(c);
}
continue;
}
/* Process remaining data in the input buffer, unless the client
* is blocked again. Actually processInputBuffer() checks that the
* client is not blocked before to proceed, but things may change and
@ -172,7 +180,7 @@ void queueClientForReprocessing(client *c) {
/* Unblock a client calling the right function depending on the kind
* of operation the client is blocking for. */
void unblockClient(client *c) {
void unblockClient(client *c, int queue_for_reprocessing) {
if (c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
@ -205,13 +213,13 @@ void unblockClient(client *c) {
/* Clear the flags, and put the client in the unblocked list so that
* we'll process new commands in its query buffer ASAP. */
server.blocked_clients--;
if (!(c->flags & CLIENT_MODULE)) server.blocked_clients--; /* We count blocked client stats on regular clients and not on module clients */
server.blocked_clients_by_type[c->bstate.btype]--;
c->flags &= ~CLIENT_BLOCKED;
c->bstate.btype = BLOCKED_NONE;
c->bstate.unblock_on_nokey = 0;
removeClientFromTimeoutTable(c);
queueClientForReprocessing(c);
if (queue_for_reprocessing) queueClientForReprocessing(c);
}
/* This function gets called when a blocked client timed out in order to
@ -247,7 +255,7 @@ void replyToClientsBlockedOnShutdown(void) {
client *c = listNodeValue(ln);
if (c->flags & CLIENT_BLOCKED && c->bstate.btype == BLOCKED_SHUTDOWN) {
addReplyError(c, "Errors trying to SHUTDOWN. Check logs.");
unblockClient(c);
unblockClient(c, 1);
}
}
}
@ -632,12 +640,30 @@ static void unblockClientOnKey(client *c, robj *key) {
/* We need to unblock the client before calling processCommandAndResetClient
* because it checks the CLIENT_BLOCKED flag */
unblockClient(c);
unblockClient(c, 0);
/* In case this client was blocked on keys during command
* we need to re process the command again */
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
/* We want the command processing and the unblock handler (see RM_Call 'K' option)
* to run atomically, this is why we must enter the execution unit here before
* running the command, and exit the execution unit after calling the unblock handler (if exists).
* Notice that we also must set the current client so it will be available
* when we will try to send the the client side caching notification (done on 'afterCommand'). */
client *old_client = server.current_client;
server.current_client = c;
enterExecutionUnit(1, 0);
processCommandAndResetClient(c);
if (!(c->flags & CLIENT_BLOCKED)) {
if (c->flags & CLIENT_MODULE) {
moduleCallCommandUnblockedHandler(c);
} else {
queueClientForReprocessing(c);
}
}
exitExecutionUnit();
afterCommand(c);
server.current_client = old_client;
}
}
@ -673,7 +699,7 @@ void unblockClientOnTimeout(client *c) {
replyToBlockedClientTimedOut(c);
if (c->flags & CLIENT_PENDING_COMMAND)
c->flags &= ~CLIENT_PENDING_COMMAND;
unblockClient(c);
unblockClient(c, 1);
}
/* Unblock a client which is currently Blocked with error.
@ -684,7 +710,7 @@ void unblockClientOnError(client *c, const char *err_str) {
updateStatsOnUnblock(c, 0, 0, 1);
if (c->flags & CLIENT_PENDING_COMMAND)
c->flags &= ~CLIENT_PENDING_COMMAND;
unblockClient(c);
unblockClient(c, 1);
}
/* sets blocking_keys to the total number of keys which has at least one client blocked on them

View File

@ -234,6 +234,10 @@ void freeCallReply(CallReply *rep) {
return;
}
if (rep->flags & REPLY_FLAG_PARSED) {
if (rep->type == REDISMODULE_REPLY_PROMISE) {
zfree(rep);
return;
}
freeCallReplyInternal(rep);
}
sdsfree(rep->original_proto);
@ -242,6 +246,17 @@ void freeCallReply(CallReply *rep) {
zfree(rep);
}
CallReply *callReplyCreatePromise(void *private_data) {
CallReply *res = zmalloc(sizeof(*res));
res->type = REDISMODULE_REPLY_PROMISE;
/* Mark the reply as parsed so there will be not attempt to parse
* it when calling reply API such as freeCallReply.
* Also mark the reply as root so freeCallReply will not ignore it. */
res->flags |= REPLY_FLAG_PARSED | REPLY_FLAG_ROOT;
res->private_data = private_data;
return res;
}
static const ReplyParserCallbacks DefaultParserCallbacks = {
.null_callback = callReplyNull,
.bulk_string_callback = callReplyBulkString,

View File

@ -33,6 +33,7 @@
#include "resp_parser.h"
typedef struct CallReply CallReply;
typedef void (*RedisModuleOnUnblocked)(void *ctx, CallReply *reply, void *private_data);
CallReply *callReplyCreate(sds reply, list *deferred_error_list, void *private_data);
CallReply *callReplyCreateError(sds reply, void *private_data);
@ -54,5 +55,6 @@ void *callReplyGetPrivateData(CallReply *rep);
int callReplyIsResp3(CallReply *rep);
list *callReplyDeferredErrorList(CallReply *rep);
void freeCallReply(CallReply *rep);
CallReply *callReplyCreatePromise(void *private_data);
#endif /* SRC_CALL_REPLY_H_ */

View File

@ -7263,7 +7263,7 @@ struct redisCommand redisCommandTable[] = {
{"ttl","Get the time to live for a key in seconds","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,TTL_History,TTL_tips,ttlCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_KEYSPACE,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=TTL_Args},
{"type","Determine the type stored at key","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,TYPE_History,TYPE_tips,typeCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_KEYSPACE,{{NULL,CMD_KEY_RO,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=TYPE_Args},
{"unlink","Delete a key asynchronously in another thread. Otherwise it is just as DEL, but non blocking.","O(1) for each key removed regardless of its size. Then the command does O(N) work in a different thread in order to reclaim memory, where N is the number of allocations the deleted objects where composed of.","4.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,UNLINK_History,UNLINK_tips,unlinkCommand,-2,CMD_WRITE|CMD_FAST,ACL_CATEGORY_KEYSPACE,{{NULL,CMD_KEY_RM|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=UNLINK_Args},
{"wait","Wait for the synchronous replication of all the write commands sent in the context of the current connection","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,WAIT_History,WAIT_tips,waitCommand,3,CMD_NOSCRIPT,ACL_CATEGORY_CONNECTION,.args=WAIT_Args},
{"wait","Wait for the synchronous replication of all the write commands sent in the context of the current connection","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,WAIT_History,WAIT_tips,waitCommand,3,0,ACL_CATEGORY_CONNECTION,.args=WAIT_Args},
{"waitaof","Wait for all write commands sent in the context of the current connection to be synced to AOF of local host and/or replicas","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GENERIC,WAITAOF_History,WAITAOF_tips,waitaofCommand,4,CMD_NOSCRIPT,ACL_CATEGORY_CONNECTION,.args=WAITAOF_Args},
/* geo */
{"geoadd","Add one or more geospatial items in the geospatial index represented using a sorted set","O(log(N)) for each item added, where N is the number of elements in the sorted set.","3.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_GEO,GEOADD_History,GEOADD_tips,geoaddCommand,-5,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_GEO,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=GEOADD_Args},
@ -7300,11 +7300,11 @@ struct redisCommand redisCommandTable[] = {
{"pfmerge","Merge N different HyperLogLogs into a single one.","O(N) to merge N HyperLogLogs, but with high constant times.","2.8.9",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_HYPERLOGLOG,PFMERGE_History,PFMERGE_tips,pfmergeCommand,-2,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_HYPERLOGLOG,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_INSERT,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}},{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=PFMERGE_Args},
{"pfselftest","An internal command for testing HyperLogLog values","N/A","2.8.9",CMD_DOC_SYSCMD,NULL,NULL,COMMAND_GROUP_HYPERLOGLOG,PFSELFTEST_History,PFSELFTEST_tips,pfselftestCommand,1,CMD_ADMIN,ACL_CATEGORY_HYPERLOGLOG},
/* list */
{"blmove","Pop an element from a list, push it to another list and return it; or block until one is available","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_LIST,BLMOVE_History,BLMOVE_tips,blmoveCommand,6,CMD_WRITE|CMD_DENYOOM|CMD_NOSCRIPT|CMD_BLOCKING,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}},{NULL,CMD_KEY_RW|CMD_KEY_INSERT,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=BLMOVE_Args},
{"blmove","Pop an element from a list, push it to another list and return it; or block until one is available","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_LIST,BLMOVE_History,BLMOVE_tips,blmoveCommand,6,CMD_WRITE|CMD_DENYOOM|CMD_BLOCKING,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}},{NULL,CMD_KEY_RW|CMD_KEY_INSERT,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=BLMOVE_Args},
{"blmpop","Pop elements from a list, or block until one is available","O(N+M) where N is the number of provided keys and M is the number of elements returned.","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_LIST,BLMPOP_History,BLMPOP_tips,blmpopCommand,-5,CMD_WRITE|CMD_BLOCKING,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}},blmpopGetKeys,.args=BLMPOP_Args},
{"blpop","Remove and get the first element in a list, or block until one is available","O(N) where N is the number of provided keys.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_LIST,BLPOP_History,BLPOP_tips,blpopCommand,-3,CMD_WRITE|CMD_NOSCRIPT|CMD_BLOCKING,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-2,1,0}}},.args=BLPOP_Args},
{"brpop","Remove and get the last element in a list, or block until one is available","O(N) where N is the number of provided keys.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_LIST,BRPOP_History,BRPOP_tips,brpopCommand,-3,CMD_WRITE|CMD_NOSCRIPT|CMD_BLOCKING,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-2,1,0}}},.args=BRPOP_Args},
{"brpoplpush","Pop an element from a list, push it to another list and return it; or block until one is available","O(1)","2.2.0",CMD_DOC_DEPRECATED,"`BLMOVE` with the `RIGHT` and `LEFT` arguments","6.2.0",COMMAND_GROUP_LIST,BRPOPLPUSH_History,BRPOPLPUSH_tips,brpoplpushCommand,4,CMD_WRITE|CMD_DENYOOM|CMD_NOSCRIPT|CMD_BLOCKING,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}},{NULL,CMD_KEY_RW|CMD_KEY_INSERT,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=BRPOPLPUSH_Args},
{"blpop","Remove and get the first element in a list, or block until one is available","O(N) where N is the number of provided keys.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_LIST,BLPOP_History,BLPOP_tips,blpopCommand,-3,CMD_WRITE|CMD_BLOCKING,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-2,1,0}}},.args=BLPOP_Args},
{"brpop","Remove and get the last element in a list, or block until one is available","O(N) where N is the number of provided keys.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_LIST,BRPOP_History,BRPOP_tips,brpopCommand,-3,CMD_WRITE|CMD_BLOCKING,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-2,1,0}}},.args=BRPOP_Args},
{"brpoplpush","Pop an element from a list, push it to another list and return it; or block until one is available","O(1)","2.2.0",CMD_DOC_DEPRECATED,"`BLMOVE` with the `RIGHT` and `LEFT` arguments","6.2.0",COMMAND_GROUP_LIST,BRPOPLPUSH_History,BRPOPLPUSH_tips,brpoplpushCommand,4,CMD_WRITE|CMD_DENYOOM|CMD_BLOCKING,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}},{NULL,CMD_KEY_RW|CMD_KEY_INSERT,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=BRPOPLPUSH_Args},
{"lindex","Get an element from a list by its index","O(N) where N is the number of elements to traverse to get to the element at index. This makes asking for the first or the last element of the list O(1).","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_LIST,LINDEX_History,LINDEX_tips,lindexCommand,3,CMD_READONLY,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=LINDEX_Args},
{"linsert","Insert an element before or after another element in a list","O(N) where N is the number of elements to traverse before seeing the value pivot. This means that inserting somewhere on the left end on the list (head) can be considered O(1) and inserting somewhere on the right end (tail) is O(N).","2.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_LIST,LINSERT_History,LINSERT_tips,linsertCommand,5,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RW|CMD_KEY_INSERT,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=LINSERT_Args},
{"llen","Get the length of a list","O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_LIST,LLEN_History,LLEN_tips,llenCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_LIST,{{NULL,CMD_KEY_RO,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=LLEN_Args},
@ -7393,8 +7393,8 @@ struct redisCommand redisCommandTable[] = {
{"sunionstore","Add multiple sets and store the resulting set in a key","O(N) where N is the total number of elements in all given sets.","1.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SET,SUNIONSTORE_History,SUNIONSTORE_tips,sunionstoreCommand,-3,CMD_WRITE|CMD_DENYOOM,ACL_CATEGORY_SET,{{NULL,CMD_KEY_OW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}},{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=SUNIONSTORE_Args},
/* sorted_set */
{"bzmpop","Remove and return members with scores in a sorted set or block until one is available","O(K) + O(M*log(N)) where K is the number of provided keys, N being the number of elements in the sorted set, and M being the number of elements popped.","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,BZMPOP_History,BZMPOP_tips,bzmpopCommand,-5,CMD_WRITE|CMD_BLOCKING,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}},blmpopGetKeys,.args=BZMPOP_Args},
{"bzpopmax","Remove and return the member with the highest score from one or more sorted sets, or block until one is available","O(log(N)) with N being the number of elements in the sorted set.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,BZPOPMAX_History,BZPOPMAX_tips,bzpopmaxCommand,-3,CMD_WRITE|CMD_NOSCRIPT|CMD_FAST|CMD_BLOCKING,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-2,1,0}}},.args=BZPOPMAX_Args},
{"bzpopmin","Remove and return the member with the lowest score from one or more sorted sets, or block until one is available","O(log(N)) with N being the number of elements in the sorted set.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,BZPOPMIN_History,BZPOPMIN_tips,bzpopminCommand,-3,CMD_WRITE|CMD_NOSCRIPT|CMD_FAST|CMD_BLOCKING,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-2,1,0}}},.args=BZPOPMIN_Args},
{"bzpopmax","Remove and return the member with the highest score from one or more sorted sets, or block until one is available","O(log(N)) with N being the number of elements in the sorted set.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,BZPOPMAX_History,BZPOPMAX_tips,bzpopmaxCommand,-3,CMD_WRITE|CMD_FAST|CMD_BLOCKING,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-2,1,0}}},.args=BZPOPMAX_Args},
{"bzpopmin","Remove and return the member with the lowest score from one or more sorted sets, or block until one is available","O(log(N)) with N being the number of elements in the sorted set.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,BZPOPMIN_History,BZPOPMIN_tips,bzpopminCommand,-3,CMD_WRITE|CMD_FAST|CMD_BLOCKING,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RW|CMD_KEY_ACCESS|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-2,1,0}}},.args=BZPOPMIN_Args},
{"zadd","Add one or more members to a sorted set, or update its score if it already exists","O(log(N)) for each item added, where N is the number of elements in the sorted set.","1.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZADD_History,ZADD_tips,zaddCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZADD_Args},
{"zcard","Get the number of members in a sorted set","O(1)","1.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZCARD_History,ZCARD_tips,zcardCommand,2,CMD_READONLY|CMD_FAST,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZCARD_Args},
{"zcount","Count the members in a sorted set with scores within the given values","O(log(N)) with N being the number of elements in the sorted set.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SORTED_SET,ZCOUNT_History,ZCOUNT_tips,zcountCommand,4,CMD_READONLY|CMD_FAST,ACL_CATEGORY_SORTEDSET,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=ZCOUNT_Args},

View File

@ -9,7 +9,6 @@
"command_flags": [
"WRITE",
"DENYOOM",
"NOSCRIPT",
"BLOCKING"
],
"acl_categories": [

View File

@ -14,7 +14,6 @@
],
"command_flags": [
"WRITE",
"NOSCRIPT",
"BLOCKING"
],
"acl_categories": [

View File

@ -14,7 +14,6 @@
],
"command_flags": [
"WRITE",
"NOSCRIPT",
"BLOCKING"
],
"acl_categories": [

View File

@ -20,7 +20,6 @@
"command_flags": [
"WRITE",
"DENYOOM",
"NOSCRIPT",
"BLOCKING"
],
"acl_categories": [

View File

@ -14,7 +14,6 @@
],
"command_flags": [
"WRITE",
"NOSCRIPT",
"FAST",
"BLOCKING"
],

View File

@ -14,7 +14,6 @@
],
"command_flags": [
"WRITE",
"NOSCRIPT",
"FAST",
"BLOCKING"
],

View File

@ -7,7 +7,6 @@
"arity": 3,
"function": "waitCommand",
"command_flags": [
"NOSCRIPT"
],
"acl_categories": [
"CONNECTION"

View File

@ -394,6 +394,7 @@ typedef struct RedisModuleServerInfoData {
#define REDISMODULE_ARGV_CALL_REPLIES_AS_ERRORS (1<<8)
#define REDISMODULE_ARGV_RESPECT_DENY_OOM (1<<9)
#define REDISMODULE_ARGV_DRY_RUN (1<<10)
#define REDISMODULE_ARGV_ALLOW_BLOCK (1<<11)
/* Determine whether Redis should signalModifiedKey implicitly.
* In case 'ctx' has no 'module' member (and therefore no module->options),
@ -469,6 +470,15 @@ struct ModuleConfig {
RedisModule *module;
};
typedef struct RedisModuleAsyncRMCallPromise{
size_t ref_count;
void *private_data;
RedisModule *module;
RedisModuleOnUnblocked on_unblocked;
client *c;
RedisModuleCtx *ctx;
} RedisModuleAsyncRMCallPromise;
/* --------------------------------------------------------------------------
* Prototypes
* -------------------------------------------------------------------------- */
@ -492,7 +502,7 @@ static struct redisCommandArg *moduleCopyCommandArgs(RedisModuleCommandArg *args
const RedisModuleCommandInfoVersion *version);
static redisCommandArgType moduleConvertArgType(RedisModuleCommandArgType type, int *error);
static int moduleConvertArgFlags(int flags);
void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_flags);
/* --------------------------------------------------------------------------
* ## Heap allocation raw functions
*
@ -619,6 +629,16 @@ client *moduleAllocTempClient(user *user) {
return c;
}
static void freeRedisModuleAsyncRMCallPromise(RedisModuleAsyncRMCallPromise *promise) {
if (--promise->ref_count > 0) {
return;
}
/* When the promise is finally freed it can not have a client attached to it.
* Either releasing the client or RM_CallReplyPromiseAbort would have removed it. */
serverAssert(!promise->c);
zfree(promise);
}
void moduleReleaseTempClient(client *c) {
if (moduleTempClientCount == moduleTempClientCap) {
moduleTempClientCap = moduleTempClientCap ? moduleTempClientCap*2 : 32;
@ -632,6 +652,12 @@ void moduleReleaseTempClient(client *c) {
c->flags = CLIENT_MODULE;
c->user = NULL; /* Root user */
c->cmd = c->lastcmd = c->realcmd = NULL;
if (c->bstate.async_rm_call_handle) {
RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
promise->c = NULL; /* Remove the client from the promise so it will no longer be possible to abort it. */
freeRedisModuleAsyncRMCallPromise(promise);
c->bstate.async_rm_call_handle = NULL;
}
moduleTempClients[moduleTempClientCount++] = c;
}
@ -771,7 +797,7 @@ void modulePostExecutionUnitOperations() {
void moduleFreeContext(RedisModuleCtx *ctx) {
/* See comment in moduleCreateContext */
if (!(ctx->flags & (REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_COMMAND))) {
server.execution_nesting--;
exitExecutionUnit();
postExecutionUnitOperations();
}
autoMemoryCollect(ctx);
@ -796,6 +822,42 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
freeClient(ctx->client);
}
static CallReply *moduleParseReply(client *c, RedisModuleCtx *ctx) {
/* Convert the result of the Redis command into a module reply. */
sds proto = sdsnewlen(c->buf,c->bufpos);
c->bufpos = 0;
while(listLength(c->reply)) {
clientReplyBlock *o = listNodeValue(listFirst(c->reply));
proto = sdscatlen(proto,o->buf,o->used);
listDelNode(c->reply,listFirst(c->reply));
}
CallReply *reply = callReplyCreate(proto, c->deferred_reply_errors, ctx);
c->deferred_reply_errors = NULL; /* now the responsibility of the reply object. */
return reply;
}
void moduleCallCommandUnblockedHandler(client *c) {
RedisModuleCtx ctx;
RedisModuleAsyncRMCallPromise *promise = c->bstate.async_rm_call_handle;
serverAssert(promise);
RedisModule *module = promise->module;
if (!promise->on_unblocked) {
moduleReleaseTempClient(c);
return; /* module did not set any unblock callback. */
}
moduleCreateContext(&ctx, module, REDISMODULE_CTX_TEMP_CLIENT);
selectDb(ctx.client, c->db->id);
CallReply *reply = moduleParseReply(c, &ctx);
module->in_call++;
promise->on_unblocked(&ctx, reply, promise->private_data);
module->in_call--;
moduleFreeContext(&ctx);
moduleReleaseTempClient(c);
}
/* Create a module ctx and keep track of the nesting level.
*
* Note: When creating ctx for threads (RM_GetThreadSafeContext and
@ -832,7 +894,7 @@ void moduleCreateContext(RedisModuleCtx *out_ctx, RedisModule *module, int ctx_f
* 2. If we are running in a thread (execution_nesting will be dealt with
* when locking/unlocking the GIL) */
if (!(ctx_flags & (REDISMODULE_CTX_THREAD_SAFE|REDISMODULE_CTX_COMMAND))) {
server.execution_nesting++;
enterExecutionUnit(1, 0);
}
}
@ -5632,9 +5694,20 @@ void RM_FreeCallReply(RedisModuleCallReply *reply) {
/* This is a wrapper for the recursive free reply function. This is needed
* in order to have the first level function to return on nested replies,
* but only if called by the module API. */
RedisModuleCtx *ctx = callReplyGetPrivateData(reply);
RedisModuleCtx *ctx = NULL;
if(callReplyType(reply) == REDISMODULE_REPLY_PROMISE) {
RedisModuleAsyncRMCallPromise *promise = callReplyGetPrivateData(reply);
ctx = promise->ctx;
freeRedisModuleAsyncRMCallPromise(promise);
} else {
ctx = callReplyGetPrivateData(reply);
}
freeCallReply(reply);
autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply);
if (ctx) {
autoMemoryFreed(ctx,REDISMODULE_AM_REPLY,reply);
}
}
/* Return the reply type as one of the following:
@ -5651,7 +5724,8 @@ void RM_FreeCallReply(RedisModuleCallReply *reply) {
* - REDISMODULE_REPLY_DOUBLE
* - REDISMODULE_REPLY_BIG_NUMBER
* - REDISMODULE_REPLY_VERBATIM_STRING
* - REDISMODULE_REPLY_ATTRIBUTE */
* - REDISMODULE_REPLY_ATTRIBUTE
* - REDISMODULE_REPLY_PROMISE */
int RM_CallReplyType(RedisModuleCallReply *reply) {
return callReplyType(reply);
}
@ -5734,6 +5808,39 @@ int RM_CallReplyAttributeElement(RedisModuleCallReply *reply, size_t idx, RedisM
return REDISMODULE_ERR;
}
/* Set unblock handler (callback and private data) on the given promise RedisModuleCallReply.
* The given reply must be of promise type (REDISMODULE_REPLY_PROMISE). */
void RM_CallReplyPromiseSetUnblockHandler(RedisModuleCallReply *reply, RedisModuleOnUnblocked on_unblock, void *private_data) {
RedisModuleAsyncRMCallPromise *promise = callReplyGetPrivateData(reply);
promise->on_unblocked = on_unblock;
promise->private_data = private_data;
}
/* Abort the execution of a given promise RedisModuleCallReply.
* return REDMODULE_OK in case the abort was done successfully and REDISMODULE_ERR
* if its not possible to abort the execution (execution already finished).
* In case the execution was aborted (REDMODULE_OK was returned), the private_data out parameter
* will be set with the value of the private data that was given on 'RM_CallReplyPromiseSetUnblockHandler'
* so the caller will be able to release the private data.
*
* If the execution was aborted successfully, it is promised that the unblock handler will not be called.
* That said, it is possible that the abort operation will successes but the operation will still continue.
* This can happened if, for example, a module implements some blocking command and does not respect the
* disconnect callback. For pure Redis commands this can not happened.*/
int RM_CallReplyPromiseAbort(RedisModuleCallReply *reply, void **private_data) {
RedisModuleAsyncRMCallPromise *promise = callReplyGetPrivateData(reply);
if (!promise->c) return REDISMODULE_ERR; /* Promise can not be aborted, either already aborted or already finished. */
if (!(promise->c->flags & CLIENT_BLOCKED)) return REDISMODULE_ERR; /* Client is not blocked anymore, can not abort it. */
/* Client is still blocked, remove it from any blocking state and release it. */
if (private_data) *private_data = promise->private_data;
promise->private_data = NULL;
promise->on_unblocked = NULL;
unblockClient(promise->c, 0);
moduleReleaseTempClient(promise->c);
return REDISMODULE_OK;
}
/* Return the pointer and length of a string or error reply. */
const char *RM_CallReplyStringPtr(RedisModuleCallReply *reply, size_t *len) {
size_t private_len;
@ -5781,6 +5888,7 @@ void RM_SetContextUser(RedisModuleCtx *ctx, const RedisModuleUser *user) {
* "0" -> REDISMODULE_ARGV_RESP_AUTO
* "C" -> REDISMODULE_ARGV_RUN_AS_USER
* "M" -> REDISMODULE_ARGV_RESPECT_DENY_OOM
* "K" -> REDISMODULE_ARGV_ALLOW_BLOCK
*
* On error (format specifier error) NULL is returned and nothing is
* allocated. On success the argument vector is returned. */
@ -5855,6 +5963,8 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int
if (flags) (*flags) |= REDISMODULE_ARGV_CALL_REPLIES_AS_ERRORS;
} else if (*p == 'D') {
if (flags) (*flags) |= (REDISMODULE_ARGV_DRY_RUN | REDISMODULE_ARGV_CALL_REPLIES_AS_ERRORS);
} else if (*p == 'K') {
if (flags) (*flags) |= REDISMODULE_ARGV_ALLOW_BLOCK;
} else {
goto fmterr;
}
@ -5919,6 +6029,34 @@ fmterr:
* If everything succeeded, it will return with a NULL, otherwise it will
* return with a CallReply object denoting the error, as if it was called with
* the 'E' code.
* * 'K' -- Allow running blocking commands. If enabled and the command gets blocked, a
* special REDISMODULE_REPLY_PROMISE will be returned. This reply type
* indicates that the command was blocked and the reply will be given asynchronously.
* The module can use this reply object to set a handler which will be called when
* the command gets unblocked using RedisModule_CallReplyPromiseSetUnblockHandler.
* The handler must be set immediately after the command invocation (without releasing
* the Redis lock in between). If the handler is not set, the blocking command will
* still continue its execution but the reply will be ignored (fire and forget),
* notice that this is dangerous in case of role change, as explained below.
* The module can use RedisModule_CallReplyPromiseAbort to abort the command invocation
* if it was not yet finished (see RedisModule_CallReplyPromiseAbort documentation for more
* details). It is also the module's responsibility to abort the execution on role change, either by using
* server event (to get notified when the instance becomes a replica) or relying on the disconnect
* callback of the original client. Failing to do so can result in a write operation on a replica.
* Unlike other call replies, promise call reply **must** be freed while the Redis GIL is locked.
* Notice that on unblocking, the only promise is that the unblock handler will be called,
* If the blocking RM_Call caused the module to also block some real client (using RM_BlockClient),
* it is the module responsibility to unblock this client on the unblock handler.
* On the unblock handler it is only allowed to perform the following:
* * Calling additional Redis commands using RM_Call
* * Open keys using RM_OpenKey
* * Replicate data to the replica or AOF
*
* Specifically, it is not allowed to call any Redis module API which are client related such as:
* * RM_Reply* API's
* * RM_BlockClient
* * RM_GetCurrentUserName
*
* * **...**: The actual arguments to the Redis command.
*
* On success a RedisModuleCallReply object is returned, otherwise
@ -5978,8 +6116,10 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
c = moduleAllocTempClient(user);
/* We do not want to allow block, the module do not expect it */
c->flags |= CLIENT_DENY_BLOCKING;
if (!(flags & REDISMODULE_ARGV_ALLOW_BLOCK)) {
/* We do not want to allow block, the module do not expect it */
c->flags |= CLIENT_DENY_BLOCKING;
}
c->db = ctx->client->db;
c->argv = argv;
/* We have to assign argv_len, which is equal to argc in that case (RM_Call)
@ -6212,24 +6352,39 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
call(c,call_flags);
server.replication_allowed = prev_replication_allowed;
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
/* Convert the result of the Redis command into a module reply. */
sds proto = sdsnewlen(c->buf,c->bufpos);
c->bufpos = 0;
while(listLength(c->reply)) {
clientReplyBlock *o = listNodeValue(listFirst(c->reply));
proto = sdscatlen(proto,o->buf,o->used);
listDelNode(c->reply,listFirst(c->reply));
if (c->flags & CLIENT_BLOCKED) {
serverAssert(flags & REDISMODULE_ARGV_ALLOW_BLOCK);
serverAssert(ctx->module);
RedisModuleAsyncRMCallPromise *promise = zmalloc(sizeof(RedisModuleAsyncRMCallPromise));
*promise = (RedisModuleAsyncRMCallPromise) {
/* We start with ref_count value of 2 because this object is held
* by the promise CallReply and the fake client that was used to execute the command. */
.ref_count = 2,
.module = ctx->module,
.on_unblocked = NULL,
.private_data = NULL,
.c = c,
.ctx = (ctx->flags & REDISMODULE_CTX_AUTO_MEMORY) ? ctx : NULL,
};
reply = callReplyCreatePromise(promise);
c->bstate.async_rm_call_handle = promise;
if (!(call_flags & CMD_CALL_PROPAGATE_AOF)) {
/* No need for AOF propagation, set the relevant flags of the client */
c->flags |= CLIENT_MODULE_PREVENT_AOF_PROP;
}
if (!(call_flags & CMD_CALL_PROPAGATE_REPL)) {
/* No need for replication propagation, set the relevant flags of the client */
c->flags |= CLIENT_MODULE_PREVENT_REPL_PROP;
}
c = NULL; /* Make sure not to free the client */
} else {
reply = moduleParseReply(c, (ctx->flags & REDISMODULE_CTX_AUTO_MEMORY) ? ctx : NULL);
}
reply = callReplyCreate(proto, c->deferred_reply_errors, ctx);
c->deferred_reply_errors = NULL; /* now the responsibility of the reply object. */
cleanup:
if (reply) autoMemoryAdd(ctx,REDISMODULE_AM_REPLY,reply);
if (ctx->module) ctx->module->in_call--;
moduleReleaseTempClient(c);
if (c) moduleReleaseTempClient(c);
return reply;
}
@ -7710,6 +7865,16 @@ RedisModuleBlockedClient *RM_BlockClientOnAuth(RedisModuleCtx *ctx, RedisModuleA
return bc;
}
/* Get the private data that was previusely set on a blocked client */
void *RM_BlockClientGetPrivateData(RedisModuleBlockedClient *blocked_client) {
return blocked_client->privdata;
}
/* Set private data on a blocked client */
void RM_BlockClientSetPrivateData(RedisModuleBlockedClient *blocked_client, void *private_data) {
blocked_client->privdata = private_data;
}
/* This call is similar to RedisModule_BlockClient(), however in this case we
* don't just block the client, but also ask Redis to unblock it automatically
* once certain keys become "ready", that is, contain more data.
@ -7959,7 +8124,7 @@ void moduleHandleBlockedClients(void) {
* to NULL, because if we reached this point, the client was
* properly unblocked by the module. */
bc->disconnect_callback = NULL;
unblockClient(c);
unblockClient(c, 1);
/* Put the client in the list of clients that need to write
* if there are pending replies here. This is needed since
* during a non blocking command the client may receive output. */
@ -8145,8 +8310,7 @@ void moduleGILAfterLock() {
serverAssert(server.execution_nesting == 0);
/* Bump up the nesting level to prevent immediate propagation
* of possible RM_Call from th thread */
server.execution_nesting++;
updateCachedTime(0);
enterExecutionUnit(1, 0);
}
/* Acquire the server lock before executing a thread safe API call.
@ -8184,7 +8348,7 @@ void moduleGILBeforeUnlock() {
/* Restore nesting level and propagate pending commands
* (because it's unclear when thread safe contexts are
* released we have to propagate here). */
server.execution_nesting--;
exitExecutionUnit();
postExecutionUnitOperations();
}
@ -8294,8 +8458,10 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti
void firePostExecutionUnitJobs() {
/* Avoid propagation of commands.
* In that way, postExecutionUnitOperations will prevent
* recursive calls to firePostExecutionUnitJobs. */
server.execution_nesting++;
* recursive calls to firePostExecutionUnitJobs.
* This is a special case where we need to increase 'execution_nesting'
* but we do not want to update the cached time */
enterExecutionUnit(0, 0);
while (listLength(modulePostExecUnitJobs) > 0) {
listNode *ln = listFirst(modulePostExecUnitJobs);
RedisModulePostExecUnitJob *job = listNodeValue(ln);
@ -8311,7 +8477,7 @@ void firePostExecutionUnitJobs() {
moduleFreeContext(&ctx);
zfree(job);
}
server.execution_nesting--;
exitExecutionUnit();
}
/* When running inside a key space notification callback, it is dangerous and highly discouraged to perform any write
@ -8374,8 +8540,11 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
*
* In order to do that we increment the execution_nesting counter, thus
* preventing postExecutionUnitOperations (from within moduleFreeContext)
* from propagating commands from CB. */
server.execution_nesting++;
* from propagating commands from CB.
*
* This is a special case where we need to increase 'execution_nesting'
* but we do not want to update the cached time */
enterExecutionUnit(0, 0);
listIter li;
listNode *ln;
@ -8406,7 +8575,7 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
}
}
server.execution_nesting--;
exitExecutionUnit();
}
/* Unsubscribe any notification subscribers this module has upon unloading */
@ -13087,6 +13256,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(CallReplySetElement);
REGISTER_API(CallReplyMapElement);
REGISTER_API(CallReplyAttributeElement);
REGISTER_API(CallReplyPromiseSetUnblockHandler);
REGISTER_API(CallReplyPromiseAbort);
REGISTER_API(CallReplyAttribute);
REGISTER_API(CallReplyType);
REGISTER_API(CallReplyLength);
@ -13201,6 +13372,8 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetKeyNameFromDigest);
REGISTER_API(GetDbIdFromDigest);
REGISTER_API(BlockClient);
REGISTER_API(BlockClientGetPrivateData);
REGISTER_API(BlockClientSetPrivateData);
REGISTER_API(BlockClientOnAuth);
REGISTER_API(UnblockClient);
REGISTER_API(IsBlockedReplyRequest);

View File

@ -1574,7 +1574,7 @@ void freeClient(client *c) {
c->querybuf = NULL;
/* Deallocate structures used to block on blocking ops. */
if (c->flags & CLIENT_BLOCKED) unblockClient(c);
if (c->flags & CLIENT_BLOCKED) unblockClient(c, 1);
dictRelease(c->bstate.keys);
/* UNWATCH all the keys */
@ -1715,6 +1715,11 @@ void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...) {
* The input client argument: c, may be NULL in case the previous client was
* freed before the call. */
int beforeNextClient(client *c) {
/* Notice, this code is also called from 'processUnblockedClients'.
* But in case of a module blocked client (see RM_Call 'K' flag) we do not reach this code path.
* So whenever we change the code here we need to consider if we need this change on module
* blocked client as well */
/* Skip the client processing if we're in an IO thread, in that case we'll perform
this operation later (this function is called again) in the fan-in stage of the threading mechanism */
if (io_threads_op != IO_THREADS_OP_IDLE)
@ -2448,6 +2453,10 @@ int processCommandAndResetClient(client *c) {
* the client. Returns C_ERR if the client is no longer valid after executing
* the command, and C_OK for all other cases. */
int processPendingCommandAndInputBuffer(client *c) {
/* Notice, this code is also called from 'processUnblockedClients'.
* But in case of a module blocked client (see RM_Call 'K' flag) we do not reach this code path.
* So whenever we change the code here we need to consider if we need this change on module
* blocked client as well */
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
@ -3905,7 +3914,7 @@ void unblockPostponedClients() {
listRewind(server.postponed_clients, &li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
unblockClient(c);
unblockClient(c, 1);
}
}

View File

@ -93,6 +93,7 @@ typedef long long ustime_t;
#define REDISMODULE_REPLY_BIG_NUMBER 9
#define REDISMODULE_REPLY_VERBATIM_STRING 10
#define REDISMODULE_REPLY_ATTRIBUTE 11
#define REDISMODULE_REPLY_PROMISE 12
/* Postponed array length. */
#define REDISMODULE_POSTPONED_ARRAY_LEN -1 /* Deprecated, please use REDISMODULE_POSTPONED_LEN */
@ -916,6 +917,7 @@ typedef int (*RedisModuleConfigSetNumericFunc)(const char *name, long long val,
typedef int (*RedisModuleConfigSetBoolFunc)(const char *name, int val, void *privdata, RedisModuleString **err);
typedef int (*RedisModuleConfigSetEnumFunc)(const char *name, int val, void *privdata, RedisModuleString **err);
typedef int (*RedisModuleConfigApplyFunc)(RedisModuleCtx *ctx, void *privdata, RedisModuleString **err);
typedef void (*RedisModuleOnUnblocked)(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data);
typedef int (*RedisModuleAuthCallback)(RedisModuleCtx *ctx, RedisModuleString *username, RedisModuleString *password, RedisModuleString **err);
typedef struct RedisModuleTypeMethods {
@ -994,6 +996,8 @@ REDISMODULE_API const char* (*RedisModule_CallReplyVerbatim)(RedisModuleCallRepl
REDISMODULE_API RedisModuleCallReply * (*RedisModule_CallReplySetElement)(RedisModuleCallReply *reply, size_t idx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_CallReplyMapElement)(RedisModuleCallReply *reply, size_t idx, RedisModuleCallReply **key, RedisModuleCallReply **val) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_CallReplyAttributeElement)(RedisModuleCallReply *reply, size_t idx, RedisModuleCallReply **key, RedisModuleCallReply **val) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_CallReplyPromiseSetUnblockHandler)(RedisModuleCallReply *reply, RedisModuleOnUnblocked on_unblock, void *private_data) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_CallReplyPromiseAbort)(RedisModuleCallReply *reply, void **private_data) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleCallReply * (*RedisModule_CallReplyAttribute)(RedisModuleCallReply *reply) REDISMODULE_ATTR;
REDISMODULE_API size_t (*RedisModule_CallReplyLength)(RedisModuleCallReply *reply) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleCallReply * (*RedisModule_CallReplyArrayElement)(RedisModuleCallReply *reply, size_t idx) REDISMODULE_ATTR;
@ -1207,6 +1211,8 @@ REDISMODULE_API int (*RedisModule_GetServerVersion)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetTypeMethodVersion)() REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_Yield)(RedisModuleCtx *ctx, int flags, const char *busy_reply) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClient)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) REDISMODULE_ATTR;
REDISMODULE_API void * (*RedisModule_BlockClientGetPrivateData)(RedisModuleBlockedClient *blocked_client) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_BlockClientSetPrivateData)(RedisModuleBlockedClient *blocked_client, void *private_data) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnAuth)(RedisModuleCtx *ctx, RedisModuleAuthCallback reply_callback, void (*free_privdata)(RedisModuleCtx*,void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_UnblockClient)(RedisModuleBlockedClient *bc, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_IsBlockedReplyRequest)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
@ -1373,6 +1379,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(CallReplySetElement);
REDISMODULE_GET_API(CallReplyMapElement);
REDISMODULE_GET_API(CallReplyAttributeElement);
REDISMODULE_GET_API(CallReplyPromiseSetUnblockHandler);
REDISMODULE_GET_API(CallReplyPromiseAbort);
REDISMODULE_GET_API(CallReplyAttribute);
REDISMODULE_GET_API(CallReplyType);
REDISMODULE_GET_API(CallReplyLength);
@ -1563,6 +1571,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ThreadSafeContextTryLock);
REDISMODULE_GET_API(ThreadSafeContextUnlock);
REDISMODULE_GET_API(BlockClient);
REDISMODULE_GET_API(BlockClientGetPrivateData);
REDISMODULE_GET_API(BlockClientSetPrivateData);
REDISMODULE_GET_API(BlockClientOnAuth);
REDISMODULE_GET_API(UnblockClient);
REDISMODULE_GET_API(IsBlockedReplyRequest);

View File

@ -3629,7 +3629,7 @@ void processClientsWaitingReplicas(void) {
if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) {
addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled.");
unblockClient(c);
unblockClient(c, 1);
continue;
}
@ -3679,7 +3679,7 @@ void processClientsWaitingReplicas(void) {
addReplyLongLong(c, numreplicas);
}
unblockClient(c);
unblockClient(c, 1);
}
}

View File

@ -1133,6 +1133,26 @@ void updateCachedTime(int update_daylight_info) {
updateCachedTimeWithUs(update_daylight_info, us);
}
/* Performing required operations in order to enter an execution unit.
* In general, if we are already inside an execution unit then there is nothing to do,
* otherwise we need to update cache times so the same cached time will be used all over
* the execution unit.
* update_cached_time - if 0, will not update the cached time even if required.
* us - if not zero, use this time for cached time, otherwise get current time. */
void enterExecutionUnit(int update_cached_time, long long us) {
if (server.execution_nesting++ == 0 && update_cached_time) {
if (us == 0) {
us = ustime();
}
updateCachedTimeWithUs(0, us);
server.cmd_time_snapshot = server.mstime;
}
}
void exitExecutionUnit() {
--server.execution_nesting;
}
void checkChildrenDone(void) {
int statloc = 0;
pid_t pid;
@ -3464,7 +3484,7 @@ void call(client *c, int flags) {
* and a client which is reprocessing command again (after being unblocked).
* Blocked clients can be blocked in different places and not always it means the call() function has been
* called. For example this is required for avoiding double logging to monitors.*/
int reprocessing_command = ((!server.execution_nesting) && (c->flags & CLIENT_EXECUTING_COMMAND)) ? 1 : 0;
int reprocessing_command = flags & CMD_CALL_REPROCESSING;
/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation. */
@ -3483,14 +3503,13 @@ void call(client *c, int flags) {
incrCommandStatsOnError(NULL, 0);
const long long call_timer = ustime();
enterExecutionUnit(1, call_timer);
/* Update cache time, and indicate we are starting command execution.
* in case we have nested calls we want to update only on the first call */
if (server.execution_nesting++ == 0) {
updateCachedTimeWithUs(0,call_timer);
server.cmd_time_snapshot = server.mstime;
c->flags |= CLIENT_EXECUTING_COMMAND;
}
/* setting the CLIENT_EXECUTING_COMMAND flag so we will avoid
* sending client side caching message in the middle of a command reply.
* In case of blocking commands, the flag will be un-set only after successfully
* re-processing and unblock the client.*/
c->flags |= CLIENT_EXECUTING_COMMAND;
monotime monotonic_start = 0;
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
@ -3498,12 +3517,11 @@ void call(client *c, int flags) {
c->cmd->proc(c);
if (--server.execution_nesting == 0) {
/* In case client is blocked after trying to execute the command,
* it means the execution is not yet completed and we MIGHT reprocess the command in the future. */
if (!(c->flags & CLIENT_BLOCKED))
c->flags &= ~(CLIENT_EXECUTING_COMMAND);
}
exitExecutionUnit();
/* In case client is blocked after trying to execute the command,
* it means the execution is not yet completed and we MIGHT reprocess the command in the future. */
if (!(c->flags & CLIENT_BLOCKED)) c->flags &= ~(CLIENT_EXECUTING_COMMAND);
/* In order to avoid performance implication due to querying the clock using a system call 3 times,
* we use a monotonic clock, when we are sure its cost is very low, and fall back to non-monotonic call otherwise. */
@ -3600,10 +3618,12 @@ void call(client *c, int flags) {
/* However prevent AOF / replication propagation if the command
* implementation called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so. */
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
c->flags & CLIENT_MODULE_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
c->flags & CLIENT_MODULE_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
@ -4115,7 +4135,9 @@ int processCommand(client *c) {
queueMultiCommand(c, cmd_flags);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL);
int flags = CMD_CALL_FULL;
if (client_reprocessing_command) flags |= CMD_CALL_REPROCESSING;
call(c,flags);
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}

View File

@ -394,6 +394,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_PUSHING (1ULL<<46) /* This client is pushing notifications. */
#define CLIENT_MODULE_AUTH_HAS_RESULT (1ULL<<47) /* Indicates a client in the middle of module based
auth had been authenticated from the Module. */
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@ -584,6 +586,7 @@ typedef enum {
#define CMD_CALL_NONE 0
#define CMD_CALL_PROPAGATE_AOF (1<<0)
#define CMD_CALL_PROPAGATE_REPL (1<<1)
#define CMD_CALL_REPROCESSING (1<<2)
#define CMD_CALL_PROPAGATE (CMD_CALL_PROPAGATE_AOF|CMD_CALL_PROPAGATE_REPL)
#define CMD_CALL_FULL (CMD_CALL_PROPAGATE)
#define CMD_CALL_FROM_MODULE (1<<2) /* From RM_Call */
@ -1024,6 +1027,10 @@ typedef struct blockingState {
void *module_blocked_handle; /* RedisModuleBlockedClient structure.
which is opaque for the Redis core, only
handled in module.c. */
void *async_rm_call_handle; /* RedisModuleAsyncRMCallPromise structure.
which is opaque for the Redis core, only
handled in module.c. */
} blockingState;
/* The following structure represents a node in the server.ready_keys list,
@ -2488,6 +2495,7 @@ void moduleTypeNameByID(char *name, uint64_t moduleid);
const char *moduleTypeModuleName(moduleType *mt);
const char *moduleNameFromCommand(struct redisCommand *cmd);
void moduleFreeContext(struct RedisModuleCtx *ctx);
void moduleCallCommandUnblockedHandler(client *c);
void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void);
void moduleBlockedClientTimedOut(client *c);
@ -3069,6 +3077,8 @@ void adjustOpenFilesLimit(void);
void incrementErrorCount(const char *fullerr, size_t namelen);
void closeListeningSockets(int unlink_unix_socket);
void updateCachedTime(int update_daylight_info);
void enterExecutionUnit(int update_cached_time, long long us);
void exitExecutionUnit();
void resetServerStats(void);
void activeDefragCycle(void);
unsigned int getLRUClock(void);
@ -3370,7 +3380,7 @@ typedef struct luaScript {
void processUnblockedClients(void);
void initClientBlockingState(client *c);
void blockClient(client *c, int btype);
void unblockClient(client *c);
void unblockClient(client *c, int queue_for_reprocessing);
void unblockClientOnTimeout(client *c);
void unblockClientOnError(client *c, const char *err_str);
void queueClientForReprocessing(client *c);

View File

@ -219,6 +219,257 @@ int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
return REDISMODULE_OK;
}
static void rm_call_async_send_reply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
RedisModule_ReplyWithCallReply(ctx, reply);
RedisModule_FreeCallReply(reply);
}
/* Called when the command that was blocked on 'RM_Call' gets unblocked
* and send the reply to the blocked client. */
static void rm_call_async_on_unblocked(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data) {
UNUSED(ctx);
RedisModuleBlockedClient *bc = private_data;
RedisModuleCtx *bctx = RedisModule_GetThreadSafeContext(bc);
rm_call_async_send_reply(bctx, reply);
RedisModule_FreeThreadSafeContext(bctx);
RedisModule_UnblockClient(bc, RedisModule_BlockClientGetPrivateData(bc));
}
int do_rm_call_async_fire_and_forget(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
UNUSED(argv);
UNUSED(argc);
if(argc < 2){
return RedisModule_WrongArity(ctx);
}
const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "!KEv", argv + 2, argc - 2);
if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) {
RedisModule_ReplyWithCallReply(ctx, rep);
} else {
RedisModule_ReplyWithSimpleString(ctx, "Blocked");
}
RedisModule_FreeCallReply(rep);
return REDISMODULE_OK;
}
static void do_rm_call_async_free_pd(RedisModuleCtx * ctx, void *pd) {
UNUSED(ctx);
RedisModule_FreeCallReply(pd);
}
static void do_rm_call_async_disconnect(RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc) {
UNUSED(ctx);
RedisModuleCallReply* rep = RedisModule_BlockClientGetPrivateData(bc);
RedisModule_CallReplyPromiseAbort(rep, NULL);
RedisModule_FreeCallReply(rep);
RedisModule_AbortBlock(bc);
}
/*
* Callback for do_rm_call_async / do_rm_call_async_script_mode
* Gets the command to invoke as the first argument to the command and runs it,
* passing the rest of the arguments to the command invocation.
* If the command got blocked, blocks the client and unblock it when the command gets unblocked,
* this allows check the K (allow blocking) argument to RM_Call.
*/
int do_rm_call_async(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
UNUSED(argv);
UNUSED(argc);
if(argc < 2){
return RedisModule_WrongArity(ctx);
}
size_t format_len = 0;
char format[6] = {0};
if (!(RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {
/* We are allowed to block the client so we can allow RM_Call to also block us */
format[format_len++] = 'K';
}
const char* invoked_cmd = RedisModule_StringPtrLen(argv[0], NULL);
if (strcasecmp(invoked_cmd, "do_rm_call_async_script_mode") == 0) {
format[format_len++] = 'S';
}
format[format_len++] = 'E';
format[format_len++] = 'v';
if (strcasecmp(invoked_cmd, "do_rm_call_async_no_replicate") != 0) {
/* Notice, without the '!' flag we will have inconsistency between master and replica.
* This is used only to check '!' flag correctness on blocked commands. */
format[format_len++] = '!';
}
const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, format, argv + 2, argc - 2);
if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) {
rm_call_async_send_reply(ctx, rep);
} else {
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, do_rm_call_async_free_pd, 0);
RedisModule_SetDisconnectCallback(bc, do_rm_call_async_disconnect);
RedisModule_BlockClientSetPrivateData(bc, rep);
RedisModule_CallReplyPromiseSetUnblockHandler(rep, rm_call_async_on_unblocked, bc);
}
return REDISMODULE_OK;
}
/* Private data for wait_and_do_rm_call_async that holds information about:
* 1. the block client, to unblock when done.
* 2. the arguments, contains the command to run using RM_Call */
typedef struct WaitAndDoRMCallCtx {
RedisModuleBlockedClient *bc;
RedisModuleString **argv;
int argc;
} WaitAndDoRMCallCtx;
/*
* This callback will be called when the 'wait' command invoke on 'wait_and_do_rm_call_async' will finish.
* This callback will continue the execution flow just like 'do_rm_call_async' command.
*/
static void wait_and_do_rm_call_async_on_unblocked(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data) {
WaitAndDoRMCallCtx *wctx = private_data;
if (RedisModule_CallReplyType(reply) != REDISMODULE_REPLY_INTEGER) {
goto done;
}
if (RedisModule_CallReplyInteger(reply) != 1) {
goto done;
}
RedisModule_FreeCallReply(reply);
reply = NULL;
const char* cmd = RedisModule_StringPtrLen(wctx->argv[0], NULL);
reply = RedisModule_Call(ctx, cmd, "!EKv", wctx->argv + 1, wctx->argc - 1);
done:
if(RedisModule_CallReplyType(reply) != REDISMODULE_REPLY_PROMISE) {
RedisModuleCtx *bctx = RedisModule_GetThreadSafeContext(wctx->bc);
rm_call_async_send_reply(bctx, reply);
RedisModule_FreeThreadSafeContext(bctx);
RedisModule_UnblockClient(wctx->bc, NULL);
} else {
RedisModule_CallReplyPromiseSetUnblockHandler(reply, rm_call_async_on_unblocked, wctx->bc);
RedisModule_FreeCallReply(reply);
}
for (int i = 0 ; i < wctx->argc ; ++i) {
RedisModule_FreeString(NULL, wctx->argv[i]);
}
RedisModule_Free(wctx->argv);
RedisModule_Free(wctx);
}
/*
* Callback for wait_and_do_rm_call
* Gets the command to invoke as the first argument, runs 'wait'
* command (using the K flag to RM_Call). Once the wait finished, runs the
* command that was given (just like 'do_rm_call_async').
*/
int wait_and_do_rm_call_async(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
if(argc < 2){
return RedisModule_WrongArity(ctx);
}
int flags = RedisModule_GetContextFlags(ctx);
if (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) {
return RedisModule_ReplyWithError(ctx, "Err can not run wait, blocking is not allowed.");
}
RedisModuleCallReply* rep = RedisModule_Call(ctx, "wait", "!EKcc", "1", "0");
if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) {
rm_call_async_send_reply(ctx, rep);
} else {
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
WaitAndDoRMCallCtx *wctx = RedisModule_Alloc(sizeof(*wctx));
*wctx = (WaitAndDoRMCallCtx){
.bc = bc,
.argv = RedisModule_Alloc((argc - 1) * sizeof(RedisModuleString*)),
.argc = argc - 1,
};
for (int i = 1 ; i < argc ; ++i) {
wctx->argv[i - 1] = RedisModule_HoldString(NULL, argv[i]);
}
RedisModule_CallReplyPromiseSetUnblockHandler(rep, wait_and_do_rm_call_async_on_unblocked, wctx);
RedisModule_FreeCallReply(rep);
}
return REDISMODULE_OK;
}
static void blpop_and_set_multiple_keys_on_unblocked(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data) {
/* ignore the reply */
RedisModule_FreeCallReply(reply);
WaitAndDoRMCallCtx *wctx = private_data;
for (int i = 0 ; i < wctx->argc ; i += 2) {
RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!ss", wctx->argv[i], wctx->argv[i + 1]);
RedisModule_FreeCallReply(rep);
}
RedisModuleCtx *bctx = RedisModule_GetThreadSafeContext(wctx->bc);
RedisModule_ReplyWithSimpleString(bctx, "OK");
RedisModule_FreeThreadSafeContext(bctx);
RedisModule_UnblockClient(wctx->bc, NULL);
for (int i = 0 ; i < wctx->argc ; ++i) {
RedisModule_FreeString(NULL, wctx->argv[i]);
}
RedisModule_Free(wctx->argv);
RedisModule_Free(wctx);
}
/*
* Performs a blpop command on a given list and when unblocked set multiple string keys.
* This command allows checking that the unblock callback is performed as a unit
* and its effect are replicated to the replica and AOF wrapped with multi exec.
*/
int blpop_and_set_multiple_keys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
if(argc < 2 || argc % 2 != 0){
return RedisModule_WrongArity(ctx);
}
int flags = RedisModule_GetContextFlags(ctx);
if (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) {
return RedisModule_ReplyWithError(ctx, "Err can not run wait, blocking is not allowed.");
}
RedisModuleCallReply* rep = RedisModule_Call(ctx, "blpop", "!EKsc", argv[1], "0");
if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) {
rm_call_async_send_reply(ctx, rep);
} else {
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
WaitAndDoRMCallCtx *wctx = RedisModule_Alloc(sizeof(*wctx));
*wctx = (WaitAndDoRMCallCtx){
.bc = bc,
.argv = RedisModule_Alloc((argc - 2) * sizeof(RedisModuleString*)),
.argc = argc - 2,
};
for (int i = 0 ; i < argc - 2 ; ++i) {
wctx->argv[i] = RedisModule_HoldString(NULL, argv[i + 2]);
}
RedisModule_CallReplyPromiseSetUnblockHandler(rep, blpop_and_set_multiple_keys_on_unblocked, wctx);
RedisModule_FreeCallReply(rep);
}
return REDISMODULE_OK;
}
/* simulate a blocked client replying to a thread safe context without creating a thread */
int do_fake_bg_true(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
UNUSED(argv);
@ -316,6 +567,30 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"write", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "do_rm_call_async", do_rm_call_async,
"write", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "do_rm_call_async_script_mode", do_rm_call_async,
"write", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "do_rm_call_async_no_replicate", do_rm_call_async,
"write", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "do_rm_call_fire_and_forget", do_rm_call_async_fire_and_forget,
"write", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "wait_and_do_rm_call", wait_and_do_rm_call_async,
"write", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "blpop_and_set_multiple_keys", blpop_and_set_multiple_keys,
"write", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;

View File

@ -0,0 +1,380 @@
set testmodule [file normalize tests/modules/blockedclient.so]
set testmodule2 [file normalize tests/modules/postnotifications.so]
set testmodule3 [file normalize tests/modules/blockonkeys.so]
start_server {tags {"modules"}} {
r module load $testmodule
test {Locked GIL acquisition from async RM_Call} {
assert_equal {OK} [r do_rm_call_async acquire_gil]
}
test "Blpop on async RM_Call fire and forget" {
assert_equal {Blocked} [r do_rm_call_fire_and_forget blpop l 0]
r lpush l a
assert_equal {0} [r llen l]
}
foreach cmd {do_rm_call_async do_rm_call_async_script_mode} {
test "Blpop on async RM_Call using $cmd" {
set rd [redis_deferring_client]
$rd $cmd blpop l 0
wait_for_blocked_clients_count 1
r lpush l a
assert_equal [$rd read] {l a}
wait_for_blocked_clients_count 0
}
test "Brpop on async RM_Call using $cmd" {
set rd [redis_deferring_client]
$rd $cmd brpop l 0
wait_for_blocked_clients_count 1
r lpush l a
assert_equal [$rd read] {l a}
wait_for_blocked_clients_count 0
}
test "Brpoplpush on async RM_Call using $cmd" {
set rd [redis_deferring_client]
$rd $cmd brpoplpush l1 l2 0
wait_for_blocked_clients_count 1
r lpush l1 a
assert_equal [$rd read] {a}
wait_for_blocked_clients_count 0
r lpop l2
} {a}
test "Blmove on async RM_Call using $cmd" {
set rd [redis_deferring_client]
$rd $cmd blmove l1 l2 LEFT LEFT 0
wait_for_blocked_clients_count 1
r lpush l1 a
assert_equal [$rd read] {a}
wait_for_blocked_clients_count 0
r lpop l2
} {a}
test "Bzpopmin on async RM_Call using $cmd" {
set rd [redis_deferring_client]
$rd $cmd bzpopmin s 0
wait_for_blocked_clients_count 1
r zadd s 10 foo
assert_equal [$rd read] {s foo 10}
wait_for_blocked_clients_count 0
}
test "Bzpopmax on async RM_Call using $cmd" {
set rd [redis_deferring_client]
$rd $cmd bzpopmax s 0
wait_for_blocked_clients_count 1
r zadd s 10 foo
assert_equal [$rd read] {s foo 10}
wait_for_blocked_clients_count 0
}
}
test {Nested async RM_Call} {
set rd [redis_deferring_client]
$rd do_rm_call_async do_rm_call_async do_rm_call_async do_rm_call_async blpop l 0
wait_for_blocked_clients_count 1
r lpush l a
assert_equal [$rd read] {l a}
wait_for_blocked_clients_count 0
}
test {Test multiple async RM_Call waiting on the same event} {
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
$rd1 do_rm_call_async do_rm_call_async do_rm_call_async do_rm_call_async blpop l 0
$rd2 do_rm_call_async do_rm_call_async do_rm_call_async do_rm_call_async blpop l 0
wait_for_blocked_clients_count 2
r lpush l element element
assert_equal [$rd1 read] {l element}
assert_equal [$rd2 read] {l element}
wait_for_blocked_clients_count 0
}
test {async RM_Call calls RM_Call} {
assert_equal {PONG} [r do_rm_call_async do_rm_call ping]
}
test {async RM_Call calls background RM_Call calls RM_Call} {
assert_equal {PONG} [r do_rm_call_async do_bg_rm_call do_rm_call ping]
}
test {async RM_Call calls background RM_Call calls RM_Call calls async RM_Call} {
assert_equal {PONG} [r do_rm_call_async do_bg_rm_call do_rm_call do_rm_call_async ping]
}
test {async RM_Call inside async RM_Call callback} {
set rd [redis_deferring_client]
$rd wait_and_do_rm_call blpop l 0
wait_for_blocked_clients_count 1
start_server {} {
test "Connect a replica to the master instance" {
r slaveof [srv -1 host] [srv -1 port]
wait_for_condition 50 100 {
[s role] eq {slave} &&
[string match {*master_link_status:up*} [r info replication]]
} else {
fail "Can't turn the instance into a replica"
}
}
assert_equal {1} [r -1 lpush l a]
assert_equal [$rd read] {l a}
}
wait_for_blocked_clients_count 0
}
test {Become replica while having async RM_Call running} {
r flushall
set rd [redis_deferring_client]
$rd do_rm_call_async blpop l 0
wait_for_blocked_clients_count 1
#become a replica of a not existing redis
r replicaof localhost 30000
catch {[$rd read]} e
assert_match {UNBLOCKED force unblock from blocking operation*} $e
wait_for_blocked_clients_count 0
r replicaof no one
r lpush l 1
# make sure the async rm_call was aborted
assert_equal [r llen l] {1}
}
test {Pipeline with blocking RM_Call} {
r flushall
set rd [redis_deferring_client]
set buf ""
append buf "do_rm_call_async blpop l 0\r\n"
append buf "ping\r\n"
$rd write $buf
$rd flush
wait_for_blocked_clients_count 1
# release the blocked client
r lpush l 1
assert_equal [$rd read] {l 1}
assert_equal [$rd read] {PONG}
wait_for_blocked_clients_count 0
}
test {blocking RM_Call abort} {
r flushall
set rd [redis_deferring_client]
$rd client id
set client_id [$rd read]
$rd do_rm_call_async blpop l 0
wait_for_blocked_clients_count 1
r client kill ID $client_id
assert_error {*error reading reply*} {$rd read}
wait_for_blocked_clients_count 0
r lpush l 1
# make sure the async rm_call was aborted
assert_equal [r llen l] {1}
}
}
start_server {tags {"modules"}} {
r module load $testmodule
test {Test basic replication stream on unblock handler} {
r flushall
set repl [attach_to_replication_stream]
set rd [redis_deferring_client]
$rd do_rm_call_async blpop l 0
wait_for_blocked_clients_count 1
r lpush l a
assert_equal [$rd read] {l a}
assert_replication_stream $repl {
{select *}
{lpush l a}
{lpop l}
}
close_replication_stream $repl
wait_for_blocked_clients_count 0
}
test {Test unblock handler are executed as a unit} {
r flushall
set repl [attach_to_replication_stream]
set rd [redis_deferring_client]
$rd blpop_and_set_multiple_keys l x 1 y 2
wait_for_blocked_clients_count 1
r lpush l a
assert_equal [$rd read] {OK}
assert_replication_stream $repl {
{select *}
{lpush l a}
{multi}
{lpop l}
{set x 1}
{set y 2}
{exec}
}
close_replication_stream $repl
wait_for_blocked_clients_count 0
}
test {Test no propagation of blocking command} {
r flushall
set repl [attach_to_replication_stream]
set rd [redis_deferring_client]
$rd do_rm_call_async_no_replicate blpop l 0
wait_for_blocked_clients_count 1
r lpush l a
assert_equal [$rd read] {l a}
# make sure the lpop are not replicated
r set x 1
assert_replication_stream $repl {
{select *}
{lpush l a}
{set x 1}
}
close_replication_stream $repl
wait_for_blocked_clients_count 0
}
}
start_server {tags {"modules"}} {
r module load $testmodule
r module load $testmodule2
test {Test unblock handler are executed as a unit with key space notifications} {
r flushall
set repl [attach_to_replication_stream]
set rd [redis_deferring_client]
$rd blpop_and_set_multiple_keys l string_foo 1 string_bar 2
wait_for_blocked_clients_count 1
r lpush l a
assert_equal [$rd read] {OK}
assert_replication_stream $repl {
{select *}
{lpush l a}
{multi}
{lpop l}
{set string_foo 1}
{set string_bar 2}
{incr string_changed{string_foo}}
{incr string_changed{string_bar}}
{incr string_total}
{incr string_total}
{exec}
}
close_replication_stream $repl
wait_for_blocked_clients_count 0
}
test {Test unblock handler are executed as a unit with lazy expire} {
r flushall
r DEBUG SET-ACTIVE-EXPIRE 0
set repl [attach_to_replication_stream]
set rd [redis_deferring_client]
$rd blpop_and_set_multiple_keys l string_foo 1 string_bar 2
wait_for_blocked_clients_count 1
r lpush l a
assert_equal [$rd read] {OK}
# set expiration on string_foo
r pexpire string_foo 1
after 10
# now the key should have been expired
$rd blpop_and_set_multiple_keys l string_foo 1 string_bar 2
wait_for_blocked_clients_count 1
r lpush l a
assert_equal [$rd read] {OK}
assert_replication_stream $repl {
{select *}
{lpush l a}
{multi}
{lpop l}
{set string_foo 1}
{set string_bar 2}
{incr string_changed{string_foo}}
{incr string_changed{string_bar}}
{incr string_total}
{incr string_total}
{exec}
{pexpireat string_foo *}
{lpush l a}
{multi}
{lpop l}
{del string_foo}
{set string_foo 1}
{set string_bar 2}
{incr expired}
{incr string_changed{string_foo}}
{incr string_changed{string_bar}}
{incr string_total}
{incr string_total}
{exec}
}
close_replication_stream $repl
r DEBUG SET-ACTIVE-EXPIRE 1
}
wait_for_blocked_clients_count 0
}
start_server {tags {"modules"}} {
r module load $testmodule
r module load $testmodule3
test {Test unblock handler on module blocked on keys} {
set rd [redis_deferring_client]
r fsl.push l 1
$rd do_rm_call_async FSL.BPOPGT l 3 0
wait_for_blocked_clients_count 1
r fsl.push l 2
r fsl.push l 3
r fsl.push l 4
assert_equal [$rd read] {4}
wait_for_blocked_clients_count 0
}
}

View File

@ -217,41 +217,45 @@ start_server {tags {"scripting"}} {
} {*execution time*}
}
test {EVAL - Scripts can't run blpop command} {
set e {}
catch {run_script {return redis.pcall('blpop','x',0)} 1 x} e
set e
} {*not allowed*}
test {EVAL - Scripts do not block on blpop command} {
r lpush l 1
r lpop l
run_script {return redis.pcall('blpop','l',0)} 1 l
} {}
test {EVAL - Scripts can't run brpop command} {
set e {}
catch {run_script {return redis.pcall('brpop','empty_list',0)} 1 empty_list} e
set e
} {*not allowed*}
test {EVAL - Scripts do not block on brpop command} {
r lpush l 1
r lpop l
run_script {return redis.pcall('brpop','l',0)} 1 l
} {}
test {EVAL - Scripts can't run brpoplpush command} {
set e {}
catch {run_script {return redis.pcall('brpoplpush','empty_list1{t}', 'empty_list2{t}',0)} 2 empty_list1{t} empty_list2{t}} e
set e
} {*not allowed*}
test {EVAL - Scripts do not block on brpoplpush command} {
r lpush empty_list1{t} 1
r lpop empty_list1{t}
run_script {return redis.pcall('brpoplpush','empty_list1{t}', 'empty_list2{t}',0)} 2 empty_list1{t} empty_list2{t}
} {}
test {EVAL - Scripts can't run blmove command} {
set e {}
catch {run_script {return redis.pcall('blmove','empty_list1{t}', 'empty_list2{t}', 'LEFT', 'LEFT', 0)} 2 empty_list1{t} empty_list2{t}} e
set e
} {*not allowed*}
test {EVAL - Scripts do not block on blmove command} {
r lpush empty_list1{t} 1
r lpop empty_list1{t}
run_script {return redis.pcall('blmove','empty_list1{t}', 'empty_list2{t}', 'LEFT', 'LEFT', 0)} 2 empty_list1{t} empty_list2{t}
} {}
test {EVAL - Scripts can't run bzpopmin command} {
set e {}
catch {run_script {return redis.pcall('bzpopmin','empty_zset', 0)} 1 empty_zset} e
set e
} {*not allowed*}
test {EVAL - Scripts do not block on bzpopmin command} {
r zadd empty_zset 10 foo
r zmpop 1 empty_zset MIN
run_script {return redis.pcall('bzpopmin','empty_zset', 0)} 1 empty_zset
} {}
test {EVAL - Scripts can't run bzpopmax command} {
set e {}
catch {run_script {return redis.pcall('bzpopmax','empty_zset', 0)} 1 empty_zset} e
set e
} {*not allowed*}
test {EVAL - Scripts do not block on bzpopmax command} {
r zadd empty_zset 10 foo
r zmpop 1 empty_zset MIN
run_script {return redis.pcall('bzpopmax','empty_zset', 0)} 1 empty_zset
} {}
test {EVAL - Scripts do not block on wait} {
run_script {return redis.pcall('wait','1','0')} 0
} {0}
test {EVAL - Scripts can't run XREAD and XREADGROUP with BLOCK option} {
r del s