Modules: add defrag API support. (#8149)

Add a new set of defrag functions that take a defrag context and allow
defragmenting memory blocks and RedisModuleStrings.

Modules can register a defrag callback which will be invoked when the
defrag process handles globals.

Modules with custom data types can also register a datatype-specific
defrag callback which is invoked for keys that require defragmentation.
The callback and associated functions support both one-step and
multi-step options, depending on the complexity of the key as exposed by
the free_effort callback.
This commit is contained in:
Yossi Gottlieb 2020-12-13 09:56:01 +02:00 committed by GitHub
parent ddd43b6bc3
commit 63c1303cfb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 569 additions and 8 deletions

View File

@ -29,4 +29,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/blockedclient \
--single unit/moduleapi/getkeys \
--single unit/moduleapi/test_lazyfree \
--single unit/moduleapi/defrag \
"${@}"

View File

@ -794,6 +794,20 @@ long defragStream(redisDb *db, dictEntry *kde) {
return defragged;
}
/* Defrag a module key. This is either done immediately or scheduled
* for later. Returns then number of pointers defragged.
*/
long defragModule(redisDb *db, dictEntry *kde) {
robj *obj = dictGetVal(kde);
serverAssert(obj->type == OBJ_MODULE);
long defragged = 0;
if (!moduleDefragValue(dictGetKey(kde), obj, &defragged))
defragLater(db, kde);
return defragged;
}
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. Returns a stat of how many pointers were
* moved. */
@ -865,8 +879,7 @@ long defragKey(redisDb *db, dictEntry *de) {
} else if (ob->type == OBJ_STREAM) {
defragged += defragStream(db, de);
} else if (ob->type == OBJ_MODULE) {
/* Currently defragmenting modules private data types
* is not supported. */
defragged += defragModule(db, de);
} else {
serverPanic("Unknown object type");
}
@ -928,6 +941,7 @@ long defragOtherGlobals() {
* that remain static for a long time */
defragged += activeDefragSdsDict(server.lua_scripts, DEFRAG_SDS_DICT_VAL_IS_STROB);
defragged += activeDefragSdsListAndDict(server.repl_scriptcache_fifo, server.repl_scriptcache_dict, DEFRAG_SDS_DICT_NO_VAL);
defragged += moduleDefragGlobals();
return defragged;
}
@ -946,6 +960,8 @@ int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
server.stat_active_defrag_hits += scanLaterHash(ob, cursor);
} else if (ob->type == OBJ_STREAM) {
return scanLaterStraemListpacks(ob, cursor, endtime, &server.stat_active_defrag_hits);
} else if (ob->type == OBJ_MODULE) {
return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, &server.stat_active_defrag_hits);
} else {
*cursor = 0; /* object type may have changed since we schedule it for later */
}
@ -1184,4 +1200,15 @@ void activeDefragCycle(void) {
/* Not implemented yet. */
}
void *activeDefragAlloc(void *ptr) {
UNUSED(ptr);
return NULL;
}
robj *activeDefragStringOb(robj *ob, long *defragged) {
UNUSED(ob);
UNUSED(defragged);
return NULL;
}
#endif

View File

