Merge pull request #1 from antirez/unstable

update local data
This commit is contained in:
Dongheng Yang 2018-10-28 15:54:23 +08:00 committed by GitHub
commit 42b02b760d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 893 additions and 234 deletions

View File

@ -216,7 +216,7 @@ Inside the root are the following important directories:
* `src`: contains the Redis implementation, written in C.
* `tests`: contains the unit tests, implemented in Tcl.
* `deps`: contains libraries Redis uses. Everything needed to compile Redis is inside this directory; your system just needs to provide `libc`, a POSIX compatible interface and a C compiler. Notably `deps` contains a copy of `jemalloc`, which is the default allocator of Redis under Linux. Note that under `deps` there are also things which started with the Redis project, but for which the main repository is not `anitrez/redis`. An exception to this rule is `deps/geohash-int` which is the low level geocoding library used by Redis: it originated from a different project, but at this point it diverged so much that it is developed as a separated entity directly inside the Redis repository.
* `deps`: contains libraries Redis uses. Everything needed to compile Redis is inside this directory; your system just needs to provide `libc`, a POSIX compatible interface and a C compiler. Notably `deps` contains a copy of `jemalloc`, which is the default allocator of Redis under Linux. Note that under `deps` there are also things which started with the Redis project, but for which the main repository is not `antirez/redis`. An exception to this rule is `deps/geohash-int` which is the low level geocoding library used by Redis: it originated from a different project, but at this point it diverged so much that it is developed as a separated entity directly inside the Redis repository.
There are a few more directories but they are not very important for our goals
here. We'll focus mostly on `src`, where the Redis implementation is contained,
@ -227,7 +227,7 @@ of complexity incrementally.
Note: lately Redis was refactored quite a bit. Function names and file
names have been changed, so you may find that this documentation reflects the
`unstable` branch more closely. For instance in Redis 3.0 the `server.c`
and `server.h` files were named to `redis.c` and `redis.h`. However the overall
and `server.h` files were named `redis.c` and `redis.h`. However the overall
structure is the same. Keep in mind that all the new developments and pull
requests should be performed against the `unstable` branch.

View File

@ -625,7 +625,7 @@ replica-priority 100
# memory to never hit a real out-of-memory condition before the master hits
# the configured maxmemory setting.
#
# replica-ingore-maxmemory yes
# replica-ignore-maxmemory yes
############################# LAZY FREEING ####################################

View File

@ -41,6 +41,10 @@ endif
# To get ARM stack traces if Redis crashes we need a special C flag.
ifneq (,$(filter aarch64 armv,$(uname_M)))
CFLAGS+=-funwind-tables
else
ifneq (,$(findstring armv,$(uname_M)))
CFLAGS+=-funwind-tables
endif
endif
# Backwards compatibility for selecting an allocator

View File

