Merge branch 'unstable' of https://github.com/antirez/redis into unstable
This commit is contained in:
commit
423a030ee3
18
src/Makefile
18
src/Makefile
|
@ -21,6 +21,11 @@ NODEPS:=clean distclean
|
|||
|
||||
# Default settings
|
||||
STD=-std=c99 -pedantic -DREDIS_STATIC=''
|
||||
ifneq (,$(findstring clang,$(CC)))
|
||||
ifneq (,$(findstring FreeBSD,$(uname_S)))
|
||||
STD+=-Wno-c11-extensions
|
||||
endif
|
||||
endif
|
||||
WARN=-Wall -W -Wno-missing-field-initializers
|
||||
OPT=$(OPTIMIZATION)
|
||||
|
||||
|
@ -97,10 +102,20 @@ else
|
|||
ifeq ($(uname_S),OpenBSD)
|
||||
# OpenBSD
|
||||
FINAL_LIBS+= -lpthread
|
||||
ifeq ($(USE_BACKTRACE),yes)
|
||||
FINAL_CFLAGS+= -DUSE_BACKTRACE -I/usr/local/include
|
||||
FINAL_LDFLAGS+= -L/usr/local/lib
|
||||
FINAL_LIBS+= -lexecinfo
|
||||
endif
|
||||
|
||||
else
|
||||
ifeq ($(uname_S),FreeBSD)
|
||||
# FreeBSD
|
||||
FINAL_LIBS+= -lpthread
|
||||
FINAL_LIBS+= -lpthread -lexecinfo
|
||||
else
|
||||
ifeq ($(uname_S),DragonFly)
|
||||
# FreeBSD
|
||||
FINAL_LIBS+= -lpthread -lexecinfo
|
||||
else
|
||||
# All the other OSes (notably Linux)
|
||||
FINAL_LDFLAGS+= -rdynamic
|
||||
|
@ -110,6 +125,7 @@ endif
|
|||
endif
|
||||
endif
|
||||
endif
|
||||
endif
|
||||
# Include paths to dependencies
|
||||
FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src
|
||||
|
||||
|
|
|
@ -684,7 +684,7 @@ void loadServerConfigFromString(char *config) {
|
|||
goto loaderr;
|
||||
}
|
||||
} else if ((!strcasecmp(argv[0],"cluster-slave-no-failover") ||
|
||||
!strcasecmp(argv[0],"cluster-replica-no-failiver")) &&
|
||||
!strcasecmp(argv[0],"cluster-replica-no-failover")) &&
|
||||
argc == 2)
|
||||
{
|
||||
server.cluster_slave_no_failover = yesnotoi(argv[1]);
|
||||
|
|
|
@ -62,7 +62,9 @@
|
|||
#endif
|
||||
|
||||
/* Test for backtrace() */
|
||||
#if defined(__APPLE__) || (defined(__linux__) && defined(__GLIBC__))
|
||||
#if defined(__APPLE__) || (defined(__linux__) && defined(__GLIBC__)) || \
|
||||
defined(__FreeBSD__) || (defined(__OpenBSD__) && defined(USE_BACKTRACE))\
|
||||
|| defined(__DragonFly__)
|
||||
#define HAVE_BACKTRACE 1
|
||||
#endif
|
||||
|
||||
|
|
161
src/debug.c
161
src/debug.c
|
@ -37,7 +37,11 @@
|
|||
|
||||
#ifdef HAVE_BACKTRACE
|
||||
#include <execinfo.h>
|
||||
#ifndef __OpenBSD__
|
||||
#include <ucontext.h>
|
||||
#else
|
||||
typedef ucontext_t sigcontext_t;
|
||||
#endif
|
||||
#include <fcntl.h>
|
||||
#include "bio.h"
|
||||
#include <unistd.h>
|
||||
|
@ -729,6 +733,22 @@ static void *getMcontextEip(ucontext_t *uc) {
|
|||
#elif defined(__aarch64__) /* Linux AArch64 */
|
||||
return (void*) uc->uc_mcontext.pc;
|
||||
#endif
|
||||
#elif defined(__FreeBSD__)
|
||||
/* FreeBSD */
|
||||
#if defined(__i386__)
|
||||
return (void*) uc->uc_mcontext.mc_eip;
|
||||
#elif defined(__x86_64__)
|
||||
return (void*) uc->uc_mcontext.mc_rip;
|
||||
#endif
|
||||
#elif defined(__OpenBSD__)
|
||||
/* OpenBSD */
|
||||
#if defined(__i386__)
|
||||
return (void*) uc->sc_eip;
|
||||
#elif defined(__x86_64__)
|
||||
return (void*) uc->sc_rip;
|
||||
#endif
|
||||
#elif defined(__DragonFly__)
|
||||
return (void*) uc->uc_mcontext.mc_rip;
|
||||
#else
|
||||
return NULL;
|
||||
#endif
|
||||
|
@ -870,6 +890,145 @@ void logRegisters(ucontext_t *uc) {
|
|||
);
|
||||
logStackContent((void**)uc->uc_mcontext.gregs[15]);
|
||||
#endif
|
||||
#elif defined(__FreeBSD__)
|
||||
#if defined(__x86_64__)
|
||||
serverLog(LL_WARNING,
|
||||
"\n"
|
||||
"RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n"
|
||||
"RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n"
|
||||
"R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n"
|
||||
"R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n"
|
||||
"RIP:%016lx EFL:%016lx\nCSGSFS:%016lx",
|
||||
(unsigned long) uc->uc_mcontext.mc_rax,
|
||||
(unsigned long) uc->uc_mcontext.mc_rbx,
|
||||
(unsigned long) uc->uc_mcontext.mc_rcx,
|
||||
(unsigned long) uc->uc_mcontext.mc_rdx,
|
||||
(unsigned long) uc->uc_mcontext.mc_rdi,
|
||||
(unsigned long) uc->uc_mcontext.mc_rsi,
|
||||
(unsigned long) uc->uc_mcontext.mc_rbp,
|
||||
(unsigned long) uc->uc_mcontext.mc_rsp,
|
||||
(unsigned long) uc->uc_mcontext.mc_r8,
|
||||
(unsigned long) uc->uc_mcontext.mc_r9,
|
||||
(unsigned long) uc->uc_mcontext.mc_r10,
|
||||
(unsigned long) uc->uc_mcontext.mc_r11,
|
||||
(unsigned long) uc->uc_mcontext.mc_r12,
|
||||
(unsigned long) uc->uc_mcontext.mc_r13,
|
||||
(unsigned long) uc->uc_mcontext.mc_r14,
|
||||
(unsigned long) uc->uc_mcontext.mc_r15,
|
||||
(unsigned long) uc->uc_mcontext.mc_rip,
|
||||
(unsigned long) uc->uc_mcontext.mc_rflags,
|
||||
(unsigned long) uc->uc_mcontext.mc_cs
|
||||
);
|
||||
logStackContent((void**)uc->uc_mcontext.mc_rsp);
|
||||
#elif defined(__i386__)
|
||||
serverLog(LL_WARNING,
|
||||
"\n"
|
||||
"EAX:%08lx EBX:%08lx ECX:%08lx EDX:%08lx\n"
|
||||
"EDI:%08lx ESI:%08lx EBP:%08lx ESP:%08lx\n"
|
||||
"SS :%08lx EFL:%08lx EIP:%08lx CS:%08lx\n"
|
||||
"DS :%08lx ES :%08lx FS :%08lx GS:%08lx",
|
||||
(unsigned long) uc->uc_mcontext.mc_eax,
|
||||
(unsigned long) uc->uc_mcontext.mc_ebx,
|
||||
(unsigned long) uc->uc_mcontext.mc_ebx,
|
||||
(unsigned long) uc->uc_mcontext.mc_edx,
|
||||
(unsigned long) uc->uc_mcontext.mc_edi,
|
||||
(unsigned long) uc->uc_mcontext.mc_esi,
|
||||
(unsigned long) uc->uc_mcontext.mc_ebp,
|
||||
(unsigned long) uc->uc_mcontext.mc_esp,
|
||||
(unsigned long) uc->uc_mcontext.mc_ss,
|
||||
(unsigned long) uc->uc_mcontext.mc_eflags,
|
||||
(unsigned long) uc->uc_mcontext.mc_eip,
|
||||
(unsigned long) uc->uc_mcontext.mc_cs,
|
||||
(unsigned long) uc->uc_mcontext.mc_es,
|
||||
(unsigned long) uc->uc_mcontext.mc_fs,
|
||||
(unsigned long) uc->uc_mcontext.mc_gs
|
||||
);
|
||||
logStackContent((void**)uc->uc_mcontext.mc_esp);
|
||||
#endif
|
||||
#elif defined(__OpenBSD__)
|
||||
#if defined(__x86_64__)
|
||||
serverLog(LL_WARNING,
|
||||
"\n"
|
||||
"RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n"
|
||||
"RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n"
|
||||
"R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n"
|
||||
"R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n"
|
||||
"RIP:%016lx EFL:%016lx\nCSGSFS:%016lx",
|
||||
(unsigned long) uc->sc_rax,
|
||||
(unsigned long) uc->sc_rbx,
|
||||
(unsigned long) uc->sc_rcx,
|
||||
(unsigned long) uc->sc_rdx,
|
||||
(unsigned long) uc->sc_rdi,
|
||||
(unsigned long) uc->sc_rsi,
|
||||
(unsigned long) uc->sc_rbp,
|
||||
(unsigned long) uc->sc_rsp,
|
||||
(unsigned long) uc->sc_r8,
|
||||
(unsigned long) uc->sc_r9,
|
||||
(unsigned long) uc->sc_r10,
|
||||
(unsigned long) uc->sc_r11,
|
||||
(unsigned long) uc->sc_r12,
|
||||
(unsigned long) uc->sc_r13,
|
||||
(unsigned long) uc->sc_r14,
|
||||
(unsigned long) uc->sc_r15,
|
||||
(unsigned long) uc->sc_rip,
|
||||
(unsigned long) uc->sc_rflags,
|
||||
(unsigned long) uc->sc_cs
|
||||
);
|
||||
logStackContent((void**)uc->sc_rsp);
|
||||
#elif defined(__i386__)
|
||||
serverLog(LL_WARNING,
|
||||
"\n"
|
||||
"EAX:%08lx EBX:%08lx ECX:%08lx EDX:%08lx\n"
|
||||
"EDI:%08lx ESI:%08lx EBP:%08lx ESP:%08lx\n"
|
||||
"SS :%08lx EFL:%08lx EIP:%08lx CS:%08lx\n"
|
||||
"DS :%08lx ES :%08lx FS :%08lx GS:%08lx",
|
||||
(unsigned long) uc->sc_eax,
|
||||
(unsigned long) uc->sc_ebx,
|
||||
(unsigned long) uc->sc_ebx,
|
||||
(unsigned long) uc->sc_edx,
|
||||
(unsigned long) uc->sc_edi,
|
||||
(unsigned long) uc->sc_esi,
|
||||
(unsigned long) uc->sc_ebp,
|
||||
(unsigned long) uc->sc_esp,
|
||||
(unsigned long) uc->sc_ss,
|
||||
(unsigned long) uc->sc_eflags,
|
||||
(unsigned long) uc->sc_eip,
|
||||
(unsigned long) uc->sc_cs,
|
||||
(unsigned long) uc->sc_es,
|
||||
(unsigned long) uc->sc_fs,
|
||||
(unsigned long) uc->sc_gs
|
||||
);
|
||||
logStackContent((void**)uc->sc_esp);
|
||||
#endif
|
||||
#elif defined(__DragonFly__)
|
||||
serverLog(LL_WARNING,
|
||||
"\n"
|
||||
"RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n"
|
||||
"RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n"
|
||||
"R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n"
|
||||
"R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n"
|
||||
"RIP:%016lx EFL:%016lx\nCSGSFS:%016lx",
|
||||
(unsigned long) uc->uc_mcontext.mc_rax,
|
||||
(unsigned long) uc->uc_mcontext.mc_rbx,
|
||||
(unsigned long) uc->uc_mcontext.mc_rcx,
|
||||
(unsigned long) uc->uc_mcontext.mc_rdx,
|
||||
(unsigned long) uc->uc_mcontext.mc_rdi,
|
||||
(unsigned long) uc->uc_mcontext.mc_rsi,
|
||||
(unsigned long) uc->uc_mcontext.mc_rbp,
|
||||
(unsigned long) uc->uc_mcontext.mc_rsp,
|
||||
(unsigned long) uc->uc_mcontext.mc_r8,
|
||||
(unsigned long) uc->uc_mcontext.mc_r9,
|
||||
(unsigned long) uc->uc_mcontext.mc_r10,
|
||||
(unsigned long) uc->uc_mcontext.mc_r11,
|
||||
(unsigned long) uc->uc_mcontext.mc_r12,
|
||||
(unsigned long) uc->uc_mcontext.mc_r13,
|
||||
(unsigned long) uc->uc_mcontext.mc_r14,
|
||||
(unsigned long) uc->uc_mcontext.mc_r15,
|
||||
(unsigned long) uc->uc_mcontext.mc_rip,
|
||||
(unsigned long) uc->uc_mcontext.mc_rflags,
|
||||
(unsigned long) uc->uc_mcontext.mc_cs
|
||||
);
|
||||
logStackContent((void**)uc->uc_mcontext.mc_rsp);
|
||||
#else
|
||||
serverLog(LL_WARNING,
|
||||
" Dumping of registers not supported for this OS/arch");
|
||||
|
@ -1189,6 +1348,8 @@ void serverLogHexDump(int level, char *descr, void *value, size_t len) {
|
|||
void watchdogSignalHandler(int sig, siginfo_t *info, void *secret) {
|
||||
#ifdef HAVE_BACKTRACE
|
||||
ucontext_t *uc = (ucontext_t*) secret;
|
||||
#else
|
||||
(void)secret;
|
||||
#endif
|
||||
UNUSED(info);
|
||||
UNUSED(sig);
|
||||
|
|
|
@ -562,11 +562,21 @@ sds latencyCommandGenSparkeline(char *event, struct latencyTimeSeries *ts) {
|
|||
*
|
||||
* LATENCY HISTORY: return time-latency samples for the specified event.
|
||||
* LATENCY LATEST: return the latest latency for all the events classes.
|
||||
* LATENCY DOCTOR: returns an human readable analysis of instance latency.
|
||||
* LATENCY DOCTOR: returns a human readable analysis of instance latency.
|
||||
* LATENCY GRAPH: provide an ASCII graph of the latency of the specified event.
|
||||
* LATENCY RESET: reset data of a specified event or all the data if no event provided.
|
||||
*/
|
||||
void latencyCommand(client *c) {
|
||||
const char *help[] = {
|
||||
"DOCTOR -- Returns a human readable latency analysis report.",
|
||||
"GRAPH <event> -- Returns an ASCII latency graph for the event class.",
|
||||
"HISTORY <event> -- Returns time-latency samples for the event class.",
|
||||
"LATEST -- Returns the latest latency samples for all events.",
|
||||
"RESET [event ...] -- Resets latency data of one or more event classes.",
|
||||
" (default: reset all data for all event classes)",
|
||||
"HELP -- Prints this help.",
|
||||
NULL
|
||||
};
|
||||
struct latencyTimeSeries *ts;
|
||||
|
||||
if (!strcasecmp(c->argv[1]->ptr,"history") && c->argc == 3) {
|
||||
|
@ -611,8 +621,10 @@ void latencyCommand(client *c) {
|
|||
resets += latencyResetEvent(c->argv[j]->ptr);
|
||||
addReplyLongLong(c,resets);
|
||||
}
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc >= 2) {
|
||||
addReplyHelp(c, help);
|
||||
} else {
|
||||
addReply(c,shared.syntaxerr);
|
||||
addReplySubcommandSyntaxError(c);
|
||||
}
|
||||
return;
|
||||
|
||||
|
|
|
@ -2109,6 +2109,7 @@ int checkClientOutputBufferLimits(client *c) {
|
|||
* called from contexts where the client can't be freed safely, i.e. from the
|
||||
* lower level functions pushing data inside the client output buffers. */
|
||||
void asyncCloseClientOnOutputBufferLimitReached(client *c) {
|
||||
if (c->fd == -1) return; /* It is unsafe to free fake clients. */
|
||||
serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
|
||||
if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return;
|
||||
if (checkClientOutputBufferLimits(c)) {
|
||||
|
|
39
src/object.c
39
src/object.c
|
@ -1285,9 +1285,18 @@ NULL
|
|||
*
|
||||
* Usage: MEMORY usage <key> */
|
||||
void memoryCommand(client *c) {
|
||||
robj *o;
|
||||
|
||||
if (!strcasecmp(c->argv[1]->ptr,"usage") && c->argc >= 3) {
|
||||
if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) {
|
||||
const char *help[] = {
|
||||
"DOCTOR - Return memory problems reports.",
|
||||
"MALLOC-STATS -- Return internal statistics report from the memory allocator.",
|
||||
"PURGE -- Attempt to purge dirty pages for reclamation by the allocator.",
|
||||
"STATS -- Return information about the memory usage of the server.",
|
||||
"USAGE <key> [SAMPLES <count>] -- Return memory in bytes used by <key> and its value. Nested values are sampled up to <count> times (default: 5).",
|
||||
NULL
|
||||
};
|
||||
addReplyHelp(c, help);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"usage") && c->argc >= 3) {
|
||||
dictEntry *de;
|
||||
long long samples = OBJ_COMPUTE_SIZE_DEF_SAMPLES;
|
||||
for (int j = 3; j < c->argc; j++) {
|
||||
if (!strcasecmp(c->argv[j]->ptr,"samples") &&
|
||||
|
@ -1306,10 +1315,12 @@ void memoryCommand(client *c) {
|
|||
return;
|
||||
}
|
||||
}
|
||||
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk))
|
||||
== NULL) return;
|
||||
size_t usage = objectComputeSize(o,samples);
|
||||
usage += sdsAllocSize(c->argv[2]->ptr);
|
||||
if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) {
|
||||
addReply(c, shared.nullbulk);
|
||||
return;
|
||||
}
|
||||
size_t usage = objectComputeSize(dictGetVal(de),samples);
|
||||
usage += sdsAllocSize(dictGetKey(de));
|
||||
usage += sizeof(dictEntry);
|
||||
addReplyLongLong(c,usage);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) {
|
||||
|
@ -1434,19 +1445,7 @@ void memoryCommand(client *c) {
|
|||
addReply(c, shared.ok);
|
||||
/* Nothing to do for other allocators. */
|
||||
#endif
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) {
|
||||
addReplyMultiBulkLen(c,5);
|
||||
addReplyBulkCString(c,
|
||||
"MEMORY DOCTOR - Outputs memory problems report");
|
||||
addReplyBulkCString(c,
|
||||
"MEMORY USAGE <key> [SAMPLES <count>] - Estimate memory usage of key");
|
||||
addReplyBulkCString(c,
|
||||
"MEMORY STATS - Show memory usage details");
|
||||
addReplyBulkCString(c,
|
||||
"MEMORY PURGE - Ask the allocator to release memory");
|
||||
addReplyBulkCString(c,
|
||||
"MEMORY MALLOC-STATS - Show allocator internal stats");
|
||||
} else {
|
||||
addReplyError(c,"Syntax error. Try MEMORY HELP");
|
||||
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)c->argv[1]->ptr);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1645,6 +1645,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) {
|
|||
* node: the entries inside the listpack itself are delta-encoded
|
||||
* relatively to this ID. */
|
||||
sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL);
|
||||
if (nodekey == NULL) {
|
||||
rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error.");
|
||||
}
|
||||
if (sdslen(nodekey) != sizeof(streamID)) {
|
||||
rdbExitReportCorruptRDB("Stream node key entry is not the "
|
||||
"size of a stream ID");
|
||||
|
|
214
src/redis-cli.c
214
src/redis-cli.c
|
@ -2726,8 +2726,7 @@ static int clusterManagerSetSlot(clusterManagerNode *node1,
|
|||
if (err != NULL) {
|
||||
*err = zmalloc((reply->len + 1) * sizeof(char));
|
||||
strcpy(*err, reply->str);
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, err);
|
||||
}
|
||||
} else CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, reply->str);
|
||||
goto cleanup;
|
||||
}
|
||||
cleanup:
|
||||
|
@ -2830,7 +2829,7 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||
if (err != NULL) {
|
||||
*err = zmalloc((reply->len + 1) * sizeof(char));
|
||||
strcpy(*err, reply->str);
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err);
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, *err);
|
||||
}
|
||||
goto next;
|
||||
}
|
||||
|
@ -2848,14 +2847,21 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source,
|
|||
if (migrate_reply == NULL) goto next;
|
||||
if (migrate_reply->type == REDIS_REPLY_ERROR) {
|
||||
if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) {
|
||||
/* If the key already exists, try to migrate keys
|
||||
* adding REPLACE option.
|
||||
* If the key's slot is not served, try to assign slot
|
||||
* to the target node. */
|
||||
int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL);
|
||||
if (strstr(migrate_reply->str, "slot not served") != NULL)
|
||||
clusterManagerSetSlot(source, target, slot, "node", NULL);
|
||||
clusterManagerLogWarn("*** Target key exists. "
|
||||
"Replacing it for FIX.\n");
|
||||
freeReplyObject(migrate_reply);
|
||||
/* Try to migrate keys adding REPLACE option. */
|
||||
migrate_reply = clusterManagerMigrateKeysInReply(source,
|
||||
target,
|
||||
reply,
|
||||
1, timeout,
|
||||
is_busy,
|
||||
timeout,
|
||||
NULL);
|
||||
success = (migrate_reply != NULL &&
|
||||
migrate_reply->type != REDIS_REPLY_ERROR);
|
||||
|
@ -2941,7 +2947,7 @@ static int clusterManagerMoveSlot(clusterManagerNode *source,
|
|||
if (err != NULL) {
|
||||
*err = zmalloc((r->len + 1) * sizeof(char));
|
||||
strcpy(*err, r->str);
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err);
|
||||
CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, *err);
|
||||
}
|
||||
}
|
||||
freeReplyObject(r);
|
||||
|
@ -3301,8 +3307,8 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
|
|||
nodename = token;
|
||||
tot_size = (p - token);
|
||||
name_len = tot_size++; // Make room for ':' in tot_size
|
||||
} else if (i == 8) break;
|
||||
i++;
|
||||
}
|
||||
if (++i == 8) break;
|
||||
}
|
||||
if (i != 8) continue;
|
||||
if (nodename == NULL) continue;
|
||||
|
@ -3341,7 +3347,7 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) {
|
|||
char *sp = cfg + name_len;
|
||||
*(sp++) = ':';
|
||||
for (i = 0; i < c; i++) {
|
||||
if (i > 0) *(sp++) = '|';
|
||||
if (i > 0) *(sp++) = ',';
|
||||
int slen = strlen(slots[i]);
|
||||
memcpy(sp, slots[i], slen);
|
||||
sp += slen;
|
||||
|
@ -3499,6 +3505,34 @@ static clusterManagerNode *clusterManagerNodeWithLeastReplicas() {
|
|||
return node;
|
||||
}
|
||||
|
||||
/* This function returns a random master node, return NULL if none */
|
||||
|
||||
static clusterManagerNode *clusterManagerNodeMasterRandom() {
|
||||
int master_count = 0;
|
||||
int idx;
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
master_count++;
|
||||
}
|
||||
|
||||
srand(time(NULL));
|
||||
idx = rand() % master_count;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
if (!idx--) {
|
||||
return n;
|
||||
}
|
||||
}
|
||||
/* Can not be reached */
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
||||
int i, fixed = 0;
|
||||
list *none = NULL, *single = NULL, *multi = NULL;
|
||||
|
@ -3571,22 +3605,25 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
"across the cluster:\n");
|
||||
clusterManagerPrintSlotsList(none);
|
||||
if (confirmWithYes("Fix these slots by covering with a random node?")){
|
||||
srand(time(NULL));
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(none, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
sds slot = ln->value;
|
||||
long idx = (long) (rand() % listLength(cluster_manager.nodes));
|
||||
listNode *node_n = listIndex(cluster_manager.nodes, idx);
|
||||
assert(node_n != NULL);
|
||||
clusterManagerNode *n = node_n->value;
|
||||
clusterManagerNode *n = clusterManagerNodeMasterRandom();
|
||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
|
@ -3614,10 +3651,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
clusterManagerNode *n = fn->value;
|
||||
clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
|
@ -3651,7 +3695,11 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
clusterManagerLogInfo(">>> Covering slot %s moving keys "
|
||||
"to %s:%d\n", slot,
|
||||
target->ip, target->port);
|
||||
/* Ensure the slot is not already assigned. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER DELSLOTS %s", slot);
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(target,
|
||||
"CLUSTER ADDSLOTS %s", slot);
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
|
@ -3660,6 +3708,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
"CLUSTER SETSLOT %s %s", slot, "STABLE");
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH");
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Since CLUSTER ADDSLOTS succeeded, we also update the slot
|
||||
* info into the node struct, in order to keep it synced */
|
||||
|
@ -3670,14 +3721,22 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
while ((nln = listNext(&nli)) != NULL) {
|
||||
clusterManagerNode *src = nln->value;
|
||||
if (src == target) continue;
|
||||
/* Assign the slot to target node in the source node. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s %s", slot,
|
||||
"NODE", target->name);
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
/* Set the source node in 'importing' state
|
||||
* (even if we will actually migrate keys away)
|
||||
* in order to avoid receiving redirections
|
||||
* for MIGRATE. */
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(src,
|
||||
r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s %s", slot,
|
||||
"IMPORTING", target->name);
|
||||
if (!clusterManagerCheckRedisReply(target, r, NULL))
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
|
@ -3687,6 +3746,13 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
|
|||
fixed = -1;
|
||||
goto cleanup;
|
||||
}
|
||||
r = CLUSTER_MANAGER_COMMAND(src,
|
||||
"CLUSTER SETSLOT %s %s", slot,
|
||||
"STABLE");
|
||||
if (!clusterManagerCheckRedisReply(src, r, NULL))
|
||||
fixed = -1;
|
||||
if (r) freeReplyObject(r);
|
||||
if (fixed < 0) goto cleanup;
|
||||
}
|
||||
fixed++;
|
||||
}
|
||||
|
@ -3720,11 +3786,22 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
if (n->slots[slot]) {
|
||||
if (owner == NULL) owner = n;
|
||||
listAddNodeTail(owners, n);
|
||||
if (n->slots[slot]) listAddNodeTail(owners, n);
|
||||
else {
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER COUNTKEYSINSLOT %d", slot);
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
if (success && r->integer > 0) {
|
||||
clusterManagerLogWarn("*** Found keys about slot %d "
|
||||
"in non-owner node %s:%d!\n", slot,
|
||||
n->ip, n->port);
|
||||
listAddNodeTail(owners, n);
|
||||
}
|
||||
if (r) freeReplyObject(r);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
}
|
||||
if (listLength(owners) == 1) owner = listFirst(owners)->value;
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
|
@ -3735,7 +3812,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
sds migrating_slot = n->migrating[i];
|
||||
if (atoi(migrating_slot) == slot) {
|
||||
char *sep = (listLength(migrating) == 0 ? "" : ",");
|
||||
migrating_str = sdscatfmt(migrating_str, "%s%S:%u",
|
||||
migrating_str = sdscatfmt(migrating_str, "%s%s:%u",
|
||||
sep, n->ip, n->port);
|
||||
listAddNodeTail(migrating, n);
|
||||
is_migrating = 1;
|
||||
|
@ -3748,7 +3825,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
sds importing_slot = n->importing[i];
|
||||
if (atoi(importing_slot) == slot) {
|
||||
char *sep = (listLength(importing) == 0 ? "" : ",");
|
||||
importing_str = sdscatfmt(importing_str, "%s%S:%u",
|
||||
importing_str = sdscatfmt(importing_str, "%s%s:%u",
|
||||
sep, n->ip, n->port);
|
||||
listAddNodeTail(importing, n);
|
||||
is_importing = 1;
|
||||
|
@ -3767,15 +3844,20 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
clusterManagerLogWarn("*** Found keys about slot %d "
|
||||
"in node %s:%d!\n", slot, n->ip,
|
||||
n->port);
|
||||
char *sep = (listLength(importing) == 0 ? "" : ",");
|
||||
importing_str = sdscatfmt(importing_str, "%s%S:%u",
|
||||
sep, n->ip, n->port);
|
||||
listAddNodeTail(importing, n);
|
||||
}
|
||||
if (r) freeReplyObject(r);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
}
|
||||
printf("Set as migrating in: %s\n", migrating_str);
|
||||
printf("Set as importing in: %s\n", importing_str);
|
||||
/* If there is no slot owner, set as owner the slot with the biggest
|
||||
if (sdslen(migrating_str) > 0)
|
||||
printf("Set as migrating in: %s\n", migrating_str);
|
||||
if (sdslen(importing_str) > 0)
|
||||
printf("Set as importing in: %s\n", importing_str);
|
||||
/* If there is no slot owner, set as owner the node with the biggest
|
||||
* number of keys, among the set of migrating / importing nodes. */
|
||||
if (owner == NULL) {
|
||||
clusterManagerLogInfo(">>> Nobody claims ownership, "
|
||||
|
@ -3799,6 +3881,15 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
if (!success) goto cleanup;
|
||||
/* Ensure that the slot is unassigned before assigning it to the
|
||||
* owner. */
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
/* Ignore "already unassigned" error. */
|
||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||
if (reply) freeReplyObject(reply);
|
||||
if (!success) goto cleanup;
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
|
@ -3827,32 +3918,43 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
* in migrating state, since migrating is a valid state only for
|
||||
* slot owners. */
|
||||
if (listLength(owners) > 1) {
|
||||
owner = clusterManagerGetNodeWithMostKeysInSlot(owners, slot, NULL);
|
||||
/* Owner cannot be NULL at this point, since if there are more owners,
|
||||
* the owner has been set in the previous condition (owner == NULL). */
|
||||
assert(owner != NULL);
|
||||
listRewind(owners, &li);
|
||||
redisReply *reply = NULL;
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOT %d", slot);
|
||||
reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot);
|
||||
success = clusterManagerCheckRedisReply(n, reply, NULL);
|
||||
/* Ignore "already unassigned" error. */
|
||||
if (!success && reply && reply->type == REDIS_REPLY_ERROR &&
|
||||
strstr(reply->str, "already unassigned") != NULL) success = 1;
|
||||
if (reply) freeReplyObject(reply);
|
||||
if (!success) goto cleanup;
|
||||
n->slots[slot] = 0;
|
||||
/* Assign the slot to the owner in the node 'n' configuration.' */
|
||||
success = clusterManagerSetSlot(n, owner, slot, "node", NULL);
|
||||
if (!success) goto cleanup;
|
||||
success = clusterManagerSetSlot(n, owner, slot, "importing", NULL);
|
||||
if (!success) goto cleanup;
|
||||
clusterManagerRemoveNodeFromList(importing, n); //Avoid duplicates
|
||||
/* Avoid duplicates. */
|
||||
clusterManagerRemoveNodeFromList(importing, n);
|
||||
listAddNodeTail(importing, n);
|
||||
/* Ensure that the node is not in the migrating list. */
|
||||
clusterManagerRemoveNodeFromList(migrating, n);
|
||||
}
|
||||
reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH");
|
||||
success = clusterManagerCheckRedisReply(owner, reply, NULL);
|
||||
if (reply) freeReplyObject(reply);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
int move_opts = CLUSTER_MANAGER_OPT_VERBOSE;
|
||||
/* Case 1: The slot is in migrating state in one slot, and in
|
||||
* importing state in 1 slot. That's trivial to address. */
|
||||
/* Case 1: The slot is in migrating state in one node, and in
|
||||
* importing state in 1 node. That's trivial to address. */
|
||||
if (listLength(migrating) == 1 && listLength(importing) == 1) {
|
||||
clusterManagerNode *src = listFirst(migrating)->value;
|
||||
clusterManagerNode *dst = listFirst(importing)->value;
|
||||
clusterManagerLogInfo(">>> Case 1: Moving slot %d from "
|
||||
"%s:%d to %s:%d\n", slot,
|
||||
src->ip, src->port, dst->ip, dst->port);
|
||||
success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL);
|
||||
}
|
||||
/* Case 2: There are multiple nodes that claim the slot as importing,
|
||||
|
@ -3860,7 +3962,7 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
* the slot. In this case we just move all the keys to the owner
|
||||
* according to the configuration. */
|
||||
else if (listLength(migrating) == 0 && listLength(importing) > 0) {
|
||||
clusterManagerLogInfo(">>> Moving all the %d slot keys to its "
|
||||
clusterManagerLogInfo(">>> Case 2: Moving all the %d slot keys to its "
|
||||
"owner %s:%d\n", slot, owner->ip, owner->port);
|
||||
move_opts |= CLUSTER_MANAGER_OPT_COLD;
|
||||
listRewind(importing, &li);
|
||||
|
@ -3878,25 +3980,43 @@ static int clusterManagerFixOpenSlot(int slot) {
|
|||
if (r) freeReplyObject(r);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
/* Since the slot has been moved in "cold" mode, ensure that all the
|
||||
* other nodes update their own configuration about the slot itself. */
|
||||
listRewind(cluster_manager.nodes, &li);
|
||||
while ((ln = listNext(&li)) != NULL) {
|
||||
clusterManagerNode *n = ln->value;
|
||||
if (n == owner) continue;
|
||||
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name);
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
if (r) freeReplyObject(r);
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
} else {
|
||||
int try_to_close_slot = (listLength(importing) == 0 &&
|
||||
listLength(migrating) == 1);
|
||||
if (try_to_close_slot) {
|
||||
clusterManagerNode *n = listFirst(migrating)->value;
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER GETKEYSINSLOT %d %d", slot, 10);
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
if (r) {
|
||||
if (success) try_to_close_slot = (r->elements == 0);
|
||||
freeReplyObject(r);
|
||||
if (!owner || owner != n) {
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n,
|
||||
"CLUSTER GETKEYSINSLOT %d %d", slot, 10);
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
if (r) {
|
||||
if (success) try_to_close_slot = (r->elements == 0);
|
||||
freeReplyObject(r);
|
||||
}
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
if (!success) goto cleanup;
|
||||
}
|
||||
/* Case 3: There are no slots claiming to be in importing state, but
|
||||
* there is a migrating node that actually don't have any key. We
|
||||
* can just close the slot, probably a reshard interrupted in the middle. */
|
||||
* there is a migrating node that actually don't have any key or is the
|
||||
* slot owner. We can just close the slot, probably a reshard interrupted
|
||||
* in the middle. */
|
||||
if (try_to_close_slot) {
|
||||
clusterManagerNode *n = listFirst(migrating)->value;
|
||||
clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n",
|
||||
slot, n->ip, n->port);
|
||||
redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s",
|
||||
slot, "STABLE");
|
||||
success = clusterManagerCheckRedisReply(n, r, NULL);
|
||||
|
@ -4025,6 +4145,7 @@ static int clusterManagerCheckCluster(int quiet) {
|
|||
result = 0;
|
||||
if (do_fix/* && result*/) {
|
||||
dictType dtype = clusterManagerDictType;
|
||||
dtype.keyDestructor = dictSdsDestructor;
|
||||
dtype.valDestructor = dictListDestructor;
|
||||
clusterManagerUncoveredSlots = dictCreate(&dtype, NULL);
|
||||
int fixed = clusterManagerFixSlotsCoverage(slots);
|
||||
|
@ -4061,7 +4182,7 @@ static clusterManagerNode *clusterNodeForResharding(char *id,
|
|||
static list *clusterManagerComputeReshardTable(list *sources, int numslots) {
|
||||
list *moved = listCreate();
|
||||
int src_count = listLength(sources), i = 0, tot_slots = 0, j;
|
||||
clusterManagerNode **sorted = zmalloc(src_count * sizeof(**sorted));
|
||||
clusterManagerNode **sorted = zmalloc(src_count * sizeof(*sorted));
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(sources, &li);
|
||||
|
@ -5100,10 +5221,13 @@ static int clusterManagerCommandSetTimeout(int argc, char **argv) {
|
|||
n->port);
|
||||
ok_count++;
|
||||
continue;
|
||||
reply_err:
|
||||
reply_err:;
|
||||
int need_free = 0;
|
||||
if (err == NULL) err = "";
|
||||
else need_free = 1;
|
||||
clusterManagerLogErr("ERR setting node-timeot for %s:%d: %s\n", n->ip,
|
||||
n->port, err);
|
||||
if (need_free) zfree(err);
|
||||
err_count++;
|
||||
}
|
||||
clusterManagerLogInfo(">>> New node timeout set. %d OK, %d ERR.\n",
|
||||
|
|
|
@ -1245,6 +1245,18 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||
if (eof_reached) {
|
||||
int aof_is_enabled = server.aof_state != AOF_OFF;
|
||||
|
||||
/* Ensure background save doesn't overwrite synced data */
|
||||
if (server.rdb_child_pid != -1) {
|
||||
serverLog(LL_NOTICE,
|
||||
"Replica is about to load the RDB file received from the "
|
||||
"master, but there is a pending RDB child running. "
|
||||
"Killing process %ld and removing its temp file to avoid "
|
||||
"any race",
|
||||
(long) server.rdb_child_pid);
|
||||
kill(server.rdb_child_pid,SIGUSR1);
|
||||
rdbRemoveTempFile(server.rdb_child_pid);
|
||||
}
|
||||
|
||||
if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
|
||||
serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno));
|
||||
cancelReplicationHandshake();
|
||||
|
|
|
@ -452,13 +452,15 @@ struct redisCommand sentinelcmds[] = {
|
|||
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
|
||||
{"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
|
||||
{"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
|
||||
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
|
||||
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0},
|
||||
{"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0}
|
||||
};
|
||||
|
||||
/* This function overwrites a few normal Redis config default with Sentinel
|
||||
* specific defaults. */
|
||||
void initSentinelConfig(void) {
|
||||
server.port = REDIS_SENTINEL_PORT;
|
||||
server.protected_mode = 0; /* Sentinel must be exposed. */
|
||||
}
|
||||
|
||||
/* Perform the Sentinel mode initialization. */
|
||||
|
@ -1941,12 +1943,25 @@ werr:
|
|||
/* Send the AUTH command with the specified master password if needed.
|
||||
* Note that for slaves the password set for the master is used.
|
||||
*
|
||||
* In case this Sentinel requires a password as well, via the "requirepass"
|
||||
* configuration directive, we assume we should use the local password in
|
||||
* order to authenticate when connecting with the other Sentinels as well.
|
||||
* So basically all the Sentinels share the same password and use it to
|
||||
* authenticate reciprocally.
|
||||
*
|
||||
* We don't check at all if the command was successfully transmitted
|
||||
* to the instance as if it fails Sentinel will detect the instance down,
|
||||
* will disconnect and reconnect the link and so forth. */
|
||||
void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) {
|
||||
char *auth_pass = (ri->flags & SRI_MASTER) ? ri->auth_pass :
|
||||
ri->master->auth_pass;
|
||||
char *auth_pass = NULL;
|
||||
|
||||
if (ri->flags & SRI_MASTER) {
|
||||
auth_pass = ri->auth_pass;
|
||||
} else if (ri->flags & SRI_SLAVE) {
|
||||
auth_pass = ri->master->auth_pass;
|
||||
} else if (ri->flags & SRI_SENTINEL) {
|
||||
if (server.requirepass) auth_pass = server.requirepass;
|
||||
}
|
||||
|
||||
if (auth_pass) {
|
||||
if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s",
|
||||
|
|
10
src/server.c
10
src/server.c
|
@ -1526,10 +1526,10 @@ void initServerConfig(void) {
|
|||
server.runid[CONFIG_RUN_ID_SIZE] = '\0';
|
||||
changeReplicationId();
|
||||
clearReplicationId2();
|
||||
server.timezone = timezone; /* Initialized by tzset(). */
|
||||
server.timezone = getTimeZone(); /* Initialized by tzset(). */
|
||||
server.configfile = NULL;
|
||||
server.executable = NULL;
|
||||
server.config_hz = CONFIG_DEFAULT_HZ;
|
||||
server.hz = server.config_hz = CONFIG_DEFAULT_HZ;
|
||||
server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ;
|
||||
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
|
||||
server.port = CONFIG_DEFAULT_SERVER_PORT;
|
||||
|
@ -1958,9 +1958,13 @@ int listenToPort(int port, int *fds, int *count) {
|
|||
}
|
||||
if (fds[*count] == ANET_ERR) {
|
||||
serverLog(LL_WARNING,
|
||||
"Creating Server TCP listening socket %s:%d: %s",
|
||||
"Could not create server TCP listening socket %s:%d: %s",
|
||||
server.bindaddr[j] ? server.bindaddr[j] : "*",
|
||||
port, server.neterr);
|
||||
if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT ||
|
||||
errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
|
||||
errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL)
|
||||
continue;
|
||||
return C_ERR;
|
||||
}
|
||||
anetNonBlock(NULL,fds[*count]);
|
||||
|
|
|
@ -39,7 +39,7 @@
|
|||
#include <errno.h> /* errno program_invocation_name program_invocation_short_name */
|
||||
|
||||
#if !defined(HAVE_SETPROCTITLE)
|
||||
#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__)
|
||||
#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__ || defined __DragonFly__)
|
||||
#define HAVE_SETPROCTITLE 1
|
||||
#else
|
||||
#define HAVE_SETPROCTITLE 0
|
||||
|
|
|
@ -37,7 +37,7 @@
|
|||
* mark the entry as deleted, or having the same field as the "master"
|
||||
* entry at the start of the listpack> */
|
||||
#define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */
|
||||
#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */
|
||||
#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */
|
||||
#define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */
|
||||
|
||||
void streamFreeCG(streamCG *cg);
|
||||
|
@ -165,7 +165,7 @@ int streamCompareID(streamID *a, streamID *b) {
|
|||
* Returns the new entry ID populating the 'added_id' structure.
|
||||
*
|
||||
* If 'use_id' is not NULL, the ID is not auto-generated by the function,
|
||||
* but instead the passed ID is uesd to add the new entry. In this case
|
||||
* but instead the passed ID is used to add the new entry. In this case
|
||||
* adding the entry may fail as specified later in this comment.
|
||||
*
|
||||
* The function returns C_OK if the item was added, this is always true
|
||||
|
@ -223,13 +223,13 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
|
|||
*
|
||||
* count and deleted just represent respectively the total number of
|
||||
* entries inside the listpack that are valid, and marked as deleted
|
||||
* (delted flag in the entry flags set). So the total number of items
|
||||
* (deleted flag in the entry flags set). So the total number of items
|
||||
* actually inside the listpack (both deleted and not) is count+deleted.
|
||||
*
|
||||
* The real entries will be encoded with an ID that is just the
|
||||
* millisecond and sequence difference compared to the key stored at
|
||||
* the radix tree node containing the listpack (delta encoding), and
|
||||
* if the fields of the entry are the same as the master enty fields, the
|
||||
* if the fields of the entry are the same as the master entry fields, the
|
||||
* entry flags will specify this fact and the entry fields and number
|
||||
* of fields will be omitted (see later in the code of this function).
|
||||
*
|
||||
|
@ -486,7 +486,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
|
|||
* }
|
||||
* streamIteratorStop(&myiterator); */
|
||||
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) {
|
||||
/* Intialize the iterator and translates the iteration start/stop
|
||||
/* Initialize the iterator and translates the iteration start/stop
|
||||
* elements into a 128 big big-endian number. */
|
||||
if (start) {
|
||||
streamEncodeID(si->start_key,start);
|
||||
|
@ -564,7 +564,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
|
|||
si->lp_ele = lpLast(si->lp);
|
||||
}
|
||||
} else if (si->rev) {
|
||||
/* If we are itereating in the reverse order, and this is not
|
||||
/* If we are iterating in the reverse order, and this is not
|
||||
* the first entry emitted for this listpack, then we already
|
||||
* emitted the current entry, and have to go back to the previous
|
||||
* one. */
|
||||
|
@ -751,7 +751,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
|
|||
}
|
||||
|
||||
/* Stop the stream iterator. The only cleanup we need is to free the rax
|
||||
* itereator, since the stream iterator itself is supposed to be stack
|
||||
* iterator, since the stream iterator itself is supposed to be stack
|
||||
* allocated. */
|
||||
void streamIteratorStop(streamIterator *si) {
|
||||
raxStop(&si->ri);
|
||||
|
@ -847,11 +847,15 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
|
|||
decrRefCount(argv[4]);
|
||||
}
|
||||
|
||||
/* Send the specified range to the client 'c'. The range the client will
|
||||
* receive is between start and end inclusive, if 'count' is non zero, no more
|
||||
* than 'count' elements are sent. The 'end' pointer can be NULL to mean that
|
||||
* we want all the elements from 'start' till the end of the stream. If 'rev'
|
||||
* is non zero, elements are produced in reversed order from end to start.
|
||||
/* Send the stream items in the specified range to the client 'c'. The range
|
||||
* the client will receive is between start and end inclusive, if 'count' is
|
||||
* non zero, no more than 'count' elements are sent.
|
||||
*
|
||||
* The 'end' pointer can be NULL to mean that we want all the elements from
|
||||
* 'start' till the end of the stream. If 'rev' is non zero, elements are
|
||||
* produced in reversed order from end to start.
|
||||
*
|
||||
* The function returns the number of entries emitted.
|
||||
*
|
||||
* If group and consumer are not NULL, the function performs additional work:
|
||||
* 1. It updates the last delivered ID in the group in case we are
|
||||
|
@ -863,15 +867,15 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
|
|||
*
|
||||
* The behavior may be modified passing non-zero flags:
|
||||
*
|
||||
* STREAM_RWR_NOACK: Do not craete PEL entries, that is, the point "3" above
|
||||
* STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above
|
||||
* is not performed.
|
||||
* STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries,
|
||||
* and return the number of entries emitted as usually.
|
||||
* This is used when the function is just used in order
|
||||
* to emit data and there is some higher level logic.
|
||||
*
|
||||
* The final argument 'spi' (stream propagatino info pointer) is a structure
|
||||
* filled with information needed to propagte the command execution to AOF
|
||||
* The final argument 'spi' (stream propagation info pointer) is a structure
|
||||
* filled with information needed to propagate the command execution to AOF
|
||||
* and slaves, in the case a consumer group was passed: we need to generate
|
||||
* XCLAIM commands to create the pending list into AOF/slaves in that case.
|
||||
*
|
||||
|
@ -890,6 +894,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna
|
|||
#define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */
|
||||
#define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array
|
||||
boundaries, just the entries. */
|
||||
#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */
|
||||
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) {
|
||||
void *arraylen_ptr = NULL;
|
||||
size_t arraylen = 0;
|
||||
|
@ -898,15 +903,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
|
|||
streamID id;
|
||||
int propagate_last_id = 0;
|
||||
|
||||
/* If a group was passed, we check if the request is about messages
|
||||
* never delivered so far (normally this happens when ">" ID is passed).
|
||||
*
|
||||
* If instead the client is asking for some history, we serve it
|
||||
* using a different function, so that we return entries *solely*
|
||||
* from its own PEL. This ensures each consumer will always and only
|
||||
* see the history of messages delivered to it and not yet confirmed
|
||||
/* If the client is asking for some history, we serve it using a
|
||||
* different function, so that we return entries *solely* from its
|
||||
* own PEL. This ensures each consumer will always and only see
|
||||
* the history of messages delivered to it and not yet confirmed
|
||||
* as delivered. */
|
||||
if (group && streamCompareID(start,&group->last_id) <= 0) {
|
||||
if (group && (flags & STREAM_RWR_HISTORY)) {
|
||||
return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
|
||||
consumer);
|
||||
}
|
||||
|
@ -1023,7 +1025,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
|
|||
if (end && memcmp(ri.key,end,ri.key_len) > 0) break;
|
||||
streamID thisid;
|
||||
streamDecodeID(ri.key,&thisid);
|
||||
if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL,
|
||||
if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL,
|
||||
STREAM_RWR_RAWENTRIES,NULL) == 0)
|
||||
{
|
||||
/* Note that we may have a not acknowledged entry in the PEL
|
||||
|
@ -1136,7 +1138,7 @@ invalid:
|
|||
}
|
||||
|
||||
/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
|
||||
* 0, to be used when - and + are accetable IDs. */
|
||||
* 0, to be used when - and + are acceptable IDs. */
|
||||
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
|
||||
return streamGenericParseIDOrReply(c,o,id,missing_seq,0);
|
||||
}
|
||||
|
@ -1470,8 +1472,10 @@ void xreadCommand(client *c) {
|
|||
stream *s = o->ptr;
|
||||
streamID *gt = ids+i; /* ID must be greater than this. */
|
||||
int serve_synchronously = 0;
|
||||
int serve_history = 0; /* True for XREADGROUP with ID != ">". */
|
||||
|
||||
/* Check if there are the conditions to serve the client synchronously. */
|
||||
/* Check if there are the conditions to serve the client
|
||||
* synchronously. */
|
||||
if (groups) {
|
||||
/* If the consumer is blocked on a group, we always serve it
|
||||
* synchronously (serving its local history) if the ID specified
|
||||
|
@ -1480,6 +1484,7 @@ void xreadCommand(client *c) {
|
|||
gt->seq != UINT64_MAX)
|
||||
{
|
||||
serve_synchronously = 1;
|
||||
serve_history = 1;
|
||||
} else {
|
||||
/* We also want to serve a consumer in a consumer group
|
||||
* synchronously in case the group top item delivered is smaller
|
||||
|
@ -1515,9 +1520,12 @@ void xreadCommand(client *c) {
|
|||
if (groups) consumer = streamLookupConsumer(groups[i],
|
||||
consumername->ptr,1);
|
||||
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
|
||||
int flags = 0;
|
||||
if (noack) flags |= STREAM_RWR_NOACK;
|
||||
if (serve_history) flags |= STREAM_RWR_HISTORY;
|
||||
streamReplyWithRange(c,s,&start,NULL,count,0,
|
||||
groups ? groups[i] : NULL,
|
||||
consumer, noack, &spi);
|
||||
consumer, flags, &spi);
|
||||
if (groups) server.dirty++;
|
||||
}
|
||||
}
|
||||
|
@ -2155,7 +2163,7 @@ void xclaimCommand(client *c) {
|
|||
|
||||
/* If we stopped because some IDs cannot be parsed, perhaps they
|
||||
* are trailing options. */
|
||||
time_t now = mstime();
|
||||
mstime_t now = mstime();
|
||||
streamID last_id = {0,0};
|
||||
int propagate_last_id = 0;
|
||||
for (; j < c->argc; j++) {
|
||||
|
@ -2274,8 +2282,9 @@ void xclaimCommand(client *c) {
|
|||
if (justid) {
|
||||
addReplyStreamID(c,&id);
|
||||
} else {
|
||||
streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL,
|
||||
STREAM_RWR_RAWENTRIES,NULL);
|
||||
size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
|
||||
NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
|
||||
if (!emitted) addReply(c,shared.nullbulk);
|
||||
}
|
||||
arraylen++;
|
||||
|
||||
|
|
14
src/t_zset.c
14
src/t_zset.c
|
@ -574,12 +574,12 @@ int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
|
|||
switch(c[0]) {
|
||||
case '+':
|
||||
if (c[1] != '\0') return C_ERR;
|
||||
*ex = 0;
|
||||
*ex = 1;
|
||||
*dest = shared.maxstring;
|
||||
return C_OK;
|
||||
case '-':
|
||||
if (c[1] != '\0') return C_ERR;
|
||||
*ex = 0;
|
||||
*ex = 1;
|
||||
*dest = shared.minstring;
|
||||
return C_OK;
|
||||
case '(':
|
||||
|
@ -652,9 +652,8 @@ int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) {
|
|||
zskiplistNode *x;
|
||||
|
||||
/* Test for ranges that will always be empty. */
|
||||
if (sdscmplex(range->min,range->max) > 1 ||
|
||||
(sdscmp(range->min,range->max) == 0 &&
|
||||
(range->minex || range->maxex)))
|
||||
int cmp = sdscmplex(range->min,range->max);
|
||||
if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
|
||||
return 0;
|
||||
x = zsl->tail;
|
||||
if (x == NULL || !zslLexValueGteMin(x->ele,range))
|
||||
|
@ -927,9 +926,8 @@ int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) {
|
|||
unsigned char *p;
|
||||
|
||||
/* Test for ranges that will always be empty. */
|
||||
if (sdscmplex(range->min,range->max) > 1 ||
|
||||
(sdscmp(range->min,range->max) == 0 &&
|
||||
(range->minex || range->maxex)))
|
||||
int cmp = sdscmplex(range->min,range->max);
|
||||
if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex)))
|
||||
return 0;
|
||||
|
||||
p = ziplistIndex(zl,-2); /* Last element. */
|
||||
|
|
21
src/util.c
21
src/util.c
|
@ -39,6 +39,7 @@
|
|||
#include <float.h>
|
||||
#include <stdint.h>
|
||||
#include <errno.h>
|
||||
#include <time.h>
|
||||
|
||||
#include "util.h"
|
||||
#include "sha1.h"
|
||||
|
@ -605,7 +606,7 @@ void getRandomHexChars(char *p, size_t len) {
|
|||
* already, this will be detected and handled correctly.
|
||||
*
|
||||
* The function does not try to normalize everything, but only the obvious
|
||||
* case of one or more "../" appearning at the start of "filename"
|
||||
* case of one or more "../" appearing at the start of "filename"
|
||||
* relative path. */
|
||||
sds getAbsolutePath(char *filename) {
|
||||
char cwd[1024];
|
||||
|
@ -652,6 +653,24 @@ sds getAbsolutePath(char *filename) {
|
|||
return abspath;
|
||||
}
|
||||
|
||||
/*
|
||||
* Gets the proper timezone in a more portable fashion
|
||||
* i.e timezone variables are linux specific.
|
||||
*/
|
||||
|
||||
unsigned long getTimeZone(void) {
|
||||
#ifdef __linux__
|
||||
return timezone;
|
||||
#else
|
||||
struct timeval tv;
|
||||
struct timezone tz;
|
||||
|
||||
gettimeofday(&tv, &tz);
|
||||
|
||||
return tz.tz_minuteswest * 60UL;
|
||||
#endif
|
||||
}
|
||||
|
||||
/* Return true if the specified path is just a file basename without any
|
||||
* relative or absolute path. This function just checks that no / or \
|
||||
* character exists inside the specified path, that's enough in the
|
||||
|
|
|
@ -50,6 +50,7 @@ int string2ld(const char *s, size_t slen, long double *dp);
|
|||
int d2string(char *buf, size_t len, double value);
|
||||
int ld2string(char *buf, size_t len, long double value, int humanfriendly);
|
||||
sds getAbsolutePath(char *filename);
|
||||
unsigned long getTimeZone(void);
|
||||
int pathIsBaseName(char *path);
|
||||
|
||||
#ifdef REDIS_TEST
|
||||
|
|
|
@ -91,6 +91,14 @@ proc wait_for_sync r {
|
|||
}
|
||||
}
|
||||
|
||||
proc wait_for_ofs_sync {r1 r2} {
|
||||
wait_for_condition 50 100 {
|
||||
[status $r1 master_repl_offset] eq [status $r2 master_repl_offset]
|
||||
} else {
|
||||
fail "replica didn't sync in time"
|
||||
}
|
||||
}
|
||||
|
||||
# Random integer between 0 and max (excluded).
|
||||
proc randomInt {max} {
|
||||
expr {int(rand()*$max)}
|
||||
|
|
|
@ -90,6 +90,7 @@ start_server {tags {"defrag"}} {
|
|||
test "Active defrag big keys" {
|
||||
r flushdb
|
||||
r config resetstat
|
||||
r config set save "" ;# prevent bgsave from interfereing with save below
|
||||
r config set activedefrag no
|
||||
r config set active-defrag-max-scan-fields 1000
|
||||
r config set active-defrag-threshold-lower 5
|
||||
|
|
|
@ -108,6 +108,93 @@ start_server {
|
|||
assert {$c == 5}
|
||||
}
|
||||
|
||||
test {XREADGROUP will not report data on empty history. Bug #5577} {
|
||||
r del events
|
||||
r xadd events * a 1
|
||||
r xadd events * b 2
|
||||
r xadd events * c 3
|
||||
r xgroup create events mygroup 0
|
||||
|
||||
# Current local PEL should be empty
|
||||
set res [r xpending events mygroup - + 10]
|
||||
assert {[llength $res] == 0}
|
||||
|
||||
# So XREADGROUP should read an empty history as well
|
||||
set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
|
||||
assert {[llength [lindex $res 0 1]] == 0}
|
||||
|
||||
# We should fetch all the elements in the stream asking for >
|
||||
set res [r xreadgroup group mygroup myconsumer count 3 streams events >]
|
||||
assert {[llength [lindex $res 0 1]] == 3}
|
||||
|
||||
# Now the history is populated with three not acked entries
|
||||
set res [r xreadgroup group mygroup myconsumer count 3 streams events 0]
|
||||
assert {[llength [lindex $res 0 1]] == 3}
|
||||
}
|
||||
|
||||
test {XREADGROUP history reporting of deleted entries. Bug #5570} {
|
||||
r del mystream
|
||||
r XGROUP CREATE mystream mygroup $ MKSTREAM
|
||||
r XADD mystream 1 field1 A
|
||||
r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
|
||||
r XADD mystream MAXLEN 1 2 field1 B
|
||||
r XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
|
||||
|
||||
# Now we have two pending entries, however one should be deleted
|
||||
# and one should be ok (we should only see "B")
|
||||
set res [r XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0-1]
|
||||
assert {[lindex $res 0 1 0] == {1-0 {}}}
|
||||
assert {[lindex $res 0 1 1] == {2-0 {field1 B}}}
|
||||
}
|
||||
|
||||
test {XCLAIM can claim PEL items from another consumer} {
|
||||
# Add 3 items into the stream, and create a consumer group
|
||||
r del mystream
|
||||
set id1 [r XADD mystream * a 1]
|
||||
set id2 [r XADD mystream * b 2]
|
||||
set id3 [r XADD mystream * c 3]
|
||||
r XGROUP CREATE mystream mygroup 0
|
||||
|
||||
# Client 1 reads item 1 from the stream without acknowledgements.
|
||||
# Client 2 then claims pending item 1 from the PEL of client 1
|
||||
set reply [
|
||||
r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
|
||||
]
|
||||
assert {[llength [lindex $reply 0 1 0 1]] == 2}
|
||||
assert {[lindex $reply 0 1 0 1] eq {a 1}}
|
||||
r debug sleep 0.2
|
||||
set reply [
|
||||
r XCLAIM mystream mygroup client2 10 $id1
|
||||
]
|
||||
assert {[llength [lindex $reply 0 1]] == 2}
|
||||
assert {[lindex $reply 0 1] eq {a 1}}
|
||||
|
||||
# Client 1 reads another 2 items from stream
|
||||
r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream >
|
||||
r debug sleep 0.2
|
||||
|
||||
# Delete item 2 from the stream. Now client 1 has PEL that contains
|
||||
# only item 3. Try to use client 2 to claim the deleted item 2
|
||||
# from the PEL of client 1, this should return nil
|
||||
r XDEL mystream $id2
|
||||
set reply [
|
||||
r XCLAIM mystream mygroup client2 10 $id2
|
||||
]
|
||||
assert {[llength $reply] == 1}
|
||||
assert_equal "" [lindex $reply 0]
|
||||
|
||||
# Delete item 3 from the stream. Now client 1 has PEL that is empty.
|
||||
# Try to use client 2 to claim the deleted item 3 from the PEL
|
||||
# of client 1, this should return nil
|
||||
r debug sleep 0.2
|
||||
r XDEL mystream $id3
|
||||
set reply [
|
||||
r XCLAIM mystream mygroup client2 10 $id3
|
||||
]
|
||||
assert {[llength $reply] == 1}
|
||||
assert_equal "" [lindex $reply 0]
|
||||
}
|
||||
|
||||
start_server {} {
|
||||
set master [srv -1 client]
|
||||
set master_host [srv -1 host]
|
||||
|
@ -144,6 +231,8 @@ start_server {
|
|||
}
|
||||
}
|
||||
|
||||
wait_for_ofs_sync $master $slave
|
||||
|
||||
# Turn slave into master
|
||||
$slave slaveof no one
|
||||
|
||||
|
|
|
@ -388,7 +388,7 @@ start_server {tags {"zset"}} {
|
|||
0 omega}
|
||||
}
|
||||
|
||||
test "ZRANGEBYLEX/ZREVRANGEBYLEX/ZCOUNT basics" {
|
||||
test "ZRANGEBYLEX/ZREVRANGEBYLEX/ZLEXCOUNT basics" {
|
||||
create_default_lex_zset
|
||||
|
||||
# inclusive range
|
||||
|
@ -416,6 +416,22 @@ start_server {tags {"zset"}} {
|
|||
assert_equal {} [r zrevrangebylex zset \[elez \[elex]
|
||||
assert_equal {} [r zrevrangebylex zset (hill (omega]
|
||||
}
|
||||
|
||||
test "ZLEXCOUNT advanced" {
|
||||
create_default_lex_zset
|
||||
|
||||
assert_equal 9 [r zlexcount zset - +]
|
||||
assert_equal 0 [r zlexcount zset + -]
|
||||
assert_equal 0 [r zlexcount zset + \[c]
|
||||
assert_equal 0 [r zlexcount zset \[c -]
|
||||
assert_equal 8 [r zlexcount zset \[bar +]
|
||||
assert_equal 5 [r zlexcount zset \[bar \[foo]
|
||||
assert_equal 4 [r zlexcount zset \[bar (foo]
|
||||
assert_equal 4 [r zlexcount zset (bar \[foo]
|
||||
assert_equal 3 [r zlexcount zset (bar (foo]
|
||||
assert_equal 5 [r zlexcount zset - (foo]
|
||||
assert_equal 1 [r zlexcount zset (maxstring +]
|
||||
}
|
||||
|
||||
test "ZRANGEBYSLEX with LIMIT" {
|
||||
create_default_lex_zset
|
||||
|
|
Loading…
Reference in New Issue