@ -50,6 +50,7 @@ typedef struct RedisModuleInfoCtx {
} RedisModuleInfoCtx;
typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_report);
typedef void (*RedisModuleDefragFunc)(struct RedisModuleDefragCtx *ctx);
/* This structure represents a module inside the system. */
struct RedisModule {
@ -66,6 +67,7 @@ struct RedisModule {
int options; /* Module options and capabilities. */
int blocked_clients; /* Count of RedisModuleBlockedClient in this module. */
RedisModuleInfoFunc info_cb; /* Callback for module to add INFO fields. */
RedisModuleDefragFunc defrag_cb; /* Callback for global data defrag. */
};
typedef struct RedisModule RedisModule;
@ -875,6 +877,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api
module->in_hook = 0;
module->options = 0;
module->info_cb = 0;
module->defrag_cb = 0;
ctx->module = module;
}
@ -3694,9 +3697,10 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) {
* .mem_usage = myType_MemUsageCallBack,
* .aux_load = myType_AuxRDBLoadCallBack,
* .aux_save = myType_AuxRDBSaveCallBack,
* .free_effort = myType_FreeEffortCallBack
* .unlink = myType_UnlinkCallBack
* .copy = myType_CopyCallback
* .free_effort = myType_FreeEffortCallBack,
* .unlink = myType_UnlinkCallBack,
* .copy = myType_CopyCallback,
* .defrag = myType_DefragCallback
* }
*
* * **rdb_load**: A callback function pointer that loads data from RDB files.
@ -3723,6 +3727,21 @@ robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value) {
* Note: if the target key exists and is being overwritten, the copy callback will be
* called first, followed by a free callback to the value that is being replaced.
*
* * **defrag**: A callback function pointer that is used to request the module to defrag
* a key. The module should then iterate pointers and call the relevant RM_Defrag*()
* functions to defragment pointers or complex types. The module should continue
* iterating as long as RM_DefragShouldStop() returns a zero value, and return a
* zero value if finished or non-zero value if more work is left to be done. If more work
* needs to be done, RM_DefragCursorSet() and RM_DefragCursorGet() can be used to track
* this work across different calls.
* Normally, the defrag mechanism invokes the callback without a time limit, so
* RM_DefragShouldStop() always returns zero. The "late defrag" mechanism which has
* a time limit and provides cursor support is used only for keys that are determined
* to have significant internal complexity. To determine this, the defrag mechanism
* uses the free_effort callback and the 'active-defrag-max-scan-fields' config directive.
* NOTE: The value is passed as a void** and the function is expected to update the
* pointer if the top-level value pointer is defragmented and consequentially changes.
*
* Note: the module name "AAAAAAAAA" is reserved and produces an error, it
* happens to be pretty lame as well.
*
@ -3766,6 +3785,7 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
moduleTypeFreeEffortFunc free_effort;
moduleTypeUnlinkFunc unlink;
moduleTypeCopyFunc copy;
moduleTypeDefragFunc defrag;
} v3;
} *tms = (struct typemethods*) typemethods_ptr;
@ -3787,6 +3807,7 @@ moduleType *RM_CreateDataType(RedisModuleCtx *ctx, const char *name, int encver,
mt->free_effort = tms->v3.free_effort;
mt->unlink = tms->v3.unlink;
mt->copy = tms->v3.copy;
mt->defrag = tms->v3.defrag;
}
memcpy(mt->name,name,sizeof(mt->name));
listAddNodeTail(ctx->module->types,mt);
@ -8165,6 +8186,205 @@ int *RM_GetCommandKeys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc,
return res;
}
/* The defrag context, used to manage state during calls to the data type
* defrag callback.
*/
typedef struct RedisModuleDefragCtx {
long defragged;
long long int endtime;
unsigned long *cursor;
} RedisModuleDefragCtx;
/* Register a defrag callback for global data, i.e. anything that the module
* may allocate that is not tied to a specific data type.
*/
int RM_RegisterDefragFunc(RedisModuleCtx *ctx, RedisModuleDefragFunc cb) {
ctx->module->defrag_cb = cb;
return REDISMODULE_OK;
}
/* When the data type defrag callback iterates complex structures, this
* function should be called periodically. A zero (false) return
* indicates the callback may continue its work. A non-zero value (true)
* indicates it should stop.
*
* When stopped, the callback may use RM_DefragCursorSet() to store its
* position so it can later use RM_DefragCursorGet() to resume defragging.
*
* When stopped and more work is left to be done, the callback should
* return 1. Otherwise, it should return 0.
*
* NOTE: Modules should consider the frequency in which this function is called,
* so it generally makes sense to do small batches of work in between calls.
*/
int RM_DefragShouldStop(RedisModuleDefragCtx *ctx) {
return (ctx->endtime != 0 && ctx->endtime < ustime());
}
/* Store an arbitrary cursor value for future re-use.
*
* This should only be called if RM_DefragShouldStop() has returned a non-zero
* value and the defrag callback is about to exit without fully iterating its
* data type.
*
* This behavior is reserved to cases where late defrag is performed. Late
* defrag is selected for keys that implement the free_effort callback and
* return a free_effort value that is larger than the defrag
* 'active-defrag-max-scan-fields' configuration directive.
*
* Smaller keys, keys that do not implement free_effort or the global
* defrag callback are not called in late-defrag mode. In those cases, a
* call to this function will return REDISMODULE_ERR.
*
* The cursor may be used by the module to represent some progress into the
* module's data type. Modules may also store additional cursor-related
* information locally and use the cursor as a flag that indicates when
* traversal of a new key begins. This is possible because the API makes
* a guarantee that concurrent defragmentation of multiple keys will
* not be performed.
*/
int RM_DefragCursorSet(RedisModuleDefragCtx *ctx, unsigned long cursor) {
if (!ctx->cursor)
return REDISMODULE_ERR;
*ctx->cursor = cursor;
return REDISMODULE_OK;
}
/* Fetch a cursor value that has been previously stored using RM_DefragCursorSet().
*
* If not called for a late defrag operation, REDISMODULE_ERR will be returned and
* the cursor should be ignored. See DM_DefragCursorSet() for more details on
* defrag cursors.
*/
int RM_DefragCursorGet(RedisModuleDefragCtx *ctx, unsigned long *cursor) {
if (!ctx->cursor)
return REDISMODULE_ERR;
*cursor = *ctx->cursor;
return REDISMODULE_OK;
}
/* Defrag a memory allocation previously allocated by RM_Alloc, RM_Calloc, etc.
* The defragmentation process involves allocating a new memory block and copying
* the contents to it, like realloc().
*
* If defragmentation was not necessary, NULL is returned and the operation has
* no other effect.
*
* If a non-NULL value is returned, the caller should use the new pointer instead
* of the old one and update any reference to the old pointer, which must not
* be used again.
*/
void *RM_DefragAlloc(RedisModuleDefragCtx *ctx, void *ptr) {
void *newptr = activeDefragAlloc(ptr);
if (newptr)
ctx->defragged++;
return newptr;
}
/* Defrag a RedisModuleString previously allocated by RM_Alloc, RM_Calloc, etc.
* See RM_DefragAlloc() for more information on how the defragmentation process
* works.
*
* NOTE: It is only possible to defrag strings that have a single reference.
* Typically this means strings retained with RM_RetainString or RM_HoldString
* may not be defragmentable. One exception is command argvs which, if retained
* by the module, will end up with a single reference (because the reference
* on the Redis side is dropped as soon as the command callback returns).
*/
RedisModuleString *RM_DefragRedisModuleString(RedisModuleDefragCtx *ctx, RedisModuleString *str) {
return activeDefragStringOb(str, &ctx->defragged);
}
/* Perform a late defrag of a module datatype key.
*
* Returns a zero value (and initializes the cursor) if no more needs to be done,
* or a non-zero value otherwise.
*/
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged) {
moduleValue *mv = value->ptr;
moduleType *mt = mv->type;
RedisModuleDefragCtx defrag_ctx = { 0, endtime, cursor };
/* Invoke callback. Note that the callback may be missing if the key has been
* replaced with a different type since our last visit.
*/
int ret = 0;
if (mt->defrag)
ret = mt->defrag(&defrag_ctx, key, &mv->value);
*defragged += defrag_ctx.defragged;
if (!ret) {
*cursor = 0; /* No more work to do */
return 0;
}
return 1;
}
/* Attempt to defrag a module data type value. Depending on complexity,
* the operation may happen immediately or be scheduled for later.
*
* Returns 1 if the operation has been completed or 0 if it needs to
* be scheduled for late defrag.
*/
int moduleDefragValue(robj *key, robj *value, long *defragged) {
moduleValue *mv = value->ptr;
moduleType *mt = mv->type;
/* Try to defrag moduleValue itself regardless of whether or not
* defrag callbacks are provided.
*/
moduleValue *newmv = activeDefragAlloc(mv);
if (newmv) {
(*defragged)++;
value->ptr = mv = newmv;
}
if (!mt->defrag)
return 1;
/* Use free_effort to determine complexity of module value, and if
* necessary schedule it for defragLater instead of quick immediate
* defrag.
*/
if (mt->free_effort) {
size_t effort = mt->free_effort(key, mv->value);
if (!effort)
effort = SIZE_MAX;
if (effort > server.active_defrag_max_scan_fields) {
return 0; /* Defrag later */
}
}
RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL };
mt->defrag(&defrag_ctx, key, &mv->value);
(*defragged) += defrag_ctx.defragged;
return 1;
}
/* Call registered module API defrag functions */
long moduleDefragGlobals(void) {
dictIterator *di = dictGetIterator(modules);
dictEntry *de;
long defragged = 0;
while ((de = dictNext(di)) != NULL) {
struct RedisModule *module = dictGetVal(de);
if (!module->defrag_cb)
continue;
RedisModuleDefragCtx defrag_ctx = { 0, 0, NULL };
module->defrag_cb(&defrag_ctx);
defragged += defrag_ctx.defragged;
}
return defragged;
}
/* Register all the APIs we export. Keep this function at the end of the
* file so that's easy to seek it to add new entries. */
void moduleRegisterCoreAPI(void) {
@ -8408,4 +8628,10 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(GetClientCertificate);
REGISTER_API(GetCommandKeys);
REGISTER_API(GetTypeMethodVersion);
REGISTER_API(RegisterDefragFunc);
REGISTER_API(DefragAlloc);
REGISTER_API(DefragRedisModuleString);
REGISTER_API(DefragShouldStop);
REGISTER_API(DefragCursorSet);
REGISTER_API(DefragCursorGet);
}

