Enhance mem_usage/free_effort/unlink/copy callbacks and add GetDbFromIO api. (#8999)

Create new module type enhanced callbacks: mem_usage2, free_effort2, unlink2, copy2.
These will be given a context point from which the module can obtain the key name and database id.
In addition the digest and defrag context can now be used to obtain the key name and database id.
This commit is contained in:
chenyang8094 2021-06-16 14:45:49 +08:00 committed by GitHub
parent 7f342020dc
commit e0cd3ad0de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 1255 additions and 92 deletions

View File

@ -36,4 +36,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/hash \
--single unit/moduleapi/zset \
--single unit/moduleapi/stream \
--single unit/moduleapi/datatype2 \
"${@}"

View File

@ -1386,11 +1386,11 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
/* Call the module type callback in order to rewrite a data type
* that is exported by a module and is not handled by Redis itself.
* The function returns 0 on error, 1 on success. */
int rewriteModuleObject(rio *r, robj *key, robj *o) {
int rewriteModuleObject(rio *r, robj *key, robj *o, int dbid) {
RedisModuleIO io;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitIOContext(io,mt,r,key);
moduleInitIOContext(io,mt,r,key,dbid);
mt->aof_rewrite(&io,key,mv->value);
if (io.ctx) {
moduleFreeContext(io.ctx);
@ -1464,7 +1464,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
} else if (o->type == OBJ_STREAM) {
if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
if (rewriteModuleObject(aof,&key,o,j) == 0) goto werr;
} else {
serverPanic("Unknown object type");
}

View File

@ -5062,7 +5062,7 @@ NULL
/* Generates a DUMP-format representation of the object 'o', adding it to the
* io stream pointed by 'rio'. This function can't fail. */
void createDumpPayload(rio *payload, robj *o, robj *key) {
void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
unsigned char buf[2];
uint64_t crc;
@ -5070,7 +5070,7 @@ void createDumpPayload(rio *payload, robj *o, robj *key) {
* byte followed by the serialized object. This is understood by RESTORE. */
rioInitWithBuffer(payload,sdsempty());
serverAssert(rdbSaveObjectType(payload,o));
serverAssert(rdbSaveObject(payload,o,key));
serverAssert(rdbSaveObject(payload,o,key,dbid));
/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+
@ -5131,7 +5131,7 @@ void dumpCommand(client *c) {
}
/* Create the DUMP encoded representation. */
createDumpPayload(&payload,o,c->argv[1]);
createDumpPayload(&payload,o,c->argv[1],c->db->id);
/* Transfer to the client */
addReplyBulkSds(c,payload.io.buffer.ptr);
@ -5203,7 +5203,7 @@ void restoreCommand(client *c) {
rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,key->ptr)) == NULL))
((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id)) == NULL))
{
addReplyError(c,"Bad data format");
return;
@ -5523,7 +5523,7 @@ try_again:
/* Emit the payload argument, that is the serialized object using
* the DUMP format. */
createDumpPayload(&payload,ov[j],kv[j]);
createDumpPayload(&payload,ov[j],kv[j],dbid);
serverAssertWithInfo(c,NULL,
rioWriteBulkString(&cmd,payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr)));

View File

