Merge branch 'unstable' of https://github.com/antirez/redis into unstable

This commit is contained in:
BrotherGao 2018-12-29 11:41:12 +08:00
commit 9f4b121512
13 changed files with 778 additions and 287 deletions

View File

@ -1248,7 +1248,7 @@ void configSetCommand(client *c) {
if (server.maxmemory < zmalloc_used_memory()) {
serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.");
}
freeMemoryIfNeeded();
freeMemoryIfNeededAndSafe();
}
} config_set_memory_field(
"proto-max-bulk-len",server.proto_max_bulk_len) {

View File

@ -212,7 +212,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
* 2) clients WATCHing for the destination key notified.
* 3) The expire time of the key is reset (the key is made persistent).
*
* All the new keys in the database should be creted via this interface. */
* All the new keys in the database should be created via this interface. */
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);

View File

@ -74,7 +74,7 @@ void xorDigest(unsigned char *digest, void *ptr, size_t len) {
digest[j] ^= hash[j];
}
void xorObjectDigest(unsigned char *digest, robj *o) {
void xorStringObjectDigest(unsigned char *digest, robj *o) {
o = getDecodedObject(o);
xorDigest(digest,o->ptr,sdslen(o->ptr));
decrRefCount(o);
@ -104,12 +104,151 @@ void mixDigest(unsigned char *digest, void *ptr, size_t len) {
SHA1Final(digest,&ctx);
}
void mixObjectDigest(unsigned char *digest, robj *o) {
void mixStringObjectDigest(unsigned char *digest, robj *o) {
o = getDecodedObject(o);
mixDigest(digest,o->ptr,sdslen(o->ptr));
decrRefCount(o);
}
/* This function computes the digest of a data structure stored in the
* object 'o'. It is the core of the DEBUG DIGEST command: when taking the
* digest of a whole dataset, we take the digest of the key and the value
* pair, and xor all those together.
*
* Note that this function does not reset the initial 'digest' passed, it
* will continue mixing this object digest to anything that was already
* present. */
void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) {
uint32_t aux = htonl(o->type);
mixDigest(digest,&aux,sizeof(aux));
long long expiretime = getExpire(db,keyobj);
char buf[128];
/* Save the key and associated value */
if (o->type == OBJ_STRING) {
mixStringObjectDigest(digest,o);
} else if (o->type == OBJ_LIST) {
listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL);
listTypeEntry entry;
while(listTypeNext(li,&entry)) {
robj *eleobj = listTypeGet(&entry);
mixStringObjectDigest(digest,eleobj);
decrRefCount(eleobj);
}
listTypeReleaseIterator(li);
} else if (o->type == OBJ_SET) {
setTypeIterator *si = setTypeInitIterator(o);
sds sdsele;
while((sdsele = setTypeNextObject(si)) != NULL) {
xorDigest(digest,sdsele,sdslen(sdsele));
sdsfree(sdsele);
}
setTypeReleaseIterator(si);
} else if (o->type == OBJ_ZSET) {
unsigned char eledigest[20];
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = o->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vll;
double score;
eptr = ziplistIndex(zl,0);
serverAssert(eptr != NULL);
sptr = ziplistNext(zl,eptr);
serverAssert(sptr != NULL);
while (eptr != NULL) {
serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
score = zzlGetScore(sptr);
memset(eledigest,0,20);
if (vstr != NULL) {
mixDigest(eledigest,vstr,vlen);
} else {
ll2string(buf,sizeof(buf),vll);
mixDigest(eledigest,buf,strlen(buf));
}
snprintf(buf,sizeof(buf),"%.17g",score);
mixDigest(eledigest,buf,strlen(buf));
xorDigest(digest,eledigest,20);
zzlNext(zl,&eptr,&sptr);
}
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
sds sdsele = dictGetKey(de);
double *score = dictGetVal(de);
snprintf(buf,sizeof(buf),"%.17g",*score);
memset(eledigest,0,20);
mixDigest(eledigest,sdsele,sdslen(sdsele));
mixDigest(eledigest,buf,strlen(buf));
xorDigest(digest,eledigest,20);
}
dictReleaseIterator(di);
} else {
serverPanic("Unknown sorted set encoding");
}
} else if (o->type == OBJ_HASH) {
hashTypeIterator *hi = hashTypeInitIterator(o);
while (hashTypeNext(hi) != C_ERR) {
unsigned char eledigest[20];
sds sdsele;
memset(eledigest,0,20);
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY);
mixDigest(eledigest,sdsele,sdslen(sdsele));
sdsfree(sdsele);
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE);
mixDigest(eledigest,sdsele,sdslen(sdsele));
sdsfree(sdsele);
xorDigest(digest,eledigest,20);
}
hashTypeReleaseIterator(hi);
} else if (o->type == OBJ_STREAM) {
streamIterator si;
streamIteratorStart(&si,o->ptr,NULL,NULL,0);
streamID id;
int64_t numfields;
while(streamIteratorGetID(&si,&id,&numfields)) {
sds itemid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq);
mixDigest(digest,itemid,sdslen(itemid));
sdsfree(itemid);
while(numfields--) {
unsigned char *field, *value;
int64_t field_len, value_len;
streamIteratorGetField(&si,&field,&value,
&field_len,&value_len);
mixDigest(digest,field,field_len);
mixDigest(digest,value,value_len);
}
}
streamIteratorStop(&si);
} else if (o->type == OBJ_MODULE) {
RedisModuleDigest md;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitDigestContext(md);
if (mt->digest) {
mt->digest(&md,mv->value);
xorDigest(digest,md.x,sizeof(md.x));
}
} else {
serverPanic("Unknown object type");
}
/* If the key has an expire, add it to the mix */
if (expiretime != -1) xorDigest(digest,"!!expire!!",10);
}
/* Compute the dataset digest. Since keys, sets elements, hashes elements
* are not ordered, we use a trick: every aggregate digest is the xor
* of the digests of their elements. This way the order will not change
@ -118,7 +257,6 @@ void mixObjectDigest(unsigned char *digest, robj *o) {
* a different digest. */
void computeDatasetDigest(unsigned char *final) {
unsigned char digest[20];
char buf[128];
dictIterator *di = NULL;
dictEntry *de;
int j;
@ -141,7 +279,6 @@ void computeDatasetDigest(unsigned char *final) {
while((de = dictNext(di)) != NULL) {
sds key;
robj *keyobj, *o;
long long expiretime;
memset(digest,0,20); /* This key-val digest */
key = dictGetKey(de);
@ -150,134 +287,8 @@ void computeDatasetDigest(unsigned char *final) {
mixDigest(digest,key,sdslen(key));
o = dictGetVal(de);
xorObjectDigest(db,keyobj,digest,o);
aux = htonl(o->type);
mixDigest(digest,&aux,sizeof(aux));
expiretime = getExpire(db,keyobj);
/* Save the key and associated value */
if (o->type == OBJ_STRING) {
mixObjectDigest(digest,o);
} else if (o->type == OBJ_LIST) {
listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL);
listTypeEntry entry;
while(listTypeNext(li,&entry)) {
robj *eleobj = listTypeGet(&entry);
mixObjectDigest(digest,eleobj);
decrRefCount(eleobj);
}
listTypeReleaseIterator(li);
} else if (o->type == OBJ_SET) {
setTypeIterator *si = setTypeInitIterator(o);
sds sdsele;
while((sdsele = setTypeNextObject(si)) != NULL) {
xorDigest(digest,sdsele,sdslen(sdsele));
sdsfree(sdsele);
}
setTypeReleaseIterator(si);
} else if (o->type == OBJ_ZSET) {
unsigned char eledigest[20];
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = o->ptr;
unsigned char *eptr, *sptr;
unsigned char *vstr;
unsigned int vlen;
long long vll;
double score;
eptr = ziplistIndex(zl,0);
serverAssert(eptr != NULL);
sptr = ziplistNext(zl,eptr);
serverAssert(sptr != NULL);
while (eptr != NULL) {
serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
score = zzlGetScore(sptr);
memset(eledigest,0,20);
if (vstr != NULL) {
mixDigest(eledigest,vstr,vlen);
} else {
ll2string(buf,sizeof(buf),vll);
mixDigest(eledigest,buf,strlen(buf));
}
snprintf(buf,sizeof(buf),"%.17g",score);
mixDigest(eledigest,buf,strlen(buf));
xorDigest(digest,eledigest,20);
zzlNext(zl,&eptr,&sptr);
}
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
dictIterator *di = dictGetIterator(zs->dict);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
sds sdsele = dictGetKey(de);
double *score = dictGetVal(de);
snprintf(buf,sizeof(buf),"%.17g",*score);
memset(eledigest,0,20);
mixDigest(eledigest,sdsele,sdslen(sdsele));
mixDigest(eledigest,buf,strlen(buf));
xorDigest(digest,eledigest,20);
}
dictReleaseIterator(di);
} else {
serverPanic("Unknown sorted set encoding");
}
} else if (o->type == OBJ_HASH) {
hashTypeIterator *hi = hashTypeInitIterator(o);
while (hashTypeNext(hi) != C_ERR) {
unsigned char eledigest[20];
sds sdsele;
memset(eledigest,0,20);
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY);
mixDigest(eledigest,sdsele,sdslen(sdsele));
sdsfree(sdsele);
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE);
mixDigest(eledigest,sdsele,sdslen(sdsele));
sdsfree(sdsele);
xorDigest(digest,eledigest,20);
}
hashTypeReleaseIterator(hi);
} else if (o->type == OBJ_STREAM) {
streamIterator si;
streamIteratorStart(&si,o->ptr,NULL,NULL,0);
streamID id;
int64_t numfields;
while(streamIteratorGetID(&si,&id,&numfields)) {
sds itemid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq);
mixDigest(digest,itemid,sdslen(itemid));
sdsfree(itemid);
while(numfields--) {
unsigned char *field, *value;
int64_t field_len, value_len;
streamIteratorGetField(&si,&field,&value,
&field_len,&value_len);
mixDigest(digest,field,field_len);
mixDigest(digest,value,value_len);
}
}
streamIteratorStop(&si);
} else if (o->type == OBJ_MODULE) {
RedisModuleDigest md;
moduleValue *mv = o->ptr;
moduleType *mt = mv->type;
moduleInitDigestContext(md);
if (mt->digest) {
mt->digest(&md,mv->value);
xorDigest(digest,md.x,sizeof(md.x));
}
} else {
serverPanic("Unknown object type");
}
/* If the key has an expire, add it to the mix */
if (expiretime != -1) xorDigest(digest,"!!expire!!",10);
/* We can finally xor the key-val digest to the final digest */
xorDigest(final,digest,20);
decrRefCount(keyobj);
@ -293,6 +304,7 @@ void debugCommand(client *c) {
"CHANGE-REPL-ID -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.",
"CRASH-AND-RECOVER <milliseconds> -- Hard crash and restart after <milliseconds> delay.",
"DIGEST -- Output a hex signature representing the current DB content.",
"DIGEST-VALUE <key-1> ... <key-N>-- Output a hex signature of the values of all the specified keys.",
"ERROR <string> -- Return a Redis protocol error with <string> as message. Useful for clients unit tests to simulate Redis errors.",
"LOG <message> -- write message to the server log.",
"HTSTATS <dbid> -- Return hash table statistics of the specified Redis database.",
@ -310,6 +322,7 @@ void debugCommand(client *c) {
"SLEEP <seconds> -- Stop the server for <seconds>. Decimals allowed.",
"STRUCTSIZE -- Return the size of different Redis core C structures.",
"ZIPLIST <key> -- Show low level info about the ziplist encoding.",
"STRINGMATCH-TEST -- Run a fuzz tester against the stringmatchlen() function.",
NULL
};
addReplyHelp(c, help);
@ -336,7 +349,6 @@ NULL
zfree(ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"assert")) {
if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]);
serverAssertWithInfo(c,c->argv[0],1 == 2);
} else if (!strcasecmp(c->argv[1]->ptr,"log") && c->argc == 3) {
serverLog(LL_WARNING, "DEBUG LOG: %s", (char*)c->argv[2]->ptr);
@ -495,15 +507,28 @@ NULL
}
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) {
/* DEBUG DIGEST (form without keys specified) */
unsigned char digest[20];
sds d = sdsempty();
int j;
computeDatasetDigest(digest);
for (j = 0; j < 20; j++)
d = sdscatprintf(d, "%02x",digest[j]);
for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]);
addReplyStatus(c,d);
sdsfree(d);
} else if (!strcasecmp(c->argv[1]->ptr,"digest-value") && c->argc >= 2) {
/* DEBUG DIGEST-VALUE key key key ... key. */
addReplyMultiBulkLen(c,c->argc-2);
for (int j = 2; j < c->argc; j++) {
unsigned char digest[20];
memset(digest,0,20); /* Start with a clean result */
robj *o = lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH);
if (o) xorObjectDigest(c->db,c->argv[j],digest,o);
sds d = sdsempty();
for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]);
addReplyStatus(c,d);
sdsfree(d);
}
} else if (!strcasecmp(c->argv[1]->ptr,"sleep") && c->argc == 3) {
double dtime = strtod(c->argv[2]->ptr,NULL);
long long utime = dtime*1000000;
@ -595,6 +620,10 @@ NULL
changeReplicationId();
clearReplicationId2();
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"stringmatch-test") && c->argc == 2)
{
stringmatchlen_fuzz_test();
addReplyStatus(c,"Apparently Redis did not crash: test passed");
} else {
addReplySubcommandSyntaxError(c);
return;

View File

@ -444,8 +444,8 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
* Otehrwise if we are over the memory limit, but not enough memory
* was freed to return back under the limit, the function returns C_ERR. */
int freeMemoryIfNeeded(void) {
/* By default slaves should ignore maxmemory and just be masters excat
* copies. */
/* By default replicas should ignore maxmemory
* and just be masters exact copies. */
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
size_t mem_reported, mem_tofree, mem_freed;
@ -622,3 +622,14 @@ cant_free:
return C_ERR;
}
/* This is a wrapper for freeMemoryIfNeeded() that only really calls the
* function if right now there are the conditions to do so safely:
*
* - There must be no script in timeout condition.
* - Nor we are loading data right now.
*
*/
int freeMemoryIfNeededAndSafe(void) {
if (server.lua_timedout || server.loading) return C_OK;
return freeMemoryIfNeeded();
}

View File

@ -35,6 +35,7 @@
void initClientMultiState(client *c) {
c->mstate.commands = NULL;
c->mstate.count = 0;
c->mstate.cmd_flags = 0;
}
/* Release all the resources associated with MULTI/EXEC state */
@ -67,6 +68,7 @@ void queueMultiCommand(client *c) {
for (j = 0; j < c->argc; j++)
incrRefCount(mc->argv[j]);
c->mstate.count++;
c->mstate.cmd_flags |= c->cmd->flags;
}
void discardTransaction(client *c) {
@ -137,6 +139,21 @@ void execCommand(client *c) {
goto handle_monitor;
}
/* If there are write commands inside the transaction, and this is a read
* only slave, we want to send an error. This happens when the transaction
* was initiated when the instance was a master or a writable replica and
* then the configuration changed (for example instance was turned into
* a replica). */
if (!server.loading && server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
{
addReplyError(c,
"Transaction contains write commands but instance "
"is now a read-only replica. EXEC aborted.");
discardTransaction(c);
goto handle_monitor;
}
/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;

View File

@ -365,19 +365,13 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
* Where the master must propagate the first change even if the second
* will produce an error. However it is useful to log such events since
* they are rare and may hint at errors in a script or a bug in Redis. */
if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE)) {
if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
char* to = c->flags & CLIENT_MASTER? "master": "replica";
char* from = c->flags & CLIENT_MASTER? "replica": "master";
char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
"to its %s: '%s' after processing the command "
"'%s'", from, to, s, cmdname);
/* Here we want to panic because when a master is sending an
* error to some slave in the context of replication, this can
* only create some kind of offset or data desynchronization. Better
* to catch it ASAP and crash instead of continuing. */
if (c->flags & CLIENT_SLAVE)
serverPanic("Continuing is unsafe: replication protocol violation.");
}
}
@ -1470,7 +1464,7 @@ void processInputBuffer(client *c) {
}
/* Trim to pos */
if (c->qb_pos) {
if (server.current_client != NULL && c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}

View File

@ -67,6 +67,7 @@
#define REDIS_CLI_HISTFILE_DEFAULT ".rediscli_history"
#define REDIS_CLI_RCFILE_ENV "REDISCLI_RCFILE"
#define REDIS_CLI_RCFILE_DEFAULT ".redisclirc"
#define REDIS_CLI_AUTH_ENV "REDISCLI_AUTH"
#define CLUSTER_MANAGER_SLOTS 16384
#define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000
@ -116,6 +117,7 @@
#define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6
#define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7
#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8
#define CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS 1 << 9
#define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0
#define CLUSTER_MANAGER_OPT_COLD 1 << 1
@ -1377,6 +1379,9 @@ static int parseOptions(int argc, char **argv) {
} else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
} else if (!strcmp(argv[i],"--cluster-search-multiple-owners")) {
config.cluster_manager_command.flags |=
CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS;
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
sds version = cliVersion();
printf("redis-cli %s\n", version);
@ -1419,6 +1424,14 @@ static int parseOptions(int argc, char **argv) {
return i;
}
static void parseEnv() {
/* Set auth from env, but do not overwrite CLI arguments if passed */
char *auth = getenv(REDIS_CLI_AUTH_ENV);
if (auth != NULL && config.auth == NULL) {
config.auth = auth;
}
}
static sds readArgFromStdin(void) {
char buf[1024];
sds arg = sdsempty();
@ -1446,6 +1459,9 @@ static void usage(void) {
" -p <port> Server port (default: 6379).\n"
" -s <socket> Server socket (overrides hostname and port).\n"
" -a <password> Password to use when connecting to the server.\n"
" You can also use the " REDIS_CLI_AUTH_ENV " environment\n"
" variable to pass this password more safely\n"
" (if both are used, this argument takes predecence).\n"
" -u <uri> Server URI.\n"
" -r <repeat> Execute specified command N times.\n"
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
@ -1834,7 +1850,7 @@ static int evalMode(int argc, char **argv) {
if (eval_ldb) {
if (!config.eval_ldb) {
/* If the debugging session ended immediately, there was an
* error compiling the script. Show it and don't enter
* error compiling the script. Show it and they don't enter
* the REPL at all. */
printf("Eval debugging session can't start:\n");
cliReadReply(0);
@ -1917,6 +1933,8 @@ static dictType clusterManagerDictType = {
};
typedef int clusterManagerCommandProc(int argc, char **argv);
typedef int (*clusterManagerOnReplyError)(redisReply *reply,
clusterManagerNode *n, int bulk_idx);
/* Cluster Manager helper functions */
@ -1978,14 +1996,17 @@ typedef struct clusterManagerCommandDef {
clusterManagerCommandDef clusterManagerCommands[] = {
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
"replicas <arg>"},
{"check", clusterManagerCommandCheck, -1, "host:port", NULL},
{"check", clusterManagerCommandCheck, -1, "host:port",
"search-multiple-owners"},
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
{"fix", clusterManagerCommandFix, -1, "host:port", NULL},
{"fix", clusterManagerCommandFix, -1, "host:port",
"search-multiple-owners"},
{"reshard", clusterManagerCommandReshard, -1, "host:port",
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>"},
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>,"
"replace"},
{"rebalance", clusterManagerCommandRebalance, -1, "host:port",
"weight <node1=w1...nodeN=wN>,use-empty-masters,"
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>"},
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"},
{"add-node", clusterManagerCommandAddNode, 2,
"new_host:new_port existing_host:existing_port", "slave,master-id <arg>"},
{"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL},
@ -2176,6 +2197,44 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n,
return 1;
}
/* Call MULTI command on a cluster node. */
static int clusterManagerStartTransaction(clusterManagerNode *node) {
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
int success = clusterManagerCheckRedisReply(node, reply, NULL);
if (reply) freeReplyObject(reply);
return success;
}
/* Call EXEC command on a cluster node. */
static int clusterManagerExecTransaction(clusterManagerNode *node,
clusterManagerOnReplyError onerror)
{
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC");
int success = clusterManagerCheckRedisReply(node, reply, NULL);
if (success) {
if (reply->type != REDIS_REPLY_ARRAY) {
success = 0;
goto cleanup;
}
size_t i;
for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i];
char *err = NULL;
success = clusterManagerCheckRedisReply(node, r, &err);
if (!success && onerror) success = onerror(r, node, i);
if (err) {
if (!success)
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
zfree(err);
}
if (!success) break;
}
}
cleanup:
if (reply) freeReplyObject(reply);
return success;
}
static int clusterManagerNodeConnect(clusterManagerNode *node) {
if (node->context) redisFree(node->context);
node->context = redisConnect(node->ip, node->port);
@ -2710,6 +2769,55 @@ cleanup:
return success;
}
/* Get the node the slot is assigned to from the point of view of node *n.
* If the slot is unassigned or if the reply is an error, return NULL.
* Use the **err argument in order to check wether the slot is unassigned
* or the reply resulted in an error. */
static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n,
int slot, char **err)
{
assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS);
clusterManagerNode *owner = NULL;
redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS");
if (clusterManagerCheckRedisReply(n, reply, err)) {
assert(reply->type == REDIS_REPLY_ARRAY);
size_t i;
for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i];
assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3);
int from, to;
from = r->element[0]->integer;
to = r->element[1]->integer;
if (slot < from || slot > to) continue;
redisReply *nr = r->element[2];
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2);
char *name = NULL;
if (nr->elements >= 3)
name = nr->element[2]->str;
if (name != NULL)
owner = clusterManagerNodeByName(name);
else {
char *ip = nr->element[0]->str;
assert(ip != NULL);
int port = (int) nr->element[1]->integer;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *nd = ln->value;
if (strcmp(nd->ip, ip) == 0 && port == nd->port) {
owner = nd;
break;
}
}
}
if (owner) break;
}
}
if (reply) freeReplyObject(reply);
return owner;
}
/* Set slot status to "importing" or "migrating" */
static int clusterManagerSetSlot(clusterManagerNode *node1,
clusterManagerNode *node2,
@ -2734,6 +2842,162 @@ cleanup:
return success;
}
static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) {
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
"CLUSTER SETSLOT %d %s", slot, "STABLE");
int success = clusterManagerCheckRedisReply(node, reply, NULL);
if (reply) freeReplyObject(reply);
return success;
}
static int clusterManagerDelSlot(clusterManagerNode *node, int slot,
int ignore_unassigned_err)
{
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
"CLUSTER DELSLOTS %d", slot);
char *err = NULL;
int success = clusterManagerCheckRedisReply(node, reply, &err);
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
ignore_unassigned_err)
{
char *get_owner_err = NULL;
clusterManagerNode *assigned_to =
clusterManagerGetSlotOwner(node, slot, &get_owner_err);
if (!assigned_to) {
if (get_owner_err == NULL) success = 1;
else {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err);
zfree(get_owner_err);
}
}
}
if (!success && err != NULL) {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
zfree(err);
}
if (reply) freeReplyObject(reply);
return success;
}
static int clusterManagerAddSlot(clusterManagerNode *node, int slot) {
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
"CLUSTER ADDSLOTS %d", slot);
int success = clusterManagerCheckRedisReply(node, reply, NULL);
if (reply) freeReplyObject(reply);
return success;
}
static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node,
int slot)
{
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
"CLUSTER COUNTKEYSINSLOT %d", slot);
int count = -1;
int success = clusterManagerCheckRedisReply(node, reply, NULL);
if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer;
if (reply) freeReplyObject(reply);
return count;
}
static int clusterManagerBumpEpoch(clusterManagerNode *node) {
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH");
int success = clusterManagerCheckRedisReply(node, reply, NULL);
if (reply) freeReplyObject(reply);
return success;
}
/* Callback used by clusterManagerSetSlotOwner transaction. It should ignore
* errors except for ADDSLOTS errors.
* Return 1 if the error should be ignored. */
static int clusterManagerOnSetOwnerErr(redisReply *reply,
clusterManagerNode *n, int bulk_idx)
{
UNUSED(reply);
UNUSED(n);
/* Only raise error when ADDSLOTS fail (bulk_idx == 1). */
return (bulk_idx != 1);
}
static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
int slot,
int do_clear)
{
int success = clusterManagerStartTransaction(owner);
if (!success) return 0;
/* Ensure the slot is not already assigned. */
clusterManagerDelSlot(owner, slot, 1);
/* Add the slot and bump epoch. */
clusterManagerAddSlot(owner, slot);
if (do_clear) clusterManagerClearSlotStatus(owner, slot);
clusterManagerBumpEpoch(owner);
success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr);
return success;
}
/* Get the hash for the values of the specified keys in *keys_reply for the
* specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command
* on both nodes. Every key with same name on both nodes but having different
* values will be added to the *diffs list. Return 0 in case of reply
* error. */
static int clusterManagerCompareKeysValues(clusterManagerNode *n1,
clusterManagerNode *n2,
redisReply *keys_reply,
list *diffs)
{
size_t i, argc = keys_reply->elements + 2;
static const char *hash_zero = "0000000000000000000000000000000000000000";
char **argv = zcalloc(argc * sizeof(char *));
size_t *argv_len = zcalloc(argc * sizeof(size_t));
argv[0] = "DEBUG";
argv_len[0] = 5;
argv[1] = "DIGEST-VALUE";
argv_len[1] = 12;
for (i = 0; i < keys_reply->elements; i++) {
redisReply *entry = keys_reply->element[i];
int idx = i + 2;
argv[idx] = entry->str;
argv_len[idx] = entry->len;
}
int success = 0;
void *_reply1 = NULL, *_reply2 = NULL;
redisReply *r1 = NULL, *r2 = NULL;
redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len);
success = (redisGetReply(n1->context, &_reply1) == REDIS_OK);
if (!success) goto cleanup;
r1 = (redisReply *) _reply1;
redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len);
success = (redisGetReply(n2->context, &_reply2) == REDIS_OK);
if (!success) goto cleanup;
r2 = (redisReply *) _reply2;
success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR);
if (r1->type == REDIS_REPLY_ERROR) {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str);
success = 0;
}
if (r2->type == REDIS_REPLY_ERROR) {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str);
success = 0;
}
if (!success) goto cleanup;
assert(keys_reply->elements == r1->elements &&
keys_reply->elements == r2->elements);
for (i = 0; i < keys_reply->elements; i++) {
char *key = keys_reply->element[i]->str;
char *hash1 = r1->element[i]->str;
char *hash2 = r2->element[i]->str;
/* Ignore keys that don't exist in both nodes. */
if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0)
continue;
if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key);
}
cleanup:
if (r1) freeReplyObject(r1);
if (r2) freeReplyObject(r2);
zfree(argv);
zfree(argv_len);
return success;
}
/* Migrate keys taken from reply->elements. It returns the reply from the
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
* is not NULL, a dot will be printed for every migrated key. */
@ -2814,8 +3078,10 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
char **err)
{
int success = 1;
int do_fix = (config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_FIX);
int do_fix = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_FIX;
int do_replace = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_REPLACE;
while (1) {
char *dots = NULL;
redisReply *reply = NULL, *migrate_reply = NULL;
@ -2846,16 +3112,86 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
dots);
if (migrate_reply == NULL) goto next;
if (migrate_reply->type == REDIS_REPLY_ERROR) {
if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) {
/* If the key already exists, try to migrate keys
* adding REPLACE option.
* If the key's slot is not served, try to assign slot
int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
int not_served = 0;
if (!is_busy) {
/* Check if the slot is unassigned (not served) in the
* source node's configuration. */
char *get_owner_err = NULL;
clusterManagerNode *served_by =
clusterManagerGetSlotOwner(source, slot, &get_owner_err);
if (!served_by) {
if (get_owner_err == NULL) not_served = 1;
else {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source,
get_owner_err);
zfree(get_owner_err);
}
}
}
/* Try to handle errors. */
if (is_busy || not_served) {
/* If the key's slot is not served, try to assign slot
* to the target node. */
int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL);
if (strstr(migrate_reply->str, "slot not served") != NULL)
if (do_fix && not_served) {
clusterManagerLogWarn("*** Slot was not served, setting "
"owner to node %s:%d.\n",
target->ip, target->port);
clusterManagerSetSlot(source, target, slot, "node", NULL);
clusterManagerLogWarn("*** Target key exists. "
"Replacing it for FIX.\n");
}
/* If the key already exists in the target node (BUSYKEY),
* check whether its value is the same in both nodes.
* In case of equal values, retry migration with the
* REPLACE option.
* In case of different values:
* - If the migration is requested by the fix command, stop
* and warn the user.
* - In other cases (ie. reshard), proceed only if the user
* launched the command with the --cluster-replace option.*/
if (is_busy) {
clusterManagerLogWarn("\n*** Target key exists\n");
if (!do_replace) {
clusterManagerLogWarn("*** Checking key values on "
"both nodes...\n");
list *diffs = listCreate();
success = clusterManagerCompareKeysValues(source,
target, reply, diffs);
if (!success) {
clusterManagerLogErr("*** Value check failed!\n");
listRelease(diffs);
goto next;
}
if (listLength(diffs) > 0) {
success = 0;
clusterManagerLogErr(
"*** Found %d key(s) in both source node and "
"target node having different values.\n"
" Source node: %s:%d\n"
" Target node: %s:%d\n"
" Keys(s):\n",
listLength(diffs),
source->ip, source->port,
target->ip, target->port);
listIter dli;
listNode *dln;
listRewind(diffs, &dli);
while((dln = listNext(&dli)) != NULL) {
char *k = dln->value;
clusterManagerLogErr(" - %s\n", k);
}
clusterManagerLogErr("Please fix the above key(s) "
"manually and try again "
"or relaunch the command \n"
"with --cluster-replace "
"option to force key "
"overriding.\n");
listRelease(diffs);
goto next;
}
listRelease(diffs);
}
clusterManagerLogWarn("*** Replacing target keys...\n");
}
freeReplyObject(migrate_reply);
migrate_reply = clusterManagerMigrateKeysInReply(source,
target,
@ -3610,24 +3946,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
listRewind(none, &li);
while ((ln = listNext(&li)) != NULL) {
sds slot = ln->value;
int s = atoi(slot);
clusterManagerNode *n = clusterManagerNodeMasterRandom();
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
/* Ensure the slot is not already assigned. */
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER DELSLOTS %s", slot);
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER ADDSLOTS %s", slot);
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
if (!clusterManagerSetSlotOwner(n, s, 0)) {
fixed = -1;
goto cleanup;
}
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
n->slots[atoi(slot)] = 1;
n->slots[s] = 1;
fixed++;
}
}
@ -3635,7 +3964,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
/* Handle case "2": keys only in one node. */
if (listLength(single) > 0) {
printf("The following uncovered slots have keys in just one node:\n");
printf("The following uncovered slots have keys in just one node:\n");
clusterManagerPrintSlotsList(single);
if (confirmWithYes("Fix these slots by covering with those nodes?")){
listIter li;
@ -3643,6 +3972,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
listRewind(single, &li);
while ((ln = listNext(&li)) != NULL) {
sds slot = ln->value;
int s = atoi(slot);
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
assert(entry != NULL);
list *nodes = (list *) dictGetVal(entry);
@ -3651,18 +3981,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerNode *n = fn->value;
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
/* Ensure the slot is not already assigned. */
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER DELSLOTS %s", slot);
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER ADDSLOTS %s", slot);
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
if (!clusterManagerSetSlotOwner(n, s, 0)) {
fixed = -1;
goto cleanup;
}
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
n->slots[atoi(slot)] = 1;
@ -3695,23 +4017,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerLogInfo(">>> Covering slot %s moving keys "
"to %s:%d\n", slot,
target->ip, target->port);
/* Ensure the slot is not already assigned. */
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
"CLUSTER DELSLOTS %s", slot);
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(target,
"CLUSTER ADDSLOTS %s", slot);
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
r = CLUSTER_MANAGER_COMMAND(target,
"CLUSTER SETSLOT %s %s", slot, "STABLE");
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH");
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
if (!clusterManagerSetSlotOwner(target, s, 1)) {
fixed = -1;
goto cleanup;
}
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
target->slots[atoi(slot)] = 1;
@ -3722,23 +4031,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerNode *src = nln->value;
if (src == target) continue;
/* Assign the slot to target node in the source node. */
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
"CLUSTER SETSLOT %s %s %s", slot,
"NODE", target->name);
if (!clusterManagerCheckRedisReply(src, r, NULL))
if (!clusterManagerSetSlot(src, target, s, "NODE", NULL))
fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
/* Set the source node in 'importing' state
* (even if we will actually migrate keys away)
* in order to avoid receiving redirections
* for MIGRATE. */
r = CLUSTER_MANAGER_COMMAND(src,
"CLUSTER SETSLOT %s %s %s", slot,
"IMPORTING", target->name);
if (!clusterManagerCheckRedisReply(src, r, NULL))
fixed = -1;
if (r) freeReplyObject(r);
if (!clusterManagerSetSlot(src, target, s,
"IMPORTING", NULL)) fixed = -1;
if (fixed < 0) goto cleanup;
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
CLUSTER_MANAGER_OPT_COLD;
@ -3746,12 +4047,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
fixed = -1;
goto cleanup;
}
r = CLUSTER_MANAGER_COMMAND(src,
"CLUSTER SETSLOT %s %s", slot,
"STABLE");
if (!clusterManagerCheckRedisReply(src, r, NULL))
if (!clusterManagerClearSlotStatus(src, s))
fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
}
fixed++;
@ -3875,24 +4172,9 @@ static int clusterManagerFixOpenSlot(int slot) {
// Use ADDSLOTS to assign the slot.
clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n",
owner->ip, owner->port);
redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER "
"SETSLOT %d %s",
slot, "STABLE");
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
success = clusterManagerClearSlotStatus(owner, slot);
if (!success) goto cleanup;
/* Ensure that the slot is unassigned before assigning it to the
* owner. */
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot);
success = clusterManagerCheckRedisReply(owner, reply, NULL);
/* Ignore "already unassigned" error. */
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
strstr(reply->str, "already unassigned") != NULL) success = 1;
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
success = clusterManagerSetSlotOwner(owner, slot, 0);
if (!success) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
@ -3900,9 +4182,7 @@ static int clusterManagerFixOpenSlot(int slot) {
/* Make sure this information will propagate. Not strictly needed
* since there is no past owner, so all the other nodes will accept
* whatever epoch this node will claim the slot with. */
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
success = clusterManagerBumpEpoch(owner);
if (!success) goto cleanup;
/* Remove the owner from the list of migrating/importing
* nodes. */
@ -3922,16 +4202,10 @@ static int clusterManagerFixOpenSlot(int slot) {
* the owner has been set in the previous condition (owner == NULL). */
assert(owner != NULL);
listRewind(owners, &li);
redisReply *reply = NULL;
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n == owner) continue;
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot);
success = clusterManagerCheckRedisReply(n, reply, NULL);
/* Ignore "already unassigned" error. */
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
strstr(reply->str, "already unassigned") != NULL) success = 1;
if (reply) freeReplyObject(reply);
success = clusterManagerDelSlot(n, slot, 1);
if (!success) goto cleanup;
n->slots[slot] = 0;
/* Assign the slot to the owner in the node 'n' configuration.' */
@ -3955,6 +4229,7 @@ static int clusterManagerFixOpenSlot(int slot) {
clusterManagerLogInfo(">>> Case 1: Moving slot %d from "
"%s:%d to %s:%d\n", slot,
src->ip, src->port, dst->ip, dst->port);
move_opts |= CLUSTER_MANAGER_OPT_UPDATE;
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
}
/* Case 2: There are multiple nodes that claim the slot as importing,
@ -3973,11 +4248,7 @@ static int clusterManagerFixOpenSlot(int slot) {
if (!success) goto cleanup;
clusterManagerLogInfo(">>> Setting %d as STABLE in "
"%s:%d\n", slot, n->ip, n->port);
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
slot, "STABLE");
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) freeReplyObject(r);
success = clusterManagerClearSlotStatus(n, slot);
if (!success) goto cleanup;
}
/* Since the slot has been moved in "cold" mode, ensure that all the
@ -3987,12 +4258,76 @@ static int clusterManagerFixOpenSlot(int slot) {
clusterManagerNode *n = ln->value;
if (n == owner) continue;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) freeReplyObject(r);
success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL);
if (!success) goto cleanup;
}
}
/* Case 3: The slot is in migrating state in one node but multiple
* other nodes claim to be in importing state and don't have any key in
* the slot. We search for the importing node having the same ID as
* the destination node of the migrating node.
* In that case we move the slot from the migrating node to this node and
* we close the importing states on all the other importing nodes.
* If no importing node has the same ID as the destination node of the
* migrating node, the slot's state is closed on both the migrating node
* and the importing nodes. */
else if (listLength(migrating) == 1 && listLength(importing) > 1) {
int try_to_fix = 1;
clusterManagerNode *src = listFirst(migrating)->value;
clusterManagerNode *dst = NULL;
sds target_id = NULL;
for (int i = 0; i < src->migrating_count; i += 2) {
sds migrating_slot = src->migrating[i];
if (atoi(migrating_slot) == slot) {
target_id = src->migrating[i + 1];
break;
}
}
assert(target_id != NULL);
listIter li;
listNode *ln;
listRewind(importing, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
int count = clusterManagerCountKeysInSlot(n, slot);
if (count > 0) {
try_to_fix = 0;
break;
}
if (strcmp(n->name, target_id) == 0) dst = n;
}
if (!try_to_fix) goto unhandled_case;
if (dst != NULL) {
clusterManagerLogInfo(">>> Case 3: Moving slot %d from %s:%d to "
"%s:%d and closing it on all the other "
"importing nodes.\n",
slot, src->ip, src->port,
dst->ip, dst->port);
/* Move the slot to the destination node. */
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
if (!success) goto cleanup;
/* Close slot on all the other importing nodes. */
listRewind(importing, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (dst == n) continue;
success = clusterManagerClearSlotStatus(n, slot);
if (!success) goto cleanup;
}
} else {
clusterManagerLogInfo(">>> Case 3: Closing slot %d on both "
"migrating and importing nodes.\n", slot);
/* Close the slot on both the migrating node and the importing
* nodes. */
success = clusterManagerClearSlotStatus(src, slot);
if (!success) goto cleanup;
listRewind(importing, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
success = clusterManagerClearSlotStatus(n, slot);
if (!success) goto cleanup;
}
}
} else {
int try_to_close_slot = (listLength(importing) == 0 &&
listLength(migrating) == 1);
@ -4009,13 +4344,13 @@ static int clusterManagerFixOpenSlot(int slot) {
if (!success) goto cleanup;
}
}
/* Case 3: There are no slots claiming to be in importing state, but
* there is a migrating node that actually don't have any key or is the
* slot owner. We can just close the slot, probably a reshard interrupted
* in the middle. */
/* Case 4: There are no slots claiming to be in importing state, but
* there is a migrating node that actually don't have any key or is the
* slot owner. We can just close the slot, probably a reshard
* interrupted in the middle. */
if (try_to_close_slot) {
clusterManagerNode *n = listFirst(migrating)->value;
clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n",
clusterManagerLogInfo(">>> Case 4: Closing slot %d on %s:%d\n",
slot, n->ip, n->port);
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
slot, "STABLE");
@ -4023,6 +4358,7 @@ static int clusterManagerFixOpenSlot(int slot) {
if (r) freeReplyObject(r);
if (!success) goto cleanup;
} else {
unhandled_case:
success = 0;
clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot "
"yet (work in progress). Slot is set as "
@ -4040,17 +4376,55 @@ cleanup:
return success;
}
static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot);
int success = 0;
assert(listLength(owners) > 1);
clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners,
slot,
NULL);
if (!owner) owner = listFirst(owners)->value;
clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d\n",
slot, owner->ip, owner->port);
/* Set the slot owner. */
if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
/* Update configuration in all the other master nodes by assigning the slot
* itself to the new owner, and by eventually migrating keys if the node
* has keys for the slot. */
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n == owner) continue;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
int count = clusterManagerCountKeysInSlot(n, slot);
success = (count >= 0);
if (!success) break;
clusterManagerDelSlot(n, slot, 1);
if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0;
if (count > 0) {
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
CLUSTER_MANAGER_OPT_COLD;
success = clusterManagerMoveSlot(n, owner, slot, opts, NULL);
if (!success) break;
}
}
return success;
}
static int clusterManagerCheckCluster(int quiet) {
listNode *ln = listFirst(cluster_manager.nodes);
if (!ln) return 0;
int result = 1;
int do_fix = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_FIX;
clusterManagerNode *node = ln->value;
clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n",
node->ip, node->port);
int result = 1, consistent = 0;
int do_fix = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_FIX;
if (!quiet) clusterManagerShowNodes();
if (!clusterManagerIsConfigConsistent()) {
consistent = clusterManagerIsConfigConsistent();
if (!consistent) {
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
clusterManagerOnError(err);
result = 0;
@ -4058,7 +4432,7 @@ static int clusterManagerCheckCluster(int quiet) {
clusterManagerLogOk("[OK] All nodes agree about slots "
"configuration.\n");
}
// Check open slots
/* Check open slots */
clusterManagerLogInfo(">>> Check for open slots...\n");
listIter li;
listRewind(cluster_manager.nodes, &li);
@ -4077,7 +4451,7 @@ static int clusterManagerCheckCluster(int quiet) {
n->port);
for (i = 0; i < n->migrating_count; i += 2) {
sds slot = n->migrating[i];
dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1]));
dictReplace(open_slots, slot, sdsdup(n->migrating[i + 1]));
char *fmt = (i > 0 ? ",%S" : "%S");
errstr = sdscatfmt(errstr, fmt, slot);
}
@ -4095,7 +4469,7 @@ static int clusterManagerCheckCluster(int quiet) {
n->port);
for (i = 0; i < n->importing_count; i += 2) {
sds slot = n->importing[i];
dictAdd(open_slots, slot, sdsdup(n->importing[i + 1]));
dictReplace(open_slots, slot, sdsdup(n->importing[i + 1]));
char *fmt = (i > 0 ? ",%S" : "%S");
errstr = sdscatfmt(errstr, fmt, slot);
}
@ -4117,7 +4491,7 @@ static int clusterManagerCheckCluster(int quiet) {
clusterManagerLogErr("%s.\n", (char *) errstr);
sdsfree(errstr);
if (do_fix) {
// Fix open slots.
/* Fix open slots. */
dictReleaseIterator(iter);
iter = dictGetIterator(open_slots);
while ((entry = dictNext(iter)) != NULL) {
@ -4152,6 +4526,54 @@ static int clusterManagerCheckCluster(int quiet) {
if (fixed > 0) result = 1;
}
}
int search_multiple_owners = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS;
if (search_multiple_owners) {
/* Check whether there are multiple owners, even when slots are
* fully covered and there are no open slots. */
clusterManagerLogInfo(">>> Check for multiple slot owners...\n");
int slot = 0, slots_with_multiple_owners = 0;
for (; slot < CLUSTER_MANAGER_SLOTS; slot++) {
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
list *owners = listCreate();
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (n->slots[slot]) listAddNodeTail(owners, n);
else {
/* Nodes having keys for the slot will be considered
* owners too. */
int count = clusterManagerCountKeysInSlot(n, slot);
if (count > 0) listAddNodeTail(owners, n);
}
}
if (listLength(owners) > 1) {
result = 0;
clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n",
slot, listLength(owners));
listRewind(owners, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
clusterManagerLogErr(" %s:%d\n", n->ip, n->port);
}
slots_with_multiple_owners++;
if (do_fix) {
result = clusterManagerFixMultipleSlotOwners(slot, owners);
if (!result) {
clusterManagerLogErr("Failed to fix multiple owners "
"for slot %d\n", slot);
listRelease(owners);
break;
} else slots_with_multiple_owners--;
}
}
listRelease(owners);
}
if (slots_with_multiple_owners == 0)
clusterManagerLogOk("[OK] No multiple owners found.\n");
}
return result;
}
@ -5404,7 +5826,7 @@ static int clusterManagerCommandCall(int argc, char **argv) {
if (status != REDIS_OK || reply == NULL )
printf("%s:%d: Failed!\n", n->ip, n->port);
else {
sds formatted_reply = cliFormatReplyTTY(reply, "");
sds formatted_reply = cliFormatReplyRaw(reply);
printf("%s:%d: %s\n", n->ip, n->port, (char *) formatted_reply);
sdsfree(formatted_reply);
}
@ -6781,6 +7203,8 @@ int main(int argc, char **argv) {
argc -= firstarg;
argv += firstarg;
parseEnv();
/* Cluster Manager mode */
if (CLUSTER_MANAGER_MODE()) {
clusterManagerCommandProc *proc = validateClusterManagerCommand();

View File

@ -695,7 +695,7 @@ sds sdscatfmt(sds s, char const *fmt, ...) {
* s = sdstrim(s,"Aa. :");
* printf("%s\n", s);
*
* Output will be just "Hello World".
* Output will be just "HelloWorld".
*/
sds sdstrim(sds s, const char *cset) {
char *start, *end, *sp, *ep;

View File

@ -2607,17 +2607,13 @@ int processCommand(client *c) {
}
/* Handle the maxmemory directive.
*
* First we try to free some memory if possible (if there are volatile
* keys in the dataset). If there are not the only thing we can do
* is returning an error.
*
* Note that we do not want to reclaim memory if we are here re-entering
* the event loop since there is a busy Lua script running in timeout
* condition, to avoid mixing the propagation of scripts with the propagation
* of DELs due to eviction. */
* condition, to avoid mixing the propagation of scripts with the
* propagation of DELs due to eviction. */
if (server.maxmemory && !server.lua_timedout) {
int out_of_memory = freeMemoryIfNeeded() == C_ERR;
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;
@ -3247,11 +3243,11 @@ sds genRedisInfoString(char *section) {
"allocator_frag_ratio:%.2f\r\n"
"allocator_frag_bytes:%zu\r\n"
"allocator_rss_ratio:%.2f\r\n"
"allocator_rss_bytes:%zu\r\n"
"allocator_rss_bytes:%zd\r\n"
"rss_overhead_ratio:%.2f\r\n"
"rss_overhead_bytes:%zu\r\n"
"rss_overhead_bytes:%zd\r\n"
"mem_fragmentation_ratio:%.2f\r\n"
"mem_fragmentation_bytes:%zu\r\n"
"mem_fragmentation_bytes:%zd\r\n"
"mem_not_counted_for_evict:%zu\r\n"
"mem_replication_backlog:%zu\r\n"
"mem_clients_slaves:%zu\r\n"

View File

@ -654,6 +654,9 @@ typedef struct multiCmd {
typedef struct multiState {
multiCmd *commands; /* Array of MULTI commands */
int count; /* Total number of MULTI commands */
int cmd_flags; /* The accumulated command flags OR-ed together.
So if at least a command has a given flag, it
will be set in this field. */
int minreplicas; /* MINREPLICAS for synchronous replication */
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
} multiState;
@ -864,11 +867,11 @@ struct redisMemOverhead {
float dataset_perc;
float peak_perc;
float total_frag;
size_t total_frag_bytes;
ssize_t total_frag_bytes;
float allocator_frag;
size_t allocator_frag_bytes;
ssize_t allocator_frag_bytes;
float allocator_rss;
size_t allocator_rss_bytes;
ssize_t allocator_rss_bytes;
float rss_extra;
size_t rss_extra_bytes;
size_t num_dbs;
@ -1699,6 +1702,7 @@ int zslLexValueLteMax(sds value, zlexrangespec *spec);
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level);
size_t freeMemoryGetNotCountedMemory();
int freeMemoryIfNeeded(void);
int freeMemoryIfNeededAndSafe(void);
int processCommand(client *c);
void setupSignalHandlers(void);
struct redisCommand *lookupCommand(sds name);

View File

@ -48,7 +48,7 @@
int stringmatchlen(const char *pattern, int patternLen,
const char *string, int stringLen, int nocase)
{
while(patternLen) {
while(patternLen && stringLen) {
switch(pattern[0]) {
case '*':
while (pattern[1] == '*') {
@ -171,6 +171,22 @@ int stringmatch(const char *pattern, const char *string, int nocase) {
return stringmatchlen(pattern,strlen(pattern),string,strlen(string),nocase);
}
/* Fuzz stringmatchlen() trying to crash it with bad input. */
int stringmatchlen_fuzz_test(void) {
char str[32];
char pat[32];
int cycles = 10000000;
int total_matches = 0;
while(cycles--) {
int strlen = rand() % sizeof(str);
int patlen = rand() % sizeof(pat);
for (int j = 0; j < strlen; j++) str[j] = rand() % 128;
for (int j = 0; j < patlen; j++) pat[j] = rand() % 128;
total_matches += stringmatchlen(pat, patlen, str, strlen, 0);
}
return total_matches;
}
/* Convert a string representing an amount of memory into the number of
* bytes, so for instance memtoll("1Gb") will return 1073741824 that is
* (1024*1024*1024).

View File

@ -40,6 +40,7 @@
int stringmatchlen(const char *p, int plen, const char *s, int slen, int nocase);
int stringmatch(const char *p, const char *s, int nocase);
int stringmatchlen_fuzz_test(void);
long long memtoll(const char *p, int *err);
uint32_t digits10(uint64_t v);
uint32_t sdigits10(int64_t v);

View File

@ -275,7 +275,6 @@ start_server {tags {"repl"}} {
start_server {} {
test "Master stream is correctly processed while the replica has a script in -BUSY state" {
set slave [srv 0 client]
puts [srv 0 port]
$slave config set lua-time-limit 500
$slave slaveof $master_host $master_port