Merge branch 'unstable' of github.com:/antirez/redis into unstable

This commit is contained in:
antirez 2018-12-21 11:39:15 +01:00
commit e504583b78
1 changed files with 230 additions and 28 deletions

View File

@ -1933,7 +1933,8 @@ static dictType clusterManagerDictType = {
};
typedef int clusterManagerCommandProc(int argc, char **argv);
typedef int (*clusterManagerOnReplyError)(redisReply *reply, int bulk_idx);
typedef int (*clusterManagerOnReplyError)(redisReply *reply,
clusterManagerNode *n, int bulk_idx);
/* Cluster Manager helper functions */
@ -2196,7 +2197,7 @@ static int clusterManagerCheckRedisReply(clusterManagerNode *n,
return 1;
}
/* Execute MULTI command on a cluster node. */
/* Call MULTI command on a cluster node. */
static int clusterManagerStartTransaction(clusterManagerNode *node) {
redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "MULTI");
int success = clusterManagerCheckRedisReply(node, reply, NULL);
@ -2204,7 +2205,7 @@ static int clusterManagerStartTransaction(clusterManagerNode *node) {
return success;
}
/* Execute EXEC command on a cluster node. */
/* Call EXEC command on a cluster node. */
static int clusterManagerExecTransaction(clusterManagerNode *node,
clusterManagerOnReplyError onerror)
{
@ -2220,7 +2221,7 @@ static int clusterManagerExecTransaction(clusterManagerNode *node,
redisReply *r = reply->element[i];
char *err = NULL;
success = clusterManagerCheckRedisReply(node, r, &err);
if (!success && onerror) success = onerror(r, i);
if (!success && onerror) success = onerror(r, node, i);
if (err) {
if (!success)
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
@ -2768,6 +2769,55 @@ cleanup:
return success;
}
/* Get the node the slot is assigned to from the point of view of node *n.
* If the slot is unassigned or if the reply is an error, return NULL.
* Use the **err argument in order to check wether the slot is unassigned
* or the reply resulted in an error. */
static clusterManagerNode *clusterManagerGetSlotOwner(clusterManagerNode *n,
int slot, char **err)
{
assert(slot >= 0 && slot < CLUSTER_MANAGER_SLOTS);
clusterManagerNode *owner = NULL;
redisReply *reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SLOTS");
if (clusterManagerCheckRedisReply(n, reply, err)) {
assert(reply->type == REDIS_REPLY_ARRAY);
size_t i;
for (i = 0; i < reply->elements; i++) {
redisReply *r = reply->element[i];
assert(r->type == REDIS_REPLY_ARRAY && r->elements >= 3);
int from, to;
from = r->element[0]->integer;
to = r->element[1]->integer;
if (slot < from || slot > to) continue;
redisReply *nr = r->element[2];
assert(nr->type == REDIS_REPLY_ARRAY && nr->elements >= 2);
char *name = NULL;
if (nr->elements >= 3)
name = nr->element[2]->str;
if (name != NULL)
owner = clusterManagerNodeByName(name);
else {
char *ip = nr->element[0]->str;
assert(ip != NULL);
int port = (int) nr->element[1]->integer;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *nd = ln->value;
if (strcmp(nd->ip, ip) == 0 && port == nd->port) {
owner = nd;
break;
}
}
}
if (owner) break;
}
}
if (reply) freeReplyObject(reply);
return owner;
}
/* Set slot status to "importing" or "migrating" */
static int clusterManagerSetSlot(clusterManagerNode *node1,
clusterManagerNode *node2,
@ -2808,8 +2858,19 @@ static int clusterManagerDelSlot(clusterManagerNode *node, int slot,
char *err = NULL;
int success = clusterManagerCheckRedisReply(node, reply, &err);
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
ignore_unassigned_err &&
strstr(reply->str, "already unassigned") != NULL) success = 1;
ignore_unassigned_err)
{
char *get_owner_err = NULL;
clusterManagerNode *assigned_to =
clusterManagerGetSlotOwner(node, slot, &get_owner_err);
if (!assigned_to) {
if (get_owner_err == NULL) success = 1;
else {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, get_owner_err);
zfree(get_owner_err);
}
}
}
if (!success && err != NULL) {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node, err);
zfree(err);
@ -2845,12 +2906,16 @@ static int clusterManagerBumpEpoch(clusterManagerNode *node) {
return success;
}
static int clusterManagerIgnoreUnassignedErr(redisReply *reply, int bulk_idx) {
if (bulk_idx == 0 && reply) {
if (reply->type == REDIS_REPLY_ERROR)
return strstr(reply->str, "already unassigned") != NULL;
}
return 0;
/* Callback used by clusterManagerSetSlotOwner transaction. It should ignore
* errors except for ADDSLOTS errors.
* Return 1 if the error should be ignored. */
static int clusterManagerOnSetOwnerErr(redisReply *reply,
clusterManagerNode *n, int bulk_idx)
{
UNUSED(reply);
UNUSED(n);
/* Only raise error when ADDSLOTS fail (bulk_idx == 1). */
return (bulk_idx != 1);
}
static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
@ -2865,8 +2930,71 @@ static int clusterManagerSetSlotOwner(clusterManagerNode *owner,
clusterManagerAddSlot(owner, slot);
if (do_clear) clusterManagerClearSlotStatus(owner, slot);
clusterManagerBumpEpoch(owner);
success = clusterManagerExecTransaction(owner,
clusterManagerIgnoreUnassignedErr);
success = clusterManagerExecTransaction(owner, clusterManagerOnSetOwnerErr);
return success;
}
/* Get the hash for the values of the specified keys in *keys_reply for the
* specified nodes *n1 and *n2, by calling DEBUG DIGEST-VALUE redis command
* on both nodes. Every key with same name on both nodes but having different
* values will be added to the *diffs list. Return 0 in case of reply
* error. */
static int clusterManagerCompareKeysValues(clusterManagerNode *n1,
clusterManagerNode *n2,
redisReply *keys_reply,
list *diffs)
{
size_t i, argc = keys_reply->elements + 2;
static const char *hash_zero = "0000000000000000000000000000000000000000";
char **argv = zcalloc(argc * sizeof(char *));
size_t *argv_len = zcalloc(argc * sizeof(size_t));
argv[0] = "DEBUG";
argv_len[0] = 5;
argv[1] = "DIGEST-VALUE";
argv_len[1] = 12;
for (i = 0; i < keys_reply->elements; i++) {
redisReply *entry = keys_reply->element[i];
int idx = i + 2;
argv[idx] = entry->str;
argv_len[idx] = entry->len;
}
int success = 0;
void *_reply1 = NULL, *_reply2 = NULL;
redisReply *r1 = NULL, *r2 = NULL;
redisAppendCommandArgv(n1->context,argc, (const char**)argv,argv_len);
success = (redisGetReply(n1->context, &_reply1) == REDIS_OK);
if (!success) goto cleanup;
r1 = (redisReply *) _reply1;
redisAppendCommandArgv(n2->context,argc, (const char**)argv,argv_len);
success = (redisGetReply(n2->context, &_reply2) == REDIS_OK);
if (!success) goto cleanup;
r2 = (redisReply *) _reply2;
success = (r1->type != REDIS_REPLY_ERROR && r2->type != REDIS_REPLY_ERROR);
if (r1->type == REDIS_REPLY_ERROR) {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n1, r1->str);
success = 0;
}
if (r2->type == REDIS_REPLY_ERROR) {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n2, r2->str);
success = 0;
}
if (!success) goto cleanup;
assert(keys_reply->elements == r1->elements &&
keys_reply->elements == r2->elements);
for (i = 0; i < keys_reply->elements; i++) {
char *key = keys_reply->element[i]->str;
char *hash1 = r1->element[i]->str;
char *hash2 = r2->element[i]->str;
/* Ignore keys that don't exist in both nodes. */
if (strcmp(hash1, hash_zero) == 0 || strcmp(hash2, hash_zero) == 0)
continue;
if (strcmp(hash1, hash2) != 0) listAddNodeTail(diffs, key);
}
cleanup:
if (r1) freeReplyObject(r1);
if (r2) freeReplyObject(r2);
zfree(argv);
zfree(argv_len);
return success;
}
@ -2950,8 +3078,10 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
char **err)
{
int success = 1;
int replace_existing_keys = (config.cluster_manager_command.flags &
(CLUSTER_MANAGER_CMD_FLAG_FIX | CLUSTER_MANAGER_CMD_FLAG_REPLACE));
int do_fix = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_FIX;
int do_replace = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_REPLACE;
while (1) {
char *dots = NULL;
redisReply *reply = NULL, *migrate_reply = NULL;
@ -2983,16 +3113,85 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
if (migrate_reply == NULL) goto next;
if (migrate_reply->type == REDIS_REPLY_ERROR) {
int is_busy = strstr(migrate_reply->str, "BUSYKEY") != NULL;
int not_served = strstr(migrate_reply->str, "slot not served") != NULL;
if (replace_existing_keys && (is_busy || not_served)) {
/* If the key already exists, try to migrate keys
* adding REPLACE option.
* If the key's slot is not served, try to assign slot
int not_served = 0;
if (!is_busy) {
/* Check if the slot is unassigned (not served) in the
* source node's configuration. */
char *get_owner_err = NULL;
clusterManagerNode *served_by =
clusterManagerGetSlotOwner(source, slot, &get_owner_err);
if (!served_by) {
if (get_owner_err == NULL) not_served = 1;
else {
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source,
get_owner_err);
zfree(get_owner_err);
}
}
}
/* Try to handle errors. */
if (is_busy || not_served) {
/* If the key's slot is not served, try to assign slot
* to the target node. */
if (not_served)
if (do_fix && not_served) {
clusterManagerLogWarn("*** Slot was not served, setting "
"owner to node %s:%d.\n",
target->ip, target->port);
clusterManagerSetSlot(source, target, slot, "node", NULL);
clusterManagerLogWarn("*** Target key exists. "
"Replacing it for FIX.\n");
}
/* If the key already exists in the target node (BUSYKEY),
* check whether its value is the same in both nodes.
* In case of equal values, retry migration with the
* REPLACE option.
* In case of different values:
* - If the migration is requested by the fix command, stop
* and warn the user.
* - In other cases (ie. reshard), proceed only if the user
* launched the command with the --cluster-replace option.*/
if (is_busy) {
clusterManagerLogWarn("\n*** Target key exists\n");
if (!do_replace) {
clusterManagerLogWarn("*** Checking key values on "
"both nodes...\n");
list *diffs = listCreate();
success = clusterManagerCompareKeysValues(source,
target, reply, diffs);
if (!success) {
clusterManagerLogErr("*** Value check failed!\n");
listRelease(diffs);
goto next;
}
if (listLength(diffs) > 0) {
success = 0;
clusterManagerLogErr(
"*** Found %d key(s) in both source node and "
"target node having different values.\n"
" Source node: %s:%d\n"
" Target node: %s:%d\n"
" Keys(s):\n",
listLength(diffs),
source->ip, source->port,
target->ip, target->port);
listIter dli;
listNode *dln;
listRewind(diffs, &dli);
while((dln = listNext(&dli)) != NULL) {
char *k = dln->value;
clusterManagerLogErr(" - %s\n", k);
}
clusterManagerLogErr("Please fix the above key(s) "
"manually and try again "
"or relaunch the command \n"
"with --cluster-replace "
"option to force key "
"overriding.\n");
listRelease(diffs);
goto next;
}
listRelease(diffs);
}
clusterManagerLogWarn("*** Replacing target keys...\n");
}
freeReplyObject(migrate_reply);
migrate_reply = clusterManagerMigrateKeysInReply(source,
target,
@ -4252,7 +4451,7 @@ static int clusterManagerCheckCluster(int quiet) {
n->port);
for (i = 0; i < n->migrating_count; i += 2) {
sds slot = n->migrating[i];
dictAdd(open_slots, slot, sdsdup(n->migrating[i + 1]));
dictReplace(open_slots, slot, sdsdup(n->migrating[i + 1]));
char *fmt = (i > 0 ? ",%S" : "%S");
errstr = sdscatfmt(errstr, fmt, slot);
}
@ -4270,7 +4469,7 @@ static int clusterManagerCheckCluster(int quiet) {
n->port);
for (i = 0; i < n->importing_count; i += 2) {
sds slot = n->importing[i];
dictAdd(open_slots, slot, sdsdup(n->importing[i + 1]));
dictReplace(open_slots, slot, sdsdup(n->importing[i + 1]));
char *fmt = (i > 0 ? ",%S" : "%S");
errstr = sdscatfmt(errstr, fmt, slot);
}
@ -4333,7 +4532,7 @@ static int clusterManagerCheckCluster(int quiet) {
/* Check whether there are multiple owners, even when slots are
* fully covered and there are no open slots. */
clusterManagerLogInfo(">>> Check for multiple slot owners...\n");
int slot = 0;
int slot = 0, slots_with_multiple_owners = 0;
for (; slot < CLUSTER_MANAGER_SLOTS; slot++) {
listIter li;
listNode *ln;
@ -4359,6 +4558,7 @@ static int clusterManagerCheckCluster(int quiet) {
clusterManagerNode *n = ln->value;
clusterManagerLogErr(" %s:%d\n", n->ip, n->port);
}
slots_with_multiple_owners++;
if (do_fix) {
result = clusterManagerFixMultipleSlotOwners(slot, owners);
if (!result) {
@ -4366,11 +4566,13 @@ static int clusterManagerCheckCluster(int quiet) {
"for slot %d\n", slot);
listRelease(owners);
break;
}
} else slots_with_multiple_owners--;
}
}
listRelease(owners);
}
if (slots_with_multiple_owners == 0)
clusterManagerLogOk("[OK] No multiple owners found.\n");
}
return result;
}