@ -233,11 +233,11 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
/* Although the key is not really deleted from the database, we regard
overwrite as two steps of unlink+add, so we still need to call the unlink
callback of the module. */
moduleNotifyKeyUnlink(key,old);
moduleNotifyKeyUnlink(key,old,db->id);
dictSetVal(db->dict, de, val);
if (server.lazyfree_lazy_server_del) {
freeObjAsync(key,old);
freeObjAsync(key,old,db->id);
dictSetVal(db->dict, &auxentry, NULL);
}
@ -319,7 +319,7 @@ int dbSyncDelete(redisDb *db, robj *key) {
if (de) {
robj *val = dictGetVal(de);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val);
moduleNotifyKeyUnlink(key,val,db->id);
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
@ -1277,7 +1277,7 @@ void copyCommand(client *c) {
case OBJ_HASH: newobj = hashTypeDup(o); break;
case OBJ_STREAM: newobj = streamDup(o); break;
case OBJ_MODULE:
newobj = moduleTypeDupOrReply(c, key, newkey, o);
newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o);
if (!newobj) return;
break;
default:

View File

@ -249,7 +249,7 @@ void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o)
}
streamIteratorStop(&si);
} else if (o->type == OBJ_MODULE) {
RedisModuleDigest md = {{0},{0}};
RedisModuleDigest md = {{0},{0},keyobj,db->id};
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitDigestContext(md);
@ -607,7 +607,7 @@ NULL
"encoding:%s serializedlength:%zu "
"lru:%d lru_seconds_idle:%llu%s",
(void*)val, val->refcount,
strenc, rdbSavedObjectLen(val, c->argv[2]),
strenc, rdbSavedObjectLen(val, c->argv[2], c->db->id),
val->lru, estimateObjectIdleTime(val)/1000, extra);
} else if (!strcasecmp(c->argv[1]->ptr,"sdslen") && c->argc == 3) {
dictEntry *de;

View File

@ -803,7 +803,7 @@ long defragModule(redisDb *db, dictEntry *kde) {
serverAssert(obj->type == OBJ_MODULE);
long defragged = 0;
if (!moduleDefragValue(dictGetKey(kde), obj, &defragged))
if (!moduleDefragValue(dictGetKey(kde), obj, &defragged, db->id))
defragLater(db, kde);
return defragged;
@ -945,7 +945,7 @@ long defragOtherGlobals() {
/* returns 0 more work may or may not be needed (see non-zero cursor),
* and 1 if time is up and more work is needed. */
int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime, int dbid) {
if (de) {
robj *ob = dictGetVal(de);
if (ob->type == OBJ_LIST) {
@ -959,7 +959,7 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
} else if (ob->type == OBJ_STREAM) {
return scanLaterStreamListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits);
} else if (ob->type == OBJ_MODULE) {
return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, &server.stat_active_defrag_hits);
return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, &server.stat_active_defrag_hits, dbid);
} else {
*cursor = 0; /* object type may have changed since we schedule it for later */
}
@ -1008,7 +1008,7 @@ int defragLaterStep(redisDb *db, long long endtime) {
key_defragged = server.stat_active_defrag_hits;
do {
int quit = 0;
if (defragLaterItem(de, &defrag_later_cursor, endtime))
if (defragLaterItem(de, &defrag_later_cursor, endtime,db->id))
quit = 1; /* time is up, we didn't finish all the work */
/* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields

View File

@ -90,7 +90,7 @@ void lazyfreeResetStats() {
*
* For lists the function returns the number of elements in the quicklist
* representing the list. */
size_t lazyfreeGetFreeEffort(robj *key, robj *obj) {
size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
if (obj->type == OBJ_LIST) {
quicklist *ql = obj->ptr;
return ql->len;
@ -128,16 +128,10 @@ size_t lazyfreeGetFreeEffort(robj *key, robj *obj) {
}
return effort;
} else if (obj->type == OBJ_MODULE) {
moduleValue *mv = obj->ptr;
moduleType *mt = mv->type;
if (mt->free_effort != NULL) {
size_t effort = mt->free_effort(key,mv->value);
/* If the module's free_effort returns 0, it will use asynchronous free
memory by default */
return effort == 0 ? ULONG_MAX : effort;
} else {
return 1;
}
size_t effort = moduleGetFreeEffort(key, obj, dbid);
/* If the module's free_effort returns 0, we will use asynchronous free
* memory by default. */
return effort == 0 ? ULONG_MAX : effort;
} else {
return 1; /* Everything else is a single allocation. */
}
@ -161,9 +155,9 @@ int dbAsyncDelete(redisDb *db, robj *key) {
robj *val = dictGetVal(de);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val);
moduleNotifyKeyUnlink(key,val,db->id);
size_t free_effort = lazyfreeGetFreeEffort(key,val);
size_t free_effort = lazyfreeGetFreeEffort(key,val,db->id);
/* If releasing the object is too much work, do it in the background
* by adding the object to the lazy free list.
@ -192,8 +186,8 @@ int dbAsyncDelete(redisDb *db, robj *key) {
}
/* Free an object, if the object is huge enough, free it in async way. */
void freeObjAsync(robj *key, robj *obj) {
size_t free_effort = lazyfreeGetFreeEffort(key,obj);
void freeObjAsync(robj *key, robj *obj, int dbid) {
size_t free_effort = lazyfreeGetFreeEffort(key,obj,dbid);
if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj);

View File

@ -382,7 +382,15 @@ typedef struct RedisModuleUser {
user *user; /* Reference to the real redis user */
} RedisModuleUser;
/* This is a structure used to export some meta-information such as dbid to the module. */
typedef struct RedisModuleKeyOptCtx {
struct redisObject *from_key, *to_key; /* Optional name of key processed, NULL when unknown.
In most cases, only 'from_key' is valid, but in callbacks
such as `copy2`, both 'from_key' and 'to_key' are valid. */
int from_dbid, to_dbid; /* The dbid of the key being processed, -1 when unknown.
In most cases, only 'from_dbid' is valid, but in callbacks such
as `copy2`, 'from_dbid' and 'to_dbid' are both valid. */
} RedisModuleKeyOptCtx;
/* --------------------------------------------------------------------------
* Prototypes
* -------------------------------------------------------------------------- */
@ -2433,6 +2441,25 @@ RedisModuleString *RM_RandomKey(RedisModuleCtx *ctx) {
return key;
}
/* Returns the name of the key currently being processed. */
const RedisModuleString *RM_GetKeyNameFromOptCtx(RedisModuleKeyOptCtx *ctx) {
return ctx->from_key;
}
/* Returns the name of the target key currently being processed. */
const RedisModuleString *RM_GetToKeyNameFromOptCtx(RedisModuleKeyOptCtx *ctx) {
return ctx->to_key;
}
/* Returns the dbid currently being processed. */
int RM_GetDbIdFromOptCtx(RedisModuleKeyOptCtx *ctx) {
return ctx->from_dbid;
}
/* Returns the target dbid currently being processed. */
int RM_GetToDbIdFromOptCtx(RedisModuleKeyOptCtx *ctx) {
return ctx->to_dbid;
}
/* --------------------------------------------------------------------------
* ## Key API for String type
*
@ -4374,14 +4401,21 @@ const char *moduleTypeModuleName(moduleType *mt) {
/* Create a copy of a module type value using the copy callback. If failed
* or not supported, produce an error reply and return NULL.
*/
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) {
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value) {
moduleValue *mv = value->ptr;
moduleType *mt = mv->type;
if (!mt->copy) {
if (!mt->copy && !mt->copy2) {
addReplyError(c, "not supported for this module key");
return NULL;
}
void *newval = mt->copy(fromkey, tokey, mv->value);
void *newval = NULL;
if (mt->copy2 != NULL) {
RedisModuleKeyOptCtx ctx = {fromkey, tokey, c->db->id, todb};
newval = mt->copy2(&ctx, mv->value);
} else {
newval = mt->copy(fromkey, tokey, mv->value);
}
if (!newval) {
addReplyError(c, "module key failed to copy");
return NULL;
@ -4431,6 +4465,12 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) {
* .unlink = myType_UnlinkCallBack,
* .copy = myType_CopyCallback,
* .defrag = myType_DefragCallback
*
* // Enhanced optional fields
* .mem_usage2 = myType_MemUsageCallBack2,
* .free_effort2 = myType_FreeEffortCallBack2,
* .unlink2 = myType_UnlinkCallBack2,
* .copy2 = myType_CopyCallback2,
* }
*
* * **rdb_load**: A callback function pointer that loads data from RDB files.
@ -4472,6 +4512,15 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) {
* NOTE: The value is passed as a `void**` and the function is expected to update the
* pointer if the top-level value pointer is defragmented and consequently changes.
*
* * **mem_usage2**: Similar to `mem_usage`, but provides the `RedisModuleKeyOptCtx` parameter
* so that meta information such as key name and db id can be obtained.
* * **free_effort2**: Similar to `free_effort`, but provides the `RedisModuleKeyOptCtx` parameter
* so that meta information such as key name and db id can be obtained.
* * **unlink2**: Similar to `unlink`, but provides the `RedisModuleKeyOptCtx` parameter
* so that meta information such as key name and db id can be obtained.
* * **copy2**: Similar to `copy`, but provides the `RedisModuleKeyOptCtx` parameter
* so that meta information such as key names and db ids can be obtained.
*
* Note: the module name "AAAAAAAAA" is reserved and produces an error, it
* happens to be pretty lame as well.
*
@ -4517,6 +4566,12 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
moduleTypeCopyFunc copy;
moduleTypeDefragFunc defrag;
} v3;
struct {
moduleTypeMemUsageFunc2 mem_usage2;
moduleTypeFreeEffortFunc2 free_effort2;
moduleTypeUnlinkFunc2 unlink2;
moduleTypeCopyFunc2 copy2;
} v4;
} *tms = (struct typemethods*) typemethods_ptr;
moduleType *mt = zcalloc(sizeof(*mt));
@ -4539,6 +4594,12 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
mt->copy = tms->v3.copy;
mt->defrag = tms->v3.defrag;
}
if (tms->version >= 4) {
mt->mem_usage2 = tms->v4.mem_usage2;
mt->unlink2 = tms->v4.unlink2;
mt->free_effort2 = tms->v4.free_effort2;
mt->copy2 = tms->v4.copy2;
}
memcpy(mt->name,name,sizeof(mt->name));
listAddNodeTail(ctx->module->types,mt);
return mt;
@ -4977,7 +5038,7 @@ void *RM_LoadDataTypeFromString(const RedisModuleString *str, const moduleType *
void *ret;
rioInitWithBuffer(&payload, str->ptr);
moduleInitIOContext(io,(moduleType *)mt,&payload,NULL);
moduleInitIOContext(io,(moduleType *)mt,&payload,NULL,-1);
/* All RM_Save*() calls always write a version 2 compatible format, so we
* need to make sure we read the same.
@ -5003,7 +5064,7 @@ RedisModuleString *RM_SaveDataTypeToString(RedisModuleCtx *ctx, void *data, cons
RedisModuleIO io;
rioInitWithBuffer(&payload,sdsempty());
moduleInitIOContext(io,(moduleType *)mt,&payload,NULL);
moduleInitIOContext(io,(moduleType *)mt,&payload,NULL,-1);
mt->rdb_save(&io,data);
if (io.ctx) {
moduleFreeContext(io.ctx);
@ -5018,6 +5079,15 @@ RedisModuleString *RM_SaveDataTypeToString(RedisModuleCtx *ctx, void *data, cons
}
}
/* Returns the name of the key currently being processed. */
const RedisModuleString *RM_GetKeyNameFromDigest(RedisModuleDigest *dig) {
return dig->key;
}
/* Returns the database id of the key currently being processed. */
int RM_GetDbIdFromDigest(RedisModuleDigest *dig) {
return dig->dbid;
}
/* --------------------------------------------------------------------------
* ## AOF API for modules data types
* -------------------------------------------------------------------------- */
@ -5087,9 +5157,8 @@ RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) {
return io->ctx;
}
/* Returns a RedisModuleString with the name of the key currently saving or
* loading, when an IO data type callback is called. There is no guarantee
* that the key name is always available, so this may return NULL.
/* Returns the name of the key currently being processed.
* There is no guarantee that the key name is always available, so this may return NULL.
*/
const RedisModuleString *RM_GetKeyNameFromIO(RedisModuleIO *io) {
return io->key;
@ -5100,6 +5169,18 @@ const RedisModuleString *RM_GetKeyNameFromModuleKey(RedisModuleKey *key) {
return key ? key->key : NULL;
}
/* Returns a database id of the key from RedisModuleKey. */
int RM_GetDbIdFromModuleKey(RedisModuleKey *key) {
return key ? key->db->id : -1;
}
/* Returns the database id of the key currently being processed.
* There is no guarantee that this info is always available, so this may return -1.
*/
int RM_GetDbIdFromIO(RedisModuleIO *io) {
return io->dbid;
}
/* --------------------------------------------------------------------------
* ## Logging
* -------------------------------------------------------------------------- */
@ -8346,16 +8427,55 @@ void processModuleLoadingProgressEvent(int is_aof) {
/* When a module key is deleted (in dbAsyncDelete/dbSyncDelete/dbOverwrite), it
* will be called to tell the module which key is about to be released. */
void moduleNotifyKeyUnlink(robj *key, robj *val) {
void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid) {
if (val->type == OBJ_MODULE) {
moduleValue *mv = val->ptr;
moduleType *mt = mv->type;
if (mt->unlink != NULL) {
/* We prefer to use the enhanced version. */
if (mt->unlink2 != NULL) {
RedisModuleKeyOptCtx ctx = {key, NULL, dbid, -1};
mt->unlink2(&ctx,mv->value);
} else if (mt->unlink != NULL) {
mt->unlink(key,mv->value);
}
}
}
/* Return the free_effort of the module, it will automatically choose to call
* `free_effort` or `free_effort2`, and the default return value is 1.
* value of 0 means very high effort (always asynchronous freeing). */
size_t moduleGetFreeEffort(robj *key, robj *val, int dbid) {
moduleValue *mv = val->ptr;
moduleType *mt = mv->type;
size_t effort = 1;
/* We prefer to use the enhanced version. */
if (mt->free_effort2 != NULL) {
RedisModuleKeyOptCtx ctx = {key, NULL, dbid, -1};
effort = mt->free_effort2(&ctx,mv->value);
} else if (mt->free_effort != NULL) {
effort = mt->free_effort(key,mv->value);
}
return effort;
}
/* Return the memory usage of the module, it will automatically choose to call
* `mem_usage` or `mem_usage2`, and the default return value is 0. */
size_t moduleGetMemUsage(robj *key, robj *val, int dbid) {
moduleValue *mv = val->ptr;
moduleType *mt = mv->type;
size_t size = 0;
/* We prefer to use the enhanced version. */
if (mt->mem_usage2 != NULL) {
RedisModuleKeyOptCtx ctx = {key, NULL, dbid, -1};
size = mt->mem_usage2(&ctx,mv->value);
} else if (mt->mem_usage != NULL) {
size = mt->mem_usage(mv->value);
}
return size;
}
/* --------------------------------------------------------------------------
* Modules API internals
* -------------------------------------------------------------------------- */
@ -9041,6 +9161,8 @@ typedef struct RedisModuleDefragCtx {
long defragged;
long long int endtime;
unsigned long *cursor;
struct redisObject *key; /* Optional name of key processed, NULL when unknown. */
int dbid; /* The dbid of the key being processed, -1 when unknown. */
} RedisModuleDefragCtx;
/* Register a defrag callback for global data, i.e. anything that the module
@ -9152,11 +9274,11 @@ RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisMo
* Returns a zero value (and initializes the cursor) if no more needs to be done,
* or a non-zero value otherwise.
*/
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged) {
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged, int dbid) {
moduleValue *mv = value->ptr;
moduleType *mt = mv->type;
RedisModuleDefragCtx defrag_ctx = { 0, endtime, cursor };
RedisModuleDefragCtx defrag_ctx = { 0, endtime, cursor, key, dbid};
/* Invoke callback. Note that the callback may be missing if the key has been
* replaced with a different type since our last visit.
@ -9180,7 +9302,7 @@ int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long en
* Returns 1 if the operation has been completed or 0 if it needs to
* be scheduled for late defrag.
*/
int moduleDefragValue(robj *key, robj *value, long *defragged) {
int moduleDefragValue(robj *key, robj *value, long *defragged, int dbid) {
moduleValue *mv = value->ptr;
moduleType *mt = mv->type;
@ -9200,16 +9322,14 @@ int moduleDefragValue(robj *key, robj *value, long *defragged) {
* necessary schedule it for defragLater instead of quick immediate
* defrag.
*/
if (mt->free_effort) {
size_t effort = mt->free_effort(key, mv->value);
if (!effort)
effort = SIZE_MAX;
if (effort > server.active_defrag_max_scan_fields) {
return 0; /* Defrag later */
}
size_t effort = moduleGetFreeEffort(key, value, dbid);
if (!effort)
effort = SIZE_MAX;
if (effort > server.active_defrag_max_scan_fields) {
return 0; /* Defrag later */
}
RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL };
RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL, key, dbid};
mt->defrag(&defrag_ctx, key, &mv->value);
(*defragged) += defrag_ctx.defragged;
return 1;
@ -9225,7 +9345,7 @@ long moduleDefragGlobals(void) {
struct RedisModule *module = dictGetVal(de);
if (!module->defrag_cb)
continue;
RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL };
RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL, NULL, -1};
module->defrag_cb(&defrag_ctx);
defragged += defrag_ctx.defragged;
}
@ -9234,6 +9354,20 @@ long moduleDefragGlobals(void) {
return defragged;
}
/* Returns the name of the key currently being processed.
* There is no guarantee that the key name is always available, so this may return NULL.
*/
const RedisModuleString *RM_GetKeyNameFromDefragCtx(RedisModuleDefragCtx *ctx) {
return ctx->key;
}
/* Returns the database id of the key currently being processed.
* There is no guarantee that this info is always available, so this may return -1.
*/
int RM_GetDbIdFromDefragCtx(RedisModuleDefragCtx *ctx) {
return ctx->dbid;
}
/* Register all the APIs we export. Keep this function at the end of the
* file so that's easy to seek it to add new entries. */
void moduleRegisterCoreAPI(void) {
@ -9374,6 +9508,16 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetContextFromIO);
REGISTER_API(GetKeyNameFromIO);
REGISTER_API(GetKeyNameFromModuleKey);
REGISTER_API(GetDbIdFromModuleKey);
REGISTER_API(GetDbIdFromIO);
REGISTER_API(GetKeyNameFromOptCtx);
REGISTER_API(GetToKeyNameFromOptCtx);
REGISTER_API(GetDbIdFromOptCtx);
REGISTER_API(GetToDbIdFromOptCtx);
REGISTER_API(GetKeyNameFromDefragCtx);
REGISTER_API(GetDbIdFromDefragCtx);
REGISTER_API(GetKeyNameFromDigest);
REGISTER_API(GetDbIdFromDigest);
REGISTER_API(BlockClient);
REGISTER_API(UnblockClient);
REGISTER_API(IsBlockedReplyRequest);

View File

@ -790,7 +790,7 @@ size_t streamRadixTreeMemoryUsage(rax *rax) {
* case of aggregated data types where only "sample_size" elements
* are checked and averaged to estimate the total size. */
#define OBJ_COMPUTE_SIZE_DEF_SAMPLES 5 /* Default sample size. */
size_t objectComputeSize(robj *o, size_t sample_size) {
size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) {
sds ele, ele2;
dict *d;
dictIterator *di;
@ -941,13 +941,7 @@ size_t objectComputeSize(robj *o, size_t sample_size) {
raxStop(&ri);
}
} else if (o->type == OBJ_MODULE) {
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
if (mt->mem_usage != NULL) {
asize = mt->mem_usage(mv->value);
} else {
asize = 0;
}
asize = moduleGetMemUsage(key, o, dbid);
} else {
serverPanic("Unknown object type");
}
@ -1336,7 +1330,7 @@ NULL
addReplyNull(c);
return;
}
size_t usage = objectComputeSize(dictGetVal(de),samples);
size_t usage = objectComputeSize(c->argv[2],dictGetVal(de),samples,c->db->id);
usage += sdsZmallocSize(dictGetKey(de));
usage += sizeof(dictEntry);
addReplyLongLong(c,usage);

View File

@ -792,7 +792,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
/* Save a Redis object.
* Returns -1 on error, number of bytes written on success. */
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) {
@ -1032,7 +1032,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
* to call the right module during loading. */
int retval = rdbSaveLen(rdb,mt->id);
if (retval == -1) return -1;
moduleInitIOContext(io,mt,rdb,key);
moduleInitIOContext(io,mt,rdb,key,dbid);
io.bytes += retval;
/* Then write the module-specific representation + EOF marker. */
@ -1058,8 +1058,8 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
* the rdbSaveObject() function. Currently we use a trick to get
* this length with very little changes to the code. In the future
* we could switch to a faster solution. */
size_t rdbSavedObjectLen(robj *o, robj *key) {
ssize_t len = rdbSaveObject(NULL,o,key);
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid) {
ssize_t len = rdbSaveObject(NULL,o,key,dbid);
serverAssertWithInfo(NULL,o,len != -1);
return len;
}
@ -1067,7 +1067,7 @@ size_t rdbSavedObjectLen(robj *o, robj *key) {
/* Save a key-value pair, with expire time, type, key, value.
* On error -1 is returned.
* On success if the key was actually saved 1 is returned. */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime, int dbid) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
@ -1100,7 +1100,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
/* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key,dbid) == -1) return -1;
/* Delay return if required (for testing) */
if (server.rdb_key_save_delay)
@ -1163,7 +1163,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
RedisModuleIO io;
int retval = rdbSaveType(rdb, RDB_OPCODE_MODULE_AUX);
if (retval == -1) return -1;
moduleInitIOContext(io,mt,rdb,NULL);
moduleInitIOContext(io,mt,rdb,NULL,-1);
io.bytes += retval;
/* Write the "module" identifier as prefix, so that we'll be able
@ -1251,7 +1251,7 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
if (rdbSaveKeyValuePair(rdb,&key,o,expire,j) == -1) goto werr;
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
@ -1510,7 +1510,7 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) {
/* Load a Redis object of the specified type from the specified file.
* On success a newly allocated object is returned, otherwise NULL. */
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid) {
robj *o = NULL, *ele, *dec;
uint64_t len;
unsigned int i;
@ -2184,7 +2184,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
RedisModuleIO io;
robj keyobj;
initStaticStringObject(keyobj,key);
moduleInitIOContext(io,mt,rdb,&keyobj);
moduleInitIOContext(io,mt,rdb,&keyobj,dbid);
io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
/* Call the rdb_load method of the module providing the 10 bit
* encoding version in the lower 10 bits of the module ID. */
@ -2495,7 +2495,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
}
RedisModuleIO io;
moduleInitIOContext(io,mt,rdb,NULL);
moduleInitIOContext(io,mt,rdb,NULL,-1);
io.ver = 2;
/* Call the rdb_load method of the module providing the 10 bit
* encoding version in the lower 10 bits of the module ID. */
@ -2526,7 +2526,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
if ((key = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL)
goto eoferr;
/* Read value */
if ((val = rdbLoadObject(type,rdb,key)) == NULL) {
if ((val = rdbLoadObject(type,rdb,key,db->id)) == NULL) {
sdsfree(key);
goto eoferr;
}

View File

@ -143,11 +143,11 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi);
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
void rdbRemoveTempFile(pid_t childpid, int from_signal);
int rdbSave(char *filename, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key);
size_t rdbSavedObjectLen(robj *o, robj *key);
robj *rdbLoadObject(int type, rio *rdb, sds key);
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid);
size_t rdbSavedObjectLen(robj *o, robj *key, int dbid);
robj *rdbLoadObject(int type, rio *rdb, sds key, int dbid);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime,int dbid);
ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt);
robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename);
robj *rdbLoadStringObject(rio *rdb);

View File

@ -184,6 +184,7 @@ void rdbCheckSetupSignals(void) {
* otherwise the already open file 'fp' is checked. */
int redis_check_rdb(char *rdbfilename, FILE *fp) {
uint64_t dbid;
int selected_dbid = -1;
int type, rdbver;
char buf[1024];
long long expiretime, now = mstime();
@ -251,6 +252,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
if ((dbid = rdbLoadLen(&rdb,NULL)) == RDB_LENERR)
goto eoferr;
rdbCheckInfo("Selecting DB ID %llu", (unsigned long long)dbid);
selected_dbid = dbid;
continue; /* Read type again. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently
@ -308,7 +310,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) {
rdbstate.keys++;
/* Read value */
rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE;
if ((val = rdbLoadObject(type,&rdb,key->ptr)) == NULL) goto eoferr;
if ((val = rdbLoadObject(type,&rdb,key->ptr,selected_dbid)) == NULL) goto eoferr;
/* Check if the key already expired. */
if (expiretime != -1 && expiretime < now)
rdbstate.already_expired++;

View File

@ -16,7 +16,7 @@
/* Version of the RedisModuleTypeMethods structure. Once the RedisModuleTypeMethods
* structure is changed, this version number needs to be changed synchronistically. */
#define REDISMODULE_TYPE_METHOD_VERSION 3
#define REDISMODULE_TYPE_METHOD_VERSION 4
/* API flags and constants */
#define REDISMODULE_READ (1<<0)
@ -520,6 +520,7 @@ typedef struct RedisModuleServerInfoData RedisModuleServerInfoData;
typedef struct RedisModuleScanCursor RedisModuleScanCursor;
typedef struct RedisModuleDefragCtx RedisModuleDefragCtx;
typedef struct RedisModuleUser RedisModuleUser;
typedef struct RedisModuleKeyOptCtx RedisModuleKeyOptCtx;
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
typedef void (*RedisModuleDisconnectFunc)(RedisModuleCtx *ctx, RedisModuleBlockedClient *bc);
@ -530,11 +531,15 @@ typedef int (*RedisModuleTypeAuxLoadFunc)(RedisModuleIO *rdb, int encver, int wh
typedef void (*RedisModuleTypeAuxSaveFunc)(RedisModuleIO *rdb, int when);
typedef void (*RedisModuleTypeRewriteFunc)(RedisModuleIO *aof, RedisModuleString *key, void *value);
typedef size_t (*RedisModuleTypeMemUsageFunc)(const void *value);
typedef size_t (*RedisModuleTypeMemUsageFunc2)(RedisModuleKeyOptCtx *ctx, const void *value);
typedef void (*RedisModuleTypeDigestFunc)(RedisModuleDigest *digest, void *value);
typedef void (*RedisModuleTypeFreeFunc)(void *value);
typedef size_t (*RedisModuleTypeFreeEffortFunc)(RedisModuleString *key, const void *value);
typedef size_t (*RedisModuleTypeFreeEffortFunc2)(RedisModuleKeyOptCtx *ctx, const void *value);
typedef void (*RedisModuleTypeUnlinkFunc)(RedisModuleString *key, const void *value);
typedef void (*RedisModuleTypeUnlinkFunc2)(RedisModuleKeyOptCtx *ctx, const void *value);
typedef void *(*RedisModuleTypeCopyFunc)(RedisModuleString *fromkey, RedisModuleString *tokey, const void *value);
typedef void *(*RedisModuleTypeCopyFunc2)(RedisModuleKeyOptCtx *ctx, const void *value);
typedef int (*RedisModuleTypeDefragFunc)(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value);
typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
@ -561,6 +566,10 @@ typedef struct RedisModuleTypeMethods {
RedisModuleTypeUnlinkFunc unlink;
RedisModuleTypeCopyFunc copy;
RedisModuleTypeDefragFunc defrag;
RedisModuleTypeMemUsageFunc2 mem_usage2;
RedisModuleTypeFreeEffortFunc2 free_effort2;
RedisModuleTypeUnlinkFunc2 unlink2;
RedisModuleTypeCopyFunc2 copy2;
} RedisModuleTypeMethods;
#define REDISMODULE_GET_API(name) \
@ -716,10 +725,18 @@ REDISMODULE_API int (*RedisModule_StringCompare)(RedisModuleString *a, RedisModu
REDISMODULE_API RedisModuleCtx * (*RedisModule_GetContextFromIO)(RedisModuleIO *io) REDISMODULE_ATTR;
REDISMODULE_API const RedisModuleString * (*RedisModule_GetKeyNameFromIO)(RedisModuleIO *io) REDISMODULE_ATTR;
REDISMODULE_API const RedisModuleString * (*RedisModule_GetKeyNameFromModuleKey)(RedisModuleKey *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetDbIdFromModuleKey)(RedisModuleKey *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetDbIdFromIO)(RedisModuleIO *io) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetDbIdFromOptCtx)(RedisModuleKeyOptCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetToDbIdFromOptCtx)(RedisModuleKeyOptCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API const RedisModuleString * (*RedisModule_GetKeyNameFromOptCtx)(RedisModuleKeyOptCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API const RedisModuleString * (*RedisModule_GetToKeyNameFromOptCtx)(RedisModuleKeyOptCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API long long (*RedisModule_Milliseconds)(void) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_DigestAddStringBuffer)(RedisModuleDigest *md, unsigned char *ele, size_t len) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_DigestAddLongLong)(RedisModuleDigest *md, long long ele) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_DigestEndSequence)(RedisModuleDigest *md) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetDbIdFromDigest)(RedisModuleDigest *dig) REDISMODULE_ATTR;
REDISMODULE_API const RedisModuleString * (*RedisModule_GetKeyNameFromDigest)(RedisModuleDigest *dig) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleDict * (*RedisModule_CreateDict)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_FreeDict)(RedisModuleCtx *ctx, RedisModuleDict *d) REDISMODULE_ATTR;
REDISMODULE_API uint64_t (*RedisModule_DictSize)(RedisModuleDict *d) REDISMODULE_ATTR;
@ -843,6 +860,8 @@ REDISMODULE_API RedisModuleString *(*RedisModule_DefragRedisModuleString)(RedisM
REDISMODULE_API int (*RedisModule_DefragShouldStop)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_DefragCursorSet)(RedisModuleDefragCtx *ctx, unsigned long cursor) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_DefragCursorGet)(RedisModuleDefragCtx *ctx, unsigned long *cursor) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_GetDbIdFromDefragCtx)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API const RedisModuleString * (*RedisModule_GetKeyNameFromDefragCtx)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR;
#endif
#define RedisModule_IsAOFClient(id) ((id) == UINT64_MAX)
@ -989,10 +1008,18 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(GetContextFromIO);
REDISMODULE_GET_API(GetKeyNameFromIO);
REDISMODULE_GET_API(GetKeyNameFromModuleKey);
REDISMODULE_GET_API(GetDbIdFromModuleKey);
REDISMODULE_GET_API(GetDbIdFromIO);
REDISMODULE_GET_API(GetKeyNameFromOptCtx);
REDISMODULE_GET_API(GetToKeyNameFromOptCtx);
REDISMODULE_GET_API(GetDbIdFromOptCtx);
REDISMODULE_GET_API(GetToDbIdFromOptCtx);
REDISMODULE_GET_API(Milliseconds);
REDISMODULE_GET_API(DigestAddStringBuffer);
REDISMODULE_GET_API(DigestAddLongLong);
REDISMODULE_GET_API(DigestEndSequence);
REDISMODULE_GET_API(GetKeyNameFromDigest);
REDISMODULE_GET_API(GetDbIdFromDigest);
REDISMODULE_GET_API(CreateDict);
REDISMODULE_GET_API(FreeDict);
REDISMODULE_GET_API(DictSize);
@ -1116,6 +1143,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(DefragShouldStop);
REDISMODULE_GET_API(DefragCursorSet);
REDISMODULE_GET_API(DefragCursorGet);
REDISMODULE_GET_API(GetKeyNameFromDefragCtx);
REDISMODULE_GET_API(GetDbIdFromDefragCtx);
#endif
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;

View File

@ -550,6 +550,7 @@ struct moduleLoadQueueEntry;
struct redisObject;
struct RedisModuleDefragCtx;
struct RedisModuleInfoCtx;
struct RedisModuleKeyOptCtx;
/* Each module type implementation should export a set of methods in order
* to serialize and deserialize the value in the RDB file, rewrite the AOF
@ -569,6 +570,11 @@ typedef void *(*moduleTypeCopyFunc)(struct redisObject *fromkey, struct redisObj
typedef int (*moduleTypeDefragFunc)(struct RedisModuleDefragCtx *ctx, struct redisObject *key, void **value);
typedef void (*RedisModuleInfoFunc)(struct RedisModuleInfoCtx *ctx, int for_crash_report);
typedef void (*RedisModuleDefragFunc)(struct RedisModuleDefragCtx *ctx);
typedef size_t (*moduleTypeMemUsageFunc2)(struct RedisModuleKeyOptCtx *ctx, const void *value);
typedef void (*moduleTypeFreeFunc2)(struct RedisModuleKeyOptCtx *ctx, void *value);
typedef size_t (*moduleTypeFreeEffortFunc2)(struct RedisModuleKeyOptCtx *ctx, const void *value);
typedef void (*moduleTypeUnlinkFunc2)(struct RedisModuleKeyOptCtx *ctx, void *value);
typedef void *(*moduleTypeCopyFunc2)(struct RedisModuleKeyOptCtx *ctx, const void *value);
/* This callback type is called by moduleNotifyUserChanged() every time
* a user authenticated via the module API is associated with a different
@ -594,6 +600,10 @@ typedef struct RedisModuleType {
moduleTypeDefragFunc defrag;
moduleTypeAuxLoadFunc aux_load;
moduleTypeAuxSaveFunc aux_save;
moduleTypeMemUsageFunc2 mem_usage2;
moduleTypeFreeEffortFunc2 free_effort2;
moduleTypeUnlinkFunc2 unlink2;
moduleTypeCopyFunc2 copy2;
int aux_save_triggers;
char name[10]; /* 9 bytes name + null term. Charset: A-Z a-z 0-9 _- */
} moduleType;
@ -650,17 +660,19 @@ typedef struct RedisModuleIO {
* 2 (current version with opcodes annotation). */
struct RedisModuleCtx *ctx; /* Optional context, see RM_GetContextFromIO()*/
struct redisObject *key; /* Optional name of key processed */
} RedisModuleIO;
int dbid; /* The dbid of the key being processed, -1 when unknown. */
} RedisModuleIO;
/* Macro to initialize an IO context. Note that the 'ver' field is populated
* inside rdb.c according to the version of the value to load. */
#define moduleInitIOContext(iovar,mtype,rioptr,keyptr) do { \
#define moduleInitIOContext(iovar,mtype,rioptr,keyptr,db) do { \
iovar.rio = rioptr; \
iovar.type = mtype; \
iovar.bytes = 0; \
iovar.error = 0; \
iovar.ver = 0; \
iovar.key = keyptr; \
iovar.dbid = db; \
iovar.ctx = NULL; \
} while(0)
@ -672,6 +684,8 @@ typedef struct RedisModuleIO {
typedef struct RedisModuleDigest {
unsigned char o[20]; /* Ordered elements. */
unsigned char x[20]; /* Xored elements. */
struct redisObject *key; /* Optional name of key processed */
int dbid; /* The dbid of the key being processed */
} RedisModuleDigest;
/* Just start with a digest composed of all zero bytes. */
@ -1817,10 +1831,12 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key);
void moduleUnblockClient(client *c);
int moduleClientIsBlockedOnKeys(client *c);
void moduleNotifyUserChanged(client *c);
void moduleNotifyKeyUnlink(robj *key, robj *val);
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value);
int moduleDefragValue(robj *key, robj *obj, long *defragged);
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged);
void moduleNotifyKeyUnlink(robj *key, robj *val, int dbid);
size_t moduleGetFreeEffort(robj *key, robj *val, int dbid);
size_t moduleGetMemUsage(robj *key, robj *val, int dbid);
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, int todb, robj *value);
int moduleDefragValue(robj *key, robj *obj, long *defragged, int dbid);
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged, int dbid);
long moduleDefragGlobals(void);
/* Utils */
@ -2406,7 +2422,7 @@ void slotToKeyFlush(int async);
size_t lazyfreeGetPendingObjectsCount(void);
size_t lazyfreeGetFreedObjectsCount(void);
void lazyfreeResetStats(void);
void freeObjAsync(robj *key, robj *obj);
void freeObjAsync(robj *key, robj *obj, int dbid);
void freeSlotsToKeysMapAsync(rax *rt);
void freeSlotsToKeysMap(rax *rt, int async);

