Merge branch 'unstable' of https://github.com/antirez/redis into unstable

This commit is contained in:
杨东衡 2018-12-04 23:56:11 +08:00
commit 423a030ee3
21 changed files with 607 additions and 117 deletions

View File

@ -21,6 +21,11 @@ NODEPS:=clean distclean
# Default settings
STD=-std=c99 -pedantic -DREDIS_STATIC=''
ifneq (,$(findstring clang,$(CC)))
ifneq (,$(findstring FreeBSD,$(uname_S)))
STD+=-Wno-c11-extensions
endif
endif
WARN=-Wall -W -Wno-missing-field-initializers
OPT=$(OPTIMIZATION)
@ -97,10 +102,20 @@ else
ifeq ($(uname_S),OpenBSD)
# OpenBSD
FINAL_LIBS+= -lpthread
ifeq ($(USE_BACKTRACE),yes)
FINAL_CFLAGS+= -DUSE_BACKTRACE -I/usr/local/include
FINAL_LDFLAGS+= -L/usr/local/lib
FINAL_LIBS+= -lexecinfo
endif
else
ifeq ($(uname_S),FreeBSD)
# FreeBSD
FINAL_LIBS+= -lpthread
FINAL_LIBS+= -lpthread -lexecinfo
else
ifeq ($(uname_S),DragonFly)
# FreeBSD
FINAL_LIBS+= -lpthread -lexecinfo
else
# All the other OSes (notably Linux)
FINAL_LDFLAGS+= -rdynamic
@ -110,6 +125,7 @@ endif
endif
endif
endif
endif
# Include paths to dependencies
FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src

View File

