Cleanup: propagate and alsoPropagate do not need redisCommand (#9502)

The `cmd` argument was completely unused, and all the code that bothered to pass it was unnecessary.
This is a prepartion for a future commit that treats subcommands as commands
This commit is contained in:
guybe7 2021-09-15 11:53:42 +02:00 committed by GitHub
parent 03fcc211de
commit 7759ec7c43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 30 additions and 75 deletions

View File

@ -802,7 +802,7 @@ int loadAppendOnlyFile(char *filename) {
goto cleanup;
}
if (cmd == server.multiCommand) valid_before_multi = valid_up_to;
if (cmd->proc == multiCommand) valid_before_multi = valid_up_to;
/* Run the command in the context of a fake client */
fakeClient->cmd = fakeClient->lastcmd = cmd;

View File

@ -345,15 +345,10 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
/* Replicate the command. */
robj *argv[2];
struct redisCommand *cmd = where == ZSET_MIN ?
server.zpopminCommand :
server.zpopmaxCommand;
argv[0] = createStringObject(cmd->name,strlen(cmd->name));
argv[0] = where == ZSET_MIN ? shared.zpopmin : shared.zpopmax;
argv[1] = rl->key;
incrRefCount(rl->key);
propagate(cmd,receiver->db->id,
argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
propagate(receiver->db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[1]);
}
}

View File