View File

@ -1137,7 +1137,7 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum,
sdsfree(ele);
}
setTypeReleaseIterator(si);
server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstset) :
server.lazyfree_lazy_server_del ? freeObjAsync(NULL, dstset, -1) :
decrRefCount(dstset);
} else {
/* If we have a target key where to store the resulting set

View File

@ -28,6 +28,7 @@ TEST_MODULES = \
blockonbackground.so \
scan.so \
datatype.so \
datatype2.so \
auth.so \
keyspace_events.so \
blockedclient.so \
@ -37,7 +38,7 @@ TEST_MODULES = \
defragtest.so \
hash.so \
zset.so \
stream.so
stream.so \
.PHONY: all

735
tests/modules/datatype2.c Normal file
View File

@ -0,0 +1,735 @@
/* This module is used to test a use case of a module that stores information
* about keys in global memory, and relies on the enhanced data type callbacks to
* get key name and dbid on various operations.
*
* it simulates a simple memory allocator. The smallest allocation unit of
* the allocator is a mem block with a size of 4KB. Multiple mem blocks are combined
* using a linked list. These linked lists are placed in a global dict named 'mem_pool'.
* Each db has a 'mem_pool'. You can use the 'mem.alloc' command to allocate a specified
* number of mem blocks, and use 'mem.free' to release the memory. Use 'mem.write', 'mem.read'
* to write and read the specified mem block (note that each mem block can only be written once).
* Use 'mem.usage' to get the memory usage under different dbs, and it will return the size
* mem blocks and used mem blocks under the db.
* The specific structure diagram is as follows:
*
*
* Global variables of the module:
*
* mem blocks link
*
*
* k1 4KB4KB4KB
*
*
*
* k2 4KB4KB
* db0
*
*
* k3 4KB4KB4KB
* db1 null
*
* dict
*
* db2
*
*
*
* db3 null k1 4KB4KB4KB
*
*
* mem_pool[MAX_DB]
* k2 4KB4KB
*
*
* dict
*
*
* Keys in redis database:
*
*
* size
* used
* mask
*
* MemAllocObject size
* k1 used
* mask
*
* size MemAllocObject
* k2 used k1
* mask
*
* MemAllocObject
* k3 k2
*
*
* redis db[0] size redis db[1] size
* used used
* mask mask
*
* MemAllocObject MemAllocObject
*
**/
#include "redismodule.h"
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <string.h>
#include <stdint.h>
static RedisModuleType *MemAllocType;
#define MAX_DB 16
RedisModuleDict *mem_pool[MAX_DB];
typedef struct MemAllocObject {
long long size;
long long used;
uint64_t mask;
} MemAllocObject;
MemAllocObject *createMemAllocObject(void) {
MemAllocObject *o = RedisModule_Calloc(1, sizeof(*o));
return o;
}
/*---------------------------- mem block apis ------------------------------------*/
#define BLOCK_SIZE 4096
struct MemBlock {
char block[BLOCK_SIZE];
struct MemBlock *next;
};
void MemBlockFree(struct MemBlock *head) {
if (head) {
struct MemBlock *block = head->next, *next;
RedisModule_Free(head);
while (block) {
next = block->next;
RedisModule_Free(block);
block = next;
}
}
}
struct MemBlock *MemBlockCreate(long long num) {
if (num <= 0) {
return NULL;
}
struct MemBlock *head = RedisModule_Calloc(1, sizeof(struct MemBlock));
struct MemBlock *block = head;
while (--num) {
block->next = RedisModule_Calloc(1, sizeof(struct MemBlock));
block = block->next;
}
return head;
}
long long MemBlockNum(const struct MemBlock *head) {
long long num = 0;
const struct MemBlock *block = head;
while (block) {
num++;
block = block->next;
}
return num;
}
size_t MemBlockWrite(struct MemBlock *head, long long block_index, const char *data, size_t size) {
size_t w_size = 0;
struct MemBlock *block = head;
while (block_index-- && block) {
block = block->next;
}
if (block) {
size = size > BLOCK_SIZE ? BLOCK_SIZE:size;
memcpy(block->block, data, size);
w_size += size;
}
return w_size;
}
int MemBlockRead(struct MemBlock *head, long long block_index, char *data, size_t size) {
size_t r_size = 0;
struct MemBlock *block = head;
while (block_index-- && block) {
block = block->next;
}
if (block) {
size = size > BLOCK_SIZE ? BLOCK_SIZE:size;
memcpy(data, block->block, size);
r_size += size;
}
return r_size;
}
void MemPoolFreeDb(RedisModuleCtx *ctx, int dbid) {
RedisModuleString *key;
void *tdata;
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(mem_pool[dbid], "^", NULL, 0);
while((key = RedisModule_DictNext(ctx, iter, &tdata)) != NULL) {
MemBlockFree((struct MemBlock *)tdata);
}
RedisModule_DictIteratorStop(iter);
RedisModule_FreeDict(NULL, mem_pool[dbid]);
mem_pool[dbid] = RedisModule_CreateDict(NULL);
}
struct MemBlock *MemBlockClone(const struct MemBlock *head) {
struct MemBlock *newhead = NULL;
if (head) {
newhead = RedisModule_Calloc(1, sizeof(struct MemBlock));
memcpy(newhead->block, head->block, BLOCK_SIZE);
struct MemBlock *newblock = newhead;
const struct MemBlock *oldblock = head->next;
while (oldblock) {
newblock->next = RedisModule_Calloc(1, sizeof(struct MemBlock));
newblock = newblock->next;
memcpy(newblock->block, oldblock->block, BLOCK_SIZE);
oldblock = oldblock->next;
}
}
return newhead;
}
/*---------------------------- event handler ------------------------------------*/
void swapDbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(sub);
RedisModuleSwapDbInfo *ei = data;
// swap
RedisModuleDict *tmp = mem_pool[ei->dbnum_first];
mem_pool[ei->dbnum_first] = mem_pool[ei->dbnum_second];
mem_pool[ei->dbnum_second] = tmp;
}
void flushdbCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) {
REDISMODULE_NOT_USED(ctx);
REDISMODULE_NOT_USED(e);
int i;
RedisModuleFlushInfo *fi = data;
if (sub == REDISMODULE_SUBEVENT_FLUSHDB_START) {
if (fi->dbnum != -1) {
MemPoolFreeDb(ctx, fi->dbnum);
} else {
for (i = 0; i < MAX_DB; i++) {
MemPoolFreeDb(ctx, i);
}
}
}
}
/*---------------------------- command implementation ------------------------------------*/
/* MEM.ALLOC key block_num */
int MemAlloc_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
if (argc != 3) {
return RedisModule_WrongArity(ctx);
}
long long block_num;
if ((RedisModule_StringToLongLong(argv[2], &block_num) != REDISMODULE_OK) || block_num <= 0) {
return RedisModule_ReplyWithError(ctx, "ERR invalid block_num: must be a value greater than 0");
}
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != MemAllocType) {
return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
}
MemAllocObject *o;
if (type == REDISMODULE_KEYTYPE_EMPTY) {
o = createMemAllocObject();
RedisModule_ModuleTypeSetValue(key, MemAllocType, o);
} else {
o = RedisModule_ModuleTypeGetValue(key);
}
struct MemBlock *mem = MemBlockCreate(block_num);
RedisModule_Assert(mem != NULL);
RedisModule_DictSet(mem_pool[RedisModule_GetSelectedDb(ctx)], argv[1], mem);
o->size = block_num;
o->used = 0;
o->mask = 0;
RedisModule_ReplyWithLongLong(ctx, block_num);
RedisModule_ReplicateVerbatim(ctx);
return REDISMODULE_OK;
}
/* MEM.FREE key */
int MemFree_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
if (argc != 2) {
return RedisModule_WrongArity(ctx);
}
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != MemAllocType) {
return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
}
int ret = 0;
MemAllocObject *o;
if (type == REDISMODULE_KEYTYPE_EMPTY) {
RedisModule_ReplyWithLongLong(ctx, ret);
return REDISMODULE_OK;
} else {
o = RedisModule_ModuleTypeGetValue(key);
}
int nokey;
struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[RedisModule_GetSelectedDb(ctx)], argv[1], &nokey);
if (!nokey && mem) {
MemBlockFree(mem);
o->used = 0;
o->size = 0;
o->mask = 0;
ret = 1;
}
RedisModule_ReplyWithLongLong(ctx, ret);
RedisModule_ReplicateVerbatim(ctx);
return REDISMODULE_OK;
}
/* MEM.WRITE key block_index data */
int MemWrite_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
if (argc != 4) {
return RedisModule_WrongArity(ctx);
}
long long block_index;
if ((RedisModule_StringToLongLong(argv[2], &block_index) != REDISMODULE_OK) || block_index < 0) {
return RedisModule_ReplyWithError(ctx, "ERR invalid block_index: must be a value greater than 0");
}
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != MemAllocType) {
return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
}
MemAllocObject *o;
if (type == REDISMODULE_KEYTYPE_EMPTY) {
return RedisModule_ReplyWithError(ctx, "ERR Memory has not been allocated");
} else {
o = RedisModule_ModuleTypeGetValue(key);
}
if (o->mask & (1UL << block_index)) {
return RedisModule_ReplyWithError(ctx, "ERR block is busy");
}
int ret = 0;
int nokey;
struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[RedisModule_GetSelectedDb(ctx)], argv[1], &nokey);
if (!nokey && mem) {
size_t len;
const char *buf = RedisModule_StringPtrLen(argv[3], &len);
ret = MemBlockWrite(mem, block_index, buf, len);
o->mask |= (1UL << block_index);
o->used++;
}
RedisModule_ReplyWithLongLong(ctx, ret);
RedisModule_ReplicateVerbatim(ctx);
return REDISMODULE_OK;
}
/* MEM.READ key block_index */
int MemRead_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
if (argc != 3) {
return RedisModule_WrongArity(ctx);
}
long long block_index;
if ((RedisModule_StringToLongLong(argv[2], &block_index) != REDISMODULE_OK) || block_index < 0) {
return RedisModule_ReplyWithError(ctx, "ERR invalid block_index: must be a value greater than 0");
}
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != MemAllocType) {
return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
}
MemAllocObject *o;
if (type == REDISMODULE_KEYTYPE_EMPTY) {
return RedisModule_ReplyWithError(ctx, "ERR Memory has not been allocated");
} else {
o = RedisModule_ModuleTypeGetValue(key);
}
if (!(o->mask & (1UL << block_index))) {
return RedisModule_ReplyWithNull(ctx);
}
int nokey;
struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[RedisModule_GetSelectedDb(ctx)], argv[1], &nokey);
RedisModule_Assert(nokey == 0 && mem != NULL);
char buf[BLOCK_SIZE];
MemBlockRead(mem, block_index, buf, sizeof(buf));
/* Assuming that the contents are all c-style strings */
RedisModule_ReplyWithStringBuffer(ctx, buf, strlen(buf));
return REDISMODULE_OK;
}
/* MEM.USAGE dbid */
int MemUsage_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
if (argc != 2) {
return RedisModule_WrongArity(ctx);
}
long long dbid;
if ((RedisModule_StringToLongLong(argv[1], (long long *)&dbid) != REDISMODULE_OK)) {
return RedisModule_ReplyWithError(ctx, "ERR invalid value: must be a integer");
}
if (dbid < 0 || dbid >= MAX_DB) {
return RedisModule_ReplyWithError(ctx, "ERR dbid out of range");
}
long long size = 0, used = 0;
void *data;
RedisModuleString *key;
RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(mem_pool[dbid], "^", NULL, 0);
while((key = RedisModule_DictNext(ctx, iter, &data)) != NULL) {
int dbbackup = RedisModule_GetSelectedDb(ctx);
RedisModule_SelectDb(ctx, dbid);
RedisModuleKey *openkey = RedisModule_OpenKey(ctx, key, REDISMODULE_READ);
int type = RedisModule_KeyType(openkey);
RedisModule_Assert(type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(openkey) == MemAllocType);
MemAllocObject *o = RedisModule_ModuleTypeGetValue(openkey);
used += o->used;
size += o->size;
RedisModule_CloseKey(openkey);
RedisModule_SelectDb(ctx, dbbackup);
}
RedisModule_DictIteratorStop(iter);
RedisModule_ReplyWithArray(ctx, 4);
RedisModule_ReplyWithSimpleString(ctx, "total");
RedisModule_ReplyWithLongLong(ctx, size);
RedisModule_ReplyWithSimpleString(ctx, "used");
RedisModule_ReplyWithLongLong(ctx, used);
return REDISMODULE_OK;
}
/* MEM.ALLOCANDWRITE key block_num block_index data block_index data ... */
int MemAllocAndWrite_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
RedisModule_AutoMemory(ctx);
if (argc < 3) {
return RedisModule_WrongArity(ctx);
}
long long block_num;
if ((RedisModule_StringToLongLong(argv[2], &block_num) != REDISMODULE_OK) || block_num <= 0) {
return RedisModule_ReplyWithError(ctx, "ERR invalid block_num: must be a value greater than 0");
}
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ | REDISMODULE_WRITE);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY && RedisModule_ModuleTypeGetType(key) != MemAllocType) {
return RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
}
MemAllocObject *o;
if (type == REDISMODULE_KEYTYPE_EMPTY) {
o = createMemAllocObject();
RedisModule_ModuleTypeSetValue(key, MemAllocType, o);
} else {
o = RedisModule_ModuleTypeGetValue(key);
}
struct MemBlock *mem = MemBlockCreate(block_num);
RedisModule_Assert(mem != NULL);
RedisModule_DictSet(mem_pool[RedisModule_GetSelectedDb(ctx)], argv[1], mem);
o->used = 0;
o->mask = 0;
o->size = block_num;
int i = 3;
long long block_index;
for (; i < argc; i++) {
/* Security is guaranteed internally, so no security check. */
RedisModule_StringToLongLong(argv[i], &block_index);
size_t len;
const char * buf = RedisModule_StringPtrLen(argv[i + 1], &len);
MemBlockWrite(mem, block_index, buf, len);
o->used++;
o->mask |= (1UL << block_index);
}
RedisModule_ReplyWithSimpleString(ctx, "OK");
RedisModule_ReplicateVerbatim(ctx);
return REDISMODULE_OK;
}
/*---------------------------- type callbacks ------------------------------------*/
void *MemAllocRdbLoad(RedisModuleIO *rdb, int encver) {
if (encver != 0) {
return NULL;
}
MemAllocObject *o = createMemAllocObject();
o->size = RedisModule_LoadSigned(rdb);
o->used = RedisModule_LoadSigned(rdb);
o->mask = RedisModule_LoadUnsigned(rdb);
const RedisModuleString *key = RedisModule_GetKeyNameFromIO(rdb);
int dbid = RedisModule_GetDbIdFromIO(rdb);
if (o->size) {
size_t size;
char *tmpbuf;
long long num = o->size;
struct MemBlock *head = RedisModule_Calloc(1, sizeof(struct MemBlock));
tmpbuf = RedisModule_LoadStringBuffer(rdb, &size);
memcpy(head->block, tmpbuf, size > BLOCK_SIZE ? BLOCK_SIZE:size);
RedisModule_Free(tmpbuf);
struct MemBlock *block = head;
while (--num) {
block->next = RedisModule_Calloc(1, sizeof(struct MemBlock));
block = block->next;
tmpbuf = RedisModule_LoadStringBuffer(rdb, &size);
memcpy(block->block, tmpbuf, size > BLOCK_SIZE ? BLOCK_SIZE:size);
RedisModule_Free(tmpbuf);
}
RedisModule_DictSet(mem_pool[dbid], (RedisModuleString *)key, head);
}
return o;
}
void MemAllocRdbSave(RedisModuleIO *rdb, void *value) {
MemAllocObject *o = value;
RedisModule_SaveSigned(rdb, o->size);
RedisModule_SaveSigned(rdb, o->used);
RedisModule_SaveUnsigned(rdb, o->mask);
const RedisModuleString *key = RedisModule_GetKeyNameFromIO(rdb);
int dbid = RedisModule_GetDbIdFromIO(rdb);
if (o->size) {
int nokey;
struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[dbid], (RedisModuleString *)key, &nokey);
RedisModule_Assert(nokey == 0 && mem != NULL);
struct MemBlock *block = mem;
while (block) {
RedisModule_SaveStringBuffer(rdb, block->block, BLOCK_SIZE);
block = block->next;
}
}
}
void MemAllocAofRewrite(RedisModuleIO *aof, RedisModuleString *key, void *value) {
MemAllocObject *o = (MemAllocObject *)value;
if (o->size) {
int dbid = RedisModule_GetDbIdFromIO(aof);
int nokey;
size_t i = 0, j = 0;
struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[dbid], (RedisModuleString *)key, &nokey);
RedisModule_Assert(nokey == 0 && mem != NULL);
size_t array_size = o->size * 2;
RedisModuleString ** string_array = RedisModule_Calloc(array_size, sizeof(RedisModuleString *));
while (mem) {
string_array[i] = RedisModule_CreateStringFromLongLong(NULL, j);
string_array[i + 1] = RedisModule_CreateString(NULL, mem->block, BLOCK_SIZE);
mem = mem->next;
i += 2;
j++;
}
RedisModule_EmitAOF(aof, "mem.allocandwrite", "slv", key, o->size, string_array, array_size);
for (i = 0; i < array_size; i++) {
RedisModule_FreeString(NULL, string_array[i]);
}
RedisModule_Free(string_array);
} else {
RedisModule_EmitAOF(aof, "mem.allocandwrite", "sl", key, o->size);
}
}
void MemAllocFree(void *value) {
RedisModule_Free(value);
}
void MemAllocUnlink(RedisModuleString *key, const void *value) {
REDISMODULE_NOT_USED(key);
REDISMODULE_NOT_USED(value);
/* When unlink and unlink2 exist at the same time, we will only call unlink2. */
RedisModule_Assert(0);
}
void MemAllocUnlink2(RedisModuleKeyOptCtx *ctx, const void *value) {
MemAllocObject *o = (MemAllocObject *)value;
const RedisModuleString *key = RedisModule_GetKeyNameFromOptCtx(ctx);
int dbid = RedisModule_GetDbIdFromOptCtx(ctx);
if (o->size) {
void *oldval;
RedisModule_DictDel(mem_pool[dbid], (RedisModuleString *)key, &oldval);
RedisModule_Assert(oldval != NULL);
MemBlockFree((struct MemBlock *)oldval);
}
}
void MemAllocDigest(RedisModuleDigest *md, void *value) {
MemAllocObject *o = (MemAllocObject *)value;
RedisModule_DigestAddLongLong(md, o->size);
RedisModule_DigestAddLongLong(md, o->used);
RedisModule_DigestAddLongLong(md, o->mask);
int dbid = RedisModule_GetDbIdFromDigest(md);
const RedisModuleString *key = RedisModule_GetKeyNameFromDigest(md);
if (o->size) {
int nokey;
struct MemBlock *mem = (struct MemBlock *)RedisModule_DictGet(mem_pool[dbid], (RedisModuleString *)key, &nokey);
RedisModule_Assert(nokey == 0 && mem != NULL);
struct MemBlock *block = mem;
while (block) {
RedisModule_DigestAddStringBuffer(md, (unsigned char *)block->block, BLOCK_SIZE);
block = block->next;
}
}
}
void *MemAllocCopy2(RedisModuleKeyOptCtx *ctx, const void *value) {
const MemAllocObject *old = value;
MemAllocObject *new = createMemAllocObject();
new->size = old->size;
new->used = old->used;
new->mask = old->mask;
int from_dbid = RedisModule_GetDbIdFromOptCtx(ctx);
int to_dbid = RedisModule_GetToDbIdFromOptCtx(ctx);
const RedisModuleString *fromkey = RedisModule_GetKeyNameFromOptCtx(ctx);
const RedisModuleString *tokey = RedisModule_GetToKeyNameFromOptCtx(ctx);
if (old->size) {
int nokey;
struct MemBlock *oldmem = (struct MemBlock *)RedisModule_DictGet(mem_pool[from_dbid], (RedisModuleString *)fromkey, &nokey);
RedisModule_Assert(nokey == 0 && oldmem != NULL);
struct MemBlock *newmem = MemBlockClone(oldmem);
RedisModule_Assert(newmem != NULL);
RedisModule_DictSet(mem_pool[to_dbid], (RedisModuleString *)tokey, newmem);
}
return new;
}
size_t MemAllocMemUsage2(RedisModuleKeyOptCtx *ctx, const void *value) {
REDISMODULE_NOT_USED(ctx);
uint64_t size = 0;
MemAllocObject *o = (MemAllocObject *)value;
size += sizeof(*o);
size += o->size * sizeof(struct MemBlock);
return size;
}
size_t MemAllocMemFreeEffort2(RedisModuleKeyOptCtx *ctx, const void *value) {
REDISMODULE_NOT_USED(ctx);
MemAllocObject *o = (MemAllocObject *)value;
return o->size;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx, "datatype2", 1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
RedisModuleTypeMethods tm = {
.version = REDISMODULE_TYPE_METHOD_VERSION,
.rdb_load = MemAllocRdbLoad,
.rdb_save = MemAllocRdbSave,
.aof_rewrite = MemAllocAofRewrite,
.free = MemAllocFree,
.digest = MemAllocDigest,
.unlink = MemAllocUnlink,
// .defrag = MemAllocDefrag, // Tested in defragtest.c
.unlink2 = MemAllocUnlink2,
.copy2 = MemAllocCopy2,
.mem_usage2 = MemAllocMemUsage2,
.free_effort2 = MemAllocMemFreeEffort2,
};
MemAllocType = RedisModule_CreateDataType(ctx, "mem_alloc", 0, &tm);
if (MemAllocType == NULL) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "mem.alloc", MemAlloc_RedisCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "mem.free", MemFree_RedisCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "mem.write", MemWrite_RedisCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "mem.read", MemRead_RedisCommand, "readonly", 1, 1, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
if (RedisModule_CreateCommand(ctx, "mem.usage", MemUsage_RedisCommand, "readonly", 1, 1, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
/* used for internal aof rewrite */
if (RedisModule_CreateCommand(ctx, "mem.allocandwrite", MemAllocAndWrite_RedisCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
for(int i = 0; i < MAX_DB; i++){
mem_pool[i] = RedisModule_CreateDict(NULL);
}
RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_FlushDB, flushdbCallback);
RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_SwapDB, swapDbCallback);
return REDISMODULE_OK;
}

View File

@ -3,6 +3,7 @@
#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h"
#include <stdlib.h>
static RedisModuleType *FragType;
@ -147,6 +148,9 @@ int FragDefrag(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value)
unsigned long i = 0;
int steps = 0;
int dbid = RedisModule_GetDbIdFromDefragCtx(ctx);
RedisModule_Assert(dbid != -1);
/* Attempt to get cursor, validate it's what we're exepcting */
if (RedisModule_DefragCursorGet(ctx, &i) == REDISMODULE_OK) {
if (i > 0) datatype_resumes++;

View File

@ -0,0 +1,243 @@
set testmodule [file normalize tests/modules/datatype2.so]
start_server {tags {"modules"}} {
r module load $testmodule
test "datatype2: test mem alloc and free" {
r flushall
r select 0
assert_equal 3 [r mem.alloc k1 3]
assert_equal 2 [r mem.alloc k2 2]
r select 1
assert_equal 1 [r mem.alloc k1 1]
assert_equal 5 [r mem.alloc k2 5]
r select 0
assert_equal 1 [r mem.free k1]
assert_equal 1 [r mem.free k2]
r select 1
assert_equal 1 [r mem.free k1]
assert_equal 1 [r mem.free k2]
}
test "datatype2: test del and unlink" {
r flushall
assert_equal 100 [r mem.alloc k1 100]
assert_equal 60 [r mem.alloc k2 60]
assert_equal 1 [r unlink k1]
assert_equal 1 [r del k2]
}
test "datatype2: test read and write" {
r flushall
assert_equal 3 [r mem.alloc k1 3]
set data datatype2
assert_equal [string length $data] [r mem.write k1 0 $data]
assert_equal $data [r mem.read k1 0]
}
test "datatype2: test rdb save and load" {
r flushall
r select 0
set data k1
assert_equal 3 [r mem.alloc k1 3]
assert_equal [string length $data] [r mem.write k1 1 $data]
set data k2
assert_equal 2 [r mem.alloc k2 2]
assert_equal [string length $data] [r mem.write k2 0 $data]
r select 1
set data k3
assert_equal 3 [r mem.alloc k3 3]
assert_equal [string length $data] [r mem.write k3 1 $data]
set data k4
assert_equal 2 [r mem.alloc k4 2]
assert_equal [string length $data] [r mem.write k4 0 $data]
r bgsave
waitForBgsave r
r debug reload
r select 0
assert_equal k1 [r mem.read k1 1]
assert_equal k2 [r mem.read k2 0]
r select 1
assert_equal k3 [r mem.read k3 1]
assert_equal k4 [r mem.read k4 0]
}
test "datatype2: test aof rewrite" {
r flushall
r select 0
set data k1
assert_equal 3 [r mem.alloc k1 3]
assert_equal [string length $data] [r mem.write k1 1 $data]
set data k2
assert_equal 2 [r mem.alloc k2 2]
assert_equal [string length $data] [r mem.write k2 0 $data]
r select 1
set data k3
assert_equal 3 [r mem.alloc k3 3]
assert_equal [string length $data] [r mem.write k3 1 $data]
set data k4
assert_equal 2 [r mem.alloc k4 2]
assert_equal [string length $data] [r mem.write k4 0 $data]
r bgrewriteaof
waitForBgrewriteaof r
r debug loadaof
r select 0
assert_equal k1 [r mem.read k1 1]
assert_equal k2 [r mem.read k2 0]
r select 1
assert_equal k3 [r mem.read k3 1]
assert_equal k4 [r mem.read k4 0]
}
test "datatype2: test copy" {
r flushall
r select 0
set data k1
assert_equal 3 [r mem.alloc k1 3]
assert_equal [string length $data] [r mem.write k1 1 $data]
assert_equal $data [r mem.read k1 1]
set data k2
assert_equal 2 [r mem.alloc k2 2]
assert_equal [string length $data] [r mem.write k2 0 $data]
assert_equal $data [r mem.read k2 0]
r select 1
set data k3
assert_equal 3 [r mem.alloc k3 3]
assert_equal [string length $data] [r mem.write k3 1 $data]
set data k4
assert_equal 2 [r mem.alloc k4 2]
assert_equal [string length $data] [r mem.write k4 0 $data]
assert_equal {total 5 used 2} [r mem.usage 0]
assert_equal {total 5 used 2} [r mem.usage 1]
r select 0
assert_equal 1 [r copy k1 k3]
assert_equal k1 [r mem.read k3 1]
assert_equal {total 8 used 3} [r mem.usage 0]
assert_equal 1 [r copy k2 k1 db 1]
r select 1
assert_equal k2 [r mem.read k1 0]
assert_equal {total 8 used 3} [r mem.usage 0]
assert_equal {total 7 used 3} [r mem.usage 1]
}
test "datatype2: test swapdb" {
r flushall
r select 0
set data k1
assert_equal 5 [r mem.alloc k1 5]
assert_equal [string length $data] [r mem.write k1 1 $data]
assert_equal $data [r mem.read k1 1]
set data k2
assert_equal 4 [r mem.alloc k2 4]
assert_equal [string length $data] [r mem.write k2 0 $data]
assert_equal $data [r mem.read k2 0]
r select 1
set data k1
assert_equal 3 [r mem.alloc k3 3]
assert_equal [string length $data] [r mem.write k3 1 $data]
set data k2
assert_equal 2 [r mem.alloc k4 2]
assert_equal [string length $data] [r mem.write k4 0 $data]
assert_equal {total 9 used 2} [r mem.usage 0]
assert_equal {total 5 used 2} [r mem.usage 1]
assert_equal OK [r swapdb 0 1]
assert_equal {total 9 used 2} [r mem.usage 1]
assert_equal {total 5 used 2} [r mem.usage 0]
}
test "datatype2: test digest" {
r flushall
r select 0
set data k1
assert_equal 3 [r mem.alloc k1 3]
assert_equal [string length $data] [r mem.write k1 1 $data]
assert_equal $data [r mem.read k1 1]
set data k2
assert_equal 2 [r mem.alloc k2 2]
assert_equal [string length $data] [r mem.write k2 0 $data]
assert_equal $data [r mem.read k2 0]
r select 1
set data k1
assert_equal 3 [r mem.alloc k1 3]
assert_equal [string length $data] [r mem.write k1 1 $data]
assert_equal $data [r mem.read k1 1]
set data k2
assert_equal 2 [r mem.alloc k2 2]
assert_equal [string length $data] [r mem.write k2 0 $data]
assert_equal $data [r mem.read k2 0]
r select 0
set digest0 [r debug digest]
r select 1
set digest1 [r debug digest]
assert_equal $digest0 $digest1
}
test "datatype2: test memusage" {
r flushall
set data k1
assert_equal 3 [r mem.alloc k1 3]
assert_equal [string length $data] [r mem.write k1 1 $data]
assert_equal $data [r mem.read k1 1]
set data k2
assert_equal 3 [r mem.alloc k2 3]
assert_equal [string length $data] [r mem.write k2 0 $data]
assert_equal $data [r mem.read k2 0]
assert_equal [r memory usage k1] [r memory usage k2]
}
}