View File

@ -225,6 +225,7 @@ typedef struct RedisModuleEvent {
} RedisModuleEvent;
struct RedisModuleCtx;
struct RedisModuleDefragCtx;
typedef void (*RedisModuleEventCallback)(struct RedisModuleCtx *ctx, RedisModuleEvent eid, uint64_t subevent, void *data);
static const RedisModuleEvent
@ -479,6 +480,7 @@ typedef struct RedisModuleCommandFilter RedisModuleCommandFilter;
typedef struct RedisModuleInfoCtx RedisModuleInfoCtx;
typedef struct RedisModuleServerInfoData RedisModuleServerInfoData;
typedef struct RedisModuleScanCursor RedisModuleScanCursor;
typedef struct RedisModuleDefragCtx RedisModuleDefragCtx;
typedef struct RedisModuleUser RedisModuleUser;
typedef int (*RedisModuleCmdFunc)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc);
@ -495,6 +497,7 @@ typedef void (*RedisModuleTypeFreeFunc)(void *value);
typedef size_t (*RedisModuleTypeFreeEffortFunc)(RedisModuleString *key, const void *value);
typedef void (*RedisModuleTypeUnlinkFunc)(RedisModuleString *key, const void *value);
typedef void *(*RedisModuleTypeCopyFunc)(RedisModuleString *fromkey, RedisModuleString *tokey, const void *value);
typedef int (*RedisModuleTypeDefragFunc)(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value);
typedef void (*RedisModuleClusterMessageReceiver)(RedisModuleCtx *ctx, const char *sender_id, uint8_t type, const unsigned char *payload, uint32_t len);
typedef void (*RedisModuleTimerProc)(RedisModuleCtx *ctx, void *data);
typedef void (*RedisModuleCommandFilterFunc) (RedisModuleCommandFilterCtx *filter);
@ -503,6 +506,7 @@ typedef void (*RedisModuleInfoFunc)(RedisModuleInfoCtx *ctx, int for_crash_repor
typedef void (*RedisModuleScanCB)(RedisModuleCtx *ctx, RedisModuleString *keyname, RedisModuleKey *key, void *privdata);
typedef void (*RedisModuleScanKeyCB)(RedisModuleKey *key, RedisModuleString *field, RedisModuleString *value, void *privdata);
typedef void (*RedisModuleUserChangedFunc) (uint64_t client_id, void *privdata);
typedef int (*RedisModuleDefragFunc)(RedisModuleDefragCtx *ctx);
typedef struct RedisModuleTypeMethods {
uint64_t version;
@ -518,6 +522,7 @@ typedef struct RedisModuleTypeMethods {
RedisModuleTypeFreeEffortFunc free_effort;
RedisModuleTypeUnlinkFunc unlink;
RedisModuleTypeCopyFunc copy;
RedisModuleTypeDefragFunc defrag;
} RedisModuleTypeMethods;
#define REDISMODULE_GET_API(name) \
@ -776,6 +781,12 @@ REDISMODULE_API int (*RedisModule_AuthenticateClientWithUser)(RedisModuleCtx *ct
REDISMODULE_API int (*RedisModule_DeauthenticateAndCloseClient)(RedisModuleCtx *ctx, uint64_t client_id) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleString * (*RedisModule_GetClientCertificate)(RedisModuleCtx *ctx, uint64_t id) REDISMODULE_ATTR;
REDISMODULE_API int *(*RedisModule_GetCommandKeys)(RedisModuleCtx *ctx, RedisModuleString **argv, int argc, int *num_keys) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_RegisterDefragFunc)(RedisModuleCtx *ctx, RedisModuleDefragFunc func) REDISMODULE_ATTR;
REDISMODULE_API void *(*RedisModule_DefragAlloc)(RedisModuleDefragCtx *ctx, void *ptr) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleString *(*RedisModule_DefragRedisModuleString)(RedisModuleDefragCtx *ctx, RedisModuleString *str) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_DefragShouldStop)(RedisModuleDefragCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_DefragCursorSet)(RedisModuleDefragCtx *ctx, unsigned long cursor) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_DefragCursorGet)(RedisModuleDefragCtx *ctx, unsigned long *cursor) REDISMODULE_ATTR;
#endif
#define RedisModule_IsAOFClient(id) ((id) == CLIENT_ID_AOF)
@ -1025,6 +1036,12 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(AuthenticateClientWithUser);
REDISMODULE_GET_API(GetClientCertificate);
REDISMODULE_GET_API(GetCommandKeys);
REDISMODULE_GET_API(RegisterDefragFunc);
REDISMODULE_GET_API(DefragAlloc);
REDISMODULE_GET_API(DefragRedisModuleString);
REDISMODULE_GET_API(DefragShouldStop);
REDISMODULE_GET_API(DefragCursorSet);
REDISMODULE_GET_API(DefragCursorGet);
#endif
if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR;

