diff --git a/src/Makefile b/src/Makefile index 912cbc19f..9edbb4581 100644 --- a/src/Makefile +++ b/src/Makefile @@ -21,6 +21,11 @@ NODEPS:=clean distclean # Default settings STD=-std=c99 -pedantic -DREDIS_STATIC='' +ifneq (,$(findstring clang,$(CC))) +ifneq (,$(findstring FreeBSD,$(uname_S))) + STD+=-Wno-c11-extensions +endif +endif WARN=-Wall -W -Wno-missing-field-initializers OPT=$(OPTIMIZATION) @@ -97,10 +102,20 @@ else ifeq ($(uname_S),OpenBSD) # OpenBSD FINAL_LIBS+= -lpthread + ifeq ($(USE_BACKTRACE),yes) + FINAL_CFLAGS+= -DUSE_BACKTRACE -I/usr/local/include + FINAL_LDFLAGS+= -L/usr/local/lib + FINAL_LIBS+= -lexecinfo + endif + else ifeq ($(uname_S),FreeBSD) # FreeBSD - FINAL_LIBS+= -lpthread + FINAL_LIBS+= -lpthread -lexecinfo +else +ifeq ($(uname_S),DragonFly) + # FreeBSD + FINAL_LIBS+= -lpthread -lexecinfo else # All the other OSes (notably Linux) FINAL_LDFLAGS+= -rdynamic @@ -110,6 +125,7 @@ endif endif endif endif +endif # Include paths to dependencies FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src diff --git a/src/config.c b/src/config.c index d1bfaa3f7..8a2690a0c 100644 --- a/src/config.c +++ b/src/config.c @@ -684,7 +684,7 @@ void loadServerConfigFromString(char *config) { goto loaderr; } } else if ((!strcasecmp(argv[0],"cluster-slave-no-failover") || - !strcasecmp(argv[0],"cluster-replica-no-failiver")) && + !strcasecmp(argv[0],"cluster-replica-no-failover")) && argc == 2) { server.cluster_slave_no_failover = yesnotoi(argv[1]); diff --git a/src/config.h b/src/config.h index ee3ad508e..efa9d11f2 100644 --- a/src/config.h +++ b/src/config.h @@ -62,7 +62,9 @@ #endif /* Test for backtrace() */ -#if defined(__APPLE__) || (defined(__linux__) && defined(__GLIBC__)) +#if defined(__APPLE__) || (defined(__linux__) && defined(__GLIBC__)) || \ + defined(__FreeBSD__) || (defined(__OpenBSD__) && defined(USE_BACKTRACE))\ + || defined(__DragonFly__) #define HAVE_BACKTRACE 1 #endif diff --git a/src/debug.c b/src/debug.c index 8cc53d92f..3cb567520 100644 --- a/src/debug.c +++ b/src/debug.c @@ -37,7 +37,11 @@ #ifdef HAVE_BACKTRACE #include +#ifndef __OpenBSD__ #include +#else +typedef ucontext_t sigcontext_t; +#endif #include #include "bio.h" #include @@ -729,6 +733,22 @@ static void *getMcontextEip(ucontext_t *uc) { #elif defined(__aarch64__) /* Linux AArch64 */ return (void*) uc->uc_mcontext.pc; #endif +#elif defined(__FreeBSD__) + /* FreeBSD */ + #if defined(__i386__) + return (void*) uc->uc_mcontext.mc_eip; + #elif defined(__x86_64__) + return (void*) uc->uc_mcontext.mc_rip; + #endif +#elif defined(__OpenBSD__) + /* OpenBSD */ + #if defined(__i386__) + return (void*) uc->sc_eip; + #elif defined(__x86_64__) + return (void*) uc->sc_rip; + #endif +#elif defined(__DragonFly__) + return (void*) uc->uc_mcontext.mc_rip; #else return NULL; #endif @@ -870,6 +890,145 @@ void logRegisters(ucontext_t *uc) { ); logStackContent((void**)uc->uc_mcontext.gregs[15]); #endif +#elif defined(__FreeBSD__) + #if defined(__x86_64__) + serverLog(LL_WARNING, + "\n" + "RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n" + "RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n" + "R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n" + "R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n" + "RIP:%016lx EFL:%016lx\nCSGSFS:%016lx", + (unsigned long) uc->uc_mcontext.mc_rax, + (unsigned long) uc->uc_mcontext.mc_rbx, + (unsigned long) uc->uc_mcontext.mc_rcx, + (unsigned long) uc->uc_mcontext.mc_rdx, + (unsigned long) uc->uc_mcontext.mc_rdi, + (unsigned long) uc->uc_mcontext.mc_rsi, + (unsigned long) uc->uc_mcontext.mc_rbp, + (unsigned long) uc->uc_mcontext.mc_rsp, + (unsigned long) uc->uc_mcontext.mc_r8, + (unsigned long) uc->uc_mcontext.mc_r9, + (unsigned long) uc->uc_mcontext.mc_r10, + (unsigned long) uc->uc_mcontext.mc_r11, + (unsigned long) uc->uc_mcontext.mc_r12, + (unsigned long) uc->uc_mcontext.mc_r13, + (unsigned long) uc->uc_mcontext.mc_r14, + (unsigned long) uc->uc_mcontext.mc_r15, + (unsigned long) uc->uc_mcontext.mc_rip, + (unsigned long) uc->uc_mcontext.mc_rflags, + (unsigned long) uc->uc_mcontext.mc_cs + ); + logStackContent((void**)uc->uc_mcontext.mc_rsp); + #elif defined(__i386__) + serverLog(LL_WARNING, + "\n" + "EAX:%08lx EBX:%08lx ECX:%08lx EDX:%08lx\n" + "EDI:%08lx ESI:%08lx EBP:%08lx ESP:%08lx\n" + "SS :%08lx EFL:%08lx EIP:%08lx CS:%08lx\n" + "DS :%08lx ES :%08lx FS :%08lx GS:%08lx", + (unsigned long) uc->uc_mcontext.mc_eax, + (unsigned long) uc->uc_mcontext.mc_ebx, + (unsigned long) uc->uc_mcontext.mc_ebx, + (unsigned long) uc->uc_mcontext.mc_edx, + (unsigned long) uc->uc_mcontext.mc_edi, + (unsigned long) uc->uc_mcontext.mc_esi, + (unsigned long) uc->uc_mcontext.mc_ebp, + (unsigned long) uc->uc_mcontext.mc_esp, + (unsigned long) uc->uc_mcontext.mc_ss, + (unsigned long) uc->uc_mcontext.mc_eflags, + (unsigned long) uc->uc_mcontext.mc_eip, + (unsigned long) uc->uc_mcontext.mc_cs, + (unsigned long) uc->uc_mcontext.mc_es, + (unsigned long) uc->uc_mcontext.mc_fs, + (unsigned long) uc->uc_mcontext.mc_gs + ); + logStackContent((void**)uc->uc_mcontext.mc_esp); + #endif +#elif defined(__OpenBSD__) + #if defined(__x86_64__) + serverLog(LL_WARNING, + "\n" + "RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n" + "RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n" + "R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n" + "R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n" + "RIP:%016lx EFL:%016lx\nCSGSFS:%016lx", + (unsigned long) uc->sc_rax, + (unsigned long) uc->sc_rbx, + (unsigned long) uc->sc_rcx, + (unsigned long) uc->sc_rdx, + (unsigned long) uc->sc_rdi, + (unsigned long) uc->sc_rsi, + (unsigned long) uc->sc_rbp, + (unsigned long) uc->sc_rsp, + (unsigned long) uc->sc_r8, + (unsigned long) uc->sc_r9, + (unsigned long) uc->sc_r10, + (unsigned long) uc->sc_r11, + (unsigned long) uc->sc_r12, + (unsigned long) uc->sc_r13, + (unsigned long) uc->sc_r14, + (unsigned long) uc->sc_r15, + (unsigned long) uc->sc_rip, + (unsigned long) uc->sc_rflags, + (unsigned long) uc->sc_cs + ); + logStackContent((void**)uc->sc_rsp); + #elif defined(__i386__) + serverLog(LL_WARNING, + "\n" + "EAX:%08lx EBX:%08lx ECX:%08lx EDX:%08lx\n" + "EDI:%08lx ESI:%08lx EBP:%08lx ESP:%08lx\n" + "SS :%08lx EFL:%08lx EIP:%08lx CS:%08lx\n" + "DS :%08lx ES :%08lx FS :%08lx GS:%08lx", + (unsigned long) uc->sc_eax, + (unsigned long) uc->sc_ebx, + (unsigned long) uc->sc_ebx, + (unsigned long) uc->sc_edx, + (unsigned long) uc->sc_edi, + (unsigned long) uc->sc_esi, + (unsigned long) uc->sc_ebp, + (unsigned long) uc->sc_esp, + (unsigned long) uc->sc_ss, + (unsigned long) uc->sc_eflags, + (unsigned long) uc->sc_eip, + (unsigned long) uc->sc_cs, + (unsigned long) uc->sc_es, + (unsigned long) uc->sc_fs, + (unsigned long) uc->sc_gs + ); + logStackContent((void**)uc->sc_esp); + #endif +#elif defined(__DragonFly__) + serverLog(LL_WARNING, + "\n" + "RAX:%016lx RBX:%016lx\nRCX:%016lx RDX:%016lx\n" + "RDI:%016lx RSI:%016lx\nRBP:%016lx RSP:%016lx\n" + "R8 :%016lx R9 :%016lx\nR10:%016lx R11:%016lx\n" + "R12:%016lx R13:%016lx\nR14:%016lx R15:%016lx\n" + "RIP:%016lx EFL:%016lx\nCSGSFS:%016lx", + (unsigned long) uc->uc_mcontext.mc_rax, + (unsigned long) uc->uc_mcontext.mc_rbx, + (unsigned long) uc->uc_mcontext.mc_rcx, + (unsigned long) uc->uc_mcontext.mc_rdx, + (unsigned long) uc->uc_mcontext.mc_rdi, + (unsigned long) uc->uc_mcontext.mc_rsi, + (unsigned long) uc->uc_mcontext.mc_rbp, + (unsigned long) uc->uc_mcontext.mc_rsp, + (unsigned long) uc->uc_mcontext.mc_r8, + (unsigned long) uc->uc_mcontext.mc_r9, + (unsigned long) uc->uc_mcontext.mc_r10, + (unsigned long) uc->uc_mcontext.mc_r11, + (unsigned long) uc->uc_mcontext.mc_r12, + (unsigned long) uc->uc_mcontext.mc_r13, + (unsigned long) uc->uc_mcontext.mc_r14, + (unsigned long) uc->uc_mcontext.mc_r15, + (unsigned long) uc->uc_mcontext.mc_rip, + (unsigned long) uc->uc_mcontext.mc_rflags, + (unsigned long) uc->uc_mcontext.mc_cs + ); + logStackContent((void**)uc->uc_mcontext.mc_rsp); #else serverLog(LL_WARNING, " Dumping of registers not supported for this OS/arch"); @@ -1189,6 +1348,8 @@ void serverLogHexDump(int level, char *descr, void *value, size_t len) { void watchdogSignalHandler(int sig, siginfo_t *info, void *secret) { #ifdef HAVE_BACKTRACE ucontext_t *uc = (ucontext_t*) secret; +#else + (void)secret; #endif UNUSED(info); UNUSED(sig); diff --git a/src/latency.c b/src/latency.c index d89c48db1..97e6a702e 100644 --- a/src/latency.c +++ b/src/latency.c @@ -562,11 +562,21 @@ sds latencyCommandGenSparkeline(char *event, struct latencyTimeSeries *ts) { * * LATENCY HISTORY: return time-latency samples for the specified event. * LATENCY LATEST: return the latest latency for all the events classes. - * LATENCY DOCTOR: returns an human readable analysis of instance latency. + * LATENCY DOCTOR: returns a human readable analysis of instance latency. * LATENCY GRAPH: provide an ASCII graph of the latency of the specified event. * LATENCY RESET: reset data of a specified event or all the data if no event provided. */ void latencyCommand(client *c) { + const char *help[] = { +"DOCTOR -- Returns a human readable latency analysis report.", +"GRAPH -- Returns an ASCII latency graph for the event class.", +"HISTORY -- Returns time-latency samples for the event class.", +"LATEST -- Returns the latest latency samples for all events.", +"RESET [event ...] -- Resets latency data of one or more event classes.", +" (default: reset all data for all event classes)", +"HELP -- Prints this help.", +NULL + }; struct latencyTimeSeries *ts; if (!strcasecmp(c->argv[1]->ptr,"history") && c->argc == 3) { @@ -611,8 +621,10 @@ void latencyCommand(client *c) { resets += latencyResetEvent(c->argv[j]->ptr); addReplyLongLong(c,resets); } + } else if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc >= 2) { + addReplyHelp(c, help); } else { - addReply(c,shared.syntaxerr); + addReplySubcommandSyntaxError(c); } return; diff --git a/src/networking.c b/src/networking.c index e255e64df..7d387dabc 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2109,6 +2109,7 @@ int checkClientOutputBufferLimits(client *c) { * called from contexts where the client can't be freed safely, i.e. from the * lower level functions pushing data inside the client output buffers. */ void asyncCloseClientOnOutputBufferLimitReached(client *c) { + if (c->fd == -1) return; /* It is unsafe to free fake clients. */ serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return; if (checkClientOutputBufferLimits(c)) { diff --git a/src/object.c b/src/object.c index 2e2ebee21..48ffa42b9 100644 --- a/src/object.c +++ b/src/object.c @@ -1285,9 +1285,18 @@ NULL * * Usage: MEMORY usage */ void memoryCommand(client *c) { - robj *o; - - if (!strcasecmp(c->argv[1]->ptr,"usage") && c->argc >= 3) { + if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) { + const char *help[] = { +"DOCTOR - Return memory problems reports.", +"MALLOC-STATS -- Return internal statistics report from the memory allocator.", +"PURGE -- Attempt to purge dirty pages for reclamation by the allocator.", +"STATS -- Return information about the memory usage of the server.", +"USAGE [SAMPLES ] -- Return memory in bytes used by and its value. Nested values are sampled up to times (default: 5).", +NULL + }; + addReplyHelp(c, help); + } else if (!strcasecmp(c->argv[1]->ptr,"usage") && c->argc >= 3) { + dictEntry *de; long long samples = OBJ_COMPUTE_SIZE_DEF_SAMPLES; for (int j = 3; j < c->argc; j++) { if (!strcasecmp(c->argv[j]->ptr,"samples") && @@ -1306,10 +1315,12 @@ void memoryCommand(client *c) { return; } } - if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nullbulk)) - == NULL) return; - size_t usage = objectComputeSize(o,samples); - usage += sdsAllocSize(c->argv[2]->ptr); + if ((de = dictFind(c->db->dict,c->argv[2]->ptr)) == NULL) { + addReply(c, shared.nullbulk); + return; + } + size_t usage = objectComputeSize(dictGetVal(de),samples); + usage += sdsAllocSize(dictGetKey(de)); usage += sizeof(dictEntry); addReplyLongLong(c,usage); } else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) { @@ -1434,19 +1445,7 @@ void memoryCommand(client *c) { addReply(c, shared.ok); /* Nothing to do for other allocators. */ #endif - } else if (!strcasecmp(c->argv[1]->ptr,"help") && c->argc == 2) { - addReplyMultiBulkLen(c,5); - addReplyBulkCString(c, -"MEMORY DOCTOR - Outputs memory problems report"); - addReplyBulkCString(c, -"MEMORY USAGE [SAMPLES ] - Estimate memory usage of key"); - addReplyBulkCString(c, -"MEMORY STATS - Show memory usage details"); - addReplyBulkCString(c, -"MEMORY PURGE - Ask the allocator to release memory"); - addReplyBulkCString(c, -"MEMORY MALLOC-STATS - Show allocator internal stats"); } else { - addReplyError(c,"Syntax error. Try MEMORY HELP"); + addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)c->argv[1]->ptr); } } diff --git a/src/rdb.c b/src/rdb.c index 3e43cb4e4..47555101e 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1645,6 +1645,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { * node: the entries inside the listpack itself are delta-encoded * relatively to this ID. */ sds nodekey = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL); + if (nodekey == NULL) { + rdbExitReportCorruptRDB("Stream master ID loading failed: invalid encoding or I/O error."); + } if (sdslen(nodekey) != sizeof(streamID)) { rdbExitReportCorruptRDB("Stream node key entry is not the " "size of a stream ID"); diff --git a/src/redis-cli.c b/src/redis-cli.c index 57f812b90..f307d31cf 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2726,8 +2726,7 @@ static int clusterManagerSetSlot(clusterManagerNode *node1, if (err != NULL) { *err = zmalloc((reply->len + 1) * sizeof(char)); strcpy(*err, reply->str); - CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, err); - } + } else CLUSTER_MANAGER_PRINT_REPLY_ERROR(node1, reply->str); goto cleanup; } cleanup: @@ -2830,7 +2829,7 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, if (err != NULL) { *err = zmalloc((reply->len + 1) * sizeof(char)); strcpy(*err, reply->str); - CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, err); + CLUSTER_MANAGER_PRINT_REPLY_ERROR(source, *err); } goto next; } @@ -2848,14 +2847,21 @@ static int clusterManagerMigrateKeysInSlot(clusterManagerNode *source, if (migrate_reply == NULL) goto next; if (migrate_reply->type == REDIS_REPLY_ERROR) { if (do_fix && strstr(migrate_reply->str, "BUSYKEY")) { + /* If the key already exists, try to migrate keys + * adding REPLACE option. + * If the key's slot is not served, try to assign slot + * to the target node. */ + int is_busy = (strstr(migrate_reply->str, "BUSYKEY") != NULL); + if (strstr(migrate_reply->str, "slot not served") != NULL) + clusterManagerSetSlot(source, target, slot, "node", NULL); clusterManagerLogWarn("*** Target key exists. " "Replacing it for FIX.\n"); freeReplyObject(migrate_reply); - /* Try to migrate keys adding REPLACE option. */ migrate_reply = clusterManagerMigrateKeysInReply(source, target, reply, - 1, timeout, + is_busy, + timeout, NULL); success = (migrate_reply != NULL && migrate_reply->type != REDIS_REPLY_ERROR); @@ -2941,7 +2947,7 @@ static int clusterManagerMoveSlot(clusterManagerNode *source, if (err != NULL) { *err = zmalloc((r->len + 1) * sizeof(char)); strcpy(*err, r->str); - CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, err); + CLUSTER_MANAGER_PRINT_REPLY_ERROR(n, *err); } } freeReplyObject(r); @@ -3301,8 +3307,8 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) { nodename = token; tot_size = (p - token); name_len = tot_size++; // Make room for ':' in tot_size - } else if (i == 8) break; - i++; + } + if (++i == 8) break; } if (i != 8) continue; if (nodename == NULL) continue; @@ -3341,7 +3347,7 @@ static sds clusterManagerGetConfigSignature(clusterManagerNode *node) { char *sp = cfg + name_len; *(sp++) = ':'; for (i = 0; i < c; i++) { - if (i > 0) *(sp++) = '|'; + if (i > 0) *(sp++) = ','; int slen = strlen(slots[i]); memcpy(sp, slots[i], slen); sp += slen; @@ -3499,6 +3505,34 @@ static clusterManagerNode *clusterManagerNodeWithLeastReplicas() { return node; } +/* This function returns a random master node, return NULL if none */ + +static clusterManagerNode *clusterManagerNodeMasterRandom() { + int master_count = 0; + int idx; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + master_count++; + } + + srand(time(NULL)); + idx = rand() % master_count; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + if (!idx--) { + return n; + } + } + /* Can not be reached */ + return NULL; +} + static int clusterManagerFixSlotsCoverage(char *all_slots) { int i, fixed = 0; list *none = NULL, *single = NULL, *multi = NULL; @@ -3571,22 +3605,25 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { "across the cluster:\n"); clusterManagerPrintSlotsList(none); if (confirmWithYes("Fix these slots by covering with a random node?")){ - srand(time(NULL)); listIter li; listNode *ln; listRewind(none, &li); while ((ln = listNext(&li)) != NULL) { sds slot = ln->value; - long idx = (long) (rand() % listLength(cluster_manager.nodes)); - listNode *node_n = listIndex(cluster_manager.nodes, idx); - assert(node_n != NULL); - clusterManagerNode *n = node_n->value; + clusterManagerNode *n = clusterManagerNodeMasterRandom(); clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3614,10 +3651,17 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerNode *n = fn->value; clusterManagerLogInfo(">>> Covering slot %s with %s:%d\n", slot, n->ip, n->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(n, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3651,7 +3695,11 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { clusterManagerLogInfo(">>> Covering slot %s moving keys " "to %s:%d\n", slot, target->ip, target->port); + /* Ensure the slot is not already assigned. */ redisReply *r = CLUSTER_MANAGER_COMMAND(target, + "CLUSTER DELSLOTS %s", slot); + if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER ADDSLOTS %s", slot); if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; if (r) freeReplyObject(r); @@ -3660,6 +3708,9 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { "CLUSTER SETSLOT %s %s", slot, "STABLE"); if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; if (r) freeReplyObject(r); + r = CLUSTER_MANAGER_COMMAND(target, "CLUSTER BUMPEPOCH"); + if (!clusterManagerCheckRedisReply(target, r, NULL)) fixed = -1; + if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; /* Since CLUSTER ADDSLOTS succeeded, we also update the slot * info into the node struct, in order to keep it synced */ @@ -3670,14 +3721,22 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { while ((nln = listNext(&nli)) != NULL) { clusterManagerNode *src = nln->value; if (src == target) continue; + /* Assign the slot to target node in the source node. */ + redisReply *r = CLUSTER_MANAGER_COMMAND(src, + "CLUSTER SETSLOT %s %s %s", slot, + "NODE", target->name); + if (!clusterManagerCheckRedisReply(src, r, NULL)) + fixed = -1; + if (r) freeReplyObject(r); + if (fixed < 0) goto cleanup; /* Set the source node in 'importing' state * (even if we will actually migrate keys away) * in order to avoid receiving redirections * for MIGRATE. */ - redisReply *r = CLUSTER_MANAGER_COMMAND(src, + r = CLUSTER_MANAGER_COMMAND(src, "CLUSTER SETSLOT %s %s %s", slot, "IMPORTING", target->name); - if (!clusterManagerCheckRedisReply(target, r, NULL)) + if (!clusterManagerCheckRedisReply(src, r, NULL)) fixed = -1; if (r) freeReplyObject(r); if (fixed < 0) goto cleanup; @@ -3687,6 +3746,13 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { fixed = -1; goto cleanup; } + r = CLUSTER_MANAGER_COMMAND(src, + "CLUSTER SETSLOT %s %s", slot, + "STABLE"); + if (!clusterManagerCheckRedisReply(src, r, NULL)) + fixed = -1; + if (r) freeReplyObject(r); + if (fixed < 0) goto cleanup; } fixed++; } @@ -3720,11 +3786,22 @@ static int clusterManagerFixOpenSlot(int slot) { while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; - if (n->slots[slot]) { - if (owner == NULL) owner = n; - listAddNodeTail(owners, n); + if (n->slots[slot]) listAddNodeTail(owners, n); + else { + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER COUNTKEYSINSLOT %d", slot); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (success && r->integer > 0) { + clusterManagerLogWarn("*** Found keys about slot %d " + "in non-owner node %s:%d!\n", slot, + n->ip, n->port); + listAddNodeTail(owners, n); + } + if (r) freeReplyObject(r); + if (!success) goto cleanup; } } + if (listLength(owners) == 1) owner = listFirst(owners)->value; listRewind(cluster_manager.nodes, &li); while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; @@ -3735,7 +3812,7 @@ static int clusterManagerFixOpenSlot(int slot) { sds migrating_slot = n->migrating[i]; if (atoi(migrating_slot) == slot) { char *sep = (listLength(migrating) == 0 ? "" : ","); - migrating_str = sdscatfmt(migrating_str, "%s%S:%u", + migrating_str = sdscatfmt(migrating_str, "%s%s:%u", sep, n->ip, n->port); listAddNodeTail(migrating, n); is_migrating = 1; @@ -3748,7 +3825,7 @@ static int clusterManagerFixOpenSlot(int slot) { sds importing_slot = n->importing[i]; if (atoi(importing_slot) == slot) { char *sep = (listLength(importing) == 0 ? "" : ","); - importing_str = sdscatfmt(importing_str, "%s%S:%u", + importing_str = sdscatfmt(importing_str, "%s%s:%u", sep, n->ip, n->port); listAddNodeTail(importing, n); is_importing = 1; @@ -3767,15 +3844,20 @@ static int clusterManagerFixOpenSlot(int slot) { clusterManagerLogWarn("*** Found keys about slot %d " "in node %s:%d!\n", slot, n->ip, n->port); + char *sep = (listLength(importing) == 0 ? "" : ","); + importing_str = sdscatfmt(importing_str, "%s%S:%u", + sep, n->ip, n->port); listAddNodeTail(importing, n); } if (r) freeReplyObject(r); if (!success) goto cleanup; } } - printf("Set as migrating in: %s\n", migrating_str); - printf("Set as importing in: %s\n", importing_str); - /* If there is no slot owner, set as owner the slot with the biggest + if (sdslen(migrating_str) > 0) + printf("Set as migrating in: %s\n", migrating_str); + if (sdslen(importing_str) > 0) + printf("Set as importing in: %s\n", importing_str); + /* If there is no slot owner, set as owner the node with the biggest * number of keys, among the set of migrating / importing nodes. */ if (owner == NULL) { clusterManagerLogInfo(">>> Nobody claims ownership, " @@ -3799,6 +3881,15 @@ static int clusterManagerFixOpenSlot(int slot) { success = clusterManagerCheckRedisReply(owner, reply, NULL); if (reply) freeReplyObject(reply); if (!success) goto cleanup; + /* Ensure that the slot is unassigned before assigning it to the + * owner. */ + reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER DELSLOTS %d", slot); + success = clusterManagerCheckRedisReply(owner, reply, NULL); + /* Ignore "already unassigned" error. */ + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + strstr(reply->str, "already unassigned") != NULL) success = 1; + if (reply) freeReplyObject(reply); + if (!success) goto cleanup; reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER ADDSLOTS %d", slot); success = clusterManagerCheckRedisReply(owner, reply, NULL); if (reply) freeReplyObject(reply); @@ -3827,32 +3918,43 @@ static int clusterManagerFixOpenSlot(int slot) { * in migrating state, since migrating is a valid state only for * slot owners. */ if (listLength(owners) > 1) { - owner = clusterManagerGetNodeWithMostKeysInSlot(owners, slot, NULL); + /* Owner cannot be NULL at this point, since if there are more owners, + * the owner has been set in the previous condition (owner == NULL). */ + assert(owner != NULL); listRewind(owners, &li); redisReply *reply = NULL; while ((ln = listNext(&li)) != NULL) { clusterManagerNode *n = ln->value; if (n == owner) continue; - reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOT %d", slot); + reply = CLUSTER_MANAGER_COMMAND(n, "CLUSTER DELSLOTS %d", slot); success = clusterManagerCheckRedisReply(n, reply, NULL); + /* Ignore "already unassigned" error. */ + if (!success && reply && reply->type == REDIS_REPLY_ERROR && + strstr(reply->str, "already unassigned") != NULL) success = 1; if (reply) freeReplyObject(reply); if (!success) goto cleanup; + n->slots[slot] = 0; + /* Assign the slot to the owner in the node 'n' configuration.' */ + success = clusterManagerSetSlot(n, owner, slot, "node", NULL); + if (!success) goto cleanup; success = clusterManagerSetSlot(n, owner, slot, "importing", NULL); if (!success) goto cleanup; - clusterManagerRemoveNodeFromList(importing, n); //Avoid duplicates + /* Avoid duplicates. */ + clusterManagerRemoveNodeFromList(importing, n); listAddNodeTail(importing, n); + /* Ensure that the node is not in the migrating list. */ + clusterManagerRemoveNodeFromList(migrating, n); } - reply = CLUSTER_MANAGER_COMMAND(owner, "CLUSTER BUMPEPOCH"); - success = clusterManagerCheckRedisReply(owner, reply, NULL); - if (reply) freeReplyObject(reply); - if (!success) goto cleanup; } int move_opts = CLUSTER_MANAGER_OPT_VERBOSE; - /* Case 1: The slot is in migrating state in one slot, and in - * importing state in 1 slot. That's trivial to address. */ + /* Case 1: The slot is in migrating state in one node, and in + * importing state in 1 node. That's trivial to address. */ if (listLength(migrating) == 1 && listLength(importing) == 1) { clusterManagerNode *src = listFirst(migrating)->value; clusterManagerNode *dst = listFirst(importing)->value; + clusterManagerLogInfo(">>> Case 1: Moving slot %d from " + "%s:%d to %s:%d\n", slot, + src->ip, src->port, dst->ip, dst->port); success = clusterManagerMoveSlot(src, dst, slot, move_opts, NULL); } /* Case 2: There are multiple nodes that claim the slot as importing, @@ -3860,7 +3962,7 @@ static int clusterManagerFixOpenSlot(int slot) { * the slot. In this case we just move all the keys to the owner * according to the configuration. */ else if (listLength(migrating) == 0 && listLength(importing) > 0) { - clusterManagerLogInfo(">>> Moving all the %d slot keys to its " + clusterManagerLogInfo(">>> Case 2: Moving all the %d slot keys to its " "owner %s:%d\n", slot, owner->ip, owner->port); move_opts |= CLUSTER_MANAGER_OPT_COLD; listRewind(importing, &li); @@ -3878,25 +3980,43 @@ static int clusterManagerFixOpenSlot(int slot) { if (r) freeReplyObject(r); if (!success) goto cleanup; } + /* Since the slot has been moved in "cold" mode, ensure that all the + * other nodes update their own configuration about the slot itself. */ + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = ln->value; + if (n == owner) continue; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER SETSLOT %d %s %s", slot, "NODE", owner->name); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (r) freeReplyObject(r); + if (!success) goto cleanup; + } } else { int try_to_close_slot = (listLength(importing) == 0 && listLength(migrating) == 1); if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; - redisReply *r = CLUSTER_MANAGER_COMMAND(n, - "CLUSTER GETKEYSINSLOT %d %d", slot, 10); - success = clusterManagerCheckRedisReply(n, r, NULL); - if (r) { - if (success) try_to_close_slot = (r->elements == 0); - freeReplyObject(r); + if (!owner || owner != n) { + redisReply *r = CLUSTER_MANAGER_COMMAND(n, + "CLUSTER GETKEYSINSLOT %d %d", slot, 10); + success = clusterManagerCheckRedisReply(n, r, NULL); + if (r) { + if (success) try_to_close_slot = (r->elements == 0); + freeReplyObject(r); + } + if (!success) goto cleanup; } - if (!success) goto cleanup; } /* Case 3: There are no slots claiming to be in importing state, but - * there is a migrating node that actually don't have any key. We - * can just close the slot, probably a reshard interrupted in the middle. */ + * there is a migrating node that actually don't have any key or is the + * slot owner. We can just close the slot, probably a reshard interrupted + * in the middle. */ if (try_to_close_slot) { clusterManagerNode *n = listFirst(migrating)->value; + clusterManagerLogInfo(">>> Case 3: Closing slot %d on %s:%d\n", + slot, n->ip, n->port); redisReply *r = CLUSTER_MANAGER_COMMAND(n, "CLUSTER SETSLOT %d %s", slot, "STABLE"); success = clusterManagerCheckRedisReply(n, r, NULL); @@ -4025,6 +4145,7 @@ static int clusterManagerCheckCluster(int quiet) { result = 0; if (do_fix/* && result*/) { dictType dtype = clusterManagerDictType; + dtype.keyDestructor = dictSdsDestructor; dtype.valDestructor = dictListDestructor; clusterManagerUncoveredSlots = dictCreate(&dtype, NULL); int fixed = clusterManagerFixSlotsCoverage(slots); @@ -4061,7 +4182,7 @@ static clusterManagerNode *clusterNodeForResharding(char *id, static list *clusterManagerComputeReshardTable(list *sources, int numslots) { list *moved = listCreate(); int src_count = listLength(sources), i = 0, tot_slots = 0, j; - clusterManagerNode **sorted = zmalloc(src_count * sizeof(**sorted)); + clusterManagerNode **sorted = zmalloc(src_count * sizeof(*sorted)); listIter li; listNode *ln; listRewind(sources, &li); @@ -5100,10 +5221,13 @@ static int clusterManagerCommandSetTimeout(int argc, char **argv) { n->port); ok_count++; continue; -reply_err: +reply_err:; + int need_free = 0; if (err == NULL) err = ""; + else need_free = 1; clusterManagerLogErr("ERR setting node-timeot for %s:%d: %s\n", n->ip, n->port, err); + if (need_free) zfree(err); err_count++; } clusterManagerLogInfo(">>> New node timeout set. %d OK, %d ERR.\n", diff --git a/src/replication.c b/src/replication.c index 3b420895d..a3110661e 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1245,6 +1245,18 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { if (eof_reached) { int aof_is_enabled = server.aof_state != AOF_OFF; + /* Ensure background save doesn't overwrite synced data */ + if (server.rdb_child_pid != -1) { + serverLog(LL_NOTICE, + "Replica is about to load the RDB file received from the " + "master, but there is a pending RDB child running. " + "Killing process %ld and removing its temp file to avoid " + "any race", + (long) server.rdb_child_pid); + kill(server.rdb_child_pid,SIGUSR1); + rdbRemoveTempFile(server.rdb_child_pid); + } + if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) { serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno)); cancelReplicationHandshake(); diff --git a/src/sentinel.c b/src/sentinel.c index 7b703aa33..adff9d4fa 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -452,13 +452,15 @@ struct redisCommand sentinelcmds[] = { {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0}, {"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0}, {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0}, - {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0} + {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}, + {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0} }; /* This function overwrites a few normal Redis config default with Sentinel * specific defaults. */ void initSentinelConfig(void) { server.port = REDIS_SENTINEL_PORT; + server.protected_mode = 0; /* Sentinel must be exposed. */ } /* Perform the Sentinel mode initialization. */ @@ -1941,12 +1943,25 @@ werr: /* Send the AUTH command with the specified master password if needed. * Note that for slaves the password set for the master is used. * + * In case this Sentinel requires a password as well, via the "requirepass" + * configuration directive, we assume we should use the local password in + * order to authenticate when connecting with the other Sentinels as well. + * So basically all the Sentinels share the same password and use it to + * authenticate reciprocally. + * * We don't check at all if the command was successfully transmitted * to the instance as if it fails Sentinel will detect the instance down, * will disconnect and reconnect the link and so forth. */ void sentinelSendAuthIfNeeded(sentinelRedisInstance *ri, redisAsyncContext *c) { - char *auth_pass = (ri->flags & SRI_MASTER) ? ri->auth_pass : - ri->master->auth_pass; + char *auth_pass = NULL; + + if (ri->flags & SRI_MASTER) { + auth_pass = ri->auth_pass; + } else if (ri->flags & SRI_SLAVE) { + auth_pass = ri->master->auth_pass; + } else if (ri->flags & SRI_SENTINEL) { + if (server.requirepass) auth_pass = server.requirepass; + } if (auth_pass) { if (redisAsyncCommand(c, sentinelDiscardReplyCallback, ri, "%s %s", diff --git a/src/server.c b/src/server.c index d6c6dc3a8..cc335ebdc 100644 --- a/src/server.c +++ b/src/server.c @@ -1526,10 +1526,10 @@ void initServerConfig(void) { server.runid[CONFIG_RUN_ID_SIZE] = '\0'; changeReplicationId(); clearReplicationId2(); - server.timezone = timezone; /* Initialized by tzset(). */ + server.timezone = getTimeZone(); /* Initialized by tzset(). */ server.configfile = NULL; server.executable = NULL; - server.config_hz = CONFIG_DEFAULT_HZ; + server.hz = server.config_hz = CONFIG_DEFAULT_HZ; server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ; server.arch_bits = (sizeof(long) == 8) ? 64 : 32; server.port = CONFIG_DEFAULT_SERVER_PORT; @@ -1958,9 +1958,13 @@ int listenToPort(int port, int *fds, int *count) { } if (fds[*count] == ANET_ERR) { serverLog(LL_WARNING, - "Creating Server TCP listening socket %s:%d: %s", + "Could not create server TCP listening socket %s:%d: %s", server.bindaddr[j] ? server.bindaddr[j] : "*", port, server.neterr); + if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT || + errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT || + errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL) + continue; return C_ERR; } anetNonBlock(NULL,fds[*count]); diff --git a/src/setproctitle.c b/src/setproctitle.c index 6563242de..5f91d7bfe 100644 --- a/src/setproctitle.c +++ b/src/setproctitle.c @@ -39,7 +39,7 @@ #include /* errno program_invocation_name program_invocation_short_name */ #if !defined(HAVE_SETPROCTITLE) -#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__) +#if (defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__ || defined __DragonFly__) #define HAVE_SETPROCTITLE 1 #else #define HAVE_SETPROCTITLE 0 diff --git a/src/t_stream.c b/src/t_stream.c index e37b6582a..f51f6c46b 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -37,7 +37,7 @@ * mark the entry as deleted, or having the same field as the "master" * entry at the start of the listpack> */ #define STREAM_ITEM_FLAG_NONE 0 /* No special flags. */ -#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is delted. Skip it. */ +#define STREAM_ITEM_FLAG_DELETED (1<<0) /* Entry is deleted. Skip it. */ #define STREAM_ITEM_FLAG_SAMEFIELDS (1<<1) /* Same fields as master entry. */ void streamFreeCG(streamCG *cg); @@ -165,7 +165,7 @@ int streamCompareID(streamID *a, streamID *b) { * Returns the new entry ID populating the 'added_id' structure. * * If 'use_id' is not NULL, the ID is not auto-generated by the function, - * but instead the passed ID is uesd to add the new entry. In this case + * but instead the passed ID is used to add the new entry. In this case * adding the entry may fail as specified later in this comment. * * The function returns C_OK if the item was added, this is always true @@ -223,13 +223,13 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ * * count and deleted just represent respectively the total number of * entries inside the listpack that are valid, and marked as deleted - * (delted flag in the entry flags set). So the total number of items + * (deleted flag in the entry flags set). So the total number of items * actually inside the listpack (both deleted and not) is count+deleted. * * The real entries will be encoded with an ID that is just the * millisecond and sequence difference compared to the key stored at * the radix tree node containing the listpack (delta encoding), and - * if the fields of the entry are the same as the master enty fields, the + * if the fields of the entry are the same as the master entry fields, the * entry flags will specify this fact and the entry fields and number * of fields will be omitted (see later in the code of this function). * @@ -486,7 +486,7 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { * } * streamIteratorStop(&myiterator); */ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev) { - /* Intialize the iterator and translates the iteration start/stop + /* Initialize the iterator and translates the iteration start/stop * elements into a 128 big big-endian number. */ if (start) { streamEncodeID(si->start_key,start); @@ -564,7 +564,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { si->lp_ele = lpLast(si->lp); } } else if (si->rev) { - /* If we are itereating in the reverse order, and this is not + /* If we are iterating in the reverse order, and this is not * the first entry emitted for this listpack, then we already * emitted the current entry, and have to go back to the previous * one. */ @@ -751,7 +751,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) { } /* Stop the stream iterator. The only cleanup we need is to free the rax - * itereator, since the stream iterator itself is supposed to be stack + * iterator, since the stream iterator itself is supposed to be stack * allocated. */ void streamIteratorStop(streamIterator *si) { raxStop(&si->ri); @@ -847,11 +847,15 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna decrRefCount(argv[4]); } -/* Send the specified range to the client 'c'. The range the client will - * receive is between start and end inclusive, if 'count' is non zero, no more - * than 'count' elements are sent. The 'end' pointer can be NULL to mean that - * we want all the elements from 'start' till the end of the stream. If 'rev' - * is non zero, elements are produced in reversed order from end to start. +/* Send the stream items in the specified range to the client 'c'. The range + * the client will receive is between start and end inclusive, if 'count' is + * non zero, no more than 'count' elements are sent. + * + * The 'end' pointer can be NULL to mean that we want all the elements from + * 'start' till the end of the stream. If 'rev' is non zero, elements are + * produced in reversed order from end to start. + * + * The function returns the number of entries emitted. * * If group and consumer are not NULL, the function performs additional work: * 1. It updates the last delivered ID in the group in case we are @@ -863,15 +867,15 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna * * The behavior may be modified passing non-zero flags: * - * STREAM_RWR_NOACK: Do not craete PEL entries, that is, the point "3" above + * STREAM_RWR_NOACK: Do not create PEL entries, that is, the point "3" above * is not performed. * STREAM_RWR_RAWENTRIES: Do not emit array boundaries, but just the entries, * and return the number of entries emitted as usually. * This is used when the function is just used in order * to emit data and there is some higher level logic. * - * The final argument 'spi' (stream propagatino info pointer) is a structure - * filled with information needed to propagte the command execution to AOF + * The final argument 'spi' (stream propagation info pointer) is a structure + * filled with information needed to propagate the command execution to AOF * and slaves, in the case a consumer group was passed: we need to generate * XCLAIM commands to create the pending list into AOF/slaves in that case. * @@ -890,6 +894,7 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna #define STREAM_RWR_NOACK (1<<0) /* Do not create entries in the PEL. */ #define STREAM_RWR_RAWENTRIES (1<<1) /* Do not emit protocol for array boundaries, just the entries. */ +#define STREAM_RWR_HISTORY (1<<2) /* Only serve consumer local PEL. */ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi) { void *arraylen_ptr = NULL; size_t arraylen = 0; @@ -898,15 +903,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end streamID id; int propagate_last_id = 0; - /* If a group was passed, we check if the request is about messages - * never delivered so far (normally this happens when ">" ID is passed). - * - * If instead the client is asking for some history, we serve it - * using a different function, so that we return entries *solely* - * from its own PEL. This ensures each consumer will always and only - * see the history of messages delivered to it and not yet confirmed + /* If the client is asking for some history, we serve it using a + * different function, so that we return entries *solely* from its + * own PEL. This ensures each consumer will always and only see + * the history of messages delivered to it and not yet confirmed * as delivered. */ - if (group && streamCompareID(start,&group->last_id) <= 0) { + if (group && (flags & STREAM_RWR_HISTORY)) { return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, consumer); } @@ -1023,7 +1025,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start if (end && memcmp(ri.key,end,ri.key_len) > 0) break; streamID thisid; streamDecodeID(ri.key,&thisid); - if (streamReplyWithRange(c,s,&thisid,NULL,1,0,NULL,NULL, + if (streamReplyWithRange(c,s,&thisid,&thisid,1,0,NULL,NULL, STREAM_RWR_RAWENTRIES,NULL) == 0) { /* Note that we may have a not acknowledged entry in the PEL @@ -1136,7 +1138,7 @@ invalid: } /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to - * 0, to be used when - and + are accetable IDs. */ + * 0, to be used when - and + are acceptable IDs. */ int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { return streamGenericParseIDOrReply(c,o,id,missing_seq,0); } @@ -1470,8 +1472,10 @@ void xreadCommand(client *c) { stream *s = o->ptr; streamID *gt = ids+i; /* ID must be greater than this. */ int serve_synchronously = 0; + int serve_history = 0; /* True for XREADGROUP with ID != ">". */ - /* Check if there are the conditions to serve the client synchronously. */ + /* Check if there are the conditions to serve the client + * synchronously. */ if (groups) { /* If the consumer is blocked on a group, we always serve it * synchronously (serving its local history) if the ID specified @@ -1480,6 +1484,7 @@ void xreadCommand(client *c) { gt->seq != UINT64_MAX) { serve_synchronously = 1; + serve_history = 1; } else { /* We also want to serve a consumer in a consumer group * synchronously in case the group top item delivered is smaller @@ -1515,9 +1520,12 @@ void xreadCommand(client *c) { if (groups) consumer = streamLookupConsumer(groups[i], consumername->ptr,1); streamPropInfo spi = {c->argv[i+streams_arg],groupname}; + int flags = 0; + if (noack) flags |= STREAM_RWR_NOACK; + if (serve_history) flags |= STREAM_RWR_HISTORY; streamReplyWithRange(c,s,&start,NULL,count,0, groups ? groups[i] : NULL, - consumer, noack, &spi); + consumer, flags, &spi); if (groups) server.dirty++; } } @@ -2155,7 +2163,7 @@ void xclaimCommand(client *c) { /* If we stopped because some IDs cannot be parsed, perhaps they * are trailing options. */ - time_t now = mstime(); + mstime_t now = mstime(); streamID last_id = {0,0}; int propagate_last_id = 0; for (; j < c->argc; j++) { @@ -2274,8 +2282,9 @@ void xclaimCommand(client *c) { if (justid) { addReplyStreamID(c,&id); } else { - streamReplyWithRange(c,o->ptr,&id,NULL,1,0,NULL,NULL, - STREAM_RWR_RAWENTRIES,NULL); + size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0, + NULL,NULL,STREAM_RWR_RAWENTRIES,NULL); + if (!emitted) addReply(c,shared.nullbulk); } arraylen++; diff --git a/src/t_zset.c b/src/t_zset.c index db381b592..84a61ca43 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -574,12 +574,12 @@ int zslParseLexRangeItem(robj *item, sds *dest, int *ex) { switch(c[0]) { case '+': if (c[1] != '\0') return C_ERR; - *ex = 0; + *ex = 1; *dest = shared.maxstring; return C_OK; case '-': if (c[1] != '\0') return C_ERR; - *ex = 0; + *ex = 1; *dest = shared.minstring; return C_OK; case '(': @@ -652,9 +652,8 @@ int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range) { zskiplistNode *x; /* Test for ranges that will always be empty. */ - if (sdscmplex(range->min,range->max) > 1 || - (sdscmp(range->min,range->max) == 0 && - (range->minex || range->maxex))) + int cmp = sdscmplex(range->min,range->max); + if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex))) return 0; x = zsl->tail; if (x == NULL || !zslLexValueGteMin(x->ele,range)) @@ -927,9 +926,8 @@ int zzlIsInLexRange(unsigned char *zl, zlexrangespec *range) { unsigned char *p; /* Test for ranges that will always be empty. */ - if (sdscmplex(range->min,range->max) > 1 || - (sdscmp(range->min,range->max) == 0 && - (range->minex || range->maxex))) + int cmp = sdscmplex(range->min,range->max); + if (cmp > 0 || (cmp == 0 && (range->minex || range->maxex))) return 0; p = ziplistIndex(zl,-2); /* Last element. */ diff --git a/src/util.c b/src/util.c index 3fa6c9244..430cbe61a 100644 --- a/src/util.c +++ b/src/util.c @@ -39,6 +39,7 @@ #include #include #include +#include #include "util.h" #include "sha1.h" @@ -605,7 +606,7 @@ void getRandomHexChars(char *p, size_t len) { * already, this will be detected and handled correctly. * * The function does not try to normalize everything, but only the obvious - * case of one or more "../" appearning at the start of "filename" + * case of one or more "../" appearing at the start of "filename" * relative path. */ sds getAbsolutePath(char *filename) { char cwd[1024]; @@ -652,6 +653,24 @@ sds getAbsolutePath(char *filename) { return abspath; } +/* + * Gets the proper timezone in a more portable fashion + * i.e timezone variables are linux specific. + */ + +unsigned long getTimeZone(void) { +#ifdef __linux__ + return timezone; +#else + struct timeval tv; + struct timezone tz; + + gettimeofday(&tv, &tz); + + return tz.tz_minuteswest * 60UL; +#endif +} + /* Return true if the specified path is just a file basename without any * relative or absolute path. This function just checks that no / or \ * character exists inside the specified path, that's enough in the diff --git a/src/util.h b/src/util.h index 91acde047..cc154d968 100644 --- a/src/util.h +++ b/src/util.h @@ -50,6 +50,7 @@ int string2ld(const char *s, size_t slen, long double *dp); int d2string(char *buf, size_t len, double value); int ld2string(char *buf, size_t len, long double value, int humanfriendly); sds getAbsolutePath(char *filename); +unsigned long getTimeZone(void); int pathIsBaseName(char *path); #ifdef REDIS_TEST diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 181c865fc..74f491e48 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -91,6 +91,14 @@ proc wait_for_sync r { } } +proc wait_for_ofs_sync {r1 r2} { + wait_for_condition 50 100 { + [status $r1 master_repl_offset] eq [status $r2 master_repl_offset] + } else { + fail "replica didn't sync in time" + } +} + # Random integer between 0 and max (excluded). proc randomInt {max} { expr {int(rand()*$max)} diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 8972d577a..d152e212c 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -90,6 +90,7 @@ start_server {tags {"defrag"}} { test "Active defrag big keys" { r flushdb r config resetstat + r config set save "" ;# prevent bgsave from interfereing with save below r config set activedefrag no r config set active-defrag-max-scan-fields 1000 r config set active-defrag-threshold-lower 5 diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index d2e0d6539..13981cc22 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -108,6 +108,93 @@ start_server { assert {$c == 5} } + test {XREADGROUP will not report data on empty history. Bug #5577} { + r del events + r xadd events * a 1 + r xadd events * b 2 + r xadd events * c 3 + r xgroup create events mygroup 0 + + # Current local PEL should be empty + set res [r xpending events mygroup - + 10] + assert {[llength $res] == 0} + + # So XREADGROUP should read an empty history as well + set res [r xreadgroup group mygroup myconsumer count 3 streams events 0] + assert {[llength [lindex $res 0 1]] == 0} + + # We should fetch all the elements in the stream asking for > + set res [r xreadgroup group mygroup myconsumer count 3 streams events >] + assert {[llength [lindex $res 0 1]] == 3} + + # Now the history is populated with three not acked entries + set res [r xreadgroup group mygroup myconsumer count 3 streams events 0] + assert {[llength [lindex $res 0 1]] == 3} + } + + test {XREADGROUP history reporting of deleted entries. Bug #5570} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream 1 field1 A + r XREADGROUP GROUP mygroup myconsumer STREAMS mystream > + r XADD mystream MAXLEN 1 2 field1 B + r XREADGROUP GROUP mygroup myconsumer STREAMS mystream > + + # Now we have two pending entries, however one should be deleted + # and one should be ok (we should only see "B") + set res [r XREADGROUP GROUP mygroup myconsumer STREAMS mystream 0-1] + assert {[lindex $res 0 1 0] == {1-0 {}}} + assert {[lindex $res 0 1 1] == {2-0 {field1 B}}} + } + + test {XCLAIM can claim PEL items from another consumer} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + # Client 1 reads item 1 from the stream without acknowledgements. + # Client 2 then claims pending item 1 from the PEL of client 1 + set reply [ + r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + ] + assert {[llength [lindex $reply 0 1 0 1]] == 2} + assert {[lindex $reply 0 1 0 1] eq {a 1}} + r debug sleep 0.2 + set reply [ + r XCLAIM mystream mygroup client2 10 $id1 + ] + assert {[llength [lindex $reply 0 1]] == 2} + assert {[lindex $reply 0 1] eq {a 1}} + + # Client 1 reads another 2 items from stream + r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream > + r debug sleep 0.2 + + # Delete item 2 from the stream. Now client 1 has PEL that contains + # only item 3. Try to use client 2 to claim the deleted item 2 + # from the PEL of client 1, this should return nil + r XDEL mystream $id2 + set reply [ + r XCLAIM mystream mygroup client2 10 $id2 + ] + assert {[llength $reply] == 1} + assert_equal "" [lindex $reply 0] + + # Delete item 3 from the stream. Now client 1 has PEL that is empty. + # Try to use client 2 to claim the deleted item 3 from the PEL + # of client 1, this should return nil + r debug sleep 0.2 + r XDEL mystream $id3 + set reply [ + r XCLAIM mystream mygroup client2 10 $id3 + ] + assert {[llength $reply] == 1} + assert_equal "" [lindex $reply 0] + } + start_server {} { set master [srv -1 client] set master_host [srv -1 host] @@ -144,6 +231,8 @@ start_server { } } + wait_for_ofs_sync $master $slave + # Turn slave into master $slave slaveof no one diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index cf54ae839..a8c817f6e 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -388,7 +388,7 @@ start_server {tags {"zset"}} { 0 omega} } - test "ZRANGEBYLEX/ZREVRANGEBYLEX/ZCOUNT basics" { + test "ZRANGEBYLEX/ZREVRANGEBYLEX/ZLEXCOUNT basics" { create_default_lex_zset # inclusive range @@ -416,6 +416,22 @@ start_server {tags {"zset"}} { assert_equal {} [r zrevrangebylex zset \[elez \[elex] assert_equal {} [r zrevrangebylex zset (hill (omega] } + + test "ZLEXCOUNT advanced" { + create_default_lex_zset + + assert_equal 9 [r zlexcount zset - +] + assert_equal 0 [r zlexcount zset + -] + assert_equal 0 [r zlexcount zset + \[c] + assert_equal 0 [r zlexcount zset \[c -] + assert_equal 8 [r zlexcount zset \[bar +] + assert_equal 5 [r zlexcount zset \[bar \[foo] + assert_equal 4 [r zlexcount zset \[bar (foo] + assert_equal 4 [r zlexcount zset (bar \[foo] + assert_equal 3 [r zlexcount zset (bar (foo] + assert_equal 5 [r zlexcount zset - (foo] + assert_equal 1 [r zlexcount zset (maxstring +] + } test "ZRANGEBYSLEX with LIMIT" { create_default_lex_zset