Module API to allow writes after key space notification hooks (#11199)

### Summary of API additions

* `RedisModule_AddPostNotificationJob` - new API to call inside a key space
  notification (and on more locations in the future) and allow to add a post job as describe above.
* New module option, `REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS`,
  allows to disable Redis protection of nested key-space notifications.
* `RedisModule_GetModuleOptionsAll` - gets the mask of all supported module options so a module
  will be able to check if a given option is supported by the current running Redis instance.

### Background

The following PR is a proposal of handling write operations inside module key space notifications.
After a lot of discussions we came to a conclusion that module should not perform any write
operations on key space notification.

Some examples of issues that such write operation can cause are describe on the following links:

* Bad replication oreder - https://github.com/redis/redis/pull/10969
* Used after free - https://github.com/redis/redis/pull/10969#issuecomment-1223771006
* Used after free - https://github.com/redis/redis/pull/9406#issuecomment-1221684054

There are probably more issues that are yet to be discovered. The underline problem with writing
inside key space notification is that the notification runs synchronously, this means that the notification
code will be executed in the middle on Redis logic (commands logic, eviction, expire).
Redis **do not assume** that the data might change while running the logic and such changes
can crash Redis or cause unexpected behaviour.

The solution is to state that modules **should not** perform any write command inside key space
notification (we can chose whether or not we want to force it). To still cover the use-case where
module wants to perform a write operation as a reaction to key space notifications, we introduce
a new API , `RedisModule_AddPostNotificationJob`, that allows to register a callback that will be
called by Redis when the following conditions hold:

* It is safe to perform any write operation.
* The job will be called atomically along side the operation that triggers it (in our case, key
  space notification).

Module can use this new API to safely perform any write operation and still achieve atomicity
between the notification and the write.

Although currently the API is supported on key space notifications, the API is written in a generic
way so that in the future we will be able to use it on other places (server events for example).

### Technical Details

Whenever a module uses `RedisModule_AddPostNotificationJob` the callback is added to a list
of callbacks (called `modulePostExecUnitJobs`) that need to be invoke after the current execution
unit ends (whether its a command, eviction, or active expire). In order to trigger those callback
atomically with the notification effect, we call those callbacks on `postExecutionUnitOperations`
(which was `propagatePendingCommands` before this PR). The new function fires the post jobs
and then calls `propagatePendingCommands`.

If the callback perform more operations that triggers more key space notifications. Those keys
space notifications might register more callbacks. Those callbacks will be added to the end
of `modulePostExecUnitJobs` list and will be invoke atomically after the current callback ends.
This raises a concerns of entering an infinite loops, we consider infinite loops as a logical bug
that need to be fixed in the module, an attempt to protect against infinite loops by halting the
execution could result in violation of the feature correctness and so **Redis will make no attempt
to protect the module from infinite loops**

In addition, currently key space notifications are not nested. Some modules might want to allow
nesting key-space notifications. To allow that and keep backward compatibility, we introduce a
new module option called `REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS`.
Setting this option will disable the Redis key-space notifications nesting protection and will
pass this responsibility to the module.

### Redis infrastructure

This PR promotes the existing `propagatePendingCommands` to an "Execution Unit" concept,
which is called after each atomic unit of execution,

Co-authored-by: Oran Agra <oran@redislabs.com>
Co-authored-by: Yossi Gottlieb <yossigo@gmail.com>
Co-authored-by: Madelyn Olson <34459052+madolson@users.noreply.github.com>
This commit is contained in:
Meir Shpilraien (Spielrein) 2022-11-24 19:00:04 +02:00 committed by GitHub
parent ae1de54900
commit abc345ad28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 651 additions and 14 deletions

View File

@ -51,4 +51,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/timer \
--single unit/moduleapi/publish \
--single unit/moduleapi/usercall \
--single unit/moduleapi/postnotifications \
"${@}"

View File

@ -635,7 +635,7 @@ void handleClientsBlockedOnKeys(void) {
if (!o) {
/* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to
* take care of the propagation here, because afterCommand wasn't called */
propagatePendingCommands();
postExecutionUnitOperations();
} else {
if (o->type == OBJ_LIST)
serveClientsBlockedOnListKey(o,rl);

View File

@ -7361,8 +7361,9 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
signalModifiedKey(NULL, &server.db[0], key);
moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
postExecutionUnitOperations();
decrRefCount(key);
propagatePendingCommands();
postExecutionUnitOperations();
j++;
server.dirty++;
}

View File

@ -691,7 +691,7 @@ int performEvictions(void) {
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
propagateDeletion(db,keyobj,server.lazyfree_lazy_eviction);
propagatePendingCommands();
postExecutionUnitOperations();
decrRefCount(keyobj);
keys_freed++;

View File

@ -267,7 +267,7 @@ void activeExpireCycle(int type) {
if (activeExpireCycleTryExpire(db,e,now)) {
expired++;
/* Propagate the DEL command */
propagatePendingCommands();
postExecutionUnitOperations();
}
if (ttl > 0) {
/* We want the average TTL of keys yet

View File

@ -294,6 +294,9 @@ static pthread_mutex_t moduleGIL = PTHREAD_MUTEX_INITIALIZER;
/* Function pointer type for keyspace event notification subscriptions from modules. */
typedef int (*RedisModuleNotificationFunc) (RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
/* Function pointer type for post jobs */
typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd);
/* Keyspace notification subscriber information.
* See RM_SubscribeToKeyspaceEvents() for more information. */
typedef struct RedisModuleKeyspaceSubscriber {
@ -308,9 +311,21 @@ typedef struct RedisModuleKeyspaceSubscriber {
int active;
} RedisModuleKeyspaceSubscriber;
typedef struct RedisModulePostExecUnitJob {
/* The module subscribed to the event */
RedisModule *module;
RedisModulePostNotificationJobFunc callback;
void *pd;
void (*free_pd)(void*);
int dbid;
} RedisModulePostExecUnitJob;
/* The module keyspace notification subscribers list */
static list *moduleKeyspaceSubscribers;
/* The module post keyspace jobs list */
static list *modulePostExecUnitJobs;
/* Data structures related to the exported dictionary data structure. */
typedef struct RedisModuleDict {
rax *rax; /* The radix tree. */
@ -729,8 +744,9 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
/* Modules take care of their own propagation, when we are
* outside of call() context (timers, events, etc.). */
if (--server.module_ctx_nesting == 0) {
if (!server.core_propagates)
propagatePendingCommands();
if (!server.core_propagates) {
postExecutionUnitOperations();
}
if (server.busy_module_yield_flags) {
blockingOperationEnds();
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
@ -2207,7 +2223,13 @@ void RM_Yield(RedisModuleCtx *ctx, int flags, const char *busy_reply) {
* REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD:
* Setting this flag indicates module awareness of diskless async replication (repl-diskless-load=swapdb)
* and that redis could be serving reads during replication instead of blocking with LOADING status.
*/
*
* REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS:
* Declare that the module wants to get nested key-space notifications.
* By default, Redis will not fire key-space notifications that happened inside
* a key-space notification callback. This flag allows to change this behavior
* and fire nested key-space notifications. Notice: if enabled, the module
* should protected itself from infinite recursion. */
void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
ctx->module->options = options;
}
@ -7905,7 +7927,7 @@ void moduleGILBeforeUnlock() {
* (because it's u clear when thread safe contexts are
* released we have to propagate here). */
server.module_ctx_nesting--;
propagatePendingCommands();
postExecutionUnitOperations();
if (server.busy_module_yield_flags) {
blockingOperationEnds();
@ -8000,6 +8022,12 @@ void moduleReleaseGIL(void) {
* so notification callbacks must to be fast, or they would slow Redis down.
* If you need to take long actions, use threads to offload them.
*
* Moreover, the fact that the notification is executed synchronously means
* that the notification code will be executed in the middle on Redis logic
* (commands logic, eviction, expire). Changing the key space while the logic
* runs is dangerous and discouraged. In order to react to key space events with
* write actions, please refer to `RM_AddPostExecutionUnitJob`.
*
* See https://redis.io/topics/notifications for more information.
*/
int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc callback) {
@ -8013,6 +8041,53 @@ int RM_SubscribeToKeyspaceEvents(RedisModuleCtx *ctx, int types, RedisModuleNoti
return REDISMODULE_OK;
}
void firePostExecutionUnitJobs() {
/* Avoid propagation of commands. */
server.in_nested_call++;
while (listLength(modulePostExecUnitJobs) > 0) {
listNode *ln = listFirst(modulePostExecUnitJobs);
RedisModulePostExecUnitJob *job = listNodeValue(ln);
listDelNode(modulePostExecUnitJobs, ln);
RedisModuleCtx ctx;
moduleCreateContext(&ctx, job->module, REDISMODULE_CTX_TEMP_CLIENT);
selectDb(ctx.client, job->dbid);
job->callback(&ctx, job->pd);
if (job->free_pd) job->free_pd(job->pd);
moduleFreeContext(&ctx);
zfree(job);
}
server.in_nested_call--;
}
/* When running inside a key space notification callback, it is dangerous and highly discouraged to perform any write
* operation (See `RM_SubscribeToKeyspaceEvents`). In order to still perform write actions in this scenario,
* Redis provides `RM_AddPostNotificationJob` API. The API allows to register a job callback which Redis will call
* when the following condition are promised to be fulfilled:
* 1. It is safe to perform any write operation.
* 2. The job will be called atomically along side the key space notification.
*
* Notice, one job might trigger key space notifications that will trigger more jobs.
* This raises a concerns of entering an infinite loops, we consider infinite loops
* as a logical bug that need to be fixed in the module, an attempt to protect against
* infinite loops by halting the execution could result in violation of the feature correctness
* and so Redis will make no attempt to protect the module from infinite loops.
*
* 'free_pd' can be NULL and in such case will not be used. */
int RM_AddPostNotificationJob(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *privdata, void (*free_privdata)(void*)) {
RedisModulePostExecUnitJob *job = zmalloc(sizeof(*job));
job->module = ctx->module;
job->callback = callback;
job->pd = privdata;
job->free_pd = free_privdata;
job->dbid = ctx->client->db->id;
listAddNodeTail(modulePostExecUnitJobs, job);
return REDISMODULE_OK;
}
/* Get the configured bitmap of notify-keyspace-events (Could be used
* for additional filtering in RedisModuleNotificationFunc) */
int RM_GetNotifyKeyspaceEvents() {
@ -8045,7 +8120,8 @@ void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid)
RedisModuleKeyspaceSubscriber *sub = ln->value;
/* Only notify subscribers on events matching the registration,
* and avoid subscribers triggering themselves */
if ((sub->event_mask & type) && sub->active == 0) {
if ((sub->event_mask & type) &&
(sub->active == 0 || (sub->module->options & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS))) {
RedisModuleCtx ctx;
moduleCreateContext(&ctx, sub->module, REDISMODULE_CTX_TEMP_CLIENT);
selectDb(ctx.client, dbid);
@ -11116,6 +11192,8 @@ void moduleInitModulesSystem(void) {
/* Set up the keyspace notification subscriber list and static client */
moduleKeyspaceSubscribers = listCreate();
modulePostExecUnitJobs = listCreate();
/* Set up filter list */
moduleCommandFilters = listCreate();
@ -12234,6 +12312,23 @@ int RM_GetLFU(RedisModuleKey *key, long long *lfu_freq) {
* ## Miscellaneous APIs
* -------------------------------------------------------------------------- */
/**
* Returns the full module options flags mask, using the return value
* the module can check if a certain set of module options are supported
* by the redis server version in use.
* Example:
*
* int supportedFlags = RM_GetModuleOptionsAll();
* if (supportedFlags & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS) {
* // REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS is supported
* } else{
* // REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS is not supported
* }
*/
int RM_GetModuleOptionsAll() {
return _REDISMODULE_OPTIONS_FLAGS_NEXT - 1;
}
/**
* Returns the full ContextFlags mask, using the return value
* the module can check if a certain set of flags are supported
@ -12825,6 +12920,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(NotifyKeyspaceEvent);
REGISTER_API(GetNotifyKeyspaceEvents);
REGISTER_API(SubscribeToKeyspaceEvents);
REGISTER_API(AddPostNotificationJob);
REGISTER_API(RegisterClusterMessageReceiver);
REGISTER_API(SendClusterMessage);
REGISTER_API(GetClusterNodeInfo);
@ -12932,6 +13028,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(AuthenticateClientWithACLUser);
REGISTER_API(AuthenticateClientWithUser);
REGISTER_API(GetContextFlagsAll);
REGISTER_API(GetModuleOptionsAll);
REGISTER_API(GetKeyspaceNotificationFlagsAll);
REGISTER_API(IsSubEventSupported);
REGISTER_API(GetServerVersion);

View File

@ -261,6 +261,15 @@ typedef uint64_t RedisModuleTimerID;
/* Declare that the module can handle diskless async replication with RedisModule_SetModuleOptions. */
#define REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD (1<<2)
/* Declare that the module want to get nested key space notifications.
* If enabled, the module is responsible to break endless loop. */
#define REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS (1<<3)
/* Next option flag, must be updated when adding new module flags above!
* This flag should not be used directly by the module.
* Use RedisModule_GetModuleOptionsAll instead. */
#define _REDISMODULE_OPTIONS_FLAGS_NEXT (1<<4)
/* Definitions for RedisModule_SetCommandInfo. */
typedef enum {
@ -834,6 +843,7 @@ typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx;
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
typedef int (*RedisModuleNotificationFunc)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key);
typedef void (*RedisModulePostNotificationJobFunc) (RedisModuleCtx *ctx, void *pd);
typedef void *(*RedisModuleTypeLoadFunc)(RedisModuleIO *rdb, int encver);
typedef void (*RedisModuleTypeSaveFunc)(RedisModuleIO *rdb, void *value);
typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int when);
@ -1146,6 +1156,7 @@ REDISMODULE_API void (*RedisModule_ScanCursorDestroy)(RedisModuleScanCursor *cur
REDISMODULE_API int (*RedisModule_Scan)(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanCB fn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ScanKey)(RedisModuleKey *key, RedisModuleScanCursor *cursor, RedisModuleScanKeyCB fn, void *privdata) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetContextFlagsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetModuleOptionsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetKeyspaceNotificationFlagsAll)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_IsSubEventSupported)(RedisModuleEvent event, uint64_t subevent) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetServerVersion)() REDISMODULE_ATTR;
@ -1167,6 +1178,7 @@ REDISMODULE_API void (*RedisModule_ThreadSafeContextLock)(RedisModuleCtx *ctx) R
REDISMODULE_API int (*RedisModule_ThreadSafeContextTryLock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_ThreadSafeContextUnlock)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_SubscribeToKeyspaceEvents)(RedisModuleCtx *ctx, int types, RedisModuleNotificationFunc cb) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_AddPostNotificationJob)(RedisModuleCtx *ctx, RedisModulePostNotificationJobFunc callback, void *pd, void (*free_pd)(void*)) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_NotifyKeyspaceEvent)(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetNotifyKeyspaceEvents)() REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_BlockedClientDisconnected)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
@ -1490,6 +1502,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(Scan);
REDISMODULE_GET_API(ScanKey);
REDISMODULE_GET_API(GetContextFlagsAll);
REDISMODULE_GET_API(GetModuleOptionsAll);
REDISMODULE_GET_API(GetKeyspaceNotificationFlagsAll);
REDISMODULE_GET_API(IsSubEventSupported);
REDISMODULE_GET_API(GetServerVersion);
@ -1512,6 +1525,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(BlockedClientMeasureTimeEnd);
REDISMODULE_GET_API(SetDisconnectCallback);
REDISMODULE_GET_API(SubscribeToKeyspaceEvents);
REDISMODULE_GET_API(AddPostNotificationJob);
REDISMODULE_GET_API(NotifyKeyspaceEvent);
REDISMODULE_GET_API(GetNotifyKeyspaceEvents);
REDISMODULE_GET_API(BlockedClientDisconnected);

View File

@ -3226,7 +3226,7 @@ void updateCommandLatencyHistogram(struct hdr_histogram **latency_histogram, int
/* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
void propagatePendingCommands() {
static void propagatePendingCommands() {
if (server.also_propagate.numops == 0)
return;
@ -3262,6 +3262,31 @@ void propagatePendingCommands() {
redisOpArrayFree(&server.also_propagate);
}
/* Performs operations that should be performed after an execution unit ends.
* Execution unit is a code that should be done atomically.
* Execution units can be nested and are not necessarily starts with Redis command.
*
* For example the following is a logical unit:
* active expire ->
* trigger del notification of some module ->
* accessing a key ->
* trigger key miss notification of some other module
*
* What we want to achieve is that the entire execution unit will be done atomically,
* currently with respect to replication and post jobs, but in the future there might
* be other considerations. So we basically want the `postUnitOperations` to trigger
* after the entire chain finished.
*
* Current, in order to avoid massive code changes that could be risky to cherry-pick,
* we count on the mechanism we already have such as `server.core_propagation`,
* `server.module_ctx_nesting`, and `server.in_nested_call`. We understand that we probably
* do not need all of those variable and we will make an attempt to re-arrange it on unstable
* branch. */
void postExecutionUnitOperations() {
firePostExecutionUnitJobs();
propagatePendingCommands();
}
/* Increment the command failure counters (either rejected_calls or failed_calls).
* The decision which counter to increment is done using the flags argument, options are:
* * ERROR_COMMAND_REJECTED - update rejected_calls
@ -3576,8 +3601,9 @@ void afterCommand(client *c) {
/* If we are at the top-most call() we can propagate what we accumulated.
* Should be done before trackingHandlePendingKeyInvalidations so that we
* reply to client before invalidating cache (makes more sense) */
if (server.core_propagates)
propagatePendingCommands();
if (server.core_propagates) {
postExecutionUnitOperations();
}
/* Flush pending invalidation messages only when we are not in nested call.
* So the messages are not interleaved with transaction response. */
trackingHandlePendingKeyInvalidations();

View File

@ -2424,6 +2424,7 @@ void moduleAcquireGIL(void);
int moduleTryAcquireGIL(void);
void moduleReleaseGIL(void);
void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid);
void firePostExecutionUnitJobs();
void moduleCallCommandFilters(client *c);
void ModuleForkDoneHandler(int exitcode, int bysignal);
int TerminateModuleForkChild(int child_pid, int wait);
@ -2946,7 +2947,7 @@ void startCommandExecution();
int incrCommandStatsOnError(struct redisCommand *cmd, int flags);
void call(client *c, int flags);
void alsoPropagate(int dbid, robj **argv, int argc, int target);
void propagatePendingCommands();
void postExecutionUnitOperations();
void redisOpArrayFree(redisOpArray *oa);
void forceCommandPropagation(client *c, int flags);
void preventCommandPropagation(client *c);

View File

@ -59,7 +59,8 @@ TEST_MODULES = \
moduleconfigs.so \
moduleconfigstwo.so \
publish.so \
usercall.so
usercall.so \
postnotifications.so
.PHONY: all

View File

@ -131,6 +131,49 @@ static int KeySpace_NotificationModuleKeyMiss(RedisModuleCtx *ctx, int type, con
return REDISMODULE_OK;
}
static int KeySpace_NotificationModuleString(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
RedisModuleKey *redis_key = RedisModule_OpenKey(ctx, key, REDISMODULE_READ);
size_t len = 0;
/* RedisModule_StringDMA could change the data format and cause the old robj to be freed.
* This code verifies that such format change will not cause any crashes.*/
char *data = RedisModule_StringDMA(redis_key, &len, REDISMODULE_READ);
int res = strncmp(data, "dummy", 5);
REDISMODULE_NOT_USED(res);
RedisModule_CloseKey(redis_key);
return REDISMODULE_OK;
}
static void KeySpace_PostNotificationStringFreePD(void *pd) {
RedisModule_FreeString(NULL, pd);
}
static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) {
REDISMODULE_NOT_USED(ctx);
RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!s", pd);
RedisModule_FreeCallReply(rep);
}
static int KeySpace_NotificationModuleStringPostNotificationJob(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "string1_", 8) != 0) {
return REDISMODULE_OK;
}
RedisModuleString *new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str);
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
}
static int KeySpace_NotificationModule(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
@ -312,6 +355,14 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationModuleString) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationModuleStringPostNotificationJob) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx,"keyspace.notify", cmdNotify,"",0,0,0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}

View File

@ -0,0 +1,236 @@
/* This module is used to test the server post keyspace jobs API.
*
* -----------------------------------------------------------------------------
*
* Copyright (c) 2020, Meir Shpilraien <meir at redislabs dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/* This module allow to verify 'RedisModule_AddPostNotificationJob' by registering to 3
* key space event:
* * STRINGS - the module register to all strings notifications and set post notification job
* that increase a counter indicating how many times the string key was changed.
* In addition, it increase another counter that counts the total changes that
* was made on all strings keys.
* * EXPIRED - the module register to expired event and set post notification job that that
* counts the total number of expired events.
* * EVICTED - the module register to evicted event and set post notification job that that
* counts the total number of evicted events.
*
* In addition, the module register a new command, 'postnotification.async_set', that performs a set
* command from a background thread. This allows to check the 'RedisModule_AddPostNotificationJob' on
* notifications that was triggered on a background thread. */
#define _BSD_SOURCE
#define _DEFAULT_SOURCE /* For usleep */
#include "redismodule.h"
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
static void KeySpace_PostNotificationStringFreePD(void *pd) {
RedisModule_FreeString(NULL, pd);
}
static void KeySpace_PostNotificationReadKey(RedisModuleCtx *ctx, void *pd) {
RedisModuleCallReply* rep = RedisModule_Call(ctx, "get", "!s", pd);
RedisModule_FreeCallReply(rep);
}
static void KeySpace_PostNotificationString(RedisModuleCtx *ctx, void *pd) {
REDISMODULE_NOT_USED(ctx);
RedisModuleCallReply* rep = RedisModule_Call(ctx, "incr", "!s", pd);
RedisModule_FreeCallReply(rep);
}
static int KeySpace_NotificationExpired(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
REDISMODULE_NOT_USED(key);
RedisModuleString *new_key = RedisModule_CreateString(NULL, "expired", 7);
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
}
static int KeySpace_NotificationEvicted(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
REDISMODULE_NOT_USED(key);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "evicted", 7) == 0) {
return REDISMODULE_OK; /* do not count the evicted key */
}
RedisModuleString *new_key = RedisModule_CreateString(NULL, "evicted", 7);
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
}
static int KeySpace_NotificationString(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "string_", 7) != 0) {
return REDISMODULE_OK;
}
if (strcmp(key_str, "string_total") == 0) {
return REDISMODULE_OK;
}
RedisModuleString *new_key;
if (strncmp(key_str, "string_changed{", 15) == 0) {
new_key = RedisModule_CreateString(NULL, "string_total", 12);
} else {
new_key = RedisModule_CreateStringPrintf(NULL, "string_changed{%s}", key_str);
}
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationString, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
}
static int KeySpace_LazyExpireInsidePostNotificationJob(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "read_", 5) != 0) {
return REDISMODULE_OK;
}
RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 5, strlen(key_str) - 5);;
RedisModule_AddPostNotificationJob(ctx, KeySpace_PostNotificationReadKey, new_key, KeySpace_PostNotificationStringFreePD);
return REDISMODULE_OK;
}
static int KeySpace_NestedNotification(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(type);
REDISMODULE_NOT_USED(event);
const char *key_str = RedisModule_StringPtrLen(key, NULL);
if (strncmp(key_str, "write_sync_", 11) != 0) {
return REDISMODULE_OK;
}
/* This test was only meant to check REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS.
* In general it is wrong and discourage to perform any writes inside a notification callback. */
RedisModuleString *new_key = RedisModule_CreateString(NULL, key_str + 11, strlen(key_str) - 11);;
RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!sc", new_key, "1");
RedisModule_FreeCallReply(rep);
RedisModule_FreeString(NULL, new_key);
return REDISMODULE_OK;
}
static void *KeySpace_PostNotificationsAsyncSetInner(void *arg) {
RedisModuleBlockedClient *bc = arg;
RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
RedisModule_ThreadSafeContextLock(ctx);
RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!cc", "string_x", "1");
RedisModule_ThreadSafeContextUnlock(ctx);
RedisModule_ReplyWithCallReply(ctx, rep);
RedisModule_FreeCallReply(rep);
RedisModule_UnblockClient(bc, NULL);
RedisModule_FreeThreadSafeContext(ctx);
return NULL;
}
static int KeySpace_PostNotificationsAsyncSet(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
if (argc != 1)
return RedisModule_WrongArity(ctx);
pthread_t tid;
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx,NULL,NULL,NULL,0);
if (pthread_create(&tid,NULL,KeySpace_PostNotificationsAsyncSetInner,bc) != 0) {
RedisModule_AbortBlock(bc);
return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread");
}
return REDISMODULE_OK;
}
/* This function must be present on each Redis module. It is used in order to
* register the commands into the Redis server. */
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx,"postnotifications",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
if (!(RedisModule_GetModuleOptionsAll() & REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS)) {
return REDISMODULE_ERR;
}
RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_ALLOW_NESTED_KEYSPACE_NOTIFICATIONS);
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NotificationString) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_LazyExpireInsidePostNotificationJob) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_STRING, KeySpace_NestedNotification) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_EXPIRED, KeySpace_NotificationExpired) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_EVICTED, KeySpace_NotificationEvicted) != REDISMODULE_OK){
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "postnotification.async_set", KeySpace_PostNotificationsAsyncSet,
"write", 0, 0, 0) == REDISMODULE_ERR){
return REDISMODULE_ERR;
}
return REDISMODULE_OK;
}
int RedisModule_OnUnload(RedisModuleCtx *ctx) {
REDISMODULE_NOT_USED(ctx);
return REDISMODULE_OK;
}