View File

@ -514,6 +514,7 @@ struct RedisModuleIO;
struct RedisModuleDigest;
struct RedisModuleCtx;
struct redisObject;
struct RedisModuleDefragCtx;
/* Each module type implementation should export a set of methods in order
* to serialize and deserialize the value in the RDB file, rewrite the AOF
@ -530,6 +531,7 @@ typedef void (*moduleTypeFreeFunc)(void *value);
typedef size_t (*moduleTypeFreeEffortFunc)(struct redisObject *key, const void *value);
typedef void (*moduleTypeUnlinkFunc)(struct redisObject *key, void *value);
typedef void *(*moduleTypeCopyFunc)(struct redisObject *fromkey, struct redisObject *tokey, const void *value);
typedef int (*moduleTypeDefragFunc)(struct RedisModuleDefragCtx *ctx, struct redisObject *key, void **value);
/* This callback type is called by moduleNotifyUserChanged() every time
* a user authenticated via the module API is associated with a different
@ -552,6 +554,7 @@ typedef struct RedisModuleType {
moduleTypeFreeEffortFunc free_effort;
moduleTypeUnlinkFunc unlink;
moduleTypeCopyFunc copy;
moduleTypeDefragFunc defrag;
moduleTypeAuxLoadFunc aux_load;
moduleTypeAuxSaveFunc aux_save;
int aux_save_triggers;
@ -1699,6 +1702,9 @@ int moduleClientIsBlockedOnKeys(client *c);
void moduleNotifyUserChanged(client *c);
void moduleNotifyKeyUnlink(robj *key, robj *val);
robj *moduleTypeDupOrReply(client *c, robj *fromkey, robj *tokey, robj *value);
int moduleDefragValue(robj *key, robj *obj, long *defragged);
int moduleLateDefrag(robj *key, robj *value, unsigned long *cursor, long long endtime, long long *defragged);
long moduleDefragGlobals(void);
/* Utils */
long long ustime(void);
@ -2138,6 +2144,8 @@ void freeMemoryOverheadData(struct redisMemOverhead *mh);
void checkChildrenDone(void);
int setOOMScoreAdj(int process_class);
void rejectCommandFormat(client *c, const char *fmt, ...);
void *activeDefragAlloc(void *ptr);
robj *activeDefragStringOb(robj* ob, long *defragged);
#define RESTART_SERVER_NONE 0
#define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */

View File

@ -28,6 +28,7 @@ TEST_MODULES = \
getkeys.so \
test_lazyfree.so \
timer.so \
defragtest.so
.PHONY: all

234
tests/modules/defragtest.c Normal file
View File

@ -0,0 +1,234 @@
/* A module that implements defrag callback mechanisms.
*/
#define REDISMODULE_EXPERIMENTAL_API
#include "redismodule.h"
static RedisModuleType *FragType;
struct FragObject {
unsigned long len;
void **values;
int maxstep;
};
/* Make sure we get the expected cursor */
unsigned long int last_set_cursor = 0;
unsigned long int datatype_attempts = 0;
unsigned long int datatype_defragged = 0;
unsigned long int datatype_resumes = 0;
unsigned long int datatype_wrong_cursor = 0;
unsigned long int global_attempts = 0;
unsigned long int global_defragged = 0;
int global_strings_len = 0;
RedisModuleString **global_strings = NULL;
static void createGlobalStrings(RedisModuleCtx *ctx, int count)
{
global_strings_len = count;
global_strings = RedisModule_Alloc(sizeof(RedisModuleString *) * count);
for (int i = 0; i < count; i++) {
global_strings[i] = RedisModule_CreateStringFromLongLong(ctx, i);
}
}
static int defragGlobalStrings(RedisModuleDefragCtx *ctx)
{
for (int i = 0; i < global_strings_len; i++) {
RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, global_strings[i]);
global_attempts++;
if (new != NULL) {
global_strings[i] = new;
global_defragged++;
}
}
return 0;
}
static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) {
REDISMODULE_NOT_USED(for_crash_report);
RedisModule_InfoAddSection(ctx, "stats");
RedisModule_InfoAddFieldLongLong(ctx, "datatype_attempts", datatype_attempts);
RedisModule_InfoAddFieldLongLong(ctx, "datatype_defragged", datatype_defragged);
RedisModule_InfoAddFieldLongLong(ctx, "datatype_resumes", datatype_resumes);
RedisModule_InfoAddFieldLongLong(ctx, "datatype_wrong_cursor", datatype_wrong_cursor);
RedisModule_InfoAddFieldLongLong(ctx, "global_attempts", global_attempts);
RedisModule_InfoAddFieldLongLong(ctx, "global_defragged", global_defragged);
}
struct FragObject *createFragObject(unsigned long len, unsigned long size, int maxstep) {
struct FragObject *o = RedisModule_Alloc(sizeof(*o));
o->len = len;
o->values = RedisModule_Alloc(sizeof(RedisModuleString*) * len);
o->maxstep = maxstep;
for (unsigned long i = 0; i < len; i++) {
o->values[i] = RedisModule_Calloc(1, size);
}
return o;
}
/* FRAG.RESETSTATS */
static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
datatype_attempts = 0;
datatype_defragged = 0;
datatype_resumes = 0;
datatype_wrong_cursor = 0;
global_attempts = 0;
global_defragged = 0;
RedisModule_ReplyWithSimpleString(ctx, "OK");
return REDISMODULE_OK;
}
/* FRAG.CREATE key len size maxstep */
static int fragCreateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 5)
return RedisModule_WrongArity(ctx);
RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
REDISMODULE_READ|REDISMODULE_WRITE);
int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_EMPTY)
{
return RedisModule_ReplyWithError(ctx, "ERR key exists");
}
long long len;
if ((RedisModule_StringToLongLong(argv[2], &len) != REDISMODULE_OK)) {
return RedisModule_ReplyWithError(ctx, "ERR invalid len");
}
long long size;
if ((RedisModule_StringToLongLong(argv[3], &size) != REDISMODULE_OK)) {
return RedisModule_ReplyWithError(ctx, "ERR invalid size");
}
long long maxstep;
if ((RedisModule_StringToLongLong(argv[4], &maxstep) != REDISMODULE_OK)) {
return RedisModule_ReplyWithError(ctx, "ERR invalid maxstep");
}
struct FragObject *o = createFragObject(len, size, maxstep);
RedisModule_ModuleTypeSetValue(key, FragType, o);
RedisModule_ReplyWithSimpleString(ctx, "OK");
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
void FragFree(void *value) {
struct FragObject *o = value;
for (unsigned long i = 0; i < o->len; i++)
RedisModule_Free(o->values[i]);
RedisModule_Free(o->values);
RedisModule_Free(o);
}
size_t FragFreeEffort(RedisModuleString *key, const void *value) {
REDISMODULE_NOT_USED(key);
const struct FragObject *o = value;
return o->len;
}
int FragDefrag(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value) {
REDISMODULE_NOT_USED(key);
unsigned long i = 0;
int steps = 0;
/* Attempt to get cursor, validate it's what we're exepcting */
if (RedisModule_DefragCursorGet(ctx, &i) == REDISMODULE_OK) {
if (i > 0) datatype_resumes++;
/* Validate we're expecting this cursor */
if (i != last_set_cursor) datatype_wrong_cursor++;
} else {
if (last_set_cursor != 0) datatype_wrong_cursor++;
}
/* Attempt to defrag the object itself */
datatype_attempts++;
struct FragObject *o = RedisModule_DefragAlloc(ctx, *value);
if (o == NULL) {
/* Not defragged */
o = *value;
} else {
/* Defragged */
*value = o;
datatype_defragged++;
}
/* Deep defrag now */
for (; i < o->len; i++) {
datatype_attempts++;
void *new = RedisModule_DefragAlloc(ctx, o->values[i]);
if (new) {
o->values[i] = new;
datatype_defragged++;
}
if ((o->maxstep && ++steps > o->maxstep) ||
((i % 64 == 0) && RedisModule_DefragShouldStop(ctx)))
{
RedisModule_DefragCursorSet(ctx, i);
last_set_cursor = i;
return 1;
}
}
last_set_cursor = 0;
return 0;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx, "defragtest", 1, REDISMODULE_APIVER_1)
== REDISMODULE_ERR) return REDISMODULE_ERR;
if (RedisModule_GetTypeMethodVersion() < REDISMODULE_TYPE_METHOD_VERSION) {
return REDISMODULE_ERR;
}
long long glen;
if (argc != 1 || RedisModule_StringToLongLong(argv[0], &glen) == REDISMODULE_ERR) {
return REDISMODULE_ERR;
}
createGlobalStrings(ctx, glen);
RedisModuleTypeMethods tm = {
.version = REDISMODULE_TYPE_METHOD_VERSION,
.free = FragFree,
.free_effort = FragFreeEffort,
.defrag = FragDefrag
};
FragType = RedisModule_CreateDataType(ctx, "frag_type", 0, &tm);
if (FragType == NULL) return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "frag.create",
fragCreateCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "frag.resetstats",
fragResetStatsCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
RedisModule_RegisterInfoFunc(ctx, FragInfo);
RedisModule_RegisterDefragFunc(ctx, defragGlobalStrings);
return REDISMODULE_OK;
}