@ -677,6 +677,7 @@ int loadAppendOnlyFile(char *filename) {
int old_aof_state = server.aof_state;
long loops = 0;
off_t valid_up_to = 0; /* Offset of latest well-formed command loaded. */
off_t valid_before_multi = 0; /* Offset before MULTI command loaded. */
if (fp == NULL) {
serverLog(LL_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
@ -777,16 +778,28 @@ int loadAppendOnlyFile(char *filename) {
/* Command lookup */
cmd = lookupCommand(argv[0]->ptr);
if (!cmd) {
serverLog(LL_WARNING,"Unknown command '%s' reading the append only file", (char*)argv[0]->ptr);
serverLog(LL_WARNING,
"Unknown command '%s' reading the append only file",
(char*)argv[0]->ptr);
exit(1);
}
if (cmd == server.multiCommand) valid_before_multi = valid_up_to;
/* Run the command in the context of a fake client */
fakeClient->cmd = cmd;
cmd->proc(fakeClient);
if (fakeClient->flags & CLIENT_MULTI &&
fakeClient->cmd->proc != execCommand)
{
queueMultiCommand(fakeClient);
} else {
cmd->proc(fakeClient);
}
/* The fake client should not have a reply */
serverAssert(fakeClient->bufpos == 0 && listLength(fakeClient->reply) == 0);
serverAssert(fakeClient->bufpos == 0 &&
listLength(fakeClient->reply) == 0);
/* The fake client should never get blocked */
serverAssert((fakeClient->flags & CLIENT_BLOCKED) == 0);
@ -801,7 +814,12 @@ int loadAppendOnlyFile(char *filename) {
* If the client is in the middle of a MULTI/EXEC, handle it as it was
* a short read, even if technically the protocol is correct: we want
* to remove the unprocessed tail and continue. */
if (fakeClient->flags & CLIENT_MULTI) goto uxeof;
if (fakeClient->flags & CLIENT_MULTI) {
serverLog(LL_WARNING,
"Revert incomplete MULTI/EXEC transaction in AOF file");
valid_up_to = valid_before_multi;
goto uxeof;
}
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
fclose(fp);
@ -1121,25 +1139,47 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
streamID id;
int64_t numfields;
/* Reconstruct the stream data using XADD commands. */
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
if (s->length) {
/* Reconstruct the stream data using XADD commands. */
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
/* Emit the XADD <key> <id> ...fields... command. */
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
/* Emit the XADD <key> <id> ...fields... command. */
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
while(numfields--) {
unsigned char *field, *value;
int64_t field_len, value_len;
streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
}
}
} else {
/* Use the XADD MAXLEN 0 trick to generate an empty stream if
* the key we are serializing is an empty string, which is possible
* for the Stream type. */
if (rioWriteBulkCount(r,'*',7) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
while(numfields--) {
unsigned char *field, *value;
int64_t field_len, value_len;
streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
}
if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
if (rioWriteBulkString(r,"x",1) == 0) return 0;
if (rioWriteBulkString(r,"y",1) == 0) return 0;
}
/* Append XSETID after XADD, make sure lastid is correct,
* in case of XDEL lastid. */
if (rioWriteBulkCount(r,'*',3) == 0) return 0;
if (rioWriteBulkString(r,"XSETID",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
/* Create all the stream consumer groups. */
if (s->cgroups) {
raxIterator ri;

View File

@ -1,7 +1,7 @@
/* This file implements atomic counters using __atomic or __sync macros if
* available, otherwise synchronizing different threads using a mutex.
*
* The exported interaface is composed of three macros:
* The exported interface is composed of three macros:
*
* atomicIncr(var,count) -- Increment the atomic counter
* atomicGetIncr(var,oldvalue_var,count) -- Get and increment the atomic counter

View File

@ -17,7 +17,7 @@
*
* The design is trivial, we have a structure representing a job to perform
* and a different thread and job queue for every job type.
* Every thread wait for new jobs in its queue, and process every job
* Every thread waits for new jobs in its queue, and process every job
* sequentially.
*
* Jobs of the same type are guaranteed to be processed from the least

View File

@ -5164,10 +5164,10 @@ try_again:
serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
}
int expired = 0; /* Number of keys that we'll find already expired.
Note that serializing large keys may take some time
so certain keys that were found non expired by the
lookupKey() function, may be expired later. */
int non_expired = 0; /* Number of keys that we'll find non expired.
Note that serializing large keys may take some time
so certain keys that were found non expired by the
lookupKey() function, may be expired later. */
/* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) {
@ -5177,11 +5177,16 @@ try_again:
if (expireat != -1) {
ttl = expireat-mstime();
if (ttl < 0) {
expired++;
continue;
}
if (ttl < 1) ttl = 1;
}
/* Relocate valid (non expired) keys into the array in successive
* positions to remove holes created by the keys that were present
* in the first lookup but are now expired after the second lookup. */
kv[non_expired++] = kv[j];
serverAssertWithInfo(c,NULL,
rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
@ -5209,6 +5214,9 @@ try_again:
serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
}
/* Fix the actual number of keys we are migrating. */
num_keys = non_expired;
/* Transfer the query to the other node in 64K chunks. */
errno = 0;
{
@ -5250,7 +5258,7 @@ try_again:
* command name itself. */
if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
for (j = 0; j < num_keys-expired; j++) {
for (j = 0; j < num_keys; j++) {
if (syncReadLine(cs->fd, buf2, sizeof(buf2), timeout) <= 0) {
socket_error = 1;
break;

View File

@ -120,7 +120,7 @@ const char *configEnumGetName(configEnum *ce, int val) {
return NULL;
}
/* Wrapper for configEnumGetName() returning "unknown" insetad of NULL if
/* Wrapper for configEnumGetName() returning "unknown" instead of NULL if
* there is no match. */
const char *configEnumGetNameOrUnknown(configEnum *ce, int val) {
const char *name = configEnumGetName(ce,val);

View File

@ -38,6 +38,8 @@
* C-level DB API
*----------------------------------------------------------------------------*/
int keyIsExpired(redisDb *db, robj *key);
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */
@ -102,7 +104,10 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
/* Key expired. If we are in the context of a master, expireIfNeeded()
* returns 0 only when the key does not exist at all, so it's safe
* to return NULL ASAP. */
if (server.masterhost == NULL) return NULL;
if (server.masterhost == NULL) {
server.stat_keyspace_misses++;
return NULL;
}
/* However if we are in the context of a slave, expireIfNeeded() will
* not really try to expire the key, it only returns information
@ -121,6 +126,7 @@ robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
server.current_client->cmd &&
server.current_client->cmd->flags & CMD_READONLY)
{
server.stat_keyspace_misses++;
return NULL;
}
}
@ -206,7 +212,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
* 2) clients WATCHing for the destination key notified.
* 3) The expire time of the key is reset (the key is made persistent).
*
* All the new keys in the database should be craeted via this interface. */
* All the new keys in the database should be creted via this interface. */
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dbAdd(db,key,val);
@ -543,7 +549,7 @@ void keysCommand(client *c) {
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
keyobj = createStringObject(key,sdslen(key));
if (expireIfNeeded(c->db,keyobj) == 0) {
if (!keyIsExpired(c->db,keyobj)) {
addReplyBulk(c,keyobj);
numkeys++;
}
@ -1120,6 +1126,25 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
decrRefCount(argv[1]);
}
/* Check if the key is expired. */
int keyIsExpired(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key);
if (when < 0) return 0; /* No expire for this key */
/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;
/* If we are in the context of a Lua script, we pretend that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
mstime_t now = server.lua_caller ? server.lua_time_start : mstime();
return now > when;
}
/* This function is called when we are going to perform some operation
* in a given key, but such key may be already logically expired even if
* it still exists in the database. The main way this function is called
@ -1140,32 +1165,17 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
* The return value of the function is 0 if the key is still valid,
* otherwise the function returns 1 if the key is expired. */
int expireIfNeeded(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key);
mstime_t now;
if (!keyIsExpired(db,key)) return 0;
if (when < 0) return 0; /* No expire for this key */
/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;
/* If we are in the context of a Lua script, we pretend that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
now = server.lua_caller ? server.lua_time_start : mstime();
/* If we are running in the context of a slave, return ASAP:
/* If we are running in the context of a slave, instead of
* evicting the expired key from the database, we return ASAP:
* the slave key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys.
*
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (server.masterhost != NULL) return now > when;
/* Return when this key has not expired */
if (now <= when) return 0;
if (server.masterhost != NULL) return 1;
/* Delete the key */
server.stat_expiredkeys++;

View File

@ -345,7 +345,10 @@ NULL
return;
}
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
if (rdbLoad(server.rdb_filename,NULL) != C_OK) {
protectClient(c);
int ret = rdbLoad(server.rdb_filename,NULL);
unprotectClient(c);
if (ret != C_OK) {
addReplyError(c,"Error trying to load the RDB dump");
return;
}
@ -354,7 +357,10 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
if (server.aof_state != AOF_OFF) flushAppendOnlyFile(1);
emptyDb(-1,EMPTYDB_NO_FLAGS,NULL);
if (loadAppendOnlyFile(server.aof_filename) != C_OK) {
protectClient(c);
int ret = loadAppendOnlyFile(server.aof_filename);
unprotectClient(c);
if (ret != C_OK) {
addReply(c,shared.err);
return;
}

View File

@ -364,7 +364,7 @@ size_t freeMemoryGetNotCountedMemory(void) {
}
}
if (server.aof_state != AOF_OFF) {
overhead += sdslen(server.aof_buf)+aofRewriteBufferSize();
overhead += sdsalloc(server.aof_buf)+aofRewriteBufferSize();
}
return overhead;
}

View File

@ -98,6 +98,11 @@ struct commandHelp {
"Get the current connection name",
9,
"2.6.9" },
{ "CLIENT ID",
"-",
"Returns the client ID for the current connection",
9,
"5.0.0" },
{ "CLIENT KILL",
"[ip:port] [ID client-id] [TYPE normal|master|slave|pubsub] [ADDR ip:port] [SKIPME yes/no]",
"Kill the connection of a client",
@ -123,6 +128,11 @@ struct commandHelp {
"Set the current connection name",
9,
"2.6.9" },
{ "CLIENT UNBLOCK",
"client-id [TIMEOUT|ERROR]",
"Unblock a client blocked in a blocking command from a different connection",
9,
"5.0.0" },
{ "CLUSTER ADDSLOTS",
"slot [slot ...]",
"Assign new hash slots to receiving node",
@ -145,7 +155,7 @@ struct commandHelp {
"3.0.0" },
{ "CLUSTER FAILOVER",
"[FORCE|TAKEOVER]",
"Forces a slave to perform a manual failover of its master.",
"Forces a replica to perform a manual failover of its master.",
12,
"3.0.0" },
{ "CLUSTER FORGET",
@ -178,9 +188,14 @@ struct commandHelp {
"Get Cluster config for the node",
12,
"3.0.0" },
{ "CLUSTER REPLICAS",
"node-id",
"List replica nodes of the specified master node",
12,
"5.0.0" },
{ "CLUSTER REPLICATE",
"node-id",
"Reconfigure a node as a slave of the specified master node",
"Reconfigure a node as a replica of the specified master node",
12,
"3.0.0" },
{ "CLUSTER RESET",
@ -205,7 +220,7 @@ struct commandHelp {
"3.0.0" },
{ "CLUSTER SLAVES",
"node-id",
"List slave nodes of the specified master node",
"List replica nodes of the specified master node",
12,
"3.0.0" },
{ "CLUSTER SLOTS",
@ -690,12 +705,12 @@ struct commandHelp {
"1.0.0" },
{ "READONLY",
"-",
"Enables read queries for a connection to a cluster slave node",
"Enables read queries for a connection to a cluster replica node",
12,
"3.0.0" },
{ "READWRITE",
"-",
"Disables read queries for a connection to a cluster slave node",
"Disables read queries for a connection to a cluster replica node",
12,
"3.0.0" },
{ "RENAME",
@ -708,6 +723,11 @@ struct commandHelp {
"Rename a key, only if the new key does not exist",
0,
"1.0.0" },
{ "REPLICAOF",
"host port",
"Make the server a replica of another instance, or promote it as master.",
9,
"5.0.0" },
{ "RESTORE",
"key ttl serialized-value [REPLACE]",
"Create a key using the provided serialized value, previously obtained using DUMP.",
@ -845,7 +865,7 @@ struct commandHelp {
"1.0.0" },
{ "SLAVEOF",
"host port",
"Make the server a slave of another instance, or promote it as master",
"Make the server a replica of another instance, or promote it as master. Deprecated starting with Redis 5. Use REPLICAOF instead.",
9,
"1.0.0" },
{ "SLOWLOG",
@ -954,7 +974,7 @@ struct commandHelp {
7,
"2.2.0" },
{ "WAIT",
"numslaves timeout",
"numreplicas timeout",
"Wait for the synchronous replication of all the write commands sent in the context of the current connection",
0,
"3.0.0" },
@ -963,11 +983,36 @@ struct commandHelp {
"Watch the given keys to determine execution of the MULTI/EXEC block",
7,
"2.2.0" },
{ "XACK",
"key group ID [ID ...]",
"Marks a pending message as correctly processed, effectively removing it from the pending entries list of the consumer group. Return value of the command is the number of messages successfully acknowledged, that is, the IDs we were actually able to resolve in the PEL.",
14,
"5.0.0" },
{ "XADD",
"key ID field string [field string ...]",
"Appends a new entry to a stream",
14,
"5.0.0" },
{ "XCLAIM",
"key group consumer min-idle-time ID [ID ...] [IDLE ms] [TIME ms-unix-time] [RETRYCOUNT count] [force] [justid]",
"Changes (or acquires) ownership of a message in a consumer group, as if the message was delivered to the specified consumer.",
14,
"5.0.0" },
{ "XDEL",
"key ID [ID ...]",
"Removes the specified entries from the stream. Returns the number of items actually deleted, that may be different from the number of IDs passed in case certain IDs do not exist.",
14,
"5.0.0" },
{ "XGROUP",
"[CREATE key groupname id-or-$] [SETID key id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]",
"Create, destroy, and manage consumer groups.",
14,
"5.0.0" },
{ "XINFO",
"[CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]",
"Get information on streams and consumer groups",
14,
"5.0.0" },
{ "XLEN",
"key",
"Return the number of entires in a stream",
@ -998,6 +1043,11 @@ struct commandHelp {
"Return a range of elements in a stream, with IDs matching the specified IDs interval, in reverse order (from greater to smaller IDs) compared to XRANGE",
14,
"5.0.0" },
{ "XTRIM",
"key MAXLEN [~] count",
"Trims the stream to (approximately if '~' is passed) a certain size",
14,
"5.0.0" },
{ "ZADD",
"key [NX|XX] [CH] [INCR] score member [score member ...]",
"Add one or more members to a sorted set, or update its score if it already exists",

View File

@ -123,7 +123,7 @@ static uint8_t intsetSearch(intset *is, int64_t value, uint32_t *pos) {
} else {
/* Check for the case where we know we cannot find the value,
* but do know the insert position. */
if (value > _intsetGet(is,intrev32ifbe(is->length)-1)) {
if (value > _intsetGet(is,max)) {
if (pos) *pos = intrev32ifbe(is->length);
return 0;
} else if (value < _intsetGet(is,0)) {

View File

@ -274,7 +274,7 @@ void lolwut5Command(client *c) {
lwCanvas *canvas = lwDrawSchotter(cols,squares_per_row,squares_per_col);
sds rendered = lwRenderCanvas(canvas);
rendered = sdscat(rendered,
"\nGeorg nees - schotter, plotter on paper, 1968. Redis ver. ");
"\nGeorg Nees - schotter, plotter on paper, 1968. Redis ver. ");
rendered = sdscat(rendered,REDIS_VERSION);
rendered = sdscatlen(rendered,"\n",1);
addReplyBulkSds(c,rendered);

View File

@ -4221,6 +4221,7 @@ typedef struct RedisModuleTimer {
RedisModule *module; /* Module reference. */
RedisModuleTimerProc callback; /* The callback to invoke on expire. */
void *data; /* Private data for the callback. */
int dbid; /* Database number selected by the original client. */
} RedisModuleTimer;
/* This is the timer handler that is called by the main event loop. We schedule
@ -4247,7 +4248,7 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client
ctx.module = timer->module;
ctx.client = moduleFreeContextReusedClient;
selectDb(ctx.client, 0);
selectDb(ctx.client, timer->dbid);
timer->callback(&ctx,timer->data);
moduleFreeContext(&ctx);
raxRemove(Timers,(unsigned char*)ri.key,ri.key_len,NULL);
@ -4272,6 +4273,7 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
timer->module = ctx->module;
timer->callback = callback;
timer->data = data;
timer->dbid = ctx->client->db->id;
uint64_t expiretime = ustime()+period*1000;
uint64_t key;

View File

@ -158,7 +158,7 @@ void execCommand(client *c) {
must_propagate = 1;
}
call(c,CMD_CALL_FULL);
call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
/* Commands may alter argc/argv, restore mstate. */
c->mstate.commands[j].argc = c->argc;

View File

@ -160,6 +160,32 @@ client *createClient(int fd) {
return c;
}
/* This funciton puts the client in the queue of clients that should write
* their output buffers to the socket. Note that it does not *yet* install
* the write handler, to start clients are put in a queue of clients that need
* to write, so we try to do that before returning in the event loop (see the
* handleClientsWithPendingWrites() function).
* If we fail and there is more data to write, compared to what the socket
* buffers can hold, then we'll really install the handler. */
void clientInstallWriteHandler(client *c) {
/* Schedule the client to write the output buffers to the socket only
* if not already done and, for slaves, if the slave can actually receive
* writes at this stage. */
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket. This way before re-entering the event
* loop, we can try to directly write to the client sockets avoiding
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
}
/* This function is called every time we are going to transmit new data
* to the client. The behavior is the following:
*
@ -197,24 +223,9 @@ int prepareClientToWrite(client *c) {
if (c->fd <= 0) return C_ERR; /* Fake client for AOF loading. */
/* Schedule the client to write the output buffers to the socket only
* if not already done (there were no pending writes already and the client
* was yet not flagged), and, for slaves, if the slave can actually
* receive writes at this stage. */
if (!clientHasPendingReplies(c) &&
!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket. This way before re-entering the event
* loop, we can try to directly write to the client sockets avoiding
* a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write,c);
}
/* Schedule the client to write the output buffers to the socket, unless
* it should already be setup to do so (it has already pending data). */
if (!clientHasPendingReplies(c)) clientInstallWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */
return C_OK;
@ -818,6 +829,13 @@ void unlinkClient(client *c) {
void freeClient(client *c) {
listNode *ln;
/* If a client is protected, yet we need to free it right now, make sure
* to at least use asynchronous freeing. */
if (c->flags & CLIENT_PROTECTED) {
freeClientAsync(c);
return;
}
/* If it is our master that's beging disconnected we should make sure
* to cache the state to try a partial resynchronization later.
*
@ -1054,6 +1072,10 @@ int handleClientsWithPendingWrites(void) {
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) continue;
/* Try to write buffers to the client socket. */
if (writeToClient(c->fd,c,0) == C_ERR) continue;
@ -1105,6 +1127,34 @@ void resetClient(client *c) {
}
}
/* This funciton is used when we want to re-enter the event loop but there
* is the risk that the client we are dealing with will be freed in some
* way. This happens for instance in:
*
* * DEBUG RELOAD and similar.
* * When a Lua script is in -BUSY state.
*
* So the function will protect the client by doing two things:
*
* 1) It removes the file events. This way it is not possible that an
* error is signaled on the socket, freeing the client.
* 2) Moreover it makes sure that if the client is freed in a different code
* path, it is not really released, but only marked for later release. */
void protectClient(client *c) {
c->flags |= CLIENT_PROTECTED;
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
}
/* This will undo the client protection done by protectClient() */
void unprotectClient(client *c) {
if (c->flags & CLIENT_PROTECTED) {
c->flags &= ~CLIENT_PROTECTED;
aeCreateFileEvent(server.el,c->fd,AE_READABLE,readQueryFromClient,c);
if (clientHasPendingReplies(c)) clientInstallWriteHandler(c);
}
}
/* Like processMultibulkBuffer(), but for the inline protocol instead of RESP,
* this function consumes the client query buffer and creates a command ready
* to be executed inside the client structure. Returns C_OK if the command

View File

@ -185,7 +185,7 @@ robj *createStringObjectFromLongDouble(long double value, int humanfriendly) {
/* Duplicate a string object, with the guarantee that the returned object
* has the same encoding as the original one.
*
* This function also guarantees that duplicating a small integere object
* This function also guarantees that duplicating a small integer object
* (or a string object that contains a representation of a small integer)
* will always result in a fresh object that is unshared (refcount == 1).
*
@ -1011,7 +1011,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mem = 0;
if (server.aof_state != AOF_OFF) {
mem += sdslen(server.aof_buf);
mem += sdsalloc(server.aof_buf);
mem += aofRewriteBufferSize();
}
mh->aof_buffer = mem;

View File

@ -40,7 +40,7 @@
* container: 2 bits, NONE=1, ZIPLIST=2.
* recompress: 1 bit, bool, true if node is temporarry decompressed for usage.
* attempted_compress: 1 bit, boolean, used for verifying during testing.
* extra: 12 bits, free for future use; pads out the remainder of 32 bits */
* extra: 10 bits, free for future use; pads out the remainder of 32 bits */
typedef struct quicklistNode {
struct quicklistNode *prev;
struct quicklistNode *next;

267
src/rax.c
View File

@ -1,6 +1,6 @@
/* Rax -- A radix tree implementation.
*
* Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
* Copyright (c) 2017-2018, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -51,14 +51,18 @@ void *raxNotFound = (void*)"rax-not-found-pointer";
void raxDebugShowNode(const char *msg, raxNode *n);
/* Turn debugging messages on/off. */
#if 0
/* Turn debugging messages on/off by compiling with RAX_DEBUG_MSG macro on.
* When RAX_DEBUG_MSG is defined by default Rax operations will emit a lot
* of debugging info to the standard output, however you can still turn
* debugging on/off in order to enable it only when you suspect there is an
* operation causing a bug using the function raxSetDebugMsg(). */
#ifdef RAX_DEBUG_MSG
#define debugf(...) \
do { \
if (raxDebugMsg) { \
printf("%s:%s:%d:\t", __FILE__, __FUNCTION__, __LINE__); \
printf(__VA_ARGS__); \
fflush(stdout); \
} while (0);
}
#define debugnode(msg,n) raxDebugShowNode(msg,n)
#else
@ -66,6 +70,16 @@ void raxDebugShowNode(const char *msg, raxNode *n);
#define debugnode(msg,n)
#endif
/* By default log debug info if RAX_DEBUG_MSG is defined. */
static int raxDebugMsg = 1;
/* When debug messages are enabled, turn them on/off dynamically. By
* default they are enabled. Set the state to 0 to disable, and 1 to
* re-enable. */
void raxSetDebugMsg(int onoff) {
raxDebugMsg = onoff;
}
/* ------------------------- raxStack functions --------------------------
* The raxStack is a simple stack of pointers that is capable of switching
* from using a stack-allocated array to dynamic heap once a given number of
@ -134,12 +148,43 @@ static inline void raxStackFree(raxStack *ts) {
* Radix tree implementation
* --------------------------------------------------------------------------*/
/* Return the padding needed in the characters section of a node having size
* 'nodesize'. The padding is needed to store the child pointers to aligned
* addresses. Note that we add 4 to the node size because the node has a four
* bytes header. */
#define raxPadding(nodesize) ((sizeof(void*)-((nodesize+4) % sizeof(void*))) & (sizeof(void*)-1))
/* Return the pointer to the last child pointer in a node. For the compressed
* nodes this is the only child pointer. */
#define raxNodeLastChildPtr(n) ((raxNode**) ( \
((char*)(n)) + \
raxNodeCurrentLength(n) - \
sizeof(raxNode*) - \
(((n)->iskey && !(n)->isnull) ? sizeof(void*) : 0) \
))
/* Return the pointer to the first child pointer. */
#define raxNodeFirstChildPtr(n) ((raxNode**) ( \
(n)->data + \
(n)->size + \
raxPadding((n)->size)))
/* Return the current total size of the node. Note that the second line
* computes the padding after the string of characters, needed in order to
* save pointers to aligned addresses. */
#define raxNodeCurrentLength(n) ( \
sizeof(raxNode)+(n)->size+ \
raxPadding((n)->size)+ \
((n)->iscompr ? sizeof(raxNode*) : sizeof(raxNode*)*(n)->size)+ \
(((n)->iskey && !(n)->isnull)*sizeof(void*)) \
)
/* Allocate a new non compressed node with the specified number of children.
* If datafiled is true, the allocation is made large enough to hold the
* associated data pointer.
* Returns the new node pointer. On out of memory NULL is returned. */
raxNode *raxNewNode(size_t children, int datafield) {
size_t nodesize = sizeof(raxNode)+children+
size_t nodesize = sizeof(raxNode)+children+raxPadding(children)+
sizeof(raxNode*)*children;
if (datafield) nodesize += sizeof(void*);
raxNode *node = rax_malloc(nodesize);
@ -167,13 +212,6 @@ rax *raxNew(void) {
}
}
/* Return the current total size of the node. */
#define raxNodeCurrentLength(n) ( \
sizeof(raxNode)+(n)->size+ \
((n)->iscompr ? sizeof(raxNode*) : sizeof(raxNode*)*(n)->size)+ \
(((n)->iskey && !(n)->isnull)*sizeof(void*)) \
)
/* realloc the node to make room for auxiliary data in order
* to store an item in that node. On out of memory NULL is returned. */
raxNode *raxReallocForData(raxNode *n, void *data) {
@ -216,18 +254,17 @@ void *raxGetData(raxNode *n) {
raxNode *raxAddChild(raxNode *n, unsigned char c, raxNode **childptr, raxNode ***parentlink) {
assert(n->iscompr == 0);
size_t curlen = sizeof(raxNode)+
n->size+
sizeof(raxNode*)*n->size;
size_t newlen;
size_t curlen = raxNodeCurrentLength(n);
n->size++;
size_t newlen = raxNodeCurrentLength(n);
n->size--; /* For now restore the orignal size. We'll update it only on
success at the end. */
/* Alloc the new child we will link to 'n'. */
raxNode *child = raxNewNode(0,0);
if (child == NULL) return NULL;
/* Make space in the original node. */
if (n->iskey) curlen += sizeof(void*);
newlen = curlen+sizeof(raxNode*)+1; /* Add 1 char and 1 pointer. */
raxNode *newn = rax_realloc(n,newlen);
if (newn == NULL) {
rax_free(child);
@ -235,14 +272,34 @@ raxNode *raxAddChild(raxNode *n, unsigned char c, raxNode **childptr, raxNode **
}
n = newn;
/* After the reallocation, we have 5/9 (depending on the system
* pointer size) bytes at the end, that is, the additional char
* in the 'data' section, plus one pointer to the new child:
/* After the reallocation, we have up to 8/16 (depending on the system
* pointer size, and the required node padding) bytes at the end, that is,
* the additional char in the 'data' section, plus one pointer to the new
* child, plus the padding needed in order to store addresses into aligned
* locations.
*
* [numc][abx][ap][bp][xp]|auxp|.....
* So if we start with the following node, having "abde" edges.
*
* Note:
* - We assume 4 bytes pointer for simplicity.
* - Each space below corresponds to one byte
*
* [HDR*][abde][Aptr][Bptr][Dptr][Eptr]|AUXP|
*
* After the reallocation we need: 1 byte for the new edge character
* plus 4 bytes for a new child pointer (assuming 32 bit machine).
* However after adding 1 byte to the edge char, the header + the edge
* characters are no longer aligned, so we also need 3 bytes of padding.
* In total the reallocation will add 1+4+3 bytes = 8 bytes:
*
* (Blank bytes are represented by ".")
*
* [HDR*][abde][Aptr][Bptr][Dptr][Eptr]|AUXP|[....][....]
*
* Let's find where to insert the new child in order to make sure
* it is inserted in-place lexicographically. */
* it is inserted in-place lexicographically. Assuming we are adding
* a child "c" in our case pos will be = 2 after the end of the following
* loop. */
int pos;
for (pos = 0; pos < n->size; pos++) {
if (n->data[pos] > c) break;
@ -252,55 +309,81 @@ raxNode *raxAddChild(raxNode *n, unsigned char c, raxNode **childptr, raxNode **
* so that we can mess with the other data without overwriting it.
* We will obtain something like that:
*
* [numc][abx][ap][bp][xp].....|auxp| */
unsigned char *src;
* [HDR*][abde][Aptr][Bptr][Dptr][Eptr][....][....]|AUXP|
*/
unsigned char *src, *dst;
if (n->iskey && !n->isnull) {
src = n->data+n->size+sizeof(raxNode*)*n->size;
memmove(src+1+sizeof(raxNode*),src,sizeof(void*));
src = ((unsigned char*)n+curlen-sizeof(void*));
dst = ((unsigned char*)n+newlen-sizeof(void*));
memmove(dst,src,sizeof(void*));
}
/* Now imagine we are adding a node with edge 'c'. The insertion
* point is between 'b' and 'x', so the 'pos' variable value is
* To start, move all the child pointers after the insertion point
* of 1+sizeof(pointer) bytes on the right, to obtain:
/* Compute the "shift", that is, how many bytes we need to move the
* pointers section forward because of the addition of the new child
* byte in the string section. Note that if we had no padding, that
* would be always "1", since we are adding a single byte in the string
* section of the node (where now there is "abde" basically).
*
* [numc][abx][ap][bp].....[xp]|auxp| */
src = n->data+n->size+sizeof(raxNode*)*pos;
memmove(src+1+sizeof(raxNode*),src,sizeof(raxNode*)*(n->size-pos));
* However we have padding, so it could be zero, or up to 8.
*
* Another way to think at the shift is, how many bytes we need to
* move child pointers forward *other than* the obvious sizeof(void*)
* needed for the additional pointer itself. */
size_t shift = newlen - curlen - sizeof(void*);
/* We said we are adding a node with edge 'c'. The insertion
* point is between 'b' and 'd', so the 'pos' variable value is
* the index of the first child pointer that we need to move forward
* to make space for our new pointer.
*
* To start, move all the child pointers after the insertion point
* of shift+sizeof(pointer) bytes on the right, to obtain:
*
* [HDR*][abde][Aptr][Bptr][....][....][Dptr][Eptr]|AUXP|
*/
src = n->data+n->size+
raxPadding(n->size)+
sizeof(raxNode*)*pos;
memmove(src+shift+sizeof(raxNode*),src,sizeof(raxNode*)*(n->size-pos));
/* Move the pointers to the left of the insertion position as well. Often
* we don't need to do anything if there was already some padding to use. In
* that case the final destination of the pointers will be the same, however
* in our example there was no pre-existing padding, so we added one byte
* plus thre bytes of padding. After the next memmove() things will look
* like thata:
*
* [HDR*][abde][....][Aptr][Bptr][....][Dptr][Eptr]|AUXP|
*/
if (shift) {
src = (unsigned char*) raxNodeFirstChildPtr(n);
memmove(src+shift,src,sizeof(raxNode*)*pos);
}
/* Now make the space for the additional char in the data section,
* but also move the pointers before the insertion point in the right
* by 1 byte, in order to obtain the following:
* but also move the pointers before the insertion point to the right
* by shift bytes, in order to obtain the following:
*
* [numc][ab.x][ap][bp]....[xp]|auxp| */
* [HDR*][ab.d][e...][Aptr][Bptr][....][Dptr][Eptr]|AUXP|
*/
src = n->data+pos;
memmove(src+1,src,n->size-pos+sizeof(raxNode*)*pos);
memmove(src+1,src,n->size-pos);
/* We can now set the character and its child node pointer to get:
*
* [numc][abcx][ap][bp][cp]....|auxp|
* [numc][abcx][ap][bp][cp][xp]|auxp| */
* [HDR*][abcd][e...][Aptr][Bptr][....][Dptr][Eptr]|AUXP|
* [HDR*][abcd][e...][Aptr][Bptr][Cptr][Dptr][Eptr]|AUXP|
*/
n->data[pos] = c;
n->size++;
raxNode **childfield = (raxNode**)(n->data+n->size+sizeof(raxNode*)*pos);
src = (unsigned char*) raxNodeFirstChildPtr(n);
raxNode **childfield = (raxNode**)(src+sizeof(raxNode*)*pos);
memcpy(childfield,&child,sizeof(child));
*childptr = child;
*parentlink = childfield;
return n;
}
/* Return the pointer to the last child pointer in a node. For the compressed
* nodes this is the only child pointer. */
#define raxNodeLastChildPtr(n) ((raxNode**) ( \
((char*)(n)) + \
raxNodeCurrentLength(n) - \
sizeof(raxNode*) - \
(((n)->iskey && !(n)->isnull) ? sizeof(void*) : 0) \
))
/* Return the pointer to the first child pointer. */
#define raxNodeFirstChildPtr(n) ((raxNode**)((n)->data+(n)->size))
/* Turn the node 'n', that must be a node without any children, into a
* compressed node representing a set of nodes linked one after the other
* and having exactly one child each. The node can be a key or not: this
@ -321,7 +404,7 @@ raxNode *raxCompressNode(raxNode *n, unsigned char *s, size_t len, raxNode **chi
if (*child == NULL) return NULL;
/* Make space in the parent node. */
newsize = sizeof(raxNode)+len+sizeof(raxNode*);
newsize = sizeof(raxNode)+len+raxPadding(len)+sizeof(raxNode*);
if (n->iskey) {
data = raxGetData(n); /* To restore it later. */
if (!n->isnull) newsize += sizeof(void*);
@ -619,13 +702,14 @@ int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **
raxNode *postfix = NULL;
if (trimmedlen) {
nodesize = sizeof(raxNode)+trimmedlen+sizeof(raxNode*);
nodesize = sizeof(raxNode)+trimmedlen+raxPadding(trimmedlen)+
sizeof(raxNode*);
if (h->iskey && !h->isnull) nodesize += sizeof(void*);
trimmed = rax_malloc(nodesize);
}
if (postfixlen) {
nodesize = sizeof(raxNode)+postfixlen+
nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+
sizeof(raxNode*);
postfix = rax_malloc(nodesize);
}
@ -701,11 +785,12 @@ int raxGenericInsert(rax *rax, unsigned char *s, size_t len, void *data, void **
/* Allocate postfix & trimmed nodes ASAP to fail for OOM gracefully. */
size_t postfixlen = h->size - j;
size_t nodesize = sizeof(raxNode)+postfixlen+sizeof(raxNode*);
size_t nodesize = sizeof(raxNode)+postfixlen+raxPadding(postfixlen)+
sizeof(raxNode*);
if (data != NULL) nodesize += sizeof(void*);
raxNode *postfix = rax_malloc(nodesize);
nodesize = sizeof(raxNode)+j+sizeof(raxNode*);
nodesize = sizeof(raxNode)+j+raxPadding(j)+sizeof(raxNode*);
if (h->iskey && !h->isnull) nodesize += sizeof(void*);
raxNode *trimmed = rax_malloc(nodesize);
@ -875,7 +960,7 @@ raxNode *raxRemoveChild(raxNode *parent, raxNode *child) {
return parent;
}
/* Otherwise we need to scan for the children pointer and memmove()
/* Otherwise we need to scan for the child pointer and memmove()
* accordingly.
*
* 1. To start we seek the first element in both the children
@ -900,13 +985,21 @@ raxNode *raxRemoveChild(raxNode *parent, raxNode *child) {
debugf("raxRemoveChild tail len: %d\n", taillen);
memmove(e,e+1,taillen);
/* Since we have one data byte less, also child pointers start one byte
* before now. */
memmove(((char*)cp)-1,cp,(parent->size-taillen-1)*sizeof(raxNode**));
/* Compute the shift, that is the amount of bytes we should move our
* child pointers to the left, since the removal of one edge character
* and the corresponding padding change, may change the layout.
* We just check if in the old version of the node there was at the
* end just a single byte and all padding: in that case removing one char
* will remove a whole sizeof(void*) word. */
size_t shift = ((parent->size+4) % sizeof(void*)) == 1 ? sizeof(void*) : 0;
/* Move the remaining "tail" pointer at the right position as well. */
/* Move the children pointers before the deletion point. */
if (shift)
memmove(((char*)cp)-shift,cp,(parent->size-taillen-1)*sizeof(raxNode**));
/* Move the remaining "tail" pointers at the right position as well. */
size_t valuelen = (parent->iskey && !parent->isnull) ? sizeof(void*) : 0;
memmove(((char*)c)-1,c+1,taillen*sizeof(raxNode**)+valuelen);
memmove(((char*)c)-shift,c+1,taillen*sizeof(raxNode**)+valuelen);
/* 4. Update size. */
parent->size--;
@ -1072,7 +1165,7 @@ int raxRemove(rax *rax, unsigned char *s, size_t len, void **old) {
if (nodes > 1) {
/* If we can compress, create the new node and populate it. */
size_t nodesize =
sizeof(raxNode)+comprsize+sizeof(raxNode*);
sizeof(raxNode)+comprsize+raxPadding(comprsize)+sizeof(raxNode*);
raxNode *new = rax_malloc(nodesize);
/* An out of memory here just means we cannot optimize this
* node, but the tree is left in a consistent state. */
@ -1313,7 +1406,7 @@ int raxIteratorNextStep(raxIterator *it, int noup) {
}
}
/* Seek the grestest key in the subtree at the current node. Return 0 on
/* Seek the greatest key in the subtree at the current node. Return 0 on
* out of memory, otherwise 1. This is an helper function for different
* iteration functions below. */
int raxSeekGreatest(raxIterator *it) {
@ -1793,6 +1886,7 @@ void raxShow(rax *rax) {
/* Used by debugnode() macro to show info about a given node. */
void raxDebugShowNode(const char *msg, raxNode *n) {
if (raxDebugMsg == 0) return;
printf("%s: %p [%.*s] key:%d size:%d children:",
msg, (void*)n, (int)n->size, (char*)n->data, n->iskey, n->size);
int numcld = n->iscompr ? 1 : n->size;
@ -1807,4 +1901,43 @@ void raxDebugShowNode(const char *msg, raxNode *n) {
fflush(stdout);
}
/* Touch all the nodes of a tree returning a check sum. This is useful
* in order to make Valgrind detect if there is something wrong while
* reading the data structure.
*
* This function was used in order to identify Rax bugs after a big refactoring
* using this technique:
*
* 1. The rax-test is executed using Valgrind, adding a printf() so that for
* the fuzz tester we see what iteration in the loop we are in.
* 2. After every modification of the radix tree made by the fuzz tester
* in rax-test.c, we add a call to raxTouch().
* 3. Now as soon as an operation will corrupt the tree, raxTouch() will
* detect it (via Valgrind) immediately. We can add more calls to narrow
* the state.
* 4. At this point a good idea is to enable Rax debugging messages immediately
* before the moment the tree is corrupted, to see what happens.
*/
unsigned long raxTouch(raxNode *n) {
debugf("Touching %p\n", (void*)n);
unsigned long sum = 0;
if (n->iskey) {
sum += (unsigned long)raxGetData(n);
}
int numchildren = n->iscompr ? 1 : n->size;
raxNode **cp = raxNodeFirstChildPtr(n);
int count = 0;
for (int i = 0; i < numchildren; i++) {
if (numchildren > 1) {
sum += (long)n->data[i];
}
raxNode *child;
memcpy(&child,cp,sizeof(child));
if (child == (void*)0x65d1760) count++;
if (count > 1) exit(1);
sum += raxTouch(child);
cp++;
}
return sum;
}

View File

@ -1,3 +1,33 @@
/* Rax -- A radix tree implementation.
*
* Copyright (c) 2017-2018, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef RAX_H
#define RAX_H
@ -77,16 +107,16 @@ typedef struct raxNode {
* Note how the character is not stored in the children but in the
* edge of the parents:
*
* [header strlen=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)
* [header iscompr=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)
*
* if node is compressed (strlen != 0) the node has 1 children.
* if node is compressed (iscompr bit is 1) the node has 1 children.
* In that case the 'size' bytes of the string stored immediately at
* the start of the data section, represent a sequence of successive
* nodes linked one after the other, for which only the last one in
* the sequence is actually represented as a node, and pointed to by
* the current compressed node.
*
* [header strlen=3][xyz][z-ptr](value-ptr?)
* [header iscompr=1][xyz][z-ptr](value-ptr?)
*
* Both compressed and not compressed nodes can represent a key
* with associated data in the radix tree at any level (not just terminal
@ -176,6 +206,8 @@ void raxStop(raxIterator *it);
int raxEOF(raxIterator *it);
void raxShow(rax *rax);
uint64_t raxSize(rax *rax);
unsigned long raxTouch(raxNode *n);
void raxSetDebugMsg(int onoff);
/* Internal API. May be used by the node callback in order to access rax nodes
* in a low level way, so this function is exported as well. */

View File

@ -2641,7 +2641,7 @@ void replicationCron(void) {
* be the same as our repl-id.
* 3. We, yet as master, receive some updates, that will not
* increment the master_repl_offset.
* 4. Later we are turned into a slave, connecto to the new
* 4. Later we are turned into a slave, connect to the new
* master that will accept our PSYNC request by second
* replication ID, but there will be data inconsistency
* because we received writes. */

View File

@ -1242,7 +1242,7 @@ void luaMaskCountHook(lua_State *lua, lua_Debug *ar) {
* we need to mask the client executing the script from the event loop.
* If we don't do that the client may disconnect and could no longer be
* here when the EVAL command will return. */
aeDeleteFileEvent(server.el, server.lua_caller->fd, AE_READABLE);
protectClient(server.lua_caller);
}
if (server.lua_timedout) processEventsWhileBlocked();
if (server.lua_kill) {
@ -1370,10 +1370,9 @@ void evalGenericCommand(client *c, int evalsha) {
if (delhook) lua_sethook(lua,NULL,0,0); /* Disable hook */
if (server.lua_timedout) {
server.lua_timedout = 0;
/* Restore the readable handler that was unregistered when the
* script timeout was detected. */
aeCreateFileEvent(server.el,c->fd,AE_READABLE,
readQueryFromClient,c);
/* Restore the client that was protected when the script timeout
* was detected. */
unprotectClient(c);
if (server.masterhost && server.master)
queueClientForReprocessing(server.master);
}

View File

@ -307,19 +307,20 @@ struct redisCommand redisCommandTable[] = {
{"pfcount",pfcountCommand,-2,"r",0,NULL,1,-1,1,0,0},
{"pfmerge",pfmergeCommand,-2,"wm",0,NULL,1,-1,1,0,0},
{"pfdebug",pfdebugCommand,-3,"w",0,NULL,0,0,0,0,0},
{"xadd",xaddCommand,-5,"wmF",0,NULL,1,1,1,0,0},
{"xadd",xaddCommand,-5,"wmFR",0,NULL,1,1,1,0,0},
{"xrange",xrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
{"xrevrange",xrevrangeCommand,-4,"r",0,NULL,1,1,1,0,0},
{"xlen",xlenCommand,2,"rF",0,NULL,1,1,1,0,0},
{"xread",xreadCommand,-4,"rs",0,xreadGetKeys,1,1,1,0,0},
{"xreadgroup",xreadCommand,-7,"ws",0,xreadGetKeys,1,1,1,0,0},
{"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
{"xsetid",xsetidCommand,3,"wmF",0,NULL,1,1,1,0,0},
{"xack",xackCommand,-4,"wF",0,NULL,1,1,1,0,0},
{"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
{"xclaim",xclaimCommand,-6,"wF",0,NULL,1,1,1,0,0},
{"xinfo",xinfoCommand,-2,"r",0,NULL,2,2,1,0,0},
{"xpending",xpendingCommand,-3,"rR",0,NULL,1,1,1,0,0},
{"xclaim",xclaimCommand,-6,"wRF",0,NULL,1,1,1,0,0},
{"xinfo",xinfoCommand,-2,"rR",0,NULL,2,2,1,0,0},
{"xdel",xdelCommand,-3,"wF",0,NULL,1,1,1,0,0},
{"xtrim",xtrimCommand,-2,"wF",0,NULL,1,1,1,0,0},
{"xtrim",xtrimCommand,-2,"wFR",0,NULL,1,1,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0},
@ -1702,6 +1703,7 @@ void initServerConfig(void) {
server.expireCommand = lookupCommandByCString("expire");
server.pexpireCommand = lookupCommandByCString("pexpire");
server.xclaimCommand = lookupCommandByCString("xclaim");
server.xgroupCommand = lookupCommandByCString("xgroup");
/* Slow log */
server.slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;
@ -2617,8 +2619,11 @@ int processCommand(client *c) {
if (server.current_client == NULL) return C_ERR;
/* It was impossible to free enough memory, and the command the client
* is trying to execute is denied during OOM conditions? Error. */
if ((c->cmd->flags & CMD_DENYOOM) && out_of_memory) {
* is trying to execute is denied during OOM conditions or the client
* is in MULTI/EXEC context? Error. */
if (out_of_memory &&
(c->cmd->flags & CMD_DENYOOM ||
(c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {
flagTransaction(c);
addReply(c, shared.oomerr);
return C_OK;

View File

@ -256,6 +256,7 @@ typedef long long mstime_t; /* millisecond time type. */
#define CLIENT_LUA_DEBUG (1<<25) /* Run EVAL in debug mode. */
#define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */
#define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */
#define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@ -989,7 +990,8 @@ struct redisServer {
struct redisCommand *delCommand, *multiCommand, *lpushCommand,
*lpopCommand, *rpopCommand, *zpopminCommand,
*zpopmaxCommand, *sremCommand, *execCommand,
*expireCommand, *pexpireCommand, *xclaimCommand;
*expireCommand, *pexpireCommand, *xclaimCommand,
*xgroupCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */
@ -1473,6 +1475,8 @@ int clientHasPendingReplies(client *c);
void unlinkClient(client *c);
int writeToClient(int fd, client *c, int handler_installed);
void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
#ifdef __GNUC__
void addReplyErrorFormat(client *c, const char *fmt, ...)
@ -2107,6 +2111,7 @@ void xrevrangeCommand(client *c);
void xlenCommand(client *c);
void xreadCommand(client *c);
void xgroupCommand(client *c);
void xsetidCommand(client *c);
void xackCommand(client *c);
void xpendingCommand(client *c);
void xclaimCommand(client *c);

View File

@ -777,8 +777,8 @@ int streamDeleteItem(stream *s, streamID *id) {
* in the standard <ms>-<seq> format, using the simple string protocol
* of REPL. */
void addReplyStreamID(client *c, streamID *id) {
sds replyid = sdscatfmt(sdsempty(),"+%U-%U\r\n",id->ms,id->seq);
addReplySds(c,replyid);
sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
addReplyBulkSds(c,replyid);
}
/* Similar to the above function, but just creates an object, usually useful
@ -791,18 +791,18 @@ robj *createObjectFromStreamID(streamID *id) {
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need
* to propagate this changes in the form of XCLAIM commands. */
void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNACK *nack) {
void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupname, robj *id, streamNACK *nack) {
/* We need to generate an XCLAIM that will work in a idempotent fashion:
*
* XCLAIM <key> <group> <consumer> 0 <id> TIME <milliseconds-unix-time>
* RETRYCOUNT <count> FORCE JUSTID.
* RETRYCOUNT <count> FORCE JUSTID LASTID <id>.
*
* Note that JUSTID is useful in order to avoid that XCLAIM will do
* useless work in the slave side, trying to fetch the stream item. */
robj *argv[12];
robj *argv[14];
argv[0] = createStringObject("XCLAIM",6);
argv[1] = key;
argv[2] = group;
argv[2] = groupname;
argv[3] = createStringObject(nack->consumer->name,sdslen(nack->consumer->name));
argv[4] = createStringObjectFromLongLong(0);
argv[5] = id;
@ -812,7 +812,9 @@ void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNA
argv[9] = createStringObjectFromLongLong(nack->delivery_count);
argv[10] = createStringObject("FORCE",5);
argv[11] = createStringObject("JUSTID",6);
propagate(server.xclaimCommand,c->db->id,argv,12,PROPAGATE_AOF|PROPAGATE_REPL);
argv[12] = createStringObject("LASTID",6);
argv[13] = createObjectFromStreamID(&group->last_id);
propagate(server.xclaimCommand,c->db->id,argv,14,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[3]);
decrRefCount(argv[4]);
@ -822,11 +824,32 @@ void streamPropagateXCLAIM(client *c, robj *key, robj *group, robj *id, streamNA
decrRefCount(argv[9]);
decrRefCount(argv[10]);
decrRefCount(argv[11]);
decrRefCount(argv[12]);
decrRefCount(argv[13]);
}
/* We need this when we want to propoagate the new last-id of a consumer group
* that was consumed by XREADGROUP with the NOACK option: in that case we can't
* propagate the last ID just using the XCLAIM LASTID option, so we emit
*
* XGROUP SETID <key> <groupname> <id>
*/
void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
robj *argv[5];
argv[0] = createStringObject("XGROUP",6);
argv[1] = createStringObject("SETID",5);
argv[2] = key;
argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id);
propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
decrRefCount(argv[4]);
}
/* Send the specified range to the client 'c'. The range the client will
* receive is between start and end inclusive, if 'count' is non zero, no more
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
* than 'count' elements are sent. The 'end' pointer can be NULL to mean that
* we want all the elements from 'start' till the end of the stream. If 'rev'
* is non zero, elements are produced in reversed order from end to start.
*
@ -873,6 +896,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamIterator si;
int64_t numfields;
streamID id;
int propagate_last_id = 0;
/* If a group was passed, we check if the request is about messages
* never delivered so far (normally this happens when ">" ID is passed).
@ -892,8 +916,10 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0)
if (group && streamCompareID(&id,&group->last_id) > 0) {
group->last_id = id;
propagate_last_id = 1;
}
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
@ -953,9 +979,12 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
/* Propagate as XCLAIM. */
if (spi) {
robj *idarg = createObjectFromStreamID(&id);
streamPropagateXCLAIM(c,spi->keyname,spi->groupname,idarg,nack);
streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
decrRefCount(idarg);
}
} else {
if (propagate_last_id)
streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
}
arraylen++;
@ -1119,8 +1148,20 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin
return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
}
/* We propagate MAXLEN ~ <count> as MAXLEN = <resulting-len-of-stream>
* otherwise trimming is no longer determinsitic on replicas / AOF. */
void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) {
robj *maxlen_obj = createStringObjectFromLongLong(s->length);
robj *equal_obj = createStringObject("=",1);
/* XADD key [MAXLEN <count>] <ID or *> [field value] [field value] ... */
rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
decrRefCount(equal_obj);
decrRefCount(maxlen_obj);
}
/* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) {
streamID id;
int id_given = 0; /* Was an ID different than "*" specified? */
@ -1140,11 +1181,14 @@ void xaddCommand(client *c) {
* creation. */
break;
} else if (!strcasecmp(opt,"maxlen") && moreargs) {
approx_maxlen = 0;
char *next = c->argv[i+1]->ptr;
/* Check for the form MAXLEN ~ <count>. */
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
approx_maxlen = 1;
i++;
} else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
@ -1191,18 +1235,12 @@ void xaddCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
/* Remove older elements if MAXLEN was specified. */
if (maxlen >= 0) {
if (!streamTrimByLength(s,maxlen,approx_maxlen)) {
/* If no trimming was performed, for instance because approximated
* trimming length was specified, rewrite the MAXLEN argument
* as zero, so that the command is propagated without trimming. */
robj *zeroobj = createStringObjectFromLongLong(0);
rewriteClientCommandArgument(c,maxlen_arg_idx,zeroobj);
decrRefCount(zeroobj);
} else {
/* Notify xtrim event if needed. */
if (streamTrimByLength(s,maxlen,approx_maxlen)) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
}
if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
}
/* Let's rewrite the ID argument with the one actually generated for
@ -1222,7 +1260,7 @@ void xrangeGenericCommand(client *c, int rev) {
robj *o;
stream *s;
streamID startid, endid;
long long count = 0;
long long count = -1;
robj *startarg = rev ? c->argv[3] : c->argv[2];
robj *endarg = rev ? c->argv[2] : c->argv[3];
@ -1249,7 +1287,13 @@ void xrangeGenericCommand(client *c, int rev) {
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
if (count == 0) {
addReply(c,shared.nullmultibulk);
} else {
if (count == -1) count = 0;
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0,NULL);
}
}
/* XRANGE key start end [COUNT <n>] */
@ -1645,13 +1689,14 @@ uint64_t streamDelConsumer(streamCG *cg, sds name) {
* Consumer groups commands
* ----------------------------------------------------------------------- */
/* XGROUP CREATE <key> <groupname> <id or $>
/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
* XGROUP SETID <key> <groupname> <id or $>
* XGROUP DESTROY <key> <groupname>
* XGROUP DELCONSUMER <key> <groupname> <consumername> */
void xgroupCommand(client *c) {
const char *help[] = {
"CREATE <key> <groupname> <id or $> -- Create a new consumer group.",
"CREATE <key> <groupname> <id or $> [opt] -- Create a new consumer group.",
" option MKSTREAM: create the empty stream if it does not exist.",
"SETID <key> <groupname> <id or $> -- Set the current group ID.",
"DESTROY <key> <groupname> -- Remove the specified group.",
"DELCONSUMER <key> <groupname> <consumer> -- Remove the specified consumer.",
@ -1662,13 +1707,39 @@ NULL
sds grpname = NULL;
streamCG *cg = NULL;
char *opt = c->argv[1]->ptr; /* Subcommand name. */
int mkstream = 0;
robj *o;
/* Lookup the key now, this is common for all the subcommands but HELP. */
if (c->argc >= 4) {
robj *o = lookupKeyWriteOrReply(c,c->argv[2],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
s = o->ptr;
/* CREATE has an MKSTREAM option that creates the stream if it
* does not exist. */
if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {
if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {
addReplySubcommandSyntaxError(c);
return;
}
mkstream = 1;
grpname = c->argv[3]->ptr;
}
/* Everything but the "HELP" option requires a key and group name. */
if (c->argc >= 4) {
o = lookupKeyWrite(c->db,c->argv[2]);
if (o) s = o->ptr;
grpname = c->argv[3]->ptr;
}
/* Check for missing key/group. */
if (c->argc >= 4 && !mkstream) {
/* At this point key must exist, or there is an error. */
if (o == NULL) {
addReplyError(c,
"The XGROUP subcommand requires the key to exist. "
"Note that for CREATE you may want to use the MKSTREAM "
"option to create an empty stream automatically.");
return;
}
if (checkType(c,o,OBJ_STREAM)) return;
/* Certain subcommands require the group to exist. */
if ((cg = streamLookupCG(s,grpname)) == NULL &&
@ -1683,13 +1754,26 @@ NULL
}
/* Dispatch the different subcommands. */
if (!strcasecmp(opt,"CREATE") && c->argc == 5) {
if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
id = s->last_id;
if (s) {
id = s->last_id;
} else {
id.ms = 0;
id.seq = 0;
}
} else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) {
return;
}
/* Handle the MKSTREAM option now that the command can no longer fail. */
if (s == NULL && mkstream) {
o = createStreamObject();
dbAdd(c->db,c->argv[2],o);
s = o->ptr;
}
streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
if (cg) {
addReply(c,shared.ok);
@ -1737,6 +1821,40 @@ NULL
}
}
/* XSETID <stream> <groupname> <id>
*
* Set the internal "last ID" of a stream. */
void xsetidCommand(client *c) {
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
streamID id;
if (streamParseStrictIDOrReply(c,c->argv[2],&id,0) != C_OK) return;
/* If the stream has at least one item, we want to check that the user
* is setting a last ID that is equal or greater than the current top
* item, otherwise the fundamental ID monotonicity assumption is violated. */
if (s->length > 0) {
streamID maxid;
streamIterator si;
streamIteratorStart(&si,s,NULL,NULL,1);
int64_t numfields;
streamIteratorGetID(&si,&maxid,&numfields);
streamIteratorStop(&si);
if (streamCompareID(&id,&maxid) < 0) {
addReplyError(c,"The ID specified in XSETID is smaller than the "
"target stream top item");
return;
}
}
s->last_id = id;
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id);
}
/* XACK <key> <group> <id> <id> ... <id>
*
* Acknowledge a message as processed. In practical terms we just check the
@ -1782,7 +1900,7 @@ void xackCommand(client *c) {
addReplyLongLong(c,acknowledged);
}
/* XPENDING <key> <group> [<start> <stop> <count>] [<consumer>]
/* XPENDING <key> <group> [<start> <stop> <count> [<consumer>]]
*
* If start and stop are omitted, the command just outputs information about
* the amount of pending messages for the key/group pair, together with
@ -1811,6 +1929,7 @@ void xpendingCommand(client *c) {
if (c->argc >= 6) {
if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR)
return;
if (count < 0) count = 0;
if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR)
return;
if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR)
@ -1984,6 +2103,14 @@ void xpendingCommand(client *c) {
* Return just an array of IDs of messages successfully claimed,
* without returning the actual message.
*
* 6. LASTID <id>:
* Update the consumer group last ID with the specified ID if the
* current last ID is smaller than the provided one.
* This is used for replication / AOF, so that when we read from a
* consumer group, the XCLAIM that gets propagated to give ownership
* to the consumer, is also used in order to update the group current
* ID.
*
* The command returns an array of messages that the user
* successfully claimed, so that the caller is able to understand
* what messages it is now in charge of. */
@ -2029,6 +2156,8 @@ void xclaimCommand(client *c) {
/* If we stopped because some IDs cannot be parsed, perhaps they
* are trailing options. */
time_t now = mstime();
streamID last_id = {0,0};
int propagate_last_id = 0;
for (; j < c->argc; j++) {
int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
char *opt = c->argv[j]->ptr;
@ -2052,12 +2181,20 @@ void xclaimCommand(client *c) {
if (getLongLongFromObjectOrReply(c,c->argv[j],&retrycount,
"Invalid RETRYCOUNT option argument for XCLAIM")
!= C_OK) return;
} else if (!strcasecmp(opt,"LASTID") && moreargs) {
j++;
if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) return;
} else {
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
return;
}
}
if (streamCompareID(&last_id,&group->last_id) > 0) {
group->last_id = last_id;
propagate_last_id = 1;
}
if (deliverytime != -1) {
/* If a delivery time was passed, either with IDLE or TIME, we
* do some sanity check on it, and set the deliverytime to now
@ -2081,7 +2218,8 @@ void xclaimCommand(client *c) {
for (int j = 5; j <= last_id_arg; j++) {
streamID id;
unsigned char buf[sizeof(streamID)];
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
serverPanic("StreamID invalid after check. Should not be possible.");
streamEncodeID(buf,&id);
/* Lookup the ID in the group PEL. */
@ -2111,8 +2249,12 @@ void xclaimCommand(client *c) {
if (nack != raxNotFound) {
/* We need to check if the minimum idle time requested
* by the caller is satisfied by this entry. */
if (minidle) {
* by the caller is satisfied by this entry.
*
* Note that the nack could be created by FORCE, in this
* case there was no pre-existing entry and minidle should
* be ignored, but in that case nick->consumer is NULL. */
if (nack->consumer && minidle) {
mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle) continue;
}
@ -2138,10 +2280,15 @@ void xclaimCommand(client *c) {
arraylen++;
/* Propagate this change. */
streamPropagateXCLAIM(c,c->argv[1],c->argv[3],c->argv[j],nack);
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
server.dirty++;
}
}
if (propagate_last_id) {
streamPropagateGroupID(c,c->argv[1],group,c->argv[2]);
server.dirty++;
}
setDeferredMultiBulkLength(c,arraylenptr,arraylen);
preventCommandPropagation(c);
}
@ -2187,7 +2334,7 @@ void xdelCommand(client *c) {
*
* List of options:
*
* MAXLEN [~] <count> -- Trim so that the stream will be capped at
* MAXLEN [~|=] <count> -- Trim so that the stream will be capped at
* the specified length. Use ~ before the
* count in order to demand approximated trimming
* (like XADD MAXLEN option).
@ -2206,9 +2353,10 @@ void xtrimCommand(client *c) {
/* Argument parsing. */
int trim_strategy = TRIM_STRATEGY_NONE;
long long maxlen = 0; /* 0 means no maximum length. */
long long maxlen = -1; /* If left to -1 no trimming is performed. */
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
the maxium length is not applied verbatim. */
int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
/* Parse options. */
int i = 2; /* Start of options. */
@ -2216,16 +2364,25 @@ void xtrimCommand(client *c) {
int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
char *opt = c->argv[i]->ptr;
if (!strcasecmp(opt,"maxlen") && moreargs) {
approx_maxlen = 0;
trim_strategy = TRIM_STRATEGY_MAXLEN;
char *next = c->argv[i+1]->ptr;
/* Check for the form MAXLEN ~ <count>. */
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
approx_maxlen = 1;
i++;
} else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
if (maxlen < 0) {
addReplyError(c,"The MAXLEN argument must be >= 0.");
return;
}
i++;
maxlen_arg_idx = i;
} else {
addReply(c,shared.syntaxerr);
return;
@ -2246,6 +2403,7 @@ void xtrimCommand(client *c) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
server.dirty += deleted;
if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
}
addReplyLongLong(c,deleted);
}
@ -2307,11 +2465,11 @@ NULL
if (idle < 0) idle = 0;
addReplyMultiBulkLen(c,6);
addReplyStatus(c,"name");
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
addReplyStatus(c,"pending");
addReplyBulkCString(c,"pending");
addReplyLongLong(c,raxSize(consumer->pel));
addReplyStatus(c,"idle");
addReplyBulkCString(c,"idle");
addReplyLongLong(c,idle);
}
raxStop(&ri);
@ -2329,28 +2487,28 @@ NULL
while(raxNext(&ri)) {
streamCG *cg = ri.data;
addReplyMultiBulkLen(c,8);
addReplyStatus(c,"name");
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,ri.key,ri.key_len);
addReplyStatus(c,"consumers");
addReplyBulkCString(c,"consumers");
addReplyLongLong(c,raxSize(cg->consumers));
addReplyStatus(c,"pending");
addReplyBulkCString(c,"pending");
addReplyLongLong(c,raxSize(cg->pel));
addReplyStatus(c,"last-delivered-id");
addReplyBulkCString(c,"last-delivered-id");
addReplyStreamID(c,&cg->last_id);
}
raxStop(&ri);
} else if (!strcasecmp(opt,"STREAM") && c->argc == 3) {
/* XINFO STREAM <key> (or the alias XINFO <key>). */
addReplyMultiBulkLen(c,14);
addReplyStatus(c,"length");
addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length);
addReplyStatus(c,"radix-tree-keys");
addReplyBulkCString(c,"radix-tree-keys");
addReplyLongLong(c,raxSize(s->rax));
addReplyStatus(c,"radix-tree-nodes");
addReplyBulkCString(c,"radix-tree-nodes");
addReplyLongLong(c,s->rax->numnodes);
addReplyStatus(c,"groups");
addReplyBulkCString(c,"groups");
addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
addReplyStatus(c,"last-generated-id");
addReplyBulkCString(c,"last-generated-id");
addReplyStreamID(c,&s->last_id);
/* To emit the first/last entry we us the streamReplyWithRange()
@ -2359,11 +2517,11 @@ NULL
streamID start, end;
start.ms = start.seq = 0;
end.ms = end.seq = UINT64_MAX;
addReplyStatus(c,"first-entry");
addReplyBulkCString(c,"first-entry");
count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
if (!count) addReply(c,shared.nullbulk);
addReplyStatus(c,"last-entry");
addReplyBulkCString(c,"last-entry");
count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
if (!count) addReply(c,shared.nullbulk);

View File

@ -301,24 +301,22 @@ void mgetCommand(client *c) {
}
void msetGenericCommand(client *c, int nx) {
int j, busykeys = 0;
int j;
if ((c->argc % 2) == 0) {
addReplyError(c,"wrong number of arguments for MSET");
return;
}
/* Handle the NX flag. The MSETNX semantic is to return zero and don't
* set nothing at all if at least one already key exists. */
* set anything if at least one key alerady exists. */
if (nx) {
for (j = 1; j < c->argc; j += 2) {
if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
busykeys++;
addReply(c, shared.czero);
return;
}
}
if (busykeys) {
addReply(c, shared.czero);
return;
}
}
for (j = 1; j < c->argc; j += 2) {

View File

@ -33,7 +33,7 @@ start_server {tags {"dump"}} {
set now [clock milliseconds]
r restore foo [expr $now+3000] $encoded absttl
set ttl [r pttl foo]
assert {$ttl >= 2990 && $ttl <= 3000}
assert {$ttl >= 2900 && $ttl <= 3100}
r get foo
} {bar}

View File

@ -9,6 +9,17 @@ start_server {
set err
} {BUSYGROUP*}
test {XGROUP CREATE: automatic stream creation fails without MKSTREAM} {
r DEL mystream
catch {r XGROUP CREATE mystream mygroup $} err
set err
} {ERR*}
test {XGROUP CREATE: automatic stream creation works with MKSTREAM} {
r DEL mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
} {OK}
test {XREADGROUP will return only new elements} {
r XADD mystream * a 1
r XADD mystream * b 2
@ -96,4 +107,53 @@ start_server {
set c [llength [lindex [r xreadgroup group g1 c2 streams events >] 0 1]]
assert {$c == 5}
}
start_server {} {
set master [srv -1 client]
set master_host [srv -1 host]
set master_port [srv -1 port]
set slave [srv 0 client]
foreach noack {0 1} {
test "Consumer group last ID propagation to slave (NOACK=$noack)" {
$slave slaveof $master_host $master_port
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Replication not started."
}
$master del stream
$master xadd stream * a 1
$master xadd stream * a 2
$master xadd stream * a 3
$master xgroup create stream mygroup 0
# Consume the first two items on the master
for {set j 0} {$j < 2} {incr j} {
if {$noack} {
set item [$master xreadgroup group mygroup \
myconsumer COUNT 1 NOACK STREAMS stream >]
} else {
set item [$master xreadgroup group mygroup \
myconsumer COUNT 1 STREAMS stream >]
}
set id [lindex $item 0 1 0 0]
if {$noack == 0} {
assert {[$master xack stream mygroup $id] eq "1"}
}
}
# Turn slave into master
$slave slaveof no one
set item [$slave xreadgroup group mygroup myconsumer \
COUNT 1 STREAMS stream >]
# The consumed enty should be the third
set myentry [lindex $item 0 1 0 1]
assert {$myentry eq {a 3}}
}
}
}
}

View File

@ -317,3 +317,97 @@ start_server {
assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}}
}
}
start_server {tags {"stream"} overrides {appendonly yes}} {
test {XADD with MAXLEN > xlen can propagate correctly} {
for {set j 0} {$j < 100} {incr j} {
r XADD mystream * xitem v
}
r XADD mystream MAXLEN 200 * xitem v
incr j
assert {[r xlen mystream] == $j}
r debug loadaof
r XADD mystream * xitem v
incr j
assert {[r xlen mystream] == $j}
}
}
start_server {tags {"stream"} overrides {appendonly yes}} {
test {XADD with ~ MAXLEN can propagate correctly} {
for {set j 0} {$j < 100} {incr j} {
r XADD mystream * xitem v
}
r XADD mystream MAXLEN ~ $j * xitem v
incr j
assert {[r xlen mystream] == $j}
r config set stream-node-max-entries 1
r debug loadaof
r XADD mystream * xitem v
incr j
assert {[r xlen mystream] == $j}
}
}
start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
test {XTRIM with ~ MAXLEN can propagate correctly} {
for {set j 0} {$j < 100} {incr j} {
r XADD mystream * xitem v
}
r XTRIM mystream MAXLEN ~ 85
assert {[r xlen mystream] == 89}
r config set stream-node-max-entries 1
r debug loadaof
r XADD mystream * xitem v
incr j
assert {[r xlen mystream] == 90}
}
}
start_server {tags {"xsetid"}} {
test {XADD can CREATE an empty stream} {
r XADD mystream MAXLEN 0 * a b
assert {[dict get [r xinfo stream mystream] length] == 0}
}
test {XSETID can set a specific ID} {
r XSETID mystream "200-0"
assert {[dict get [r xinfo stream mystream] last-generated-id] == "200-0"}
}
test {XSETID cannot SETID with smaller ID} {
r XADD mystream * a b
catch {r XSETID mystream "1-1"} err
r XADD mystream MAXLEN 0 * a b
set err
} {ERR*smaller*}
test {XSETID cannot SETID on non-existent key} {
catch {r XSETID stream 1-1} err
set _ $err
} {ERR no such key}
}
start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no}} {
test {Empty stream can be rewrite into AOF correctly} {
r XADD mystream MAXLEN 0 * a b
assert {[dict get [r xinfo stream mystream] length] == 0}
r bgrewriteaof
waitForBgrewriteaof r
r debug loadaof
assert {[dict get [r xinfo stream mystream] length] == 0}
}
test {Stream can be rewrite into AOF correctly after XDEL lastid} {
r XSETID mystream 0-0
r XADD mystream 1-1 a b
r XADD mystream 2-2 a b
assert {[dict get [r xinfo stream mystream] length] == 2}
r XDEL mystream 2-2
r bgrewriteaof
waitForBgrewriteaof r
r debug loadaof
assert {[dict get [r xinfo stream mystream] length] == 1}
assert {[dict get [r xinfo stream mystream] last-generated-id] == "2-2"}
}
}

View File

@ -1,12 +1,17 @@
#!/usr/bin/env tclsh
if {[llength $::argv] != 2} {
puts "Usage: $::argv0 <branch> <version>"
if {[llength $::argv] != 2 && [llength $::argv] != 3} {
puts "Usage: $::argv0 <branch> <version> \[<num-commits>\]"
exit 1
}
set branch [lindex $::argv 0]
set ver [lindex $::argv 1]
if {[llength $::argv] == 3} {
set count [lindex ::$argv 2]
} else {
set count 100
}
set template {
================================================================================
@ -21,7 +26,7 @@ append template "\n\n"
set date [clock format [clock seconds]]
set template [string map [list %ver% $ver %date% $date] $template]
append template [exec git log $branch~100..$branch "--format=format:%an in commit %h:%n %s" --shortstat]
append template [exec git log $branch~$count..$branch "--format=format:%an in commit %h:%n %s" --shortstat]
#Older, more verbose version.
#