View File

@ -97,5 +97,9 @@ tags "modules" {
test "Unload the module - testkeyspace" {
assert_equal {OK} [r module unload testkeyspace]
}
test "Verify RM_StringDMA with expiration are not causing invalid memory access" {
assert_equal {OK} [r set x 1 EX 1]
}
}
}

View File

@ -0,0 +1,205 @@
set testmodule [file normalize tests/modules/postnotifications.so]
tags "modules" {
start_server [list overrides [list loadmodule "$testmodule"]] {
test {Test write on post notification callback} {
set repl [attach_to_replication_stream]
r set string_x 1
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]
r set string_x 2
assert_equal {2} [r get string_changed{string_x}]
assert_equal {2} [r get string_total]
assert_replication_stream $repl {
{multi}
{select *}
{set string_x 1}
{incr string_changed{string_x}}
{incr string_total}
{exec}
{multi}
{set string_x 2}
{incr string_changed{string_x}}
{incr string_total}
{exec}
}
close_replication_stream $repl
}
test {Test write on post notification callback from module thread} {
r flushall
set repl [attach_to_replication_stream]
assert_equal {OK} [r postnotification.async_set]
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]
assert_replication_stream $repl {
{multi}
{select *}
{set string_x 1}
{incr string_changed{string_x}}
{incr string_total}
{exec}
}
close_replication_stream $repl
}
test {Test active expire} {
r flushall
set repl [attach_to_replication_stream]
r set x 1
r pexpire x 10
wait_for_condition 100 50 {
[r keys expired] == {expired}
} else {
puts [r keys *]
fail "Failed waiting for x to expired"
}
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{del x}
{incr expired}
{exec}
}
close_replication_stream $repl
}
test {Test lazy expire} {
r flushall
r DEBUG SET-ACTIVE-EXPIRE 0
set repl [attach_to_replication_stream]
r set x 1
r pexpire x 1
after 10
assert_equal {} [r get x]
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{del x}
{incr expired}
{exec}
}
close_replication_stream $repl
r DEBUG SET-ACTIVE-EXPIRE 1
} {OK} {needs:debug}
test {Test lazy expire inside post job notification} {
r flushall
r DEBUG SET-ACTIVE-EXPIRE 0
set repl [attach_to_replication_stream]
r set x 1
r pexpire x 1
after 10
assert_equal {OK} [r set read_x 1]
assert_replication_stream $repl {
{select *}
{set x 1}
{pexpireat x *}
{multi}
{set read_x 1}
{del x}
{incr expired}
{exec}
}
close_replication_stream $repl
r DEBUG SET-ACTIVE-EXPIRE 1
} {OK} {needs:debug}
test {Test nested keyspace notification} {
r flushall
set repl [attach_to_replication_stream]
assert_equal {OK} [r set write_sync_write_sync_x 1]
assert_replication_stream $repl {
{multi}
{select *}
{set x 1}
{set write_sync_x 1}
{set write_sync_write_sync_x 1}
{exec}
}
close_replication_stream $repl
}
test {Test eviction} {
r flushall
set repl [attach_to_replication_stream]
r set x 1
r config set maxmemory-policy allkeys-random
r config set maxmemory 1
assert_error {OOM *} {r set y 1}
assert_replication_stream $repl {
{select *}
{set x 1}
{multi}
{del x}
{incr evicted}
{exec}
}
close_replication_stream $repl
} {} {needs:config-maxmemory}
}
}
set testmodule2 [file normalize tests/modules/keyspace_events.so]
tags "modules" {
start_server [list overrides [list loadmodule "$testmodule"]] {
r module load $testmodule2
test {Test write on post notification callback} {
set repl [attach_to_replication_stream]
r set string_x 1
assert_equal {1} [r get string_changed{string_x}]
assert_equal {1} [r get string_total]
r set string_x 2
assert_equal {2} [r get string_changed{string_x}]
assert_equal {2} [r get string_total]
r set string1_x 1
assert_equal {1} [r get string_changed{string1_x}]
assert_equal {3} [r get string_total]
assert_replication_stream $repl {
{multi}
{select *}
{set string_x 1}
{incr string_changed{string_x}}
{incr string_total}
{exec}
{multi}
{set string_x 2}
{incr string_changed{string_x}}
{incr string_total}
{exec}
{multi}
{set string1_x 1}
{incr string_changed{string1_x}}
{incr string_total}
{exec}
}
close_replication_stream $repl
}
}
}