Merge branch 'unstable' of https://github.com/antirez/redis into unstable
This commit is contained in:
commit
9f4b121512
|
@ -1248,7 +1248,7 @@ void configSetCommand(client *c) {
|
|||
if (server.maxmemory < zmalloc_used_memory()) {
|
||||
serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET is smaller than the current memory usage. This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.");
|
||||
}
|
||||
freeMemoryIfNeeded();
|
||||
freeMemoryIfNeededAndSafe();
|
||||
}
|
||||
} config_set_memory_field(
|
||||
"proto-max-bulk-len",server.proto_max_bulk_len) {
|
||||
|
|
2
src/db.c
2
src/db.c
|
@ -212,7 +212,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
|
|||
* 2) clients WATCHing for the destination key notified.
|
||||
* 3) The expire time of the key is reset (the key is made persistent).
|
||||
*
|
||||
* All the new keys in the database should be creted via this interface. */
|
||||
* All the new keys in the database should be created via this interface. */
|
||||
void setKey(redisDb *db, robj *key, robj *val) {
|
||||
if (lookupKeyWrite(db,key) == NULL) {
|
||||
dbAdd(db,key,val);
|
||||
|
|
299
src/debug.c
299
src/debug.c
|
@ -74,7 +74,7 @@ void xorDigest(unsigned char *digest, void *ptr, size_t len) {
|
|||
digest[j] ^= hash[j];
|
||||
}
|
||||
|
||||
void xorObjectDigest(unsigned char *digest, robj *o) {
|
||||
void xorStringObjectDigest(unsigned char *digest, robj *o) {
|
||||
o = getDecodedObject(o);
|
||||
xorDigest(digest,o->ptr,sdslen(o->ptr));
|
||||
decrRefCount(o);
|
||||
|
@ -104,12 +104,151 @@ void mixDigest(unsigned char *digest, void *ptr, size_t len) {
|
|||
SHA1Final(digest,&ctx);
|
||||
}
|
||||
|
||||
void mixObjectDigest(unsigned char *digest, robj *o) {
|
||||
void mixStringObjectDigest(unsigned char *digest, robj *o) {
|
||||
o = getDecodedObject(o);
|
||||
mixDigest(digest,o->ptr,sdslen(o->ptr));
|
||||
decrRefCount(o);
|
||||
}
|
||||
|
||||
/* This function computes the digest of a data structure stored in the
|
||||
* object 'o'. It is the core of the DEBUG DIGEST command: when taking the
|
||||
* digest of a whole dataset, we take the digest of the key and the value
|
||||
* pair, and xor all those together.
|
||||
*
|
||||
* Note that this function does not reset the initial 'digest' passed, it
|
||||
* will continue mixing this object digest to anything that was already
|
||||
* present. */
|
||||
void xorObjectDigest(redisDb *db, robj *keyobj, unsigned char *digest, robj *o) {
|
||||
uint32_t aux = htonl(o->type);
|
||||
mixDigest(digest,&aux,sizeof(aux));
|
||||
long long expiretime = getExpire(db,keyobj);
|
||||
char buf[128];
|
||||
|
||||
/* Save the key and associated value */
|
||||
if (o->type == OBJ_STRING) {
|
||||
mixStringObjectDigest(digest,o);
|
||||
} else if (o->type == OBJ_LIST) {
|
||||
listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL);
|
||||
listTypeEntry entry;
|
||||
while(listTypeNext(li,&entry)) {
|
||||
robj *eleobj = listTypeGet(&entry);
|
||||
mixStringObjectDigest(digest,eleobj);
|
||||
decrRefCount(eleobj);
|
||||
}
|
||||
listTypeReleaseIterator(li);
|
||||
} else if (o->type == OBJ_SET) {
|
||||
setTypeIterator *si = setTypeInitIterator(o);
|
||||
sds sdsele;
|
||||
while((sdsele = setTypeNextObject(si)) != NULL) {
|
||||
xorDigest(digest,sdsele,sdslen(sdsele));
|
||||
sdsfree(sdsele);
|
||||
}
|
||||
setTypeReleaseIterator(si);
|
||||
} else if (o->type == OBJ_ZSET) {
|
||||
unsigned char eledigest[20];
|
||||
|
||||
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||
unsigned char *zl = o->ptr;
|
||||
unsigned char *eptr, *sptr;
|
||||
unsigned char *vstr;
|
||||
unsigned int vlen;
|
||||
long long vll;
|
||||
double score;
|
||||
|
||||
eptr = ziplistIndex(zl,0);
|
||||
serverAssert(eptr != NULL);
|
||||
sptr = ziplistNext(zl,eptr);
|
||||
serverAssert(sptr != NULL);
|
||||
|
||||
while (eptr != NULL) {
|
||||
serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
|
||||
score = zzlGetScore(sptr);
|
||||
|
||||
memset(eledigest,0,20);
|
||||
if (vstr != NULL) {
|
||||
mixDigest(eledigest,vstr,vlen);
|
||||
} else {
|
||||
ll2string(buf,sizeof(buf),vll);
|
||||
mixDigest(eledigest,buf,strlen(buf));
|
||||
}
|
||||
|
||||
snprintf(buf,sizeof(buf),"%.17g",score);
|
||||
mixDigest(eledigest,buf,strlen(buf));
|
||||
xorDigest(digest,eledigest,20);
|
||||
zzlNext(zl,&eptr,&sptr);
|
||||
}
|
||||
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||
zset *zs = o->ptr;
|
||||
dictIterator *di = dictGetIterator(zs->dict);
|
||||
dictEntry *de;
|
||||
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
sds sdsele = dictGetKey(de);
|
||||
double *score = dictGetVal(de);
|
||||
|
||||
snprintf(buf,sizeof(buf),"%.17g",*score);
|
||||
memset(eledigest,0,20);
|
||||
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
||||
mixDigest(eledigest,buf,strlen(buf));
|
||||
xorDigest(digest,eledigest,20);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
} else {
|
||||
serverPanic("Unknown sorted set encoding");
|
||||
}
|
||||
} else if (o->type == OBJ_HASH) {
|
||||
hashTypeIterator *hi = hashTypeInitIterator(o);
|
||||
while (hashTypeNext(hi) != C_ERR) {
|
||||
unsigned char eledigest[20];
|
||||
sds sdsele;
|
||||
|
||||
memset(eledigest,0,20);
|
||||
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY);
|
||||
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
||||
sdsfree(sdsele);
|
||||
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE);
|
||||
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
||||
sdsfree(sdsele);
|
||||
xorDigest(digest,eledigest,20);
|
||||
}
|
||||
hashTypeReleaseIterator(hi);
|
||||
} else if (o->type == OBJ_STREAM) {
|
||||
streamIterator si;
|
||||
streamIteratorStart(&si,o->ptr,NULL,NULL,0);
|
||||
streamID id;
|
||||
int64_t numfields;
|
||||
|
||||
while(streamIteratorGetID(&si,&id,&numfields)) {
|
||||
sds itemid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq);
|
||||
mixDigest(digest,itemid,sdslen(itemid));
|
||||
sdsfree(itemid);
|
||||
|
||||
while(numfields--) {
|
||||
unsigned char *field, *value;
|
||||
int64_t field_len, value_len;
|
||||
streamIteratorGetField(&si,&field,&value,
|
||||
&field_len,&value_len);
|
||||
mixDigest(digest,field,field_len);
|
||||
mixDigest(digest,value,value_len);
|
||||
}
|
||||
}
|
||||
streamIteratorStop(&si);
|
||||
} else if (o->type == OBJ_MODULE) {
|
||||
RedisModuleDigest md;
|
||||
moduleValue *mv = o->ptr;
|
||||
moduleType *mt = mv->type;
|
||||
moduleInitDigestContext(md);
|
||||
if (mt->digest) {
|
||||
mt->digest(&md,mv->value);
|
||||
xorDigest(digest,md.x,sizeof(md.x));
|
||||
}
|
||||
} else {
|
||||
serverPanic("Unknown object type");
|
||||
}
|
||||
/* If the key has an expire, add it to the mix */
|
||||
if (expiretime != -1) xorDigest(digest,"!!expire!!",10);
|
||||
}
|
||||
|
||||
/* Compute the dataset digest. Since keys, sets elements, hashes elements
|
||||
* are not ordered, we use a trick: every aggregate digest is the xor
|
||||
* of the digests of their elements. This way the order will not change
|
||||
|
@ -118,7 +257,6 @@ void mixObjectDigest(unsigned char *digest, robj *o) {
|
|||
* a different digest. */
|
||||
void computeDatasetDigest(unsigned char *final) {
|
||||
unsigned char digest[20];
|
||||
char buf[128];
|
||||
dictIterator *di = NULL;
|
||||
dictEntry *de;
|
||||
int j;
|
||||
|
@ -141,7 +279,6 @@ void computeDatasetDigest(unsigned char *final) {
|
|||
while((de = dictNext(di)) != NULL) {
|
||||
sds key;
|
||||
robj *keyobj, *o;
|
||||
long long expiretime;
|
||||
|
||||
memset(digest,0,20); /* This key-val digest */
|
||||
key = dictGetKey(de);
|
||||
|
@ -150,134 +287,8 @@ void computeDatasetDigest(unsigned char *final) {
|
|||
mixDigest(digest,key,sdslen(key));
|
||||
|
||||
o = dictGetVal(de);
|
||||
xorObjectDigest(db,keyobj,digest,o);
|
||||
|
||||
aux = htonl(o->type);
|
||||
mixDigest(digest,&aux,sizeof(aux));
|
||||
expiretime = getExpire(db,keyobj);
|
||||
|
||||
/* Save the key and associated value */
|
||||
if (o->type == OBJ_STRING) {
|
||||
mixObjectDigest(digest,o);
|
||||
} else if (o->type == OBJ_LIST) {
|
||||
listTypeIterator *li = listTypeInitIterator(o,0,LIST_TAIL);
|
||||
listTypeEntry entry;
|
||||
while(listTypeNext(li,&entry)) {
|
||||
robj *eleobj = listTypeGet(&entry);
|
||||
mixObjectDigest(digest,eleobj);
|
||||
decrRefCount(eleobj);
|
||||
}
|
||||
listTypeReleaseIterator(li);
|
||||
} else if (o->type == OBJ_SET) {
|
||||
setTypeIterator *si = setTypeInitIterator(o);
|
||||
sds sdsele;
|
||||
while((sdsele = setTypeNextObject(si)) != NULL) {
|
||||
xorDigest(digest,sdsele,sdslen(sdsele));
|
||||
sdsfree(sdsele);
|
||||
}
|
||||
setTypeReleaseIterator(si);
|
||||
} else if (o->type == OBJ_ZSET) {
|
||||
unsigned char eledigest[20];
|
||||
|
||||
if (o->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||
unsigned char *zl = o->ptr;
|
||||
unsigned char *eptr, *sptr;
|
||||
unsigned char *vstr;
|
||||
unsigned int vlen;
|
||||
long long vll;
|
||||
double score;
|
||||
|
||||
eptr = ziplistIndex(zl,0);
|
||||
serverAssert(eptr != NULL);
|
||||
sptr = ziplistNext(zl,eptr);
|
||||
serverAssert(sptr != NULL);
|
||||
|
||||
while (eptr != NULL) {
|
||||
serverAssert(ziplistGet(eptr,&vstr,&vlen,&vll));
|
||||
score = zzlGetScore(sptr);
|
||||
|
||||
memset(eledigest,0,20);
|
||||
if (vstr != NULL) {
|
||||
mixDigest(eledigest,vstr,vlen);
|
||||
} else {
|
||||
ll2string(buf,sizeof(buf),vll);
|
||||
mixDigest(eledigest,buf,strlen(buf));
|
||||
}
|
||||
|
||||
snprintf(buf,sizeof(buf),"%.17g",score);
|
||||
mixDigest(eledigest,buf,strlen(buf));
|
||||
xorDigest(digest,eledigest,20);
|
||||
zzlNext(zl,&eptr,&sptr);
|
||||
}
|
||||
} else if (o->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||
zset *zs = o->ptr;
|
||||
dictIterator *di = dictGetIterator(zs->dict);
|
||||
dictEntry *de;
|
||||
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
sds sdsele = dictGetKey(de);
|
||||
double *score = dictGetVal(de);
|
||||
|
||||
snprintf(buf,sizeof(buf),"%.17g",*score);
|
||||
memset(eledigest,0,20);
|
||||
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
||||
mixDigest(eledigest,buf,strlen(buf));
|
||||
xorDigest(digest,eledigest,20);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
} else {
|
||||
serverPanic("Unknown sorted set encoding");
|
||||
}
|
||||
} else if (o->type == OBJ_HASH) {
|
||||
hashTypeIterator *hi = hashTypeInitIterator(o);
|
||||
while (hashTypeNext(hi) != C_ERR) {
|
||||
unsigned char eledigest[20];
|
||||
sds sdsele;
|
||||
|
||||
memset(eledigest,0,20);
|
||||
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_KEY);
|
||||
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
||||
sdsfree(sdsele);
|
||||
sdsele = hashTypeCurrentObjectNewSds(hi,OBJ_HASH_VALUE);
|
||||
mixDigest(eledigest,sdsele,sdslen(sdsele));
|
||||
sdsfree(sdsele);
|
||||
xorDigest(digest,eledigest,20);
|
||||
}
|
||||
hashTypeReleaseIterator(hi);
|
||||
} else if (o->type == OBJ_STREAM) {
|
||||
streamIterator si;
|
||||
streamIteratorStart(&si,o->ptr,NULL,NULL,0);
|
||||
streamID id;
|
||||
int64_t numfields;
|
||||
|
||||
while(streamIteratorGetID(&si,&id,&numfields)) {
|
||||
sds itemid = sdscatfmt(sdsempty(),"%U.%U",id.ms,id.seq);
|
||||
mixDigest(digest,itemid,sdslen(itemid));
|
||||
sdsfree(itemid);
|
||||
|
||||
while(numfields--) {
|
||||
unsigned char *field, *value;
|
||||
int64_t field_len, value_len;
|
||||
streamIteratorGetField(&si,&field,&value,
|
||||
&field_len,&value_len);
|
||||
mixDigest(digest,field,field_len);
|
||||
mixDigest(digest,value,value_len);
|
||||
}
|
||||
}
|
||||
streamIteratorStop(&si);
|
||||
} else if (o->type == OBJ_MODULE) {
|
||||
RedisModuleDigest md;
|
||||
moduleValue *mv = o->ptr;
|
||||
moduleType *mt = mv->type;
|
||||
moduleInitDigestContext(md);
|
||||
if (mt->digest) {
|
||||
mt->digest(&md,mv->value);
|
||||
xorDigest(digest,md.x,sizeof(md.x));
|
||||
}
|
||||
} else {
|
||||
serverPanic("Unknown object type");
|
||||
}
|
||||
/* If the key has an expire, add it to the mix */
|
||||
if (expiretime != -1) xorDigest(digest,"!!expire!!",10);
|
||||
/* We can finally xor the key-val digest to the final digest */
|
||||
xorDigest(final,digest,20);
|
||||
decrRefCount(keyobj);
|
||||
|
@ -293,6 +304,7 @@ void debugCommand(client *c) {
|
|||
"CHANGE-REPL-ID -- Change the replication IDs of the instance. Dangerous, should be used only for testing the replication subsystem.",
|
||||
"CRASH-AND-RECOVER <milliseconds> -- Hard crash and restart after <milliseconds> delay.",
|
||||
"DIGEST -- Output a hex signature representing the current DB content.",
|
||||
"DIGEST-VALUE <key-1> ... <key-N>-- Output a hex signature of the values of all the specified keys.",
|
||||
"ERROR <string> -- Return a Redis protocol error with <string> as message. Useful for clients unit tests to simulate Redis errors.",
|
||||
"LOG <message> -- write message to the server log.",
|
||||
"HTSTATS <dbid> -- Return hash table statistics of the specified Redis database.",
|
||||
|
@ -310,6 +322,7 @@ void debugCommand(client *c) {
|
|||
"SLEEP <seconds> -- Stop the server for <seconds>. Decimals allowed.",
|
||||
"STRUCTSIZE -- Return the size of different Redis core C structures.",
|
||||
"ZIPLIST <key> -- Show low level info about the ziplist encoding.",
|
||||
"STRINGMATCH-TEST -- Run a fuzz tester against the stringmatchlen() function.",
|
||||
NULL
|
||||
};
|
||||
addReplyHelp(c, help);
|
||||
|
@ -336,7 +349,6 @@ NULL
|
|||
zfree(ptr);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"assert")) {
|
||||
if (c->argc >= 3) c->argv[2] = tryObjectEncoding(c->argv[2]);
|
||||
serverAssertWithInfo(c,c->argv[0],1 == 2);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"log") && c->argc == 3) {
|
||||
serverLog(LL_WARNING, "DEBUG LOG: %s", (char*)c->argv[2]->ptr);
|
||||
|
@ -495,15 +507,28 @@ NULL
|
|||
}
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"digest") && c->argc == 2) {
|
||||
/* DEBUG DIGEST (form without keys specified) */
|
||||
unsigned char digest[20];
|
||||
sds d = sdsempty();
|
||||
int j;
|
||||
|
||||
computeDatasetDigest(digest);
|
||||
for (j = 0; j < 20; j++)
|
||||
d = sdscatprintf(d, "%02x",digest[j]);
|
||||
for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]);
|
||||
addReplyStatus(c,d);
|
||||
sdsfree(d);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"digest-value") && c->argc >= 2) {
|
||||
/* DEBUG DIGEST-VALUE key key key ... key. */
|
||||
addReplyMultiBulkLen(c,c->argc-2);
|
||||
for (int j = 2; j < c->argc; j++) {
|
||||
unsigned char digest[20];
|
||||
memset(digest,0,20); /* Start with a clean result */
|
||||
robj *o = lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH);
|
||||
if (o) xorObjectDigest(c->db,c->argv[j],digest,o);
|
||||
|
||||
sds d = sdsempty();
|
||||
for (int i = 0; i < 20; i++) d = sdscatprintf(d, "%02x",digest[i]);
|
||||
addReplyStatus(c,d);
|
||||
sdsfree(d);
|
||||
}
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"sleep") && c->argc == 3) {
|
||||
double dtime = strtod(c->argv[2]->ptr,NULL);
|
||||
long long utime = dtime*1000000;
|
||||
|
@ -595,6 +620,10 @@ NULL
|
|||
changeReplicationId();
|
||||
clearReplicationId2();
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"stringmatch-test") && c->argc == 2)
|
||||
{
|
||||
stringmatchlen_fuzz_test();
|
||||
addReplyStatus(c,"Apparently Redis did not crash: test passed");
|
||||
} else {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
return;
|
||||
|
|
15
src/evict.c
15
src/evict.c
|
@ -444,8 +444,8 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
|
|||
* Otehrwise if we are over the memory limit, but not enough memory
|
||||
* was freed to return back under the limit, the function returns C_ERR. */
|
||||
int freeMemoryIfNeeded(void) {
|
||||
/* By default slaves should ignore maxmemory and just be masters excat
|
||||
* copies. */
|
||||
/* By default replicas should ignore maxmemory
|
||||
* and just be masters exact copies. */
|
||||
if (server.masterhost && server.repl_slave_ignore_maxmemory) return C_OK;
|
||||
|
||||
size_t mem_reported, mem_tofree, mem_freed;
|
||||
|
@ -622,3 +622,14 @@ cant_free:
|
|||
return C_ERR;
|
||||
}
|
||||
|
||||
/* This is a wrapper for freeMemoryIfNeeded() that only really calls the
|
||||
* function if right now there are the conditions to do so safely:
|
||||
*
|
||||
* - There must be no script in timeout condition.
|
||||
* - Nor we are loading data right now.
|
||||
*
|
||||
*/
|
||||
int freeMemoryIfNeededAndSafe(void) {
|
||||
if (server.lua_timedout || server.loading) return C_OK;
|
||||
return freeMemoryIfNeeded();
|
||||
}
|
||||
|
|
17
src/multi.c
17
src/multi.c
|
@ -35,6 +35,7 @@
|
|||
void initClientMultiState(client *c) {
|
||||
c->mstate.commands = NULL;
|
||||
c->mstate.count = 0;
|
||||
c->mstate.cmd_flags = 0;
|
||||
}
|
||||
|
||||
/* Release all the resources associated with MULTI/EXEC state */
|
||||
|
@ -67,6 +68,7 @@ void queueMultiCommand(client *c) {
|
|||
for (j = 0; j < c->argc; j++)
|
||||
incrRefCount(mc->argv[j]);
|
||||
c->mstate.count++;
|
||||
c->mstate.cmd_flags |= c->cmd->flags;
|
||||
}
|
||||
|
||||
void discardTransaction(client *c) {
|
||||
|
@ -137,6 +139,21 @@ void execCommand(client *c) {
|
|||
goto handle_monitor;
|
||||
}
|
||||
|
||||
/* If there are write commands inside the transaction, and this is a read
|
||||
* only slave, we want to send an error. This happens when the transaction
|
||||
* was initiated when the instance was a master or a writable replica and
|
||||
* then the configuration changed (for example instance was turned into
|
||||
* a replica). */
|
||||
if (!server.loading && server.masterhost && server.repl_slave_ro &&
|
||||
!(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE)
|
||||
{
|
||||
addReplyError(c,
|
||||
"Transaction contains write commands but instance "
|
||||
"is now a read-only replica. EXEC aborted.");
|
||||
discardTransaction(c);
|
||||
goto handle_monitor;
|
||||
}
|
||||
|
||||
/* Exec all the queued commands */
|
||||
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
|
||||
orig_argv = c->argv;
|
||||
|
|
|
@ -365,19 +365,13 @@ void addReplyErrorLength(client *c, const char *s, size_t len) {
|
|||
* Where the master must propagate the first change even if the second
|
||||
* will produce an error. However it is useful to log such events since
|
||||
* they are rare and may hint at errors in a script or a bug in Redis. */
|
||||
if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE)) {
|
||||
if (c->flags & (CLIENT_MASTER|CLIENT_SLAVE) && !(c->flags & CLIENT_MONITOR)) {
|
||||
char* to = c->flags & CLIENT_MASTER? "master": "replica";
|
||||
char* from = c->flags & CLIENT_MASTER? "replica": "master";
|
||||
char *cmdname = c->lastcmd ? c->lastcmd->name : "<unknown>";
|
||||
serverLog(LL_WARNING,"== CRITICAL == This %s is sending an error "
|
||||
"to its %s: '%s' after processing the command "
|
||||
"'%s'", from, to, s, cmdname);
|
||||
/* Here we want to panic because when a master is sending an
|
||||
* error to some slave in the context of replication, this can
|
||||
* only create some kind of offset or data desynchronization. Better
|
||||
* to catch it ASAP and crash instead of continuing. */
|
||||
if (c->flags & CLIENT_SLAVE)
|
||||
serverPanic("Continuing is unsafe: replication protocol violation.");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1470,7 +1464,7 @@ void processInputBuffer(client *c) {
|
|||
}
|
||||
|
||||
/* Trim to pos */
|
||||
if (c->qb_pos) {
|
||||
if (server.current_client != NULL && c->qb_pos) {
|
||||
sdsrange(c->querybuf,c->qb_pos,-1);
|
||||
c->qb_pos = 0;
|
||||
}
|
||||
|
|
672
src/redis-cli.c
672
src/redis-cli.c
|
@ -67,6 +67,7 @@
|
|||
#define REDIS_CLI_HISTFILE_DEFAULT ".rediscli_history"
|
||||
#define REDIS_CLI_RCFILE_ENV "REDISCLI_RCFILE"
|
||||
#define REDIS_CLI_RCFILE_DEFAULT ".redisclirc"
|
||||
#define REDIS_CLI_AUTH_ENV "REDISCLI_AUTH"
|
||||
|
||||
#define CLUSTER_MANAGER_SLOTS 16384
|
||||
#define CLUSTER_MANAGER_MIGRATE_TIMEOUT 60000
|
||||
|
@ -116,6 +117,7 @@
|
|||
#define CLUSTER_MANAGER_CMD_FLAG_REPLACE 1 << 6
|
||||
#define CLUSTER_MANAGER_CMD_FLAG_COPY 1 << 7
|
||||
#define CLUSTER_MANAGER_CMD_FLAG_COLOR 1 << 8
|
||||
#define CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS 1 << 9
|
||||
|
||||
#define CLUSTER_MANAGER_OPT_GETFRIENDS 1 << 0
|
||||
#define CLUSTER_MANAGER_OPT_COLD 1 << 1
|
||||
|
@ -1377,6 +1379,9 @@ static int parseOptions(int argc, char **argv) {
|
|||
} else if (!strcmp(argv[i],"--cluster-use-empty-masters")) {
|
||||
config.cluster_manager_command.flags |=
|
||||
CLUSTER_MANAGER_CMD_FLAG_EMPTYMASTER;
|
||||
} else if (!strcmp(argv[i],"--cluster-search-multiple-owners")) {
|
||||
config.cluster_manager_command.flags |=
|
||||
CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS;
|
||||
} else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) {
|
||||
sds version = cliVersion();
|
||||
printf("redis-cli %s\n", version);
|
||||
|
@ -1419,6 +1424,14 @@ static int parseOptions(int argc, char **argv) {
|
|||
return i;
|
||||
}
|
||||
|
||||
static void parseEnv() {
|
||||
/* Set auth from env, but do not overwrite CLI arguments if passed */
|
||||
char *auth = getenv(REDIS_CLI_AUTH_ENV);
|
||||
if (auth != NULL && config.auth == NULL) {
|
||||
config.auth = auth;
|
||||
}
|
||||
}
|
||||
|
||||
static sds readArgFromStdin(void) {
|
||||
char buf[1024];
|
||||
sds arg = sdsempty();
|
||||
|
@ -1446,6 +1459,9 @@ static void usage(void) {
|
|||
" -p <port> Server port (default: 6379).\n"
|
||||
" -s <socket> Server socket (overrides hostname and port).\n"
|
||||
" -a <password> Password to use when connecting to the server.\n"
|
||||
" You can also use the " REDIS_CLI_AUTH_ENV " environment\n"
|
||||
" variable to pass this password more safely\n"
|
||||
" (if both are used, this argument takes predecence).\n"
|
||||
" -u <uri> Server URI.\n"
|
||||
" -r <repeat> Execute specified command N times.\n"
|
||||
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
|
||||
|
@ -1834,7 +1850,7 @@ static int evalMode(int argc, char **argv) {
|
|||
if (eval_ldb) {
|
||||
if (!config.eval_ldb) {
|
||||
/* If the debugging session ended immediately, there was an
|
||||
* error compiling the script. Show it and don't enter
|
||||
* error compiling the script. Show it and they don't enter
|
||||
* the REPL at all. */
|
||||
printf("Eval debugging session can't start:\n");
|
||||
cliReadReply(0);
|
||||
|
@ -1917,6 +1933,8 @@ static dictType clusterManagerDictType = {
|
|||
};
|
||||
|
||||
typedef int clusterManagerCommandProc(int argc, char **argv);
|
||||
typedef int (*clusterManagerOnReplyError)(redisReply *reply,
|
||||
clusterManagerNode *n, int bulk_idx);
|
||||
|
||||
/* Cluster Manager helper functions */
|
||||
|
||||
|
@ -1978,14 +1996,17 @@ typedef struct clusterManagerCommandDef {
|
|||
clusterManagerCommandDef clusterManagerCommands[] = {
|
||||
{"create", clusterManagerCommandCreate, -2, "host1:port1 ... hostN:portN",
|
||||
"replicas <arg>"},
|
||||
{"check", clusterManagerCommandCheck, -1, "host:port", NULL},
|
||||
{"check", clusterManagerCommandCheck, -1, "host:port",
|
||||
"search-multiple-owners"},
|
||||
{"info", clusterManagerCommandInfo, -1, "host:port", NULL},
|
||||
{"fix", clusterManagerCommandFix, -1, "host:port", NULL},
|
||||
{"fix", clusterManagerCommandFix, -1, "host:port",
|
||||
"search-multiple-owners"},
|
||||
{"reshard", clusterManagerCommandReshard, -1, "host:port",
|
||||
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>"},
|
||||
"from <arg>,to <arg>,slots <arg>,yes,timeout <arg>,pipeline <arg>,"
|
||||
"replace"},
|
||||
{"rebalance", clusterManagerCommandRebalance, -1, "host:port",
|
||||
"weight <node1=w1...nodeN=wN>,use-empty-masters,"
|
||||
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>"},
|
||||
"timeout <arg>,simulate,pipeline <arg>,threshold <arg>,replace"},
|
||||
{"add-node", clusterManagerCommandAddNode, 2,
|
||||
"new_host:new_port existing_host:existing_port", "slave,master-id <arg>"},
|
||||
{"del-node", clusterManagerCommandDeleteNode, 2, "host:port node_id",NULL},
|
||||
|
@ -2176,6 +2197,44 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n,
|
|||
return 1;
|
||||
}
|
||||
|
||||
/* Call MULTI command on a cluster node. */
|
||||
static int clusterManagerStartTransaction(clusterManagerNode *node) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
/* Call EXEC command on a cluster node. */
|
||||
static int clusterManagerExecTransaction(clusterManagerNode *node,
|
||||
clusterManagerOnReplyError onerror)
|
||||
{
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "EXEC");
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (success) {
|
||||
if (reply->type != REDIS_REPLY_ARRAY) {
|
||||
success = 0;
|
||||
goto cleanup;
|
||||
}
|
||||
size_t i;
|
||||
for (i = 0; i < reply->elements; i++) {
|
||||
redisReply *r = reply->element[i];
|
||||
char *err = NULL;
|
||||
success = clusterManagerCheckRedisReply(node, r, &err);
|
||||
if (!success && onerror) success = onerror(r, node, i);
|
||||
if (err) {
|
||||
if (!success)
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
|
||||
zfree(err);
|
||||
}
|
||||
if (!success) break;
|
||||
}
|
||||
}
|
||||
cleanup:
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerNodeConnect(clusterManagerNode *node) {
|
||||
if (node->context) redisFree(node->context);
|
||||
node->context = redisConnect(node->ip, node->port);
|
||||
|
@ -2710,6 +2769,55 @@ cleanup:
|
|||
return success;
|
||||
}
|
||||
|
||||
/* Get the node the slot is assigned to from the point of view of node *n.
|
||||
* If the slot is unassigned or if the reply is an error, return NULL.
|
||||
* Use the **err argument in order to check wether the slot is unassigned
|
||||
* or the reply resulted in an error. */
|
||||
static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n,
|
||||
int slot, char **err)
|
||||
{
|
||||
assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS);
|
||||
clusterManagerNode *owner = NULL;
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS");
|
||||
if (clusterManagerCheckRedisReply(n, reply, err)) {
|
||||
assert(reply->type == REDIS_REPLY_ARRAY);
|
||||
size_t i;
|
||||
for (i = 0; i < reply->elements; i++) {
|
||||
redisReply *r = reply->element[i];
|
||||
assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3);
|
||||
int from, to;
|
||||
from = r->element[0]->integer;
|
||||
to = r->element[1]->integer;
|
||||
if (slot < from || slot > to) continue;
|
||||
redisReply *nr = r->element[2];
|
||||
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2);
|
||||
char *name = NULL;
|
||||
if (nr->elements >= 3)
|
||||
name = nr->element[2]->str;
|
||||
if (name != NULL)
|
||||
owner = clusterManagerNodeByName(name);
|
||||
else {
|
||||
char *ip = nr->element[0]->str;
|
||||
assert(ip != NULL);
|
||||
int port = (int) nr->element[1]->integer;
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *nd = ln->value;
|
||||
if (strcmp(nd->ip, ip) == 0 && port == nd->port) {
|
||||
owner = nd;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (owner) break;
|
||||
}
|
||||
}
|
||||
if (reply) freeReplyObject(reply);
|
||||
return owner;
|
||||
}
|
||||
|
||||
/* Set slot status to "importing" or "migrating" */
|
||||
static int clusterManagerSetSlot(clusterManagerNode *node1,
|
||||
clusterManagerNode *node2,
|
||||
|
@ -2734,6 +2842,162 @@ cleanup:
|
|||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerClearSlotStatus(clusterManagerNode *node, int slot) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER SETSLOT %d %s", slot, "STABLE");
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerDelSlot(clusterManagerNode *node, int slot,
|
||||
int ignore_unassigned_err)
|
||||
{
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER DELSLOTS %d", slot);
|
||||
char *err = NULL;
|
||||
int success = clusterManagerCheckRedisReply(node, reply, &err);
|
||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
ignore_unassigned_err)
|
||||
{
|
||||
char *get_owner_err = NULL;
|
||||
clusterManagerNode *assigned_to =
|
||||
clusterManagerGetSlotOwner(node, slot, &get_owner_err);
|
||||
if (!assigned_to) {
|
||||
if (get_owner_err == NULL) success = 1;
|
||||
else {
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err);
|
||||
zfree(get_owner_err);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!success && err != NULL) {
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
|
||||
zfree(err);
|
||||
}
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerAddSlot(clusterManagerNode *node, int slot) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER ADDSLOTS %d", slot);
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
static signed int clusterManagerCountKeysInSlot(clusterManagerNode *node,
|
||||
int slot)
|
||||
{
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node,
|
||||
"CLUSTER COUNTKEYSINSLOT %d", slot);
|
||||
int count = -1;
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (success && reply->type == REDIS_REPLY_INTEGER) count = reply->integer;
|
||||
if (reply) freeReplyObject(reply);
|
||||
return count;
|
||||
}
|
||||
|
||||
static int clusterManagerBumpEpoch(clusterManagerNode *node) {
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "CLUSTER BUMPEPOCH");
|
||||
int success = clusterManagerCheckRedisReply(node, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
return success;
|
||||
}
|
||||
|
||||
/* Callback used by clusterManagerSetSlotOwner transaction. It should ignore
|
||||
* errors except for ADDSLOTS errors.
|
||||
* Return 1 if the error should be ignored. */
|
||||
static int clusterManagerOnSetOwnerErr(redisReply *reply,
|
||||
clusterManagerNode *n, int bulk_idx)
|
||||
{
|
||||
UNUSED(reply);
|
||||
UNUSED(n);
|
||||
/* Only raise error when ADDSLOTS fail (bulk_idx == 1). */
|
||||
return (bulk_idx != 1);
|
||||
}
|
||||
|
||||
static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
|
||||
int slot,
|
||||
int do_clear)
|
||||
{
|
||||
int success = clusterManagerStartTransaction(owner);
|
||||
if (!success) return 0;
|
||||
/* Ensure the slot is not already assigned. */
|
||||
clusterManagerDelSlot(owner, slot, 1);
|
||||
/* Add the slot and bump epoch. */
|
||||
clusterManagerAddSlot(owner, slot);
|
||||
if (do_clear) clusterManagerClearSlotStatus(owner, slot);
|
||||
clusterManagerBumpEpoch(owner);
|
||||
success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr);
|
||||
return success;
|
||||
}
|
||||
|
||||
/* Get the hash for the values of the specified keys in *keys_reply for the
|
||||
* specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command
|
||||
* on both nodes. Every key with same name on both nodes but having different
|
||||
* values will be added to the *diffs list. Return 0 in case of reply
|
||||
* error. */
|
||||
static int clusterManagerCompareKeysValues(clusterManagerNode *n1,
|
||||
clusterManagerNode *n2,
|
||||
redisReply *keys_reply,
|
||||
list *diffs)
|
||||
{
|
||||
size_t i, argc = keys_reply->elements + 2;
|
||||
static const char *hash_zero = "0000000000000000000000000000000000000000";
|
||||
char **argv = zcalloc(argc * sizeof(char *));
|
||||
size_t *argv_len = zcalloc(argc * sizeof(size_t));
|
||||
argv[0] = "DEBUG";
|
||||
argv_len[0] = 5;
|
||||
argv[1] = "DIGEST-VALUE";
|
||||
argv_len[1] = 12;
|
||||
for (i = 0; i < keys_reply->elements; i++) {
|
||||
redisReply *entry = keys_reply->element[i];
|
||||
int idx = i + 2;
|
||||
argv[idx] = entry->str;
|
||||
argv_len[idx] = entry->len;
|
||||
}
|
||||
int success = 0;
|
||||
void *_reply1 = NULL, *_reply2 = NULL;
|
||||
redisReply *r1 = NULL, *r2 = NULL;
|
||||
redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len);
|
||||
success = (redisGetReply(n1->context, &_reply1) == REDIS_OK);
|
||||
if (!success) goto cleanup;
|
||||
r1 = (redisReply *) _reply1;
|
||||
redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len);
|
||||
success = (redisGetReply(n2->context, &_reply2) == REDIS_OK);
|
||||
if (!success) goto cleanup;
|
||||
r2 = (redisReply *) _reply2;
|
||||
success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR);
|
||||
if (r1->type == REDIS_REPLY_ERROR) {
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str);
|
||||
success = 0;
|
||||
}
|
||||
if (r2->type == REDIS_REPLY_ERROR) {
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str);
|
||||
success = 0;
|
||||
}
|
||||
if (!success) goto cleanup;
|
||||
assert(keys_reply->elements == r1->elements &&
|
||||
keys_reply->elements == r2->elements);
|
||||
for (i = 0; i < keys_reply->elements; i++) {
|
||||
char *key = keys_reply->element[i]->str;
|
||||
char *hash1 = r1->element[i]->str;
|
||||
char *hash2 = r2->element[i]->str;
|
||||
/* Ignore keys that don't exist in both nodes. */
|
||||
if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0)
|
||||
continue;
|
||||
if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key);
|
||||
}
|
||||
cleanup:
|
||||
if (r1) freeReplyObject(r1);
|
||||
if (r2) freeReplyObject(r2);
|
||||
zfree(argv);
|
||||
zfree(argv_len);
|
||||
return success;
|
||||
}
|
||||
|
||||
/* Migrate keys taken from reply->elements. It returns the reply from the
|
||||
* MIGRATE command, or NULL if something goes wrong. If the argument 'dots'
|
||||
* is not NULL, a dot will be printed for every migrated key. */
|
||||
|
@ -2814,8 +3078,10 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||
char **err)
|
||||
{
|
||||
int success = 1;
|
||||
int do_fix = (config.cluster_manager_command.flags &
|
||||
CLUSTER_MANAGER_CMD_FLAG_FIX);
|
||||
int do_fix = config.cluster_manager_command.flags &
|
||||
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||
int do_replace = config.cluster_manager_command.flags &
|
||||
CLUSTER_MANAGER_CMD_FLAG_REPLACE;
|
||||
while (1) {
|
||||
char *dots = NULL;
|
||||
redisReply *reply = NULL, *migrate_reply = NULL;
|
||||
|
@ -2846,16 +3112,86 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||
dots);
|
||||
if (migrate_reply == NULL) goto next;
|
||||
if (migrate_reply->type == REDIS_REPLY_ERROR) {
|
||||
if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) {
|
||||
/* If the key already exists, try to migrate keys
|
||||
* adding REPLACE option.
|
||||
* If the key's slot is not served, try to assign slot
|
||||
int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
|
||||
int not_served = 0;
|
||||
if (!is_busy) {
|
||||
/* Check if the slot is unassigned (not served) in the
|
||||
* source node's configuration. */
|
||||
char *get_owner_err = NULL;
|
||||
clusterManagerNode *served_by =
|
||||
clusterManagerGetSlotOwner(source, slot, &get_owner_err);
|
||||
if (!served_by) {
|
||||
if (get_owner_err == NULL) not_served = 1;
|
||||
else {
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source,
|
||||
get_owner_err);
|
||||
zfree(get_owner_err);
|
||||
}
|
||||
}
|
||||
}
|
||||
/* Try to handle errors. */
|
||||
if (is_busy || not_served) {
|
||||
/* If the key's slot is not served, try to assign slot
|
||||
* to the target node. */
|
||||
int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL);
|
||||
if (strstr(migrate_reply->str, "slot not served") != NULL)
|
||||
if (do_fix && not_served) {
|
||||
clusterManagerLogWarn("*** Slot was not served, setting "
|
||||
"owner to node %s:%d.\n",
|
||||
target->ip, target->port);
|
||||
clusterManagerSetSlot(source, target, slot, "node", NULL);
|
||||
clusterManagerLogWarn("*** Target key exists. "
|
||||
"Replacing it for FIX.\n");
|
||||
}
|
||||
/* If the key already exists in the target node (BUSYKEY),
|
||||
* check whether its value is the same in both nodes.
|
||||
* In case of equal values, retry migration with the
|
||||
* REPLACE option.
|
||||
* In case of different values:
|
||||
* - If the migration is requested by the fix command, stop
|
||||
* and warn the user.
|
||||
* - In other cases (ie. reshard), proceed only if the user
|
||||
* launched the command with the --cluster-replace option.*/
|
||||
if (is_busy) {
|
||||
clusterManagerLogWarn("\n*** Target key exists\n");
|
||||
if (!do_replace) {
|
||||
clusterManagerLogWarn("*** Checking key values on "
|
||||
"both nodes...\n");
|
||||
list *diffs = listCreate();
|
||||
success = clusterManagerCompareKeysValues(source,
|
||||
target, reply, diffs);
|
||||
if (!success) {
|
||||
clusterManagerLogErr("*** Value check failed!\n");
|
||||
listRelease(diffs);
|
||||
goto next;
|
||||
}
|
||||
if (listLength(diffs) > 0) {
|
||||
success = 0;
|
||||
clusterManagerLogErr(
|
||||
"*** Found %d key(s) in both source node and "
|
||||
"target node having different values.\n"
|
||||
" Source node: %s:%d\n"
|
||||
" Target node: %s:%d\n"
|
||||
" Keys(s):\n",
|
||||
listLength(diffs),
|
||||
source->ip, source->port,
|
||||
target->ip, target->port);
|
||||
listIter dli;
|
||||
listNode *dln;
|
||||
listRewind(diffs, &dli);
|
||||
while((dln = listNext(&dli)) != NULL) {
|
||||
char *k = dln->value;
|
||||
clusterManagerLogErr(" - %s\n", k);
|
||||
}
|
||||
clusterManagerLogErr("Please fix the above key(s) "
|
||||
"manually and try again "
|
||||
"or relaunch the command \n"
|
||||
"with --cluster-replace "
|
||||
"option to force key "
|
||||
"overriding.\n");
|
||||
listRelease(diffs);
|
||||
goto next;
|
||||
}
|
||||
listRelease(diffs);
|
||||
}
|
||||
clusterManagerLogWarn("*** Replacing target keys...\n");
|
||||
}
|
||||
freeReplyObject(migrate_reply);
|
||||
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
||||
target,
|
||||
|
@ -3610,24 +3946,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
listRewind(none, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
sds slot = ln->value;
|
||||
int s = atoi(slot);
|
||||
clusterManagerNode *n = clusterManagerNodeMasterRandom();
|
||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
if (!clusterManagerSetSlotOwner(n, s, 0)) {
|
||||
fixed = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
n->slots[atoi(slot)] = 1;
|
||||
n->slots[s] = 1;
|
||||
fixed++;
|
||||
}
|
||||
}
|
||||
|
@ -3635,7 +3964,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
|
||||
/* Handle case "2": keys only in one node. */
|
||||
if (listLength(single) > 0) {
|
||||
printf("The following uncovered slots have keys in just one node:\n");
|
||||
printf("The following uncovered slots have keys in just one node:\n");
|
||||
clusterManagerPrintSlotsList(single);
|
||||
if (confirmWithYes("Fix these slots by covering with those nodes?")){
|
||||
listIter li;
|
||||
|
@ -3643,6 +3972,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
listRewind(single, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
sds slot = ln->value;
|
||||
int s = atoi(slot);
|
||||
dictEntry *entry = dictFind(clusterManagerUncoveredSlots, slot);
|
||||
assert(entry != NULL);
|
||||
list *nodes = (list *) dictGetVal(entry);
|
||||
|
@ -3651,18 +3981,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
clusterManagerNode *n = fn->value;
|
||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
if (!clusterManagerSetSlotOwner(n, s, 0)) {
|
||||
fixed = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
n->slots[atoi(slot)] = 1;
|
||||
|
@ -3695,23 +4017,10 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
clusterManagerLogInfo(">>> Covering slot %s moving keys "
|
||||
"to %s:%d\n", slot,
|
||||
target->ip, target->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER SETSLOT %s %s", slot, "STABLE");
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
if (!clusterManagerSetSlotOwner(target, s, 1)) {
|
||||
fixed = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
target->slots[atoi(slot)] = 1;
|
||||
|
@ -3722,23 +4031,15 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
clusterManagerNode *src = nln->value;
|
||||
if (src == target) continue;
|
||||
/* Assign the slot to target node in the source node. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s %s", slot,
|
||||
"NODE", target->name);
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
if (!clusterManagerSetSlot(src, target, s, "NODE", NULL))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Set the source node in 'importing' state
|
||||
* (even if we will actually migrate keys away)
|
||||
* in order to avoid receiving redirections
|
||||
* for MIGRATE. */
|
||||
r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s %s", slot,
|
||||
"IMPORTING", target->name);
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (!clusterManagerSetSlot(src, target, s,
|
||||
"IMPORTING", NULL)) fixed = -1;
|
||||
if (fixed < 0) goto cleanup;
|
||||
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
|
||||
CLUSTER_MANAGER_OPT_COLD;
|
||||
|
@ -3746,12 +4047,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
fixed = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s", slot,
|
||||
"STABLE");
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
if (!clusterManagerClearSlotStatus(src, s))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
}
|
||||
fixed++;
|
||||
|
@ -3875,24 +4172,9 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
// Use ADDSLOTS to assign the slot.
|
||||
clusterManagerLogWarn("*** Configuring %s:%d as the slot owner\n",
|
||||
owner->ip, owner->port);
|
||||
redisReply *reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER "
|
||||
"SETSLOT %d %s",
|
||||
slot, "STABLE");
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerClearSlotStatus(owner, slot);
|
||||
if (!success) goto cleanup;
|
||||
/* Ensure that the slot is unassigned before assigning it to the
|
||||
* owner. */
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
/* Ignore "already unassigned" error. */
|
||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||
if (reply) freeReplyObject(reply);
|
||||
if (!success) goto cleanup;
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerSetSlotOwner(owner, slot, 0);
|
||||
if (!success) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
|
@ -3900,9 +4182,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
/* Make sure this information will propagate. Not strictly needed
|
||||
* since there is no past owner, so all the other nodes will accept
|
||||
* whatever epoch this node will claim the slot with. */
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerBumpEpoch(owner);
|
||||
if (!success) goto cleanup;
|
||||
/* Remove the owner from the list of migrating/importing
|
||||
* nodes. */
|
||||
|
@ -3922,16 +4202,10 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
* the owner has been set in the previous condition (owner == NULL). */
|
||||
assert(owner != NULL);
|
||||
listRewind(owners, &li);
|
||||
redisReply *reply = NULL;
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(n, reply, NULL);
|
||||
/* Ignore "already unassigned" error. */
|
||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||
if (reply) freeReplyObject(reply);
|
||||
success = clusterManagerDelSlot(n, slot, 1);
|
||||
if (!success) goto cleanup;
|
||||
n->slots[slot] = 0;
|
||||
/* Assign the slot to the owner in the node 'n' configuration.' */
|
||||
|
@ -3955,6 +4229,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
clusterManagerLogInfo(">>> Case 1: Moving slot %d from "
|
||||
"%s:%d to %s:%d\n", slot,
|
||||
src->ip, src->port, dst->ip, dst->port);
|
||||
move_opts |= CLUSTER_MANAGER_OPT_UPDATE;
|
||||
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
||||
}
|
||||
/* Case 2: There are multiple nodes that claim the slot as importing,
|
||||
|
@ -3973,11 +4248,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
if (!success) goto cleanup;
|
||||
clusterManagerLogInfo(">>> Setting %d as STABLE in "
|
||||
"%s:%d\n", slot, n->ip, n->port);
|
||||
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
||||
slot, "STABLE");
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
if (r) freeReplyObject(r);
|
||||
success = clusterManagerClearSlotStatus(n, slot);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
/* Since the slot has been moved in "cold" mode, ensure that all the
|
||||
|
@ -3987,12 +4258,76 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name);
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
if (r) freeReplyObject(r);
|
||||
success = clusterManagerSetSlot(n, owner, slot, "NODE", NULL);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
}
|
||||
/* Case 3: The slot is in migrating state in one node but multiple
|
||||
* other nodes claim to be in importing state and don't have any key in
|
||||
* the slot. We search for the importing node having the same ID as
|
||||
* the destination node of the migrating node.
|
||||
* In that case we move the slot from the migrating node to this node and
|
||||
* we close the importing states on all the other importing nodes.
|
||||
* If no importing node has the same ID as the destination node of the
|
||||
* migrating node, the slot's state is closed on both the migrating node
|
||||
* and the importing nodes. */
|
||||
else if (listLength(migrating) == 1 && listLength(importing) > 1) {
|
||||
int try_to_fix = 1;
|
||||
clusterManagerNode *src = listFirst(migrating)->value;
|
||||
clusterManagerNode *dst = NULL;
|
||||
sds target_id = NULL;
|
||||
for (int i = 0; i < src->migrating_count; i += 2) {
|
||||
sds migrating_slot = src->migrating[i];
|
||||
if (atoi(migrating_slot) == slot) {
|
||||
target_id = src->migrating[i + 1];
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert(target_id != NULL);
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(importing, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||
if (count > 0) {
|
||||
try_to_fix = 0;
|
||||
break;
|
||||
}
|
||||
if (strcmp(n->name, target_id) == 0) dst = n;
|
||||
}
|
||||
if (!try_to_fix) goto unhandled_case;
|
||||
if (dst != NULL) {
|
||||
clusterManagerLogInfo(">>> Case 3: Moving slot %d from %s:%d to "
|
||||
"%s:%d and closing it on all the other "
|
||||
"importing nodes.\n",
|
||||
slot, src->ip, src->port,
|
||||
dst->ip, dst->port);
|
||||
/* Move the slot to the destination node. */
|
||||
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
||||
if (!success) goto cleanup;
|
||||
/* Close slot on all the other importing nodes. */
|
||||
listRewind(importing, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (dst == n) continue;
|
||||
success = clusterManagerClearSlotStatus(n, slot);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
} else {
|
||||
clusterManagerLogInfo(">>> Case 3: Closing slot %d on both "
|
||||
"migrating and importing nodes.\n", slot);
|
||||
/* Close the slot on both the migrating node and the importing
|
||||
* nodes. */
|
||||
success = clusterManagerClearSlotStatus(src, slot);
|
||||
if (!success) goto cleanup;
|
||||
listRewind(importing, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
success = clusterManagerClearSlotStatus(n, slot);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
int try_to_close_slot = (listLength(importing) == 0 &&
|
||||
listLength(migrating) == 1);
|
||||
|
@ -4009,13 +4344,13 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
if (!success) goto cleanup;
|
||||
}
|
||||
}
|
||||
/* Case 3: There are no slots claiming to be in importing state, but
|
||||
* there is a migrating node that actually don't have any key or is the
|
||||
* slot owner. We can just close the slot, probably a reshard interrupted
|
||||
* in the middle. */
|
||||
/* Case 4: There are no slots claiming to be in importing state, but
|
||||
* there is a migrating node that actually don't have any key or is the
|
||||
* slot owner. We can just close the slot, probably a reshard
|
||||
* interrupted in the middle. */
|
||||
if (try_to_close_slot) {
|
||||
clusterManagerNode *n = listFirst(migrating)->value;
|
||||
clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n",
|
||||
clusterManagerLogInfo(">>> Case 4: Closing slot %d on %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
||||
slot, "STABLE");
|
||||
|
@ -4023,6 +4358,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
if (r) freeReplyObject(r);
|
||||
if (!success) goto cleanup;
|
||||
} else {
|
||||
unhandled_case:
|
||||
success = 0;
|
||||
clusterManagerLogErr("[ERR] Sorry, redis-cli can't fix this slot "
|
||||
"yet (work in progress). Slot is set as "
|
||||
|
@ -4040,17 +4376,55 @@ cleanup:
|
|||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerFixMultipleSlotOwners(int slot, list *owners) {
|
||||
clusterManagerLogInfo(">>> Fixing multiple owners for slot %d...\n", slot);
|
||||
int success = 0;
|
||||
assert(listLength(owners) > 1);
|
||||
clusterManagerNode *owner = clusterManagerGetNodeWithMostKeysInSlot(owners,
|
||||
slot,
|
||||
NULL);
|
||||
if (!owner) owner = listFirst(owners)->value;
|
||||
clusterManagerLogInfo(">>> Setting slot %d owner: %s:%d\n",
|
||||
slot, owner->ip, owner->port);
|
||||
/* Set the slot owner. */
|
||||
if (!clusterManagerSetSlotOwner(owner, slot, 0)) return 0;
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
/* Update configuration in all the other master nodes by assigning the slot
|
||||
* itself to the new owner, and by eventually migrating keys if the node
|
||||
* has keys for the slot. */
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||
success = (count >= 0);
|
||||
if (!success) break;
|
||||
clusterManagerDelSlot(n, slot, 1);
|
||||
if (!clusterManagerSetSlot(n, owner, slot, "node", NULL)) return 0;
|
||||
if (count > 0) {
|
||||
int opts = CLUSTER_MANAGER_OPT_VERBOSE |
|
||||
CLUSTER_MANAGER_OPT_COLD;
|
||||
success = clusterManagerMoveSlot(n, owner, slot, opts, NULL);
|
||||
if (!success) break;
|
||||
}
|
||||
}
|
||||
return success;
|
||||
}
|
||||
|
||||
static int clusterManagerCheckCluster(int quiet) {
|
||||
listNode *ln = listFirst(cluster_manager.nodes);
|
||||
if (!ln) return 0;
|
||||
int result = 1;
|
||||
int do_fix = config.cluster_manager_command.flags &
|
||||
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||
clusterManagerNode *node = ln->value;
|
||||
clusterManagerLogInfo(">>> Performing Cluster Check (using node %s:%d)\n",
|
||||
node->ip, node->port);
|
||||
int result = 1, consistent = 0;
|
||||
int do_fix = config.cluster_manager_command.flags &
|
||||
CLUSTER_MANAGER_CMD_FLAG_FIX;
|
||||
if (!quiet) clusterManagerShowNodes();
|
||||
if (!clusterManagerIsConfigConsistent()) {
|
||||
consistent = clusterManagerIsConfigConsistent();
|
||||
if (!consistent) {
|
||||
sds err = sdsnew("[ERR] Nodes don't agree about configuration!");
|
||||
clusterManagerOnError(err);
|
||||
result = 0;
|
||||
|
@ -4058,7 +4432,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||
clusterManagerLogOk("[OK] All nodes agree about slots "
|
||||
"configuration.\n");
|
||||
}
|
||||
// Check open slots
|
||||
/* Check open slots */
|
||||
clusterManagerLogInfo(">>> Check for open slots...\n");
|
||||
listIter li;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
|
@ -4077,7 +4451,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||
n->port);
|
||||
for (i = 0; i < n->migrating_count; i += 2) {
|
||||
sds slot = n->migrating[i];
|
||||
dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1]));
|
||||
dictReplace(open_slots, slot, sdsdup(n->migrating[i + 1]));
|
||||
char *fmt = (i > 0 ? ",%S" : "%S");
|
||||
errstr = sdscatfmt(errstr, fmt, slot);
|
||||
}
|
||||
|
@ -4095,7 +4469,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||
n->port);
|
||||
for (i = 0; i < n->importing_count; i += 2) {
|
||||
sds slot = n->importing[i];
|
||||
dictAdd(open_slots, slot, sdsdup(n->importing[i + 1]));
|
||||
dictReplace(open_slots, slot, sdsdup(n->importing[i + 1]));
|
||||
char *fmt = (i > 0 ? ",%S" : "%S");
|
||||
errstr = sdscatfmt(errstr, fmt, slot);
|
||||
}
|
||||
|
@ -4117,7 +4491,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||
clusterManagerLogErr("%s.\n", (char *) errstr);
|
||||
sdsfree(errstr);
|
||||
if (do_fix) {
|
||||
// Fix open slots.
|
||||
/* Fix open slots. */
|
||||
dictReleaseIterator(iter);
|
||||
iter = dictGetIterator(open_slots);
|
||||
while ((entry = dictNext(iter)) != NULL) {
|
||||
|
@ -4152,6 +4526,54 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||
if (fixed > 0) result = 1;
|
||||
}
|
||||
}
|
||||
int search_multiple_owners = config.cluster_manager_command.flags &
|
||||
CLUSTER_MANAGER_CMD_FLAG_CHECK_OWNERS;
|
||||
if (search_multiple_owners) {
|
||||
/* Check whether there are multiple owners, even when slots are
|
||||
* fully covered and there are no open slots. */
|
||||
clusterManagerLogInfo(">>> Check for multiple slot owners...\n");
|
||||
int slot = 0, slots_with_multiple_owners = 0;
|
||||
for (; slot < CLUSTER_MANAGER_SLOTS; slot++) {
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
list *owners = listCreate();
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
if (n->slots[slot]) listAddNodeTail(owners, n);
|
||||
else {
|
||||
/* Nodes having keys for the slot will be considered
|
||||
* owners too. */
|
||||
int count = clusterManagerCountKeysInSlot(n, slot);
|
||||
if (count > 0) listAddNodeTail(owners, n);
|
||||
}
|
||||
}
|
||||
if (listLength(owners) > 1) {
|
||||
result = 0;
|
||||
clusterManagerLogErr("[WARNING] Slot %d has %d owners:\n",
|
||||
slot, listLength(owners));
|
||||
listRewind(owners, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
clusterManagerLogErr(" %s:%d\n", n->ip, n->port);
|
||||
}
|
||||
slots_with_multiple_owners++;
|
||||
if (do_fix) {
|
||||
result = clusterManagerFixMultipleSlotOwners(slot, owners);
|
||||
if (!result) {
|
||||
clusterManagerLogErr("Failed to fix multiple owners "
|
||||
"for slot %d\n", slot);
|
||||
listRelease(owners);
|
||||
break;
|
||||
} else slots_with_multiple_owners--;
|
||||
}
|
||||
}
|
||||
listRelease(owners);
|
||||
}
|
||||
if (slots_with_multiple_owners == 0)
|
||||
clusterManagerLogOk("[OK] No multiple owners found.\n");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -5404,7 +5826,7 @@ static int clusterManagerCommandCall(int argc, char **argv) {
|
|||
if (status != REDIS_OK || reply == NULL )
|
||||
printf("%s:%d: Failed!\n", n->ip, n->port);
|
||||
else {
|
||||
sds formatted_reply = cliFormatReplyTTY(reply, "");
|
||||
sds formatted_reply = cliFormatReplyRaw(reply);
|
||||
printf("%s:%d: %s\n", n->ip, n->port, (char *) formatted_reply);
|
||||
sdsfree(formatted_reply);
|
||||
}
|
||||
|
@ -6781,6 +7203,8 @@ int main(int argc, char **argv) {
|
|||
argc -= firstarg;
|
||||
argv += firstarg;
|
||||
|
||||
parseEnv();
|
||||
|
||||
/* Cluster Manager mode */
|
||||
if (CLUSTER_MANAGER_MODE()) {
|
||||
clusterManagerCommandProc *proc = validateClusterManagerCommand();
|
||||
|
|
|
@ -695,7 +695,7 @@ sds sdscatfmt(sds s, char const *fmt, ...) {
|
|||
* s = sdstrim(s,"Aa. :");
|
||||
* printf("%s\n", s);
|
||||
*
|
||||
* Output will be just "Hello World".
|
||||
* Output will be just "HelloWorld".
|
||||
*/
|
||||
sds sdstrim(sds s, const char *cset) {
|
||||
char *start, *end, *sp, *ep;
|
||||
|
|
16
src/server.c
16
src/server.c
|
@ -2607,17 +2607,13 @@ int processCommand(client *c) {
|
|||
}
|
||||
|
||||
/* Handle the maxmemory directive.
|
||||
*
|
||||
* First we try to free some memory if possible (if there are volatile
|
||||
* keys in the dataset). If there are not the only thing we can do
|
||||
* is returning an error.
|
||||
*
|
||||
* Note that we do not want to reclaim memory if we are here re-entering
|
||||
* the event loop since there is a busy Lua script running in timeout
|
||||
* condition, to avoid mixing the propagation of scripts with the propagation
|
||||
* of DELs due to eviction. */
|
||||
* condition, to avoid mixing the propagation of scripts with the
|
||||
* propagation of DELs due to eviction. */
|
||||
if (server.maxmemory && !server.lua_timedout) {
|
||||
int out_of_memory = freeMemoryIfNeeded() == C_ERR;
|
||||
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
|
||||
/* freeMemoryIfNeeded may flush slave output buffers. This may result
|
||||
* into a slave, that may be the active client, to be freed. */
|
||||
if (server.current_client == NULL) return C_ERR;
|
||||
|
@ -3247,11 +3243,11 @@ sds genRedisInfoString(char *section) {
|
|||
"allocator_frag_ratio:%.2f\r\n"
|
||||
"allocator_frag_bytes:%zu\r\n"
|
||||
"allocator_rss_ratio:%.2f\r\n"
|
||||
"allocator_rss_bytes:%zu\r\n"
|
||||
"allocator_rss_bytes:%zd\r\n"
|
||||
"rss_overhead_ratio:%.2f\r\n"
|
||||
"rss_overhead_bytes:%zu\r\n"
|
||||
"rss_overhead_bytes:%zd\r\n"
|
||||
"mem_fragmentation_ratio:%.2f\r\n"
|
||||
"mem_fragmentation_bytes:%zu\r\n"
|
||||
"mem_fragmentation_bytes:%zd\r\n"
|
||||
"mem_not_counted_for_evict:%zu\r\n"
|
||||
"mem_replication_backlog:%zu\r\n"
|
||||
"mem_clients_slaves:%zu\r\n"
|
||||
|
|
10
src/server.h
10
src/server.h
|
@ -654,6 +654,9 @@ typedef struct multiCmd {
|
|||
typedef struct multiState {
|
||||
multiCmd *commands; /* Array of MULTI commands */
|
||||
int count; /* Total number of MULTI commands */
|
||||
int cmd_flags; /* The accumulated command flags OR-ed together.
|
||||
So if at least a command has a given flag, it
|
||||
will be set in this field. */
|
||||
int minreplicas; /* MINREPLICAS for synchronous replication */
|
||||
time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */
|
||||
} multiState;
|
||||
|
@ -864,11 +867,11 @@ struct redisMemOverhead {
|
|||
float dataset_perc;
|
||||
float peak_perc;
|
||||
float total_frag;
|
||||
size_t total_frag_bytes;
|
||||
ssize_t total_frag_bytes;
|
||||
float allocator_frag;
|
||||
size_t allocator_frag_bytes;
|
||||
ssize_t allocator_frag_bytes;
|
||||
float allocator_rss;
|
||||
size_t allocator_rss_bytes;
|
||||
ssize_t allocator_rss_bytes;
|
||||
float rss_extra;
|
||||
size_t rss_extra_bytes;
|
||||
size_t num_dbs;
|
||||
|
@ -1699,6 +1702,7 @@ int zslLexValueLteMax(sds value, zlexrangespec *spec);
|
|||
int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level);
|
||||
size_t freeMemoryGetNotCountedMemory();
|
||||
int freeMemoryIfNeeded(void);
|
||||
int freeMemoryIfNeededAndSafe(void);
|
||||
int processCommand(client *c);
|
||||
void setupSignalHandlers(void);
|
||||
struct redisCommand *lookupCommand(sds name);
|
||||
|
|
18
src/util.c
18
src/util.c
|
@ -48,7 +48,7 @@
|
|||
int stringmatchlen(const char *pattern, int patternLen,
|
||||
const char *string, int stringLen, int nocase)
|
||||
{
|
||||
while(patternLen) {
|
||||
while(patternLen && stringLen) {
|
||||
switch(pattern[0]) {
|
||||
case '*':
|
||||
while (pattern[1] == '*') {
|
||||
|
@ -171,6 +171,22 @@ int stringmatch(const char *pattern, const char *string, int nocase) {
|
|||
return stringmatchlen(pattern,strlen(pattern),string,strlen(string),nocase);
|
||||
}
|
||||
|
||||
/* Fuzz stringmatchlen() trying to crash it with bad input. */
|
||||
int stringmatchlen_fuzz_test(void) {
|
||||
char str[32];
|
||||
char pat[32];
|
||||
int cycles = 10000000;
|
||||
int total_matches = 0;
|
||||
while(cycles--) {
|
||||
int strlen = rand() % sizeof(str);
|
||||
int patlen = rand() % sizeof(pat);
|
||||
for (int j = 0; j < strlen; j++) str[j] = rand() % 128;
|
||||
for (int j = 0; j < patlen; j++) pat[j] = rand() % 128;
|
||||
total_matches += stringmatchlen(pat, patlen, str, strlen, 0);
|
||||
}
|
||||
return total_matches;
|
||||
}
|
||||
|
||||
/* Convert a string representing an amount of memory into the number of
|
||||
* bytes, so for instance memtoll("1Gb") will return 1073741824 that is
|
||||
* (1024*1024*1024).
|
||||
|
|
|
@ -40,6 +40,7 @@
|
|||
|
||||
int stringmatchlen(const char *p, int plen, const char *s, int slen, int nocase);
|
||||
int stringmatch(const char *p, const char *s, int nocase);
|
||||
int stringmatchlen_fuzz_test(void);
|
||||
long long memtoll(const char *p, int *err);
|
||||
uint32_t digits10(uint64_t v);
|
||||
uint32_t sdigits10(int64_t v);
|
||||
|
|
|
@ -275,7 +275,6 @@ start_server {tags {"repl"}} {
|
|||
start_server {} {
|
||||
test "Master stream is correctly processed while the replica has a script in -BUSY state" {
|
||||
set slave [srv 0 client]
|
||||
puts [srv 0 port]
|
||||
$slave config set lua-time-limit 500
|
||||
$slave slaveof $master_host $master_port
|
||||
|
||||
|
|
Loading…
Reference in New Issue