@ -684,7 +684,7 @@ void loadServerConfigFromString(char *config) {
goto loaderr;
}
} else if ((!strcasecmp(argv[0],"cluster-slave-no-failover") ||
!strcasecmp(argv[0],"cluster-replica-no-failiver")) &&
!strcasecmp(argv[0],"cluster-replica-no-failover")) &&
argc == 2)
{
server.cluster_slave_no_failover = yesnotoi(argv[1]);

View File

@ -62,7 +62,9 @@
#endif
/* Test for backtrace() */
#if defined(__APPLE__) || (defined(__linux__) && defined(__GLIBC__))
#if defined(__APPLE__) || (defined(__linux__) && defined(__GLIBC__)) || \
defined(__FreeBSD__) || (defined(__OpenBSD__) && defined(USE_BACKTRACE))\
|| defined(__DragonFly__)
#define HAVE_BACKTRACE 1
#endif

View File

@ -37,7 +37,11 @@
#ifdef HAVE_BACKTRACE
#include <execinfo.h>
#ifndef __OpenBSD__
#include <ucontext.h>
#else
typedef ucontext_t sigcontext_t;
#endif
#include <fcntl.h>
#include "bio.h"
#include <unistd.h>
@ -729,6 +733,22 @@ static void *getMcontextEip(ucontext_t *uc) {
#elif defined(__aarch64__) /* Linux AArch64 */
return (void*) uc->uc_mcontext.pc;
#endif
#elif defined(__FreeBSD__)
/* FreeBSD */
#if defined(__i386__)
return (void*) uc->uc_mcontext.mc_eip;
#elif defined(__x86_64__)
return (void*) uc->uc_mcontext.mc_rip;
#endif
#elif defined(__OpenBSD__)
/* OpenBSD */
#if defined(__i386__)
return (void*) uc->sc_eip;
#elif defined(__x86_64__)
return (void*) uc->sc_rip;
#endif
#elif defined(__DragonFly__)
return (void*) uc->uc_mcontext.mc_rip;
#else
return NULL;
#endif
@ -870,6 +890,145 @@ void logRegisters(ucontext_t *uc) {
);
logStackContent((void**)uc->uc_mcontext.gregs[15]);
#endif
#elif defined(__FreeBSD__)
#if defined(__x86_64__)
serverLog(LL_WARNING,
"\n"
"RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n"
"RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n"
"R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n"
"R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n"
"RIP:%016lx EFL:%016lx\nCSGSFS:%016lx",
(unsigned long) uc->uc_mcontext.mc_rax,
(unsigned long) uc->uc_mcontext.mc_rbx,
(unsigned long) uc->uc_mcontext.mc_rcx,
(unsigned long) uc->uc_mcontext.mc_rdx,
(unsigned long) uc->uc_mcontext.mc_rdi,
(unsigned long) uc->uc_mcontext.mc_rsi,
(unsigned long) uc->uc_mcontext.mc_rbp,
(unsigned long) uc->uc_mcontext.mc_rsp,
(unsigned long) uc->uc_mcontext.mc_r8,
(unsigned long) uc->uc_mcontext.mc_r9,
(unsigned long) uc->uc_mcontext.mc_r10,
(unsigned long) uc->uc_mcontext.mc_r11,
(unsigned long) uc->uc_mcontext.mc_r12,
(unsigned long) uc->uc_mcontext.mc_r13,
(unsigned long) uc->uc_mcontext.mc_r14,
(unsigned long) uc->uc_mcontext.mc_r15,
(unsigned long) uc->uc_mcontext.mc_rip,
(unsigned long) uc->uc_mcontext.mc_rflags,
(unsigned long) uc->uc_mcontext.mc_cs
);
logStackContent((void**)uc->uc_mcontext.mc_rsp);
#elif defined(__i386__)
serverLog(LL_WARNING,
"\n"
"EAX:%08lx EBX:%08lx ECX:%08lx EDX:%08lx\n"
"EDI:%08lx ESI:%08lx EBP:%08lx ESP:%08lx\n"
"SS :%08lx EFL:%08lx EIP:%08lx CS:%08lx\n"
"DS :%08lx ES :%08lx FS :%08lx GS:%08lx",
(unsigned long) uc->uc_mcontext.mc_eax,
(unsigned long) uc->uc_mcontext.mc_ebx,
(unsigned long) uc->uc_mcontext.mc_ebx,
(unsigned long) uc->uc_mcontext.mc_edx,
(unsigned long) uc->uc_mcontext.mc_edi,
(unsigned long) uc->uc_mcontext.mc_esi,
(unsigned long) uc->uc_mcontext.mc_ebp,
(unsigned long) uc->uc_mcontext.mc_esp,
(unsigned long) uc->uc_mcontext.mc_ss,
(unsigned long) uc->uc_mcontext.mc_eflags,
(unsigned long) uc->uc_mcontext.mc_eip,
(unsigned long) uc->uc_mcontext.mc_cs,
(unsigned long) uc->uc_mcontext.mc_es,
(unsigned long) uc->uc_mcontext.mc_fs,
(unsigned long) uc->uc_mcontext.mc_gs
);
logStackContent((void**)uc->uc_mcontext.mc_esp);
#endif
#elif defined(__OpenBSD__)
#if defined(__x86_64__)
serverLog(LL_WARNING,
"\n"
"RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n"
"RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n"
"R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n"
"R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n"
"RIP:%016lx EFL:%016lx\nCSGSFS:%016lx",
(unsigned long) uc->sc_rax,
(unsigned long) uc->sc_rbx,
(unsigned long) uc->sc_rcx,
(unsigned long) uc->sc_rdx,
(unsigned long) uc->sc_rdi,
(unsigned long) uc->sc_rsi,
(unsigned long) uc->sc_rbp,
(unsigned long) uc->sc_rsp,
(unsigned long) uc->sc_r8,
(unsigned long) uc->sc_r9,
(unsigned long) uc->sc_r10,
(unsigned long) uc->sc_r11,
(unsigned long) uc->sc_r12,
(unsigned long) uc->sc_r13,
(unsigned long) uc->sc_r14,
(unsigned long) uc->sc_r15,
(unsigned long) uc->sc_rip,
(unsigned long) uc->sc_rflags,
(unsigned long) uc->sc_cs
);
logStackContent((void**)uc->sc_rsp);
#elif defined(__i386__)
serverLog(LL_WARNING,
"\n"
"EAX:%08lx EBX:%08lx ECX:%08lx EDX:%08lx\n"
"EDI:%08lx ESI:%08lx EBP:%08lx ESP:%08lx\n"
"SS :%08lx EFL:%08lx EIP:%08lx CS:%08lx\n"
"DS :%08lx ES :%08lx FS :%08lx GS:%08lx",
(unsigned long) uc->sc_eax,
(unsigned long) uc->sc_ebx,
(unsigned long) uc->sc_ebx,
(unsigned long) uc->sc_edx,
(unsigned long) uc->sc_edi,
(unsigned long) uc->sc_esi,
(unsigned long) uc->sc_ebp,
(unsigned long) uc->sc_esp,
(unsigned long) uc->sc_ss,
(unsigned long) uc->sc_eflags,
(unsigned long) uc->sc_eip,
(unsigned long) uc->sc_cs,
(unsigned long) uc->sc_es,
(unsigned long) uc->sc_fs,
(unsigned long) uc->sc_gs
);
logStackContent((void**)uc->sc_esp);
#endif
#elif defined(__DragonFly__)
serverLog(LL_WARNING,
"\n"
"RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n"
"RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n"
"R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n"
"R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n"
"RIP:%016lx EFL:%016lx\nCSGSFS:%016lx",
(unsigned long) uc->uc_mcontext.mc_rax,
(unsigned long) uc->uc_mcontext.mc_rbx,
(unsigned long) uc->uc_mcontext.mc_rcx,
(unsigned long) uc->uc_mcontext.mc_rdx,
(unsigned long) uc->uc_mcontext.mc_rdi,
(unsigned long) uc->uc_mcontext.mc_rsi,
(unsigned long) uc->uc_mcontext.mc_rbp,
(unsigned long) uc->uc_mcontext.mc_rsp,
(unsigned long) uc->uc_mcontext.mc_r8,
(unsigned long) uc->uc_mcontext.mc_r9,
(unsigned long) uc->uc_mcontext.mc_r10,
(unsigned long) uc->uc_mcontext.mc_r11,
(unsigned long) uc->uc_mcontext.mc_r12,
(unsigned long) uc->uc_mcontext.mc_r13,
(unsigned long) uc->uc_mcontext.mc_r14,
(unsigned long) uc->uc_mcontext.mc_r15,
(unsigned long) uc->uc_mcontext.mc_rip,
(unsigned long) uc->uc_mcontext.mc_rflags,
(unsigned long) uc->uc_mcontext.mc_cs
);
logStackContent((void**)uc->uc_mcontext.mc_rsp);
#else
serverLog(LL_WARNING,
" Dumping of registers not supported for this OS/arch");
@ -1189,6 +1348,8 @@ void serverLogHexDump(int level, char *descr, void *value, size_t len) {
void watchdogSignalHandler(int sig, siginfo_t *info, void *secret) {
#ifdef HAVE_BACKTRACE
ucontext_t *uc = (ucontext_t*) secret;
#else
(void)secret;
#endif
UNUSED(info);
UNUSED(sig);

View File

@ -562,11 +562,21 @@ sds latencyCommandGenSparkeline(char *event, struct latencyTimeSeries *ts) {
*
* LATENCY HISTORY: return time-latency samples for the specified event.
* LATENCY LATEST: return the latest latency for all the events classes.
* LATENCY DOCTOR: returns an human readable analysis of instance latency.
* LATENCY DOCTOR: returns a human readable analysis of instance latency.
* LATENCY GRAPH: provide an ASCII graph of the latency of the specified event.
* LATENCY RESET: reset data of a specified event or all the data if no event provided.
*/
void latencyCommand(client *c) {
const char *help[] = {
"DOCTOR -- Returns a human readable latency analysis report.",
"GRAPH <event> -- Returns an ASCII latency graph for the event class.",
"HISTORY <event> -- Returns time-latency samples for the event class.",
"LATEST -- Returns the latest latency samples for all events.",
"RESET [event ...] -- Resets latency data of one or more event classes.",
" (default: reset all data for all event classes)",
"HELP -- Prints this help.",
NULL
};
struct latencyTimeSeries *ts;
if (!strcasecmp(c->argv[1]->ptr,"history") && c->argc == 3) {
@ -611,8 +621,10 @@ void latencyCommand(client *c) {
resets += latencyResetEvent(c->argv[j]->ptr);
addReplyLongLong(c,resets);
}
} else if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc >= 2) {
addReplyHelp(c, help);
} else {
addReply(c,shared.syntaxerr);
addReplySubcommandSyntaxError(c);
}
return;

View File

@ -2109,6 +2109,7 @@ int checkClientOutputBufferLimits(client *c) {
* called from contexts where the client can't be freed safely, i.e. from the
* lower level functions pushing data inside the client output buffers. */
void asyncCloseClientOnOutputBufferLimitReached(client *c) {
if (c->fd == -1) return; /* It is unsafe to free fake clients. */
serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return;
if (checkClientOutputBufferLimits(c)) {

View File

@ -1285,9 +1285,18 @@ NULL
*
* Usage: MEMORY usage <key> */
void memoryCommand(client *c) {
robj *o;
if (!strcasecmp(c->argv[1]->ptr,"usage") && c->argc >= 3) {
if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) {
const char *help[] = {
"DOCTOR - Return memory problems reports.",
"MALLOC-STATS -- Return internal statistics report from the memory allocator.",
"PURGE -- Attempt to purge dirty pages for reclamation by the allocator.",
"STATS -- Return information about the memory usage of the server.",
"USAGE <key> [SAMPLES <count>] -- Return memory in bytes used by <key> and its value. Nested values are sampled up to <count> times (default: 5).",
NULL
};
addReplyHelp(c, help);
} else if (!strcasecmp(c->argv[1]->ptr,"usage") && c->argc >= 3) {
dictEntry *de;
long long samples = OBJ_COMPUTE_SIZE_DEF_SAMPLES;
for (int j = 3; j < c->argc; j++) {
if (!strcasecmp(c->argv[j]->ptr,"samples") &&
@ -1306,10 +1315,12 @@ void memoryCommand(client *c) {
return;
}
}
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
== NULL) return;
size_t usage = objectComputeSize(o,samples);
usage += sdsAllocSize(c->argv[2]->ptr);
if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) {
addReply(c, shared.nullbulk);
return;
}
size_t usage = objectComputeSize(dictGetVal(de),samples);
usage += sdsAllocSize(dictGetKey(de));
usage += sizeof(dictEntry);
addReplyLongLong(c,usage);
} else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) {
@ -1434,19 +1445,7 @@ void memoryCommand(client *c) {
addReply(c, shared.ok);
/* Nothing to do for other allocators. */
#endif
} else if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) {
addReplyMultiBulkLen(c,5);
addReplyBulkCString(c,
"MEMORY DOCTOR - Outputs memory problems report");
addReplyBulkCString(c,
"MEMORY USAGE <key> [SAMPLES <count>] - Estimate memory usage of key");
addReplyBulkCString(c,
"MEMORY STATS - Show memory usage details");
addReplyBulkCString(c,
"MEMORY PURGE - Ask the allocator to release memory");
addReplyBulkCString(c,
"MEMORY MALLOC-STATS - Show allocator internal stats");
} else {
addReplyError(c,"Syntax error. Try MEMORY HELP");
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)c->argv[1]->ptr);
}
}

View File

@ -1645,6 +1645,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
* node: the entries inside the listpack itself are delta-encoded
* relatively to this ID. */
sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
if (nodekey == NULL) {
rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error.");
}
if (sdslen(nodekey) != sizeof(streamID)) {
rdbExitReportCorruptRDB("Stream node key entry is not the "
"size of a stream ID");

View File

@ -2726,8 +2726,7 @@ static int clusterManagerSetSlot(clusterManagerNode *node1,
if (err != NULL) {
*err = zmalloc((reply->len + 1) * sizeof(char));
strcpy(*err, reply->str);
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, err);
}
} else CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, reply->str);
goto cleanup;
}
cleanup:
@ -2830,7 +2829,7 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
if (err != NULL) {
*err = zmalloc((reply->len + 1) * sizeof(char));
strcpy(*err, reply->str);
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err);
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, *err);
}
goto next;
}
@ -2848,14 +2847,21 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
if (migrate_reply == NULL) goto next;
if (migrate_reply->type == REDIS_REPLY_ERROR) {
if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) {
/* If the key already exists, try to migrate keys
* adding REPLACE option.
* If the key's slot is not served, try to assign slot
* to the target node. */
int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL);
if (strstr(migrate_reply->str, "slot not served") != NULL)
clusterManagerSetSlot(source, target, slot, "node", NULL);
clusterManagerLogWarn("*** Target key exists. "
"Replacing it for FIX.\n");
freeReplyObject(migrate_reply);
/* Try to migrate keys adding REPLACE option. */
migrate_reply = clusterManagerMigrateKeysInReply(source,
target,
reply,
1, timeout,
is_busy,
timeout,
NULL);
success = (migrate_reply != NULL &&
migrate_reply->type != REDIS_REPLY_ERROR);
@ -2941,7 +2947,7 @@ static int clusterManagerMoveSlot(clusterManagerNode *source,
if (err != NULL) {
*err = zmalloc((r->len + 1) * sizeof(char));
strcpy(*err, r->str);
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err);
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, *err);
}
}
freeReplyObject(r);
@ -3301,8 +3307,8 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
nodename = token;
tot_size = (p - token);
name_len = tot_size++; // Make room for ':' in tot_size
} else if (i == 8) break;
i++;
}
if (++i == 8) break;
}
if (i != 8) continue;
if (nodename == NULL) continue;
@ -3341,7 +3347,7 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
char *sp = cfg + name_len;
*(sp++) = ':';
for (i = 0; i < c; i++) {
if (i > 0) *(sp++) = '|';
if (i > 0) *(sp++) = ',';
int slen = strlen(slots[i]);
memcpy(sp, slots[i], slen);
sp += slen;
@ -3499,6 +3505,34 @@ static clusterManagerNode *clusterManagerNodeWithLeastReplicas() {
return node;
}
/* This function returns a random master node, return NULL if none */
static clusterManagerNode *clusterManagerNodeMasterRandom() {
int master_count = 0;
int idx;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
master_count++;
}
srand(time(NULL));
idx = rand() % master_count;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (!idx--) {
return n;
}
}
/* Can not be reached */
return NULL;
}
static int clusterManagerFixSlotsCoverage(char *all_slots) {
int i, fixed = 0;
list *none = NULL, *single = NULL, *multi = NULL;
@ -3571,22 +3605,25 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
"across the cluster:\n");
clusterManagerPrintSlotsList(none);
if (confirmWithYes("Fix these slots by covering with a random node?")){
srand(time(NULL));
listIter li;
listNode *ln;
listRewind(none, &li);
while ((ln = listNext(&li)) != NULL) {
sds slot = ln->value;
long idx = (long) (rand() % listLength(cluster_manager.nodes));
listNode *node_n = listIndex(cluster_manager.nodes, idx);
assert(node_n != NULL);
clusterManagerNode *n = node_n->value;
clusterManagerNode *n = clusterManagerNodeMasterRandom();
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
/* Ensure the slot is not already assigned. */
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER DELSLOTS %s", slot);
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER ADDSLOTS %s", slot);
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
@ -3614,10 +3651,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerNode *n = fn->value;
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
slot, n->ip, n->port);
/* Ensure the slot is not already assigned. */
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER DELSLOTS %s", slot);
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER ADDSLOTS %s", slot);
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
@ -3651,7 +3695,11 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
clusterManagerLogInfo(">>> Covering slot %s moving keys "
"to %s:%d\n", slot,
target->ip, target->port);
/* Ensure the slot is not already assigned. */
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
"CLUSTER DELSLOTS %s", slot);
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(target,
"CLUSTER ADDSLOTS %s", slot);
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
@ -3660,6 +3708,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
"CLUSTER SETSLOT %s %s", slot, "STABLE");
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH");
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
* info into the node struct, in order to keep it synced */
@ -3670,14 +3721,22 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
while ((nln = listNext(&nli)) != NULL) {
clusterManagerNode *src = nln->value;
if (src == target) continue;
/* Assign the slot to target node in the source node. */
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
"CLUSTER SETSLOT %s %s %s", slot,
"NODE", target->name);
if (!clusterManagerCheckRedisReply(src, r, NULL))
fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
/* Set the source node in 'importing' state
* (even if we will actually migrate keys away)
* in order to avoid receiving redirections
* for MIGRATE. */
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
r = CLUSTER_MANAGER_COMMAND(src,
"CLUSTER SETSLOT %s %s %s", slot,
"IMPORTING", target->name);
if (!clusterManagerCheckRedisReply(target, r, NULL))
if (!clusterManagerCheckRedisReply(src, r, NULL))
fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
@ -3687,6 +3746,13 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
fixed = -1;
goto cleanup;
}
r = CLUSTER_MANAGER_COMMAND(src,
"CLUSTER SETSLOT %s %s", slot,
"STABLE");
if (!clusterManagerCheckRedisReply(src, r, NULL))
fixed = -1;
if (r) freeReplyObject(r);
if (fixed < 0) goto cleanup;
}
fixed++;
}
@ -3720,11 +3786,22 @@ static int clusterManagerFixOpenSlot(int slot) {
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (n->slots[slot]) {
if (owner == NULL) owner = n;
listAddNodeTail(owners, n);
if (n->slots[slot]) listAddNodeTail(owners, n);
else {
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER COUNTKEYSINSLOT %d", slot);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (success && r->integer > 0) {
clusterManagerLogWarn("*** Found keys about slot %d "
"in non-owner node %s:%d!\n", slot,
n->ip, n->port);
listAddNodeTail(owners, n);
}
if (r) freeReplyObject(r);
if (!success) goto cleanup;
}
}
if (listLength(owners) == 1) owner = listFirst(owners)->value;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
@ -3735,7 +3812,7 @@ static int clusterManagerFixOpenSlot(int slot) {
sds migrating_slot = n->migrating[i];
if (atoi(migrating_slot) == slot) {
char *sep = (listLength(migrating) == 0 ? "" : ",");
migrating_str = sdscatfmt(migrating_str, "%s%S:%u",
migrating_str = sdscatfmt(migrating_str, "%s%s:%u",
sep, n->ip, n->port);
listAddNodeTail(migrating, n);
is_migrating = 1;
@ -3748,7 +3825,7 @@ static int clusterManagerFixOpenSlot(int slot) {
sds importing_slot = n->importing[i];
if (atoi(importing_slot) == slot) {
char *sep = (listLength(importing) == 0 ? "" : ",");
importing_str = sdscatfmt(importing_str, "%s%S:%u",
importing_str = sdscatfmt(importing_str, "%s%s:%u",
sep, n->ip, n->port);
listAddNodeTail(importing, n);
is_importing = 1;
@ -3767,15 +3844,20 @@ static int clusterManagerFixOpenSlot(int slot) {
clusterManagerLogWarn("*** Found keys about slot %d "
"in node %s:%d!\n", slot, n->ip,
n->port);
char *sep = (listLength(importing) == 0 ? "" : ",");
importing_str = sdscatfmt(importing_str, "%s%S:%u",
sep, n->ip, n->port);
listAddNodeTail(importing, n);
}
if (r) freeReplyObject(r);
if (!success) goto cleanup;
}
}
printf("Set as migrating in: %s\n", migrating_str);
printf("Set as importing in: %s\n", importing_str);
/* If there is no slot owner, set as owner the slot with the biggest
if (sdslen(migrating_str) > 0)
printf("Set as migrating in: %s\n", migrating_str);
if (sdslen(importing_str) > 0)
printf("Set as importing in: %s\n", importing_str);
/* If there is no slot owner, set as owner the node with the biggest
* number of keys, among the set of migrating / importing nodes. */
if (owner == NULL) {
clusterManagerLogInfo(">>> Nobody claims ownership, "
@ -3799,6 +3881,15 @@ static int clusterManagerFixOpenSlot(int slot) {
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
/* Ensure that the slot is unassigned before assigning it to the
* owner. */
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot);
success = clusterManagerCheckRedisReply(owner, reply, NULL);
/* Ignore "already unassigned" error. */
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
strstr(reply->str, "already unassigned") != NULL) success = 1;
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
@ -3827,32 +3918,43 @@ static int clusterManagerFixOpenSlot(int slot) {
* in migrating state, since migrating is a valid state only for
* slot owners. */
if (listLength(owners) > 1) {
owner = clusterManagerGetNodeWithMostKeysInSlot(owners, slot, NULL);
/* Owner cannot be NULL at this point, since if there are more owners,
* the owner has been set in the previous condition (owner == NULL). */
assert(owner != NULL);
listRewind(owners, &li);
redisReply *reply = NULL;
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n == owner) continue;
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOT %d", slot);
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot);
success = clusterManagerCheckRedisReply(n, reply, NULL);
/* Ignore "already unassigned" error. */
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
strstr(reply->str, "already unassigned") != NULL) success = 1;
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
n->slots[slot] = 0;
/* Assign the slot to the owner in the node 'n' configuration.' */
success = clusterManagerSetSlot(n, owner, slot, "node", NULL);
if (!success) goto cleanup;
success = clusterManagerSetSlot(n, owner, slot, "importing", NULL);
if (!success) goto cleanup;
clusterManagerRemoveNodeFromList(importing, n); //Avoid duplicates
/* Avoid duplicates. */
clusterManagerRemoveNodeFromList(importing, n);
listAddNodeTail(importing, n);
/* Ensure that the node is not in the migrating list. */
clusterManagerRemoveNodeFromList(migrating, n);
}
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
success = clusterManagerCheckRedisReply(owner, reply, NULL);
if (reply) freeReplyObject(reply);
if (!success) goto cleanup;
}
int move_opts = CLUSTER_MANAGER_OPT_VERBOSE;
/* Case 1: The slot is in migrating state in one slot, and in
* importing state in 1 slot. That's trivial to address. */
/* Case 1: The slot is in migrating state in one node, and in
* importing state in 1 node. That's trivial to address. */
if (listLength(migrating) == 1 && listLength(importing) == 1) {
clusterManagerNode *src = listFirst(migrating)->value;
clusterManagerNode *dst = listFirst(importing)->value;
clusterManagerLogInfo(">>> Case 1: Moving slot %d from "
"%s:%d to %s:%d\n", slot,
src->ip, src->port, dst->ip, dst->port);
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
}
/* Case 2: There are multiple nodes that claim the slot as importing,
@ -3860,7 +3962,7 @@ static int clusterManagerFixOpenSlot(int slot) {
* the slot. In this case we just move all the keys to the owner
* according to the configuration. */
else if (listLength(migrating) == 0 && listLength(importing) > 0) {
clusterManagerLogInfo(">>> Moving all the %d slot keys to its "
clusterManagerLogInfo(">>> Case 2: Moving all the %d slot keys to its "
"owner %s:%d\n", slot, owner->ip, owner->port);
move_opts |= CLUSTER_MANAGER_OPT_COLD;
listRewind(importing, &li);
@ -3878,25 +3980,43 @@ static int clusterManagerFixOpenSlot(int slot) {
if (r) freeReplyObject(r);
if (!success) goto cleanup;
}
/* Since the slot has been moved in "cold" mode, ensure that all the
* other nodes update their own configuration about the slot itself. */
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = ln->value;
if (n == owner) continue;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) freeReplyObject(r);
if (!success) goto cleanup;
}
} else {
int try_to_close_slot = (listLength(importing) == 0 &&
listLength(migrating) == 1);
if (try_to_close_slot) {
clusterManagerNode *n = listFirst(migrating)->value;
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER GETKEYSINSLOT %d %d", slot, 10);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) {
if (success) try_to_close_slot = (r->elements == 0);
freeReplyObject(r);
if (!owner || owner != n) {
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
"CLUSTER GETKEYSINSLOT %d %d", slot, 10);
success = clusterManagerCheckRedisReply(n, r, NULL);
if (r) {
if (success) try_to_close_slot = (r->elements == 0);
freeReplyObject(r);
}
if (!success) goto cleanup;
}
if (!success) goto cleanup;
}
/* Case 3: There are no slots claiming to be in importing state, but
* there is a migrating node that actually don't have any key. We
* can just close the slot, probably a reshard interrupted in the middle. */
* there is a migrating node that actually don't have any key or is the
* slot owner. We can just close the slot, probably a reshard interrupted
* in the middle. */
if (try_to_close_slot) {
clusterManagerNode *n = listFirst(migrating)->value;
clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n",
slot, n->ip, n->port);
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
slot, "STABLE");
success = clusterManagerCheckRedisReply(n, r, NULL);
@ -4025,6 +4145,7 @@ static int clusterManagerCheckCluster(int quiet) {
result = 0;
if (do_fix/* && result*/) {
dictType dtype = clusterManagerDictType;
dtype.keyDestructor = dictSdsDestructor;
dtype.valDestructor = dictListDestructor;
clusterManagerUncoveredSlots = dictCreate(&dtype, NULL);
int fixed = clusterManagerFixSlotsCoverage(slots);
@ -4061,7 +4182,7 @@ static clusterManagerNode *clusterNodeForResharding(char *id,
static list *clusterManagerComputeReshardTable(list *sources, int numslots) {
list *moved = listCreate();
int src_count = listLength(sources), i = 0, tot_slots = 0, j;
clusterManagerNode **sorted = zmalloc(src_count * sizeof(**sorted));
clusterManagerNode **sorted = zmalloc(src_count * sizeof(*sorted));
listIter li;
listNode *ln;
listRewind(sources, &li);
@ -5100,10 +5221,13 @@ static int clusterManagerCommandSetTimeout(int argc, char **argv) {
n->port);
ok_count++;
continue;
reply_err:
reply_err:;
int need_free = 0;
if (err == NULL) err = "";
else need_free = 1;
clusterManagerLogErr("ERR setting node-timeot for %s:%d: %s\n", n->ip,
n->port, err);
if (need_free) zfree(err);
err_count++;
}
clusterManagerLogInfo(">>> New node timeout set. %d OK, %d ERR.\n",

View File

@ -1245,6 +1245,18 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
if (eof_reached) {
int aof_is_enabled = server.aof_state != AOF_OFF;
/* Ensure background save doesn't overwrite synced data */
if (server.rdb_child_pid != -1) {
serverLog(LL_NOTICE,
"Replica is about to load the RDB file received from the "
"master, but there is a pending RDB child running. "
"Killing process %ld and removing its temp file to avoid "
"any race",
(long) server.rdb_child_pid);
kill(server.rdb_child_pid,SIGUSR1);
rdbRemoveTempFile(server.rdb_child_pid);
}
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno));
cancelReplicationHandshake();

View File

@ -452,13 +452,15 @@ struct redisCommand sentinelcmds[] = {
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
{"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
{"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0},
{"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0}
};
/* This function overwrites a few normal Redis config default with Sentinel
* specific defaults. */
void initSentinelConfig(void) {
server.port = REDIS_SENTINEL_PORT;
server.protected_mode = 0; /* Sentinel must be exposed. */
}
/* Perform the Sentinel mode initialization. */
@ -1941,12 +1943,25 @@ werr:
/* Send the AUTH command with the specified master password if needed.
* Note that for slaves the password set for the master is used.
*
* In case this Sentinel requires a password as well, via the "requirepass"
* configuration directive, we assume we should use the local password in
* order to authenticate when connecting with the other Sentinels as well.
* So basically all the Sentinels share the same password and use it to
* authenticate reciprocally.
*
* We don't check at all if the command was successfully transmitted
* to the instance as if it fails Sentinel will detect the instance down,
* will disconnect and reconnect the link and so forth. */
void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
char *auth_pass = (ri->flags & SRI_MASTER) ? ri->auth_pass :
ri->master->auth_pass;
char *auth_pass = NULL;
if (ri->flags & SRI_MASTER) {
auth_pass = ri->auth_pass;
} else if (ri->flags & SRI_SLAVE) {
auth_pass = ri->master->auth_pass;
} else if (ri->flags & SRI_SENTINEL) {
if (server.requirepass) auth_pass = server.requirepass;
}
if (auth_pass) {
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",

View File

@ -1526,10 +1526,10 @@ void initServerConfig(void) {
server.runid[CONFIG_RUN_ID_SIZE] = '\0';
changeReplicationId();
clearReplicationId2();
server.timezone = timezone; /* Initialized by tzset(). */
server.timezone = getTimeZone(); /* Initialized by tzset(). */
server.configfile = NULL;
server.executable = NULL;
server.config_hz = CONFIG_DEFAULT_HZ;
server.hz = server.config_hz = CONFIG_DEFAULT_HZ;
server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ;
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
server.port = CONFIG_DEFAULT_SERVER_PORT;
@ -1958,9 +1958,13 @@ int listenToPort(int port, int *fds, int *count) {
}
if (fds[*count] == ANET_ERR) {
serverLog(LL_WARNING,
"Creating Server TCP listening socket %s:%d: %s",
"Could not create server TCP listening socket %s:%d: %s",
server.bindaddr[j] ? server.bindaddr[j] : "*",
port, server.neterr);
if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT ||
errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL)
continue;
return C_ERR;
}
anetNonBlock(NULL,fds[*count]);

View File

@ -39,7 +39,7 @@
#include <errno.h> /* errno program_invocation_name program_invocation_short_name */
#if !defined(HAVE_SETPROCTITLE)
#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__)
#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__ || defined __DragonFly__)
#define HAVE_SETPROCTITLE 1
#else
#define HAVE_SETPROCTITLE 0

View File

@ -37,7 +37,7 @@
* mark the entry as deleted, or having the same field as the "master"
* entry at the start of the listpack> */
#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */
#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */
#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
void streamFreeCG(streamCG *cg);
@ -165,7 +165,7 @@ int streamCompareID(streamID *a, streamID *b) {
* Returns the new entry ID populating the 'added_id' structure.
*
* If 'use_id' is not NULL, the ID is not auto-generated by the function,
* but instead the passed ID is uesd to add the new entry. In this case
* but instead the passed ID is used to add the new entry. In this case
* adding the entry may fail as specified later in this comment.
*
* The function returns C_OK if the item was added, this is always true
@ -223,13 +223,13 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
*
* count and deleted just represent respectively the total number of
* entries inside the listpack that are valid, and marked as deleted
* (delted flag in the entry flags set). So the total number of items
* (deleted flag in the entry flags set). So the total number of items
* actually inside the listpack (both deleted and not) is count+deleted.
*
* The real entries will be encoded with an ID that is just the
* millisecond and sequence difference compared to the key stored at
* the radix tree node containing the listpack (delta encoding), and
* if the fields of the entry are the same as the master enty fields, the
* if the fields of the entry are the same as the master entry fields, the
* entry flags will specify this fact and the entry fields and number
* of fields will be omitted (see later in the code of this function).
*
@ -486,7 +486,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
* }
* streamIteratorStop(&myiterator); */
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
/* Intialize the iterator and translates the iteration start/stop
/* Initialize the iterator and translates the iteration start/stop
* elements into a 128 big big-endian number. */
if (start) {
streamEncodeID(si->start_key,start);
@ -564,7 +564,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
si->lp_ele = lpLast(si->lp);
}
} else if (si->rev) {
/* If we are itereating in the reverse order, and this is not
/* If we are iterating in the reverse order, and this is not
* the first entry emitted for this listpack, then we already
* emitted the current entry, and have to go back to the previous
* one. */
@ -751,7 +751,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
}
/* Stop the stream iterator. The only cleanup we need is to free the rax
* itereator, since the stream iterator itself is supposed to be stack
* iterator, since the stream iterator itself is supposed to be stack
* allocated. */
void streamIteratorStop(streamIterator *si) {
raxStop(&si->ri);
@ -847,11 +847,15 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
decrRefCount(argv[4]);
}
/* Send the specified range to the client 'c'. The range the client will
* receive is between start and end inclusive, if 'count' is non zero, no more
* than 'count' elements are sent. The 'end' pointer can be NULL to mean that
* we want all the elements from 'start' till the end of the stream. If 'rev'
* is non zero, elements are produced in reversed order from end to start.
/* Send the stream items in the specified range to the client 'c'. The range
* the client will receive is between start and end inclusive, if 'count' is
* non zero, no more than 'count' elements are sent.
*
* The 'end' pointer can be NULL to mean that we want all the elements from
* 'start' till the end of the stream. If 'rev' is non zero, elements are
* produced in reversed order from end to start.
*
* The function returns the number of entries emitted.
*
* If group and consumer are not NULL, the function performs additional work:
* 1. It updates the last delivered ID in the group in case we are
@ -863,15 +867,15 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
*
* The behavior may be modified passing non-zero flags:
*
* STREAM_RWR_NOACK: Do not craete PEL entries, that is, the point "3" above
* STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above
* is not performed.
* STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries,
* and return the number of entries emitted as usually.
* This is used when the function is just used in order
* to emit data and there is some higher level logic.
*
* The final argument 'spi' (stream propagatino info pointer) is a structure
* filled with information needed to propagte the command execution to AOF
* The final argument 'spi' (stream propagation info pointer) is a structure
* filled with information needed to propagate the command execution to AOF
* and slaves, in the case a consumer group was passed: we need to generate
* XCLAIM commands to create the pending list into AOF/slaves in that case.
*
@ -890,6 +894,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
boundaries, just the entries. */
#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
void *arraylen_ptr = NULL;
size_t arraylen = 0;
@ -898,15 +903,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamID id;
int propagate_last_id = 0;
/* If a group was passed, we check if the request is about messages
* never delivered so far (normally this happens when ">" ID is passed).
*
* If instead the client is asking for some history, we serve it
* using a different function, so that we return entries *solely*
* from its own PEL. This ensures each consumer will always and only
* see the history of messages delivered to it and not yet confirmed
/* If the client is asking for some history, we serve it using a
* different function, so that we return entries *solely* from its
* own PEL. This ensures each consumer will always and only see
* the history of messages delivered to it and not yet confirmed
* as delivered. */
if (group && streamCompareID(start,&group->last_id) <= 0) {
if (group && (flags & STREAM_RWR_HISTORY)) {
return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
consumer);
}
@ -1023,7 +1025,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
streamID thisid;
streamDecodeID(ri.key,&thisid);
if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL,
if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL) == 0)
{
/* Note that we may have a not acknowledged entry in the PEL
@ -1136,7 +1138,7 @@ invalid:
}
/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
* 0, to be used when - and + are accetable IDs. */
* 0, to be used when - and + are acceptable IDs. */
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
return streamGenericParseIDOrReply(c,o,id,missing_seq,0);
}
@ -1470,8 +1472,10 @@ void xreadCommand(client *c) {
stream *s = o->ptr;
streamID *gt = ids+i; /* ID must be greater than this. */
int serve_synchronously = 0;
int serve_history = 0; /* True for XREADGROUP with ID != ">". */
/* Check if there are the conditions to serve the client synchronously. */
/* Check if there are the conditions to serve the client
* synchronously. */
if (groups) {
/* If the consumer is blocked on a group, we always serve it
* synchronously (serving its local history) if the ID specified
@ -1480,6 +1484,7 @@ void xreadCommand(client *c) {
gt->seq != UINT64_MAX)
{
serve_synchronously = 1;
serve_history = 1;
} else {
/* We also want to serve a consumer in a consumer group
* synchronously in case the group top item delivered is smaller
@ -1515,9 +1520,12 @@ void xreadCommand(client *c) {
if (groups) consumer = streamLookupConsumer(groups[i],
consumername->ptr,1);
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
int flags = 0;
if (noack) flags |= STREAM_RWR_NOACK;
if (serve_history) flags |= STREAM_RWR_HISTORY;
streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL,
consumer, noack, &spi);
consumer, flags, &spi);
if (groups) server.dirty++;
}
}
@ -2155,7 +2163,7 @@ void xclaimCommand(client *c) {
/* If we stopped because some IDs cannot be parsed, perhaps they
* are trailing options. */
time_t now = mstime();
mstime_t now = mstime();
streamID last_id = {0,0};
int propagate_last_id = 0;
for (; j < c->argc; j++) {
@ -2274,8 +2282,9 @@ void xclaimCommand(client *c) {
if (justid) {
addReplyStreamID(c,&id);
} else {
streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
if (!emitted) addReply(c,shared.nullbulk);
}
arraylen++;

View File

@ -574,12 +574,12 @@ int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
switch(c[0]) {
case '+':
if (c[1] != '\0') return C_ERR;
*ex = 0;
*ex = 1;
*dest = shared.maxstring;
return C_OK;
case '-':
if (c[1] != '\0') return C_ERR;
*ex = 0;
*ex = 1;
*dest = shared.minstring;
return C_OK;
case '(':
@ -652,9 +652,8 @@ int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
zskiplistNode *x;
/* Test for ranges that will always be empty. */
if (sdscmplex(range->min,range->max) > 1 ||
(sdscmp(range->min,range->max) == 0 &&
(range->minex || range->maxex)))
int cmp = sdscmplex(range->min,range->max);
if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
return 0;
x = zsl->tail;
if (x == NULL || !zslLexValueGteMin(x->ele,range))
@ -927,9 +926,8 @@ int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
unsigned char *p;
/* Test for ranges that will always be empty. */
if (sdscmplex(range->min,range->max) > 1 ||
(sdscmp(range->min,range->max) == 0 &&
(range->minex || range->maxex)))
int cmp = sdscmplex(range->min,range->max);
if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
return 0;
p = ziplistIndex(zl,-2); /* Last element. */

View File

@ -39,6 +39,7 @@
#include <float.h>
#include <stdint.h>
#include <errno.h>
#include <time.h>
#include "util.h"
#include "sha1.h"
@ -605,7 +606,7 @@ void getRandomHexChars(char *p, size_t len) {
* already, this will be detected and handled correctly.
*
* The function does not try to normalize everything, but only the obvious
* case of one or more "../" appearning at the start of "filename"
* case of one or more "../" appearing at the start of "filename"
* relative path. */
sds getAbsolutePath(char *filename) {
char cwd[1024];
@ -652,6 +653,24 @@ sds getAbsolutePath(char *filename) {
return abspath;
}
/*
* Gets the proper timezone in a more portable fashion
* i.e timezone variables are linux specific.
*/
unsigned long getTimeZone(void) {
#ifdef __linux__
return timezone;
#else
struct timeval tv;
struct timezone tz;
gettimeofday(&tv, &tz);
return tz.tz_minuteswest * 60UL;
#endif
}
/* Return true if the specified path is just a file basename without any
* relative or absolute path. This function just checks that no / or \
* character exists inside the specified path, that's enough in the

View File

@ -50,6 +50,7 @@ int string2ld(const char *s, size_t slen, long double *dp);
int d2string(char *buf, size_t len, double value);
int ld2string(char *buf, size_t len, long double value, int humanfriendly);
sds getAbsolutePath(char *filename);
unsigned long getTimeZone(void);
int pathIsBaseName(char *path);
#ifdef REDIS_TEST

View File

@ -91,6 +91,14 @@ proc wait_for_sync r {
}
}
proc wait_for_ofs_sync {r1 r2} {
wait_for_condition 50 100 {
[status $r1 master_repl_offset] eq [status $r2 master_repl_offset]
} else {
fail "replica didn't sync in time"
}
}
# Random integer between 0 and max (excluded).
proc randomInt {max} {
expr {int(rand()*$max)}

View File

@ -90,6 +90,7 @@ start_server {tags {"defrag"}} {
test "Active defrag big keys" {
r flushdb
r config resetstat
r config set save "" ;# prevent bgsave from interfereing with save below
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5

View File

@ -108,6 +108,93 @@ start_server {
assert {$c == 5}
}
test {XREADGROUP will not report data on empty history. Bug #5577} {
r del events
r xadd events * a 1
r xadd events * b 2
r xadd events * c 3
r xgroup create events mygroup 0
# Current local PEL should be empty
set res [r xpending events mygroup - + 10]
assert {[llength $res] == 0}
# So XREADGROUP should read an empty history as well
set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
assert {[llength [lindex $res 0 1]] == 0}
# We should fetch all the elements in the stream asking for >
set res [r xreadgroup group mygroup myconsumer count 3 streams events >]
assert {[llength [lindex $res 0 1]] == 3}
# Now the history is populated with three not acked entries
set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
assert {[llength [lindex $res 0 1]] == 3}
}
test {XREADGROUP history reporting of deleted entries. Bug #5570} {
r del mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
r XADD mystream 1 field1 A
r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
r XADD mystream MAXLEN 1 2 field1 B
r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
# Now we have two pending entries, however one should be deleted
# and one should be ok (we should only see "B")
set res [r XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0-1]
assert {[lindex $res 0 1 0] == {1-0 {}}}
assert {[lindex $res 0 1 1] == {2-0 {field1 B}}}
}
test {XCLAIM can claim PEL items from another consumer} {
# Add 3 items into the stream, and create a consumer group
r del mystream
set id1 [r XADD mystream * a 1]
set id2 [r XADD mystream * b 2]
set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0
# Client 1 reads item 1 from the stream without acknowledgements.
# Client 2 then claims pending item 1 from the PEL of client 1
set reply [
r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
]
assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}}
r debug sleep 0.2
set reply [
r XCLAIM mystream mygroup client2 10 $id1
]
assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1] eq {a 1}}
# Client 1 reads another 2 items from stream
r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream >
r debug sleep 0.2
# Delete item 2 from the stream. Now client 1 has PEL that contains
# only item 3. Try to use client 2 to claim the deleted item 2
# from the PEL of client 1, this should return nil
r XDEL mystream $id2
set reply [
r XCLAIM mystream mygroup client2 10 $id2
]
assert {[llength $reply] == 1}
assert_equal "" [lindex $reply 0]
# Delete item 3 from the stream. Now client 1 has PEL that is empty.
# Try to use client 2 to claim the deleted item 3 from the PEL
# of client 1, this should return nil
r debug sleep 0.2
r XDEL mystream $id3
set reply [
r XCLAIM mystream mygroup client2 10 $id3
]
assert {[llength $reply] == 1}
assert_equal "" [lindex $reply 0]
}
start_server {} {
set master [srv -1 client]
set master_host [srv -1 host]
@ -144,6 +231,8 @@ start_server {
}
}
wait_for_ofs_sync $master $slave
# Turn slave into master
$slave slaveof no one

View File

@ -388,7 +388,7 @@ start_server {tags {"zset"}} {
0 omega}
}
test "ZRANGEBYLEX/ZREVRANGEBYLEX/ZCOUNT basics" {
test "ZRANGEBYLEX/ZREVRANGEBYLEX/ZLEXCOUNT basics" {
create_default_lex_zset
# inclusive range
@ -416,6 +416,22 @@ start_server {tags {"zset"}} {
assert_equal {} [r zrevrangebylex zset \[elez \[elex]
assert_equal {} [r zrevrangebylex zset (hill (omega]
}
test "ZLEXCOUNT advanced" {
create_default_lex_zset
assert_equal 9 [r zlexcount zset - +]
assert_equal 0 [r zlexcount zset + -]
assert_equal 0 [r zlexcount zset + \[c]
assert_equal 0 [r zlexcount zset \[c -]
assert_equal 8 [r zlexcount zset \[bar +]
assert_equal 5 [r zlexcount zset \[bar \[foo]
assert_equal 4 [r zlexcount zset \[bar (foo]
assert_equal 4 [r zlexcount zset (bar \[foo]
assert_equal 3 [r zlexcount zset (bar (foo]
assert_equal 5 [r zlexcount zset - (foo]
assert_equal 1 [r zlexcount zset (maxstring +]
}
test "ZRANGEBYSLEX with LIMIT" {
create_default_lex_zset