View File

@ -46,13 +46,17 @@ proc warnings_from_file {filename} {
join $result "\n"
}
# Return value for INFO property
proc status {r property} {
if {[regexp "\r\n$property:(.*?)\r\n" [{*}$r info] _ value]} {
proc getInfoProperty {infostr property} {
if {[regexp "\r\n$property:(.*?)\r\n" $infostr _ value]} {
set _ $value
}
}
# Return value for INFO property
proc status {r property} {
set _ [getInfoProperty [{*}$r info] $property]
}
proc waitForBgsave r {
while 1 {
if {[status r rdb_bgsave_in_progress] eq 1} {

View File

@ -0,0 +1,43 @@
set testmodule [file normalize tests/modules/defragtest.so]
start_server {tags {"modules"} overrides {{save ""}}} {
r module load $testmodule 10000
r config set hz 100
test {Module defrag: simple key defrag works} {
r frag.create key1 1 1000 0
r config set active-defrag-ignore-bytes 1
r config set active-defrag-threshold-lower 0
r config set active-defrag-cycle-min 99
r config set activedefrag yes
after 2000
set info [r info defragtest_stats]
assert {[getInfoProperty $info defragtest_datatype_attempts] > 0}
assert_equal 0 [getInfoProperty $info defragtest_datatype_resumes]
}
test {Module defrag: late defrag with cursor works} {
r flushdb
r frag.resetstats
# key can only be defragged in no less than 10 iterations
# due to maxstep
r frag.create key2 10000 100 1000
after 2000
set info [r info defragtest_stats]
assert {[getInfoProperty $info defragtest_datatype_resumes] > 10}
assert_equal 0 [getInfoProperty $info defragtest_datatype_wrong_cursor]
}
test {Module defrag: global defrag works} {
r flushdb
r frag.resetstats
after 2000
set info [r info defragtest_stats]
assert {[getInfoProperty $info defragtest_global_attempts] > 0}
}
}