@ -1451,7 +1451,7 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
* Even if module executed a command without asking for propagation. */
int prev_replication_allowed = server.replication_allowed;
server.replication_allowed = 1;
propagate(server.delCommand,db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
propagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
server.replication_allowed = prev_replication_allowed;
decrRefCount(argv[0]);

View File

@ -630,8 +630,7 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
/* Handle the replication of the final EXEC, since whatever a command
* emits is always wrapped around MULTI/EXEC. */
alsoPropagate(server.execCommand,c->db->id,&shared.exec,1,
PROPAGATE_AOF|PROPAGATE_REPL);
alsoPropagate(c->db->id,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL);
afterPropagateExec();
/* If this is not a module command context (but is instead a simple
@ -645,7 +644,7 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
redisOp *rop = &server.also_propagate.ops[j];
int target = rop->target;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
propagate(rop->dbid,rop->argv,rop->argc,target);
}
redisOpArrayFree(&server.also_propagate);
/* Restore the previous oparray in case of nexted use of the API. */
@ -2260,10 +2259,10 @@ int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...)
* will stop being used, so accumulating stuff does not make much sense,
* nor we could easily use the alsoPropagate() API from threads. */
if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) {
propagate(cmd,ctx->client->db->id,argv,argc,target);
propagate(ctx->client->db->id,argv,argc,target);
} else {
moduleReplicateMultiIfNeeded(ctx);
alsoPropagate(cmd,ctx->client->db->id,argv,argc,target);
alsoPropagate(ctx->client->db->id,argv,argc,target);
}
/* Release the argv. */
@ -2285,7 +2284,7 @@ int RM_Replicate(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...)
*
* The function always returns REDISMODULE_OK. */
int RM_ReplicateVerbatim(RedisModuleCtx *ctx) {
alsoPropagate(ctx->client->cmd,ctx->client->db->id,
alsoPropagate(ctx->client->db->id,
ctx->client->argv,ctx->client->argc,
PROPAGATE_AOF|PROPAGATE_REPL);
server.dirty++;

View File

@ -129,13 +129,11 @@ void afterPropagateExec() {
* implementation for more information. */
void execCommandPropagateMulti(int dbid) {
beforePropagateMulti();
propagate(server.multiCommand,dbid,&shared.multi,1,
PROPAGATE_AOF|PROPAGATE_REPL);
propagate(dbid,&shared.multi,1,PROPAGATE_AOF|PROPAGATE_REPL);
}
void execCommandPropagateExec(int dbid) {
propagate(server.execCommand,dbid,&shared.exec,1,
PROPAGATE_AOF|PROPAGATE_REPL);
propagate(dbid,&shared.exec,1,PROPAGATE_AOF|PROPAGATE_REPL);
afterPropagateExec();
}

View File

@ -3001,6 +3001,8 @@ void createSharedObjects(void) {
shared.persist = createStringObject("PERSIST",7);
shared.set = createStringObject("SET",3);
shared.eval = createStringObject("EVAL",4);
shared.zpopmin = createStringObject("ZPOPMIN",7);
shared.zpopmax = createStringObject("ZPOPMAX",7);
/* Shared command argument */
shared.left = createStringObject("left",4);
@ -3157,21 +3159,6 @@ void initServerConfig(void) {
server.commands = dictCreate(&commandTableDictType);
server.orig_commands = dictCreate(&commandTableDictType);
populateCommandTable();
server.delCommand = lookupCommandByCString("del");
server.multiCommand = lookupCommandByCString("multi");
server.lpushCommand = lookupCommandByCString("lpush");
server.lpopCommand = lookupCommandByCString("lpop");
server.rpopCommand = lookupCommandByCString("rpop");
server.zpopminCommand = lookupCommandByCString("zpopmin");
server.zpopmaxCommand = lookupCommandByCString("zpopmax");
server.sremCommand = lookupCommandByCString("srem");
server.execCommand = lookupCommandByCString("exec");
server.expireCommand = lookupCommandByCString("expire");
server.pexpireCommand = lookupCommandByCString("pexpire");
server.xclaimCommand = lookupCommandByCString("xclaim");
server.xgroupCommand = lookupCommandByCString("xgroup");
server.rpoplpushCommand = lookupCommandByCString("rpoplpush");
server.lmoveCommand = lookupCommandByCString("lmove");
/* Debugging */
server.watchdog_period = 0;
@ -4013,14 +4000,11 @@ void redisOpArrayInit(redisOpArray *oa) {
oa->numops = 0;
}
int redisOpArrayAppend(redisOpArray *oa, struct redisCommand *cmd, int dbid,
robj **argv, int argc, int target)
{
int redisOpArrayAppend(redisOpArray *oa, int dbid, robj **argv, int argc, int target) {
redisOp *op;
oa->ops = zrealloc(oa->ops,sizeof(redisOp)*(oa->numops+1));
op = oa->ops+oa->numops;
op->cmd = cmd;
op->dbid = dbid;
op->argv = argv;
op->argc = argc;
@ -4089,11 +4073,7 @@ struct redisCommand *lookupCommandOrOriginal(sds name) {
* command execution, for example when serving a blocked client, you
* want to use propagate().
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
UNUSED(cmd);
void propagate(int dbid, robj **argv, int argc, int flags) {
if (!server.replication_allowed)
return;
@ -4118,8 +4098,7 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
/* Used inside commands to schedule the propagation of additional commands
* after the current command is propagated to AOF / Replication.
*
* 'cmd' must be a pointer to the Redis command to replicate, dbid is the
* database ID the command should be propagated into.
* dbid is the database ID the command should be propagated into.
* Arguments of the command to propagate are passed as an array of redis
* objects pointers of len 'argc', using the 'argv' vector.
*
@ -4127,9 +4106,7 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
* so it is up to the caller to release the passed argv (but it is usually
* stack allocated). The function automatically increments ref count of
* passed objects, so the caller does not need to. */
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int target)
{
void alsoPropagate(int dbid, robj **argv, int argc, int target) {
robj **argvcopy;
int j;
@ -4140,7 +4117,7 @@ void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
argvcopy[j] = argv[j];
incrRefCount(argv[j]);
}
redisOpArrayAppend(&server.also_propagate,cmd,dbid,argvcopy,argc,target);
redisOpArrayAppend(&server.also_propagate,dbid,argvcopy,argc,target);
}
/* It is possible to call the function forceCommandPropagation() inside a
@ -4345,7 +4322,7 @@ void call(client *c, int flags) {
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
propagate(c->db->id,c->argv,c->argc,propagate_flags);
}
/* Restore the old replication flags, since call() can be executed
@ -4385,7 +4362,7 @@ void call(client *c, int flags) {
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
propagate(rop->dbid,rop->argv,rop->argc,target);
}
if (multi_emitted) {

View File

@ -1095,7 +1095,6 @@ extern clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUN
typedef struct redisOp {
robj **argv;
int argc, dbid, target;
struct redisCommand *cmd;
} redisOp;
/* Defines an array of Redis operations. There is an API to add to this
@ -1311,12 +1310,6 @@ struct redisServer {
off_t loading_loaded_bytes;
time_t loading_start_time;
off_t loading_process_events_interval_bytes;
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand, *lpushCommand,
*lpopCommand, *rpopCommand, *zpopminCommand,
*zpopmaxCommand, *sremCommand, *execCommand,
*expireCommand, *pexpireCommand, *xclaimCommand,
*xgroupCommand, *rpoplpushCommand, *lmoveCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */
@ -2380,8 +2373,8 @@ struct redisCommand *lookupCommand(sds name);
struct redisCommand *lookupCommandByCString(const char *s);
struct redisCommand *lookupCommandOrOriginal(sds name);
void call(client *c, int flags);
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags);
void alsoPropagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int target);
void propagate(int dbid, robj **argv, int argc, int flags);
void alsoPropagate(int dbid, robj **argv, int argc, int target);
void redisOpArrayInit(redisOpArray *oa);
void redisOpArrayFree(redisOpArray *oa);
void forceCommandPropagation(client *c, int flags);

View File

@ -962,8 +962,6 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
if (dstkey == NULL) {
/* Propagate the [LR]POP operation. */
struct redisCommand *cmd = (wherefrom == LIST_HEAD) ?
server.lpopCommand : server.rpopCommand;
argv[0] = (wherefrom == LIST_HEAD) ? shared.lpop :
shared.rpop;
argv[1] = key;
@ -976,7 +974,7 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
serverAssert(llen > 0);
argv[2] = createStringObjectFromLongLong((count > llen) ? llen : count);
propagate(cmd, db->id, argv, 3, PROPAGATE_AOF|PROPAGATE_REPL);
propagate(db->id, argv, 3, PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[2]);
/* Pop a range of elements in a nested arrays way. */
@ -984,7 +982,7 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
return;
}
propagate(cmd, db->id, argv, 2, PROPAGATE_AOF|PROPAGATE_REPL);
propagate(db->id, argv, 2, PROPAGATE_AOF|PROPAGATE_REPL);
/* BRPOP/BLPOP */
value = listTypePop(o, wherefrom);
@ -1015,10 +1013,7 @@ void serveClientBlockedOnList(client *receiver, robj *o, robj *key, robj *dstkey
argv[2] = dstkey;
argv[3] = getStringObjectFromListPosition(wherefrom);
argv[4] = getStringObjectFromListPosition(whereto);
propagate(isbrpoplpush ? server.rpoplpushCommand : server.lmoveCommand,
db->id,argv,(isbrpoplpush ? 3 : 5),
PROPAGATE_AOF|
PROPAGATE_REPL);
propagate(db->id,argv,(isbrpoplpush ? 3 : 5),PROPAGATE_AOF|PROPAGATE_REPL);
/* Notify event ("lpush" or "rpush" was notified by lmoveHandlePush). */
notifyKeyspaceEvent(NOTIFY_LIST,wherefrom == LIST_TAIL ? "rpop" : "lpop",

View File

@ -533,8 +533,7 @@ void spopWithCountCommand(client *c) {
/* Replicate/AOF this command as an SREM operation */
propargv[2] = objele;
alsoPropagate(server.sremCommand,c->db->id,propargv,3,
PROPAGATE_AOF|PROPAGATE_REPL);
alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(objele);
}
} else {
@ -576,8 +575,7 @@ void spopWithCountCommand(client *c) {
/* Replicate/AOF this command as an SREM operation */
propargv[2] = objele;
alsoPropagate(server.sremCommand,c->db->id,propargv,3,
PROPAGATE_AOF|PROPAGATE_REPL);
alsoPropagate(c->db->id,propargv,3,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(objele);
}
setTypeReleaseIterator(si);

View File

@ -1355,7 +1355,7 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
* the command execution context. Moreover this will just alter the
* consumer group state, and we don't need MULTI/EXEC wrapping because
* there is no message state cross-message atomicity required. */
propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
propagate(c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[3]);
decrRefCount(argv[7]);
decrRefCount(argv[9]);
@ -1380,7 +1380,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
* the command execution context. Moreover this will just alter the
* consumer group state, and we don't need MULTI/EXEC wrapping because
* there is no message state cross-message atomicity required. */
propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
propagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[4]);
}
@ -1402,7 +1402,7 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds
* the command execution context. Moreover this will just alter the
* consumer group state, and we don't need MULTI/EXEC wrapping because
* there is no message state cross-message atomicity required. */
propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
propagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[4]);
}