Fixes around clients that must be obeyed. Replica report disk errors in PING. (#10603)

This PR unifies all the places that test if the current client is the
master client or AOF client, and uses a method to test that on all of
these.

Other than some refactoring, these are the actual implications:
- Replicas **don't** ignore disk error when processing commands not
  coming from their master.
  **This is important for PING to be used for health check of replicas**
- SETRANGE, APPEND, SETBIT, BITFIELD don't do proto_max_bulk_len check for AOF
- RM_Call in SCRIPT_MODE ignores disk error when coming from master /
  AOF
- RM_Call in cluster mode ignores slot check when processing AOF
- Scripts ignore disk error when processing AOF
- Scripts **don't** ignore disk error on a replica, if the command comes
  from clients other than the master
- SCRIPT KILL won't kill script coming from AOF
- Scripts **don't** skip OOM check on replica if the command comes from
  clients other than the master

Note that Script, AOF, and module clients don't reach processCommand,
which is why some of the changes don't actually have any implications.

Note, reverting the change done to processCommand in 2f4240b9d9
should be dead code due to the above mentioned fact.
This commit is contained in:
Oran Agra 2022-04-20 11:11:21 +03:00 committed by GitHub
parent 046654a70e
commit ee220599b0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 26 additions and 23 deletions

View File

@ -430,7 +430,7 @@ int getBitOffsetFromArgument(client *c, robj *o, uint64_t *offset, int hash, int
if (usehash) loffset *= bits;
/* Limit offset to server.proto_max_bulk_len (512MB in bytes by default) */
if (loffset < 0 || (!(c->flags & CLIENT_MASTER) && (loffset >> 3) >= server.proto_max_bulk_len))
if (loffset < 0 || (!mustObeyClient(c) && (loffset >> 3) >= server.proto_max_bulk_len))
{
addReplyError(c,err);
return C_ERR;

View File

@ -5821,8 +5821,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
}
int deny_write_type = writeCommandsDeniedByDiskError();
int obey_client = mustObeyClient(server.current_client);
if (deny_write_type != DISK_ERROR_TYPE_NONE) {
if (deny_write_type != DISK_ERROR_TYPE_NONE && !obey_client) {
errno = ENOSPC;
if (error_as_call_replies) {
sds msg = writeCommandsGetDiskErrorMessage(deny_write_type);
@ -5864,7 +5865,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
/* If this is a Redis Cluster node, we need to make sure the module is not
* trying to access non-local keys, with the exception of commands
* received from our master. */
if (server.cluster_enabled && !(ctx->client->flags & CLIENT_MASTER)) {
if (server.cluster_enabled && !mustObeyClient(ctx->client)) {
int error_code;
/* Duplicate relevant flags in the module client. */
c->flags &= ~(CLIENT_READONLY|CLIENT_ASKING);

View File

@ -114,6 +114,7 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca
int running_stale = server.masterhost &&
server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0;
int obey_client = mustObeyClient(caller);
if (!(script_flags & SCRIPT_FLAG_EVAL_COMPAT_MODE)) {
if ((script_flags & SCRIPT_FLAG_NO_CLUSTER) && server.cluster_enabled) {
@ -139,16 +140,14 @@ int scriptPrepareForRun(scriptRunCtx *run_ctx, client *engine_client, client *ca
* 1. we are not a readonly replica
* 2. no disk error detected
* 3. command is not `fcall_ro`/`eval[sha]_ro` */
if (server.masterhost && server.repl_slave_ro && caller->id != CLIENT_ID_AOF
&& !(caller->flags & CLIENT_MASTER))
{
if (server.masterhost && server.repl_slave_ro && !obey_client) {
addReplyError(caller, "Can not run script with write flag on readonly replica");
return C_ERR;
}
/* Deny writes if we're unale to persist. */
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE && server.masterhost == NULL) {
if (deny_write_type != DISK_ERROR_TYPE_NONE && !obey_client) {
if (deny_write_type == DISK_ERROR_TYPE_RDB)
addReplyError(caller, "-MISCONF Redis is configured to save RDB snapshots, "
"but it's currently unable to persist to disk. "
@ -269,7 +268,7 @@ void scriptKill(client *c, int is_eval) {
addReplyError(c, "-NOTBUSY No scripts in execution right now.");
return;
}
if (curr_run_ctx->original_client->flags & CLIENT_MASTER) {
if (mustObeyClient(curr_run_ctx->original_client)) {
addReplyError(c,
"-UNKILLABLE The busy script was sent by a master instance in the context of replication and cannot be killed.");
}
@ -334,8 +333,8 @@ static int scriptVerifyWriteCommandAllow(scriptRunCtx *run_ctx, char **err) {
* of this script. */
int deny_write_type = writeCommandsDeniedByDiskError();
if (server.masterhost && server.repl_slave_ro && run_ctx->original_client->id != CLIENT_ID_AOF
&& !(run_ctx->original_client->flags & CLIENT_MASTER))
if (server.masterhost && server.repl_slave_ro &&
!mustObeyClient(run_ctx->original_client))
{
*err = sdsdup(shared.roslaveerr->ptr);
return C_ERR;
@ -380,8 +379,7 @@ static int scriptVerifyOOM(scriptRunCtx *run_ctx, char **err) {
* in the middle. */
if (server.maxmemory && /* Maxmemory is actually enabled. */
run_ctx->original_client->id != CLIENT_ID_AOF && /* Don't care about mem if loading from AOF. */
!server.masterhost && /* Slave must execute the script. */
!mustObeyClient(run_ctx->original_client) && /* Don't care about mem for replicas or AOF. */
!(run_ctx->flags & SCRIPT_WRITE_DIRTY) && /* Script had no side effects so far. */
server.script_oom && /* Detected OOM when script start. */
(run_ctx->c->cmd->flags & CMD_DENYOOM))
@ -394,7 +392,7 @@ static int scriptVerifyOOM(scriptRunCtx *run_ctx, char **err) {
}
static int scriptVerifyClusterState(client *c, client *original_c, sds *err) {
if (!server.cluster_enabled || original_c->id == CLIENT_ID_AOF || (original_c->flags & CLIENT_MASTER)) {
if (!server.cluster_enabled || mustObeyClient(original_c)) {
return C_OK;
}
/* If this is a Redis Cluster node, we need to make sure the script is not

View File

@ -2976,6 +2976,11 @@ struct redisCommand *lookupCommandOrOriginal(robj **argv ,int argc) {
return cmd;
}
/* Commands arriving from the master client or AOF client, should never be rejected. */
int mustObeyClient(client *c) {
return c->id == CLIENT_ID_AOF || c->flags & CLIENT_MASTER;
}
static int shouldPropagate(int target) {
if (!server.replication_allowed || target == PROPAGATE_NONE || server.loading)
return 0;
@ -3567,6 +3572,7 @@ int processCommand(client *c) {
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & (CMD_WRITE | CMD_MAY_REPLICATE)));
int is_deny_async_loading_command = (c->cmd->flags & CMD_NO_ASYNC_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_NO_ASYNC_LOADING));
int obey_client = mustObeyClient(c);
if (authRequired(c)) {
/* AUTH and HELLO and no auth commands are valid even in
@ -3618,9 +3624,7 @@ int processCommand(client *c) {
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_SCRIPT &&
server.script_caller->flags & CLIENT_MASTER) &&
!mustObeyClient(c) &&
!(!(c->cmd->flags&CMD_MOVABLE_KEYS) && c->cmd->key_specs_num == 0 &&
c->cmd->proc != execCommand))
{
@ -3704,10 +3708,10 @@ int processCommand(client *c) {
if (server.tracking_clients) trackingLimitUsedSlots();
/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
* unless coming from our master. */
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
server.masterhost == NULL &&
!obey_client &&
(is_write_command ||c->cmd->proc == pingCommand))
{
sds err = writeCommandsGetDiskErrorMessage(deny_write_type);
@ -3725,7 +3729,7 @@ int processCommand(client *c) {
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
!obey_client &&
is_write_command)
{
rejectCommand(c, shared.roslaveerr);

View File

@ -2881,7 +2881,7 @@ int prepareForShutdown(int flags);
void replyToClientsBlockedOnShutdown(void);
int abortShutdown(void);
void afterCommand(client *c);
int inNestedCall(void);
int mustObeyClient(client *c);
#ifdef __GNUC__
void _serverLog(int level, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));

View File

@ -1000,7 +1000,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i
return -1;
}
if (c == server.master || c->id == CLIENT_ID_AOF) {
if (mustObeyClient(c)) {
/* If command came from master or from AOF we must not enforce maxnodes
* (The maxlen/minid argument was re-written to make sure there's no
* inconsistency). */

View File

@ -38,7 +38,7 @@ int getGenericCommand(client *c);
*----------------------------------------------------------------------------*/
static int checkStringLength(client *c, long long size) {
if (!(c->flags & CLIENT_MASTER) && size > server.proto_max_bulk_len) {
if (!mustObeyClient(c) && size > server.proto_max_bulk_len) {
addReplyError(c,"string exceeds maximum allowed size (proto-max-bulk-len)");
return C_ERR;
}

View File

@ -58,7 +58,7 @@ int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
if (server.maxidletime &&
/* This handles the idle clients connection timeout if set. */
!(c->flags & CLIENT_SLAVE) && /* No timeout for slaves and monitors */
!(c->flags & CLIENT_MASTER) && /* No timeout for masters */
!mustObeyClient(c) && /* No timeout for masters and AOF */
!(c->flags & CLIENT_BLOCKED) && /* No timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* No timeout for Pub/Sub clients */
(now - c->lastinteraction > server.maxidletime))