Add stream consumer group lag tracking and reporting (#9127)

Adds the ability to track the lag of a consumer group (CG), that is, the number
of entries yet-to-be-delivered from the stream.

The proposed constant-time solution is in the spirit of "best-effort."

Partially addresses #8737.

## Description of approach

We add a new "entries_added" property to the stream. This starts at 0 for a new
stream and is incremented by 1 with every `XADD`.  It is essentially an all-time
counter of the entries added to the stream.

Given the stream's length and this counter value, we can trivially find the logical
"entries_added" counter of the first ID if and only if the stream is contiguous.
A fragmented stream contains one or more tombstones generated by `XDEL`s.
The new "xdel_max_id" stream property tracks the latest tombstone.

The CG also tracks its last delivered ID's as an "entries_read" counter and
increments it independently when delivering new messages, unless the this
read counter is invalid (-1 means invalid offset). When the CG's counter is
available, the reported lag is the difference between added and read counters.

Lastly, this also adds a "first_id" field to the stream structure in order to make
looking it up cheaper in most cases.

## Limitations

There are two cases in which the mechanism isn't able to track the lag.
In these cases, `XINFO` replies with `null` in the "lag" field.

The first case is when a CG is created with an arbitrary last delivered ID,
that isn't "0-0", nor the first or the last entries of the stream. In this case,
it is impossible to obtain a valid read counter (short of an O(N) operation).
The second case is when there are one or more tombstones fragmenting
the stream's entries range.

In both cases, given enough time and assuming that the consumers are
active (reading and lacking) and advancing, the CG should be able to
catch up with the tip of the stream and report zero lag.
Once that's achieved, lag tracking would resume as normal (until the
next tombstone is set).

## API changes

* `XGROUP CREATE` added with the optional named argument `[ENTRIESREAD entries-read]`
  for explicitly specifying the new CG's counter.
* `XGROUP SETID` added with an optional positional argument `[ENTRIESREAD entries-read]`
  for specifying the CG's counter.
* `XINFO` reports the maximal tombstone ID, the recorded first entry ID, and total
  number of entries added to the stream.
* `XINFO` reports the current lag and logical read counter of CGs.
* `XSETID` is an internal command that's used in replication/aof. It has been added with
  the optional positional arguments `[ENTRIESADDED entries-added] [MAXDELETEDID max-deleted-entry-id]`
  for propagating the CG's offset and maximal tombstone ID of the stream.

## The generic unsolved problem

The current stream implementation doesn't provide an efficient way to obtain the
approximate/exact size of a range of entries. While it could've been nice to have
that ability (#5813) in general, let alone specifically in the context of CGs, the risk
and complexities involved in such implementation are in all likelihood prohibitive.

## A refactoring note

The `streamGetEdgeID` has been refactored to accommodate both the existing seek
of any entry as well as seeking non-deleted entries (the addition of the `skip_tombstones`
argument). Furthermore, this refactoring also migrated the seek logic to use the
`streamIterator` (rather than `raxIterator`) that was, in turn, extended with the
`skip_tombstones` Boolean struct field to control the emission of these.

Co-authored-by: Guy Benoish <guy.benoish@redislabs.com>
Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
Itamar Haber 2022-02-23 22:34:58 +02:00 committed by GitHub
parent b857928ba7
commit c81c7f51c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 786 additions and 118 deletions

View File

@ -2035,10 +2035,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
/* Append XSETID after XADD, make sure lastid is correct,
* in case of XDEL lastid. */
if (!rioWriteBulkCount(r,'*',3) ||
if (!rioWriteBulkCount(r,'*',7) ||
!rioWriteBulkString(r,"XSETID",6) ||
!rioWriteBulkObject(r,key) ||
!rioWriteBulkStreamID(r,&s->last_id))
!rioWriteBulkStreamID(r,&s->last_id) ||
!rioWriteBulkString(r,"ENTRIESADDED",12) ||
!rioWriteBulkLongLong(r,s->entries_added) ||
!rioWriteBulkString(r,"MAXDELETEDID",12) ||
!rioWriteBulkStreamID(r,&s->max_deleted_entry_id))
{
streamIteratorStop(&si);
return 0;
@ -2053,12 +2057,14 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
while(raxNext(&ri)) {
streamCG *group = ri.data;
/* Emit the XGROUP CREATE in order to create the group. */
if (!rioWriteBulkCount(r,'*',5) ||
if (!rioWriteBulkCount(r,'*',7) ||
!rioWriteBulkString(r,"XGROUP",6) ||
!rioWriteBulkString(r,"CREATE",6) ||
!rioWriteBulkObject(r,key) ||
!rioWriteBulkString(r,(char*)ri.key,ri.key_len) ||
!rioWriteBulkStreamID(r,&group->last_id))
!rioWriteBulkStreamID(r,&group->last_id) ||
!rioWriteBulkString(r,"ENTRIESREAD",11) ||
!rioWriteBulkLongLong(r,group->entries_read))
{
raxStop(&ri);
streamIteratorStop(&si);

View File

@ -5701,7 +5701,7 @@ int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
if (len < 10) return C_ERR;
footer = p+(len-10);
/* Verify RDB version */
/* Set and verify RDB version. */
rdbver = (footer[1] << 8) | footer[0];
if (rdbver_ptr) {
*rdbver_ptr = rdbver;

View File

@ -5998,7 +5998,10 @@ struct redisCommandArg XDEL_Args[] = {
/********** XGROUP CREATE ********************/
/* XGROUP CREATE history */
#define XGROUP_CREATE_History NULL
commandHistory XGROUP_CREATE_History[] = {
{"7.0.0","Added the `entries_read` named argument."},
{0}
};
/* XGROUP CREATE tips */
#define XGROUP_CREATE_tips NULL
@ -6016,6 +6019,7 @@ struct redisCommandArg XGROUP_CREATE_Args[] = {
{"groupname",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"id",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=XGROUP_CREATE_id_Subargs},
{"mkstream",ARG_TYPE_PURE_TOKEN,-1,"MKSTREAM",NULL,NULL,CMD_ARG_OPTIONAL},
{"entries_read",ARG_TYPE_INTEGER,-1,"ENTRIESREAD",NULL,NULL,CMD_ARG_OPTIONAL},
{0}
};
@ -6077,7 +6081,10 @@ struct redisCommandArg XGROUP_DESTROY_Args[] = {
/********** XGROUP SETID ********************/
/* XGROUP SETID history */
#define XGROUP_SETID_History NULL
commandHistory XGROUP_SETID_History[] = {
{"7.0.0","Added the optional `entries_read` argument."},
{0}
};
/* XGROUP SETID tips */
#define XGROUP_SETID_tips NULL
@ -6094,6 +6101,7 @@ struct redisCommandArg XGROUP_SETID_Args[] = {
{"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE},
{"groupname",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"id",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,.subargs=XGROUP_SETID_id_Subargs},
{"entries_read",ARG_TYPE_INTEGER,-1,"ENTRIESREAD",NULL,NULL,CMD_ARG_OPTIONAL},
{0}
};
@ -6104,7 +6112,7 @@ struct redisCommand XGROUP_Subcommands[] = {
{"delconsumer","Delete a consumer from a consumer group.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_DELCONSUMER_History,XGROUP_DELCONSUMER_tips,xgroupCommand,5,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_DELCONSUMER_Args},
{"destroy","Destroy a consumer group.","O(N) where N is the number of entries in the group's pending entries list (PEL).","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_DESTROY_History,XGROUP_DESTROY_tips,xgroupCommand,4,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_DESTROY_Args},
{"help","Show helpful text about the different subcommands","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_HELP_History,XGROUP_HELP_tips,xgroupCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_STREAM},
{"setid","Set a consumer group to an arbitrary last delivered ID value.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_SETID_History,XGROUP_SETID_tips,xgroupCommand,5,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_SETID_Args},
{"setid","Set a consumer group to an arbitrary last delivered ID value.","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XGROUP_SETID_History,XGROUP_SETID_tips,xgroupCommand,-5,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XGROUP_SETID_Args},
{0}
};
@ -6137,7 +6145,10 @@ struct redisCommandArg XINFO_CONSUMERS_Args[] = {
/********** XINFO GROUPS ********************/
/* XINFO GROUPS history */
#define XINFO_GROUPS_History NULL
commandHistory XINFO_GROUPS_History[] = {
{"7.0.0","Added the `entries-read` and `lag` fields"},
{0}
};
/* XINFO GROUPS tips */
#define XINFO_GROUPS_tips NULL
@ -6159,7 +6170,10 @@ struct redisCommandArg XINFO_GROUPS_Args[] = {
/********** XINFO STREAM ********************/
/* XINFO STREAM history */
#define XINFO_STREAM_History NULL
commandHistory XINFO_STREAM_History[] = {
{"7.0.0","Added the `max-deleted-entry-id`, `entries-added`, `recorded-first-entry-id`, `entries-read` and `lag` fields"},
{0}
};
/* XINFO STREAM tips */
#define XINFO_STREAM_tips NULL
@ -6338,7 +6352,10 @@ struct redisCommandArg XREVRANGE_Args[] = {
/********** XSETID ********************/
/* XSETID history */
#define XSETID_History NULL
commandHistory XSETID_History[] = {
{"7.0.0","Added the `entries_added` and `max_deleted_entry_id` arguments."},
{0}
};
/* XSETID tips */
#define XSETID_tips NULL
@ -6347,6 +6364,8 @@ struct redisCommandArg XREVRANGE_Args[] = {
struct redisCommandArg XSETID_Args[] = {
{"key",ARG_TYPE_KEY,0,NULL,NULL,NULL,CMD_ARG_NONE},
{"last-id",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"entries_added",ARG_TYPE_INTEGER,-1,"ENTRIESADDED",NULL,NULL,CMD_ARG_OPTIONAL},
{"max_deleted_entry_id",ARG_TYPE_STRING,-1,"MAXDELETEDID",NULL,NULL,CMD_ARG_OPTIONAL},
{0}
};
@ -7057,7 +7076,7 @@ struct redisCommand redisCommandTable[] = {
{"xread","Return never seen elements in multiple streams, with IDs greater than the ones reported by the caller for each stream. Can block.","For each stream mentioned: O(N) with N being the number of elements being returned, it means that XREAD-ing with a fixed COUNT is O(1). Note that when the BLOCK option is used, XADD will pay O(M) time in order to serve the M clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREAD_History,XREAD_tips,xreadCommand,-4,CMD_BLOCKING|CMD_READONLY|CMD_BLOCKING,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_KEYWORD,.bs.keyword={"STREAMS",1},KSPEC_FK_RANGE,.fk.range={-1,1,2}}},xreadGetKeys,.args=XREAD_Args},
{"xreadgroup","Return new entries from a stream using a consumer group, or access the history of the pending entries for a given consumer. Can block.","For each stream mentioned: O(M) with M being the number of elements returned. If M is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1). On the other side when XREADGROUP blocks, XADD will pay the O(N) time in order to serve the N clients blocked on the stream getting new data.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREADGROUP_History,XREADGROUP_tips,xreadCommand,-7,CMD_BLOCKING|CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_KEYWORD,.bs.keyword={"STREAMS",4},KSPEC_FK_RANGE,.fk.range={-1,1,2}}},xreadGetKeys,.args=XREADGROUP_Args},
{"xrevrange","Return a range of elements in a stream, with IDs matching the specified IDs interval, in reverse order (from greater to smaller IDs) compared to XRANGE","O(N) with N being the number of elements returned. If N is constant (e.g. always asking for the first 10 elements with COUNT), you can consider it O(1).","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XREVRANGE_History,XREVRANGE_tips,xrevrangeCommand,-4,CMD_READONLY,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RO|CMD_KEY_ACCESS,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XREVRANGE_Args},
{"xsetid","An internal command for replicating stream values","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XSETID_History,XSETID_tips,xsetidCommand,3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XSETID_Args},
{"xsetid","An internal command for replicating stream values","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XSETID_History,XSETID_tips,xsetidCommand,-3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_UPDATE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XSETID_Args},
{"xtrim","Trims the stream to (approximately if '~' is passed) a certain size","O(N), with N being the number of evicted entries. Constant times are very small however, since entries are organized in macro nodes containing multiple entries that can be released with a single deallocation.","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STREAM,XTRIM_History,XTRIM_tips,xtrimCommand,-4,CMD_WRITE,ACL_CATEGORY_STREAM,{{NULL,CMD_KEY_RW|CMD_KEY_DELETE,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=XTRIM_Args},
/* string */
{"append","Append a value to a key","O(1). The amortized time complexity is O(1) assuming the appended value is small and the already present value is of any size, since the dynamic string library used by Redis will double the free space available on every reallocation.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_STRING,APPEND_History,APPEND_tips,appendCommand,3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_STRING,{{NULL,CMD_KEY_RW|CMD_KEY_INSERT,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=APPEND_Args},

View File

@ -7,6 +7,12 @@
"arity": -5,
"container": "XGROUP",
"function": "xgroupCommand",
"history": [
[
"7.0.0",
"Added the `entries_read` named argument."
]
],
"command_flags": [
"WRITE",
"DENYOOM"
@ -64,6 +70,12 @@
"name": "mkstream",
"type": "pure-token",
"optional": true
},
{
"token": "ENTRIESREAD",
"name": "entries_read",
"type": "integer",
"optional": true
}
]
}

View File

@ -4,9 +4,15 @@
"complexity": "O(1)",
"group": "stream",
"since": "5.0.0",
"arity": 5,
"arity": -5,
"container": "XGROUP",
"function": "xgroupCommand",
"history": [
[
"7.0.0",
"Added the optional `entries_read` argument."
]
],
"command_flags": [
"WRITE"
],
@ -57,6 +63,12 @@
"token": "$"
}
]
},
{
"name": "entries_read",
"token": "ENTRIESREAD",
"type": "integer",
"optional": true
}
]
}

View File

@ -6,6 +6,12 @@
"since": "5.0.0",
"arity": 3,
"container": "XINFO",
"history": [
[
"7.0.0",
"Added the `entries-read` and `lag` fields"
]
],
"function": "xinfoCommand",
"command_flags": [
"READONLY"

View File

@ -6,6 +6,12 @@
"since": "5.0.0",
"arity": -3,
"container": "XINFO",
"history": [
[
"7.0.0",
"Added the `max-deleted-entry-id`, `entries-added`, `recorded-first-entry-id`, `entries-read` and `lag` fields"
]
],
"function": "xinfoCommand",
"command_flags": [
"READONLY"

View File

@ -4,8 +4,14 @@
"complexity": "O(1)",
"group": "stream",
"since": "5.0.0",
"arity": 3,
"arity": -3,
"function": "xsetidCommand",
"history": [
[
"7.0.0",
"Added the `entries_added` and `max_deleted_entry_id` arguments."
]
],
"command_flags": [
"WRITE",
"DENYOOM",
@ -43,6 +49,18 @@
{
"name": "last-id",
"type": "string"
},
{
"name": "entries_added",
"token": "ENTRIESADDED",
"type": "integer",
"optional": true
},
{
"name": "max_deleted_entry_id",
"token": "MAXDELETEDID",
"type": "string",
"optional": true
}
]
}

View File

@ -692,7 +692,7 @@ int rdbSaveObjectType(rio *rdb, robj *o) {
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM:
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS_2);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
@ -986,6 +986,19 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
nwritten += n;
if ((n = rdbSaveLen(rdb,s->last_id.seq)) == -1) return -1;
nwritten += n;
/* Save the first entry ID. */
if ((n = rdbSaveLen(rdb,s->first_id.ms)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,s->first_id.seq)) == -1) return -1;
nwritten += n;
/* Save the maximal tombstone ID. */
if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.ms)) == -1) return -1;
nwritten += n;
if ((n = rdbSaveLen(rdb,s->max_deleted_entry_id.seq)) == -1) return -1;
nwritten += n;
/* Save the offset. */
if ((n = rdbSaveLen(rdb,s->entries_added)) == -1) return -1;
nwritten += n;
/* The consumer groups and their clients are part of the stream
* type, so serialize every consumer group. */
@ -1020,6 +1033,13 @@ ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key, int dbid) {
return -1;
}
nwritten += n;
/* Save the group's logical reads counter. */
if ((n = rdbSaveLen(rdb,cg->entries_read)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
/* Save the global PEL. */
if ((n = rdbSaveStreamPEL(rdb,cg->pel,1)) == -1) {
@ -2321,7 +2341,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
rdbReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
break;
}
} else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS) {
} else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS || rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
o = createStreamObject();
stream *s = o->ptr;
uint64_t listpacks = rdbLoadLen(rdb,NULL);
@ -2397,6 +2417,30 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
/* Load the last entry ID. */
s->last_id.ms = rdbLoadLen(rdb,NULL);
s->last_id.seq = rdbLoadLen(rdb,NULL);
if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
/* Load the first entry ID. */
s->first_id.ms = rdbLoadLen(rdb,NULL);
s->first_id.seq = rdbLoadLen(rdb,NULL);
/* Load the maximal deleted entry ID. */
s->max_deleted_entry_id.ms = rdbLoadLen(rdb,NULL);
s->max_deleted_entry_id.seq = rdbLoadLen(rdb,NULL);
/* Load the offset. */
s->entries_added = rdbLoadLen(rdb,NULL);
} else {
/* During migration the offset can be initialized to the stream's
* length. At this point, we also don't care about tombstones
* because CG offsets will be later initialized as well. */
s->max_deleted_entry_id.ms = 0;
s->max_deleted_entry_id.seq = 0;
s->entries_added = s->length;
/* Since the rax is already loaded, we can find the first entry's
* ID. */
streamGetEdgeID(s,1,1,&s->first_id);
}
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream object metadata loading failed.");
@ -2432,8 +2476,22 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
decrRefCount(o);
return NULL;
}
/* Load group offset. */
uint64_t cg_offset;
if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
cg_offset = rdbLoadLen(rdb,NULL);
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream cgroup offset loading failed.");
sdsfree(cgname);
decrRefCount(o);
return NULL;
}
} else {
cg_offset = streamEstimateDistanceFromFirstEverEntry(s,&cg_id);
}
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id);
streamCG *cgroup = streamCreateCG(s,cgname,sdslen(cgname),&cg_id,cg_offset);
if (cgroup == NULL) {
rdbReportCorruptRDB("Duplicated consumer group name %s",
cgname);

View File

@ -94,10 +94,11 @@
#define RDB_TYPE_HASH_LISTPACK 16
#define RDB_TYPE_ZSET_LISTPACK 17
#define RDB_TYPE_LIST_QUICKLIST_2 18
#define RDB_TYPE_STREAM_LISTPACKS_2 19
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
/* Test if a type is an object type. */
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 19))
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_FUNCTION 246 /* engine data */

View File

@ -1767,6 +1767,7 @@ void createSharedObjects(void) {
shared.retrycount = createStringObject("RETRYCOUNT",10);
shared.force = createStringObject("FORCE",5);
shared.justid = createStringObject("JUSTID",6);
shared.entriesread = createStringObject("ENTRIESREAD",11);
shared.lastid = createStringObject("LASTID",6);
shared.default_username = createStringObject("default",7);
shared.ping = createStringObject("ping",4);

View File

@ -1223,7 +1223,7 @@ struct sharedObjectsStruct {
*rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax,
*emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim,
*script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire,
*time, *pxat, *absttl, *retrycount, *force, *justid,
*time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
*getack, *special_asterick, *special_equals, *default_username, *redacted,
*ssubscribebulk,*sunsubscribebulk,

View File

@ -15,8 +15,11 @@ typedef struct streamID {
typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Number of elements inside this stream. */
uint64_t length; /* Current number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
streamID first_id; /* The first non-tombstone entry, zero if empty. */
streamID max_deleted_entry_id; /* The maximal ID that was deleted. */
uint64_t entries_added; /* All time count of elements added. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
@ -34,6 +37,7 @@ typedef struct streamIterator {
unsigned char *master_fields_ptr; /* Master field to emit next. */
int entry_flags; /* Flags of entry we are emitting. */
int rev; /* True if iterating end to start (reverse). */
int skip_tombstones; /* True if not emitting tombstone entries. */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
@ -52,6 +56,11 @@ typedef struct streamCG {
streamID last_id; /* Last delivered (not acknowledged) ID for this
group. Consumers that will just ask for more
messages will served with IDs > than this. */
long long entries_read; /* In a perfect world (CG starts at 0-0, no dels, no
XGROUP SETID, ...), this is the total number of
group reads. In the real world, the reasoning behind
this value is detailed at the top comment of
streamEstimateDistanceFromFirstEverEntry(). */
rax *pel; /* Pending entries list. This is a radix tree that
has every message delivered to consumers (without
the NOACK option) that was yet not acknowledged
@ -105,6 +114,8 @@ struct client;
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */
#define SCG_INVALID_ENTRIES_READ -1
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
@ -117,7 +128,7 @@ void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
@ -131,6 +142,8 @@ int streamParseID(const robj *o, streamID *id);
robj *createObjectFromStreamID(streamID *id);
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given);
int streamDeleteItem(stream *s, streamID *id);
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);

View File

@ -68,8 +68,13 @@ stream *streamNew(void) {
stream *s = zmalloc(sizeof(*s));
s->rax = raxNew();
s->length = 0;
s->first_id.ms = 0;
s->first_id.seq = 0;
s->last_id.ms = 0;
s->last_id.seq = 0;
s->max_deleted_entry_id.seq = 0;
s->max_deleted_entry_id.ms = 0;
s->entries_added = 0;
s->cgroups = NULL; /* Created on demand to save memory when not used. */
return s;
}
@ -184,7 +189,10 @@ robj *streamDup(robj *o) {
new_lp, NULL);
}
new_s->length = s->length;
new_s->first_id = s->first_id;
new_s->last_id = s->last_id;
new_s->max_deleted_entry_id = s->max_deleted_entry_id;
new_s->entries_added = s->entries_added;
raxStop(&ri);
if (s->cgroups == NULL) return sobj;
@ -196,7 +204,8 @@ robj *streamDup(robj *o) {
while (raxNext(&ri_cgroups)) {
streamCG *cg = ri_cgroups.data;
streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key,
ri_cgroups.key_len, &cg->last_id);
ri_cgroups.key_len, &cg->last_id,
cg->entries_read);
serverAssert(new_cg != NULL);
@ -378,37 +387,21 @@ int streamCompareID(streamID *a, streamID *b) {
return 0;
}
void streamGetEdgeID(stream *s, int first, streamID *edge_id)
/* Retrieves the ID of the stream edge entry. An edge is either the first or
* the last ID in the stream, and may be a tombstone. To filter out tombstones,
* set the'skip_tombstones' argument to 1. */
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id)
{
raxIterator ri;
raxStart(&ri, s->rax);
int empty;
if (first) {
raxSeek(&ri, "^", NULL, 0);
empty = !raxNext(&ri);
} else {
raxSeek(&ri, "$", NULL, 0);
empty = !raxPrev(&ri);
streamIterator si;
int64_t numfields;
streamIteratorStart(&si,s,NULL,NULL,!first);
si.skip_tombstones = skip_tombstones;
int found = streamIteratorGetID(&si,edge_id,&numfields);
if (!found) {
streamID min_id = {0, 0}, max_id = {UINT64_MAX, UINT64_MAX};
*edge_id = first ? max_id : min_id;
}
if (empty) {
/* Stream is empty, mark edge ID as lowest/highest possible. */
edge_id->ms = first ? UINT64_MAX : 0;
edge_id->seq = first ? UINT64_MAX : 0;
raxStop(&ri);
return;
}
unsigned char *lp = ri.data;
/* Read the master ID from the radix tree key. */
streamID master_id;
streamDecodeID(ri.key, &master_id);
/* Construct edge ID. */
lpGetEdgeStreamID(lp, first, &master_id, edge_id);
raxStop(&ri);
}
/* Adds a new item into the stream 's' having the specified number of
@ -664,7 +657,9 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
if (ri.data != lp)
raxInsert(s->rax,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
s->length++;
s->entries_added++;
s->last_id = id;
if (s->length == 1) s->first_id = id;
if (added_id) *added_id = id;
return C_OK;
}
@ -842,7 +837,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
}
deleted += deleted_from_lp;
/* Now we the entries/deleted counters. */
/* Now we update the entries/deleted counters. */
p = lpFirst(lp);
lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp);
p = lpNext(lp,p); /* Skip deleted field. */
@ -864,8 +859,16 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
break; /* If we are here, there was enough to delete in the current
node, so no need to go to the next node. */
}
raxStop(&ri);
/* Update the stream's first ID after the trimming. */
if (s->length == 0) {
s->first_id.ms = 0;
s->first_id.seq = 0;
} else if (deleted) {
streamGetEdgeID(s,1,1,&s->first_id);
}
return deleted;
}
@ -1089,9 +1092,10 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
}
}
si->stream = s;
si->lp = NULL; /* There is no current listpack right now. */
si->lp = NULL; /* There is no current listpack right now. */
si->lp_ele = NULL; /* Current listpack cursor. */
si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
si->rev = rev; /* Direction, if non-zero reversed, from end to start. */
si->skip_tombstones = 1; /* By default tombstones aren't emitted. */
}
/* Return 1 and store the current item ID at 'id' if there are still
@ -1189,10 +1193,10 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
serverAssert(*numfields>=0);
/* If current >= start, and the entry is not marked as
* deleted, emit it. */
* deleted or tombstones are included, emit it. */
if (!si->rev) {
if (memcmp(buf,si->start_key,sizeof(streamID)) >= 0 &&
!(flags & STREAM_ITEM_FLAG_DELETED))
(!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
{
if (memcmp(buf,si->end_key,sizeof(streamID)) > 0)
return 0; /* We are already out of range. */
@ -1203,7 +1207,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
}
} else {
if (memcmp(buf,si->end_key,sizeof(streamID)) <= 0 &&
!(flags & STREAM_ITEM_FLAG_DELETED))
(!si->skip_tombstones || !(flags & STREAM_ITEM_FLAG_DELETED)))
{
if (memcmp(buf,si->start_key,sizeof(streamID)) < 0)
return 0; /* We are already out of range. */
@ -1270,7 +1274,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
int64_t aux;
/* We do not really delete the entry here. Instead we mark it as
* deleted flagging it, and also incrementing the count of the
* deleted by flagging it, and also incrementing the count of the
* deleted entries in the listpack header.
*
* We start flagging: */
@ -1314,7 +1318,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current) {
streamIteratorStop(si);
streamIteratorStart(si,si->stream,&start,&end,si->rev);
/* TODO: perform a garbage collection here if the ration between
/* TODO: perform a garbage collection here if the ratio between
* deleted and valid goes over a certain limit. */
}
@ -1386,6 +1390,148 @@ robj *createObjectFromStreamID(streamID *id) {
id->ms,id->seq));
}
/* Returns non-zero if the ID is 0-0. */
int streamIDEqZero(streamID *id) {
return !(id->ms || id->seq);
}
/* A helper that returns non-zero if the range from 'start' to `end`
* contains a tombstone.
*
* NOTE: this assumes that the caller had verified that 'start' is less than
* 's->last_id'. */
int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) {
streamID start_id, end_id;
if (!s->length || streamIDEqZero(&s->max_deleted_entry_id)) {
/* The stream is empty or has no tombstones. */
return 0;
}
if (streamCompareID(&s->first_id,&s->max_deleted_entry_id) > 0) {
/* The latest tombstone is before the first entry. */
return 0;
}
if (start) {
start_id = *start;
} else {
start_id.ms = 0;
start_id.seq = 0;
}
if (end) {
end_id = *end;
} else {
end_id.ms = UINT64_MAX;
end_id.seq = UINT64_MAX;
}
if (streamCompareID(&start_id,&s->max_deleted_entry_id) <= 0 &&
streamCompareID(&s->max_deleted_entry_id,&end_id) <= 0)
{
/* start_id <= max_deleted_entry_id <= end_id: The range does include a tombstone. */
return 1;
}
/* The range doesn't includes a tombstone. */
return 0;
}
/* Replies with a consumer group's current lag, that is the number of messages
* in the stream that are yet to be delivered. In case that the lag isn't
* available due to fragmentation, the reply to the client is a null. */
void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) {
int valid = 0;
long long lag = 0;
if (!s->entries_added) {
/* The lag of a newly-initialized stream is 0. */
lag = 0;
valid = 1;
} else if (cg->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&cg->last_id,NULL)) {
/* No fragmentation ahead means that the group's logical reads counter
* is valid for performing the lag calculation. */
lag = (long long)s->entries_added - cg->entries_read;
valid = 1;
} else {
/* Attempt to retrieve the group's last ID logical read counter. */
long long entries_read = streamEstimateDistanceFromFirstEverEntry(s,&cg->last_id);
if (entries_read != SCG_INVALID_ENTRIES_READ) {
/* A valid counter was obtained. */
lag = (long long)s->entries_added - entries_read;
valid = 1;
}
}
if (valid) {
addReplyLongLong(c,lag);
} else {
addReplyNull(c);
}
}
/* This function returns a value that is the ID's logical read counter, or its
* distance (the number of entries) from the first entry ever to have been added
* to the stream.
*
* A counter is returned only in one of the following cases:
* 1. The ID is the same as the stream's last ID. In this case, the returned
* is the same as the stream's entries_added counter.
* 2. The ID equals that of the currently first entry in the stream, and the
* stream has no tombstones. The returned value, in this case, is the result
* of subtracting the stream's length from its added_entries, incremented by
* one.
* 3. The ID less than the stream's first current entry's ID, and there are no
* tombstones. Here the estimated counter is the result of subtracting the
* stream's length from its added_entries.
* 4. The stream's added_entries is zero, meaning that no entries were ever
* added.
*
* The special return value of ULLONG_MAX signals that the counter's value isn't
* obtainable. It is returned in these cases:
* 1. The provided ID, if it even exists, is somewhere between the stream's
* current first and last entries' IDs, or in the future.
* 2. The stream contains one or more tombstones. */
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id) {
/* The counter of any ID in an empty, never-before-used stream is 0. */
if (!s->entries_added) {
return 0;
}
/* In the empty stream, if the ID is smaller or equal to the last ID,
* it can set to the current added_entries value. */
if (!s->length && streamCompareID(id,&s->last_id) < 1) {
return s->entries_added;
}
int cmp_last = streamCompareID(id,&s->last_id);
if (cmp_last == 0) {
/* Return the exact counter of the last entry in the stream. */
return s->entries_added;
} else if (cmp_last > 0) {
/* The counter of a future ID is unknown. */
return SCG_INVALID_ENTRIES_READ;
}
int cmp_id_first = streamCompareID(id,&s->first_id);
int cmp_xdel_first = streamCompareID(&s->max_deleted_entry_id,&s->first_id);
if (streamIDEqZero(&s->max_deleted_entry_id) || cmp_xdel_first < 0) {
/* There's definitely no fragmentation ahead. */
if (cmp_id_first < 0) {
/* Return the estimated counter. */
return s->entries_added - s->length;
} else if (cmp_id_first == 0) {
/* Return the exact counter of the first entry in the stream. */
return s->entries_added - s->length + 1;
}
}
/* The ID is either before an XDEL that fragments the stream or an arbitrary
* ID. Either case, so we can't make a prediction. */
return SCG_INVALID_ENTRIES_READ;
}
/* As a result of an explicit XCLAIM or XREADGROUP command, new entries
* are created in the pending list of the stream and consumers. We need
* to propagate this changes in the form of XCLAIM commands. */
@ -1425,19 +1571,22 @@ void streamPropagateXCLAIM(client *c, robj *key, streamCG *group, robj *groupnam
* that was consumed by XREADGROUP with the NOACK option: in that case we can't
* propagate the last ID just using the XCLAIM LASTID option, so we emit
*
* XGROUP SETID <key> <groupname> <id>
* XGROUP SETID <key> <groupname> <id> ENTRIESREAD <entries_read>
*/
void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupname) {
robj *argv[5];
robj *argv[7];
argv[0] = shared.xgroup;
argv[1] = shared.setid;
argv[2] = key;
argv[3] = groupname;
argv[4] = createObjectFromStreamID(&group->last_id);
argv[5] = shared.entriesread;
argv[6] = createStringObjectFromLongLong(group->entries_read);
alsoPropagate(c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL);
alsoPropagate(c->db->id,argv,7,PROPAGATE_AOF|PROPAGATE_REPL);
decrRefCount(argv[4]);
decrRefCount(argv[6]);
}
/* We need this when we want to propagate creation of consumer that was created
@ -1476,6 +1625,10 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds
* function will not return it to the client.
* 3. An entry in the pending list will be created for every entry delivered
* for the first time to this consumer.
* 4. The group's read counter is incremented if it is already valid and there
* are no future tombstones, or is invalidated (set to 0) otherwise. If the
* counter is invalid to begin with, we try to obtain it for the last
* delivered ID.
*
* The behavior may be modified passing non-zero flags:
*
@ -1532,6 +1685,15 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0) {
if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) {
/* A valid counter and no future tombstones mean we can
* increment the read counter to keep tracking the group's
* progress. */
group->entries_read++;
} else if (s->entries_added) {
/* The group's counter may be invalid, so we try to obtain it. */
group->entries_read = streamEstimateDistanceFromFirstEverEntry(s,&id);
}
group->last_id = id;
/* Group last ID should be propagated only if NOACK was
* specified, otherwise the last id will be included
@ -1805,7 +1967,7 @@ void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx)
arg = createStringObjectFromLongLong(s->length);
} else {
streamID first_id;
streamGetEdgeID(s, 1, &first_id);
streamGetEdgeID(s,1,0,&first_id);
arg = createObjectFromStreamID(&first_id);
}
@ -2298,10 +2460,10 @@ void streamFreeConsumer(streamConsumer *sc) {
}
/* Create a new consumer group in the context of the stream 's', having the
* specified name and last server ID. If a consumer group with the same name
* already existed NULL is returned, otherwise the pointer to the consumer
* group is returned. */
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
* specified name, last server ID and reads counter. If a consumer group with
* the same name already exists NULL is returned, otherwise the pointer to the
* consumer group is returned. */
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read) {
if (s->cgroups == NULL) s->cgroups = raxNew();
if (raxFind(s->cgroups,(unsigned char*)name,namelen) != raxNotFound)
return NULL;
@ -2310,6 +2472,7 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id) {
cg->pel = raxNew();
cg->consumers = raxNew();
cg->last_id = *id;
cg->entries_read = entries_read;
raxInsert(s->cgroups,(unsigned char*)name,namelen,cg,NULL);
return cg;
}
@ -2389,8 +2552,8 @@ void streamDelConsumer(streamCG *cg, streamConsumer *consumer) {
* Consumer groups commands
* ----------------------------------------------------------------------- */
/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM]
* XGROUP SETID <key> <groupname> <id or $>
/* XGROUP CREATE <key> <groupname> <id or $> [MKSTREAM] [ENTRIESADDED count]
* XGROUP SETID <key> <groupname> <id or $> [ENTRIESADDED count]
* XGROUP DESTROY <key> <groupname>
* XGROUP CREATECONSUMER <key> <groupname> <consumer>
* XGROUP DELCONSUMER <key> <groupname> <consumername> */
@ -2400,21 +2563,33 @@ void xgroupCommand(client *c) {
streamCG *cg = NULL;
char *opt = c->argv[1]->ptr; /* Subcommand name. */
int mkstream = 0;
long long entries_read = SCG_INVALID_ENTRIES_READ;
robj *o;
/* CREATE has an MKSTREAM option that creates the stream if it
* does not exist. */
if (c->argc == 6 && !strcasecmp(opt,"CREATE")) {
if (strcasecmp(c->argv[5]->ptr,"MKSTREAM")) {
addReplySubcommandSyntaxError(c);
return;
}
mkstream = 1;
grpname = c->argv[3]->ptr;
}
/* Everything but the "HELP" option requires a key and group name. */
if (c->argc >= 4) {
/* Parse optional arguments for CREATE and SETID */
int i = 5;
int create_subcmd = !strcasecmp(opt,"CREATE");
int setid_subcmd = !strcasecmp(opt,"SETID");
while (i < c->argc) {
if (create_subcmd && !strcasecmp(c->argv[i]->ptr,"MKSTREAM")) {
mkstream = 1;
i++;
} else if ((create_subcmd || setid_subcmd) && !strcasecmp(c->argv[i]->ptr,"ENTRIESREAD") && i + 1 < c->argc) {
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_read,NULL) != C_OK)
return;
if (entries_read < 0 && entries_read != SCG_INVALID_ENTRIES_READ) {
addReplyError(c,"value for ENTRIESREAD must be positive or -1");
return;
}
i += 2;
} else {
addReplySubcommandSyntaxError(c);
return;
}
}
o = lookupKeyWrite(c->db,c->argv[2]);
if (o) {
if (checkType(c,o,OBJ_STREAM)) return;
@ -2454,18 +2629,20 @@ void xgroupCommand(client *c) {
" Create a new consumer group. Options are:",
" * MKSTREAM",
" Create the empty stream if it does not exist.",
" * ENTRIESREAD entries_read",
" Set the group's entries_read counter (internal use)."
"CREATECONSUMER <key> <groupname> <consumer>",
" Create a new consumer in the specified group.",
"DELCONSUMER <key> <groupname> <consumer>",
" Remove the specified consumer.",
"DESTROY <key> <groupname>",
" Remove the specified group.",
"SETID <key> <groupname> <id|$>",
" Set the current group ID.",
"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]",
" Set the current group ID and entries_read counter.",
NULL
};
addReplyHelp(c, help);
} else if (!strcasecmp(opt,"CREATE") && (c->argc == 5 || c->argc == 6)) {
} else if (!strcasecmp(opt,"CREATE") && (c->argc >= 5 && c->argc <= 8)) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
if (s) {
@ -2487,7 +2664,7 @@ NULL
signalModifiedKey(c,c->db,c->argv[2]);
}
streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id);
streamCG *cg = streamCreateCG(s,grpname,sdslen(grpname),&id,entries_read);
if (cg) {
addReply(c,shared.ok);
server.dirty++;
@ -2496,7 +2673,7 @@ NULL
} else {
addReplyError(c,"-BUSYGROUP Consumer Group name already exists");
}
} else if (!strcasecmp(opt,"SETID") && c->argc == 5) {
} else if (!strcasecmp(opt,"SETID") && (c->argc == 5 || c->argc == 7)) {
streamID id;
if (!strcmp(c->argv[4]->ptr,"$")) {
id = s->last_id;
@ -2504,6 +2681,7 @@ NULL
return;
}
cg->last_id = id;
cg->entries_read = entries_read;
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-setid",c->argv[2],c->db->id);
@ -2542,16 +2720,46 @@ NULL
}
}
/* XSETID <stream> <id>
/* XSETID <stream> <id> [ENTRIESADDED entries_added] [MAXDELETEDID max_deleted_entry_id]
*
* Set the internal "last ID" of a stream. */
* Set the internal "last ID", "added entries" and "maximal deleted entry ID"
* of a stream. */
void xsetidCommand(client *c) {
streamID id, max_xdel_id = {0, 0};
long long entries_added = -1;
if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK)
return;
int i = 3;
while (i < c->argc) {
int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
char *opt = c->argv[i]->ptr;
if (!strcasecmp(opt,"ENTRIESADDED") && moreargs) {
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&entries_added,NULL) != C_OK) {
return;
} else if (entries_added < 0) {
addReplyError(c,"entries_added must be positive");
return;
}
i += 2;
} else if (!strcasecmp(opt,"MAXDELETEDID") && moreargs) {
if (streamParseStrictIDOrReply(c,c->argv[i+1],&max_xdel_id,0,NULL) != C_OK) {
return;
} else if (streamCompareID(&id,&max_xdel_id) < 0) {
addReplyError(c,"The ID specified in XSETID is smaller than the provided max_deleted_entry_id");
return;
}
i += 2;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
if (o == NULL || checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
streamID id;
if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) return;
/* If the stream has at least one item, we want to check that the user
* is setting a last ID that is equal or greater than the current top
@ -2561,12 +2769,22 @@ void xsetidCommand(client *c) {
streamLastValidID(s,&maxid);
if (streamCompareID(&id,&maxid) < 0) {
addReplyError(c,"The ID specified in XSETID is smaller than the "
"target stream top item");
addReplyError(c,"The ID specified in XSETID is smaller than the target stream top item");
return;
}
/* If an entries_added was provided, it can't be lower than the length. */
if (entries_added != -1 && s->length > (uint64_t)entries_added) {
addReplyError(c,"The entries_added specified in XSETID is smaller than the target stream length");
return;
}
}
s->last_id = id;
if (entries_added != -1)
s->entries_added = entries_added;
if (!streamIDEqZero(&max_xdel_id))
s->max_deleted_entry_id = max_xdel_id;
addReply(c,shared.ok);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STREAM,"xsetid",c->argv[1],c->db->id);
@ -3289,8 +3507,31 @@ void xdelCommand(client *c) {
/* Actually apply the command. */
int deleted = 0;
int first_entry = 0;
for (int j = 2; j < c->argc; j++) {
deleted += streamDeleteItem(s,&ids[j-2]);
streamID *id = &ids[j-2];
if (streamDeleteItem(s,id)) {
/* We want to know if the first entry in the stream was deleted
* so we can later set the new one. */
if (streamCompareID(id,&s->first_id) == 0) {
first_entry = 1;
}
/* Update the stream's maximal tombstone if needed. */
if (streamCompareID(id,&s->max_deleted_entry_id) > 0) {
s->max_deleted_entry_id = *id;
}
deleted++;
};
}
/* Update the stream's first ID. */
if (deleted) {
if (s->length == 0) {
s->first_id.ms = 0;
s->first_id.seq = 0;
} else if (first_entry) {
streamGetEdgeID(s,1,1,&s->first_id);
}
}
/* Propagate the write if needed. */
@ -3398,7 +3639,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
}
}
addReplyMapLen(c,full ? 6 : 7);
addReplyMapLen(c,full ? 9 : 10);
addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length);
addReplyBulkCString(c,"radix-tree-keys");
@ -3407,6 +3648,12 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
addReplyLongLong(c,s->rax->numnodes);
addReplyBulkCString(c,"last-generated-id");
addReplyStreamID(c,&s->last_id);
addReplyBulkCString(c,"max-deleted-entry-id");
addReplyStreamID(c,&s->max_deleted_entry_id);
addReplyBulkCString(c,"entries-added");
addReplyLongLong(c,s->entries_added);
addReplyBulkCString(c,"recorded-first-entry-id");
addReplyStreamID(c,&s->first_id);
if (!full) {
/* XINFO STREAM <key> */
@ -3445,7 +3692,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
raxSeek(&ri_cgroups,"^",NULL,0);
while(raxNext(&ri_cgroups)) {
streamCG *cg = ri_cgroups.data;
addReplyMapLen(c,5);
addReplyMapLen(c,7);
/* Name */
addReplyBulkCString(c,"name");
@ -3455,6 +3702,18 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
addReplyBulkCString(c,"last-delivered-id");
addReplyStreamID(c,&cg->last_id);
/* Read counter of the last delivered ID */
addReplyBulkCString(c,"entries-read");
if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
addReplyLongLong(c,cg->entries_read);
} else {
addReplyNull(c);
}
/* Group lag */
addReplyBulkCString(c,"lag");
streamReplyWithCGLag(c,s,cg);
/* Group PEL count */
addReplyBulkCString(c,"pel-count");
addReplyLongLong(c,raxSize(cg->pel));
@ -3632,7 +3891,7 @@ NULL
raxSeek(&ri,"^",NULL,0);
while(raxNext(&ri)) {
streamCG *cg = ri.data;
addReplyMapLen(c,4);
addReplyMapLen(c,6);
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,ri.key,ri.key_len);
addReplyBulkCString(c,"consumers");
@ -3641,6 +3900,14 @@ NULL
addReplyLongLong(c,raxSize(cg->pel));
addReplyBulkCString(c,"last-delivered-id");
addReplyStreamID(c,&cg->last_id);
addReplyBulkCString(c,"entries-read");
if (cg->entries_read != SCG_INVALID_ENTRIES_READ) {
addReplyLongLong(c,cg->entries_read);
} else {
addReplyNull(c);
}
addReplyBulkCString(c,"lag");
streamReplyWithCGLag(c,s,cg);
}
raxStop(&ri);
} else if (!strcasecmp(opt,"STREAM")) {

View File

@ -193,9 +193,8 @@ test {corrupt payload: listpack invalid size header} {
test {corrupt payload: listpack too long entry len} {
start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] {
r config set sanitize-dump-payload no
r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x89\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x88\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x40\x63\xC9\x37\x03\xA2\xE5\x68"
catch {
r xinfo stream key full
r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x89\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x88\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x40\x63\xC9\x37\x03\xA2\xE5\x68"
} err
assert_equal [count_log_message 0 "crashed by signal"] 0
assert_equal [count_log_message 0 "ASSERTION FAILED"] 1
@ -205,9 +204,9 @@ test {corrupt payload: listpack too long entry len} {
test {corrupt payload: listpack very long entry len} {
start_server [list overrides [list loglevel verbose use-exit-on-panic yes crash-memcheck-enabled no] ] {
r config set sanitize-dump-payload no
r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x9C\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x63\x6F\x42\x8E\x7C\xB5\xA2\x9D"
catch {
r xinfo stream key full
# This will catch migrated payloads from v6.2.x
r restore key 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x02\x40\x55\x55\x00\x00\x00\x0F\x00\x01\x01\x00\x01\x02\x01\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x00\x01\x00\x01\x00\x01\x00\x01\x02\x02\x88\x31\x00\x00\x00\x00\x00\x00\x00\x09\x88\x61\x00\x00\x00\x00\x00\x00\x00\x09\x88\x32\x00\x00\x00\x00\x00\x00\x00\x09\x9C\x62\x00\x00\x00\x00\x00\x00\x00\x09\x08\x01\xFF\x0A\x01\x00\x00\x09\x00\x63\x6F\x42\x8E\x7C\xB5\xA2\x9D"
} err
assert_equal [count_log_message 0 "crashed by signal"] 0
assert_equal [count_log_message 0 "ASSERTION FAILED"] 1

View File

@ -623,22 +623,30 @@ start_server {
r XDEL x 103
set reply [r XINFO STREAM x FULL]
assert_equal [llength $reply] 12
assert_equal [lindex $reply 1] 4 ;# stream length
assert_equal [lindex $reply 9] "{100-0 {a 1}} {101-0 {b 1}} {102-0 {c 1}} {104-0 {f 1}}" ;# entries
assert_equal [lindex $reply 11 0 1] "g1" ;# first group name
assert_equal [lindex $reply 11 0 7 0 0] "100-0" ;# first entry in group's PEL
assert_equal [lindex $reply 11 0 9 0 1] "Alice" ;# first consumer
assert_equal [lindex $reply 11 0 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL
assert_equal [lindex $reply 11 1 1] "g2" ;# second group name
assert_equal [lindex $reply 11 1 9 0 1] "Charlie" ;# first consumer
assert_equal [lindex $reply 11 1 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL
assert_equal [lindex $reply 11 1 9 0 7 1 0] "101-0" ;# second entry in first consumer's PEL
assert_equal [llength $reply] 18
assert_equal [dict get $reply length] 4
assert_equal [dict get $reply entries] "{100-0 {a 1}} {101-0 {b 1}} {102-0 {c 1}} {104-0 {f 1}}"
# First consumer group
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group name] "g1"
assert_equal [lindex [dict get $group pending] 0 0] "100-0"
set consumer [lindex [dict get $group consumers] 0]
assert_equal [dict get $consumer name] "Alice"
assert_equal [lindex [dict get $consumer pending] 0 0] "100-0" ;# first entry in first consumer's PEL
# Second consumer group
set group [lindex [dict get $reply groups] 1]
assert_equal [dict get $group name] "g2"
set consumer [lindex [dict get $group consumers] 0]
assert_equal [dict get $consumer name] "Charlie"
assert_equal [lindex [dict get $consumer pending] 0 0] "100-0" ;# first entry in first consumer's PEL
assert_equal [lindex [dict get $consumer pending] 1 0] "101-0" ;# second entry in first consumer's PEL
set reply [r XINFO STREAM x FULL COUNT 1]
assert_equal [llength $reply] 12
assert_equal [lindex $reply 1] 4
assert_equal [lindex $reply 9] "{100-0 {a 1}}"
assert_equal [llength $reply] 18
assert_equal [dict get $reply length] 4
assert_equal [dict get $reply entries] "{100-0 {a 1}}"
}
test {XGROUP CREATECONSUMER: create consumer if does not exist} {
@ -702,7 +710,7 @@ start_server {
set grpinfo [r xinfo groups mystream]
r debug loadaof
assert {[r xinfo groups mystream] == $grpinfo}
assert_equal [r xinfo groups mystream] $grpinfo
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]
assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name
@ -741,6 +749,154 @@ start_server {
}
}
test {Consumer group read counter and lag in empty streams} {
r DEL x
r XGROUP CREATE x g1 0 MKSTREAM
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $reply max-deleted-entry-id] "0-0"
assert_equal [dict get $reply entries-added] 0
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] 0
r XADD x 1-0 data a
r XDEL x 1-0
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $reply max-deleted-entry-id] "1-0"
assert_equal [dict get $reply entries-added] 1
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] 0
}
test {Consumer group read counter and lag sanity} {
r DEL x
r XADD x 1-0 data a
r XADD x 2-0 data b
r XADD x 3-0 data c
r XADD x 4-0 data d
r XADD x 5-0 data e
r XGROUP CREATE x g1 0
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] 5
r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 1
assert_equal [dict get $group lag] 4
r XREADGROUP GROUP g1 c12 COUNT 10 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 0
r XADD x 6-0 data f
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 1
}
test {Consumer group lag with XDELs} {
r DEL x
r XADD x 1-0 data a
r XADD x 2-0 data b
r XADD x 3-0 data c
r XADD x 4-0 data d
r XADD x 5-0 data e
r XDEL x 3-0
r XGROUP CREATE x g1 0
r XGROUP CREATE x g2 0
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}
r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}
r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}
r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] {}
r XREADGROUP GROUP g1 c11 COUNT 1 STREAMS x >
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 0
r XADD x 6-0 data f
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 1
r XTRIM x MINID = 3-0
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 1
set group [lindex [dict get $reply groups] 1]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] 3
r XTRIM x MINID = 5-0
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 5
assert_equal [dict get $group lag] 1
set group [lindex [dict get $reply groups] 1]
assert_equal [dict get $group entries-read] {}
assert_equal [dict get $group lag] 2
}
test {Loading from legacy (Redis <= v6.2.x, rdb_ver < 10) persistence} {
# The payload was DUMPed from a v5 instance after:
# XADD x 1-0 data a
# XADD x 2-0 data b
# XADD x 3-0 data c
# XADD x 4-0 data d
# XADD x 5-0 data e
# XADD x 6-0 data f
# XDEL x 3-0
# XGROUP CREATE x g1 0
# XGROUP CREATE x g2 0
# XREADGROUP GROUP g1 c11 COUNT 4 STREAMS x >
# XTRIM x MAXLEN = 2
r DEL x
r RESTORE x 0 "\x0F\x01\x10\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\xC3\x40\x4A\x40\x57\x16\x57\x00\x00\x00\x23\x00\x02\x01\x04\x01\x01\x01\x84\x64\x61\x74\x61\x05\x00\x01\x03\x01\x00\x20\x01\x03\x81\x61\x02\x04\x20\x0A\x00\x01\x40\x0A\x00\x62\x60\x0A\x00\x02\x40\x0A\x00\x63\x60\x0A\x40\x22\x01\x81\x64\x20\x0A\x40\x39\x20\x0A\x00\x65\x60\x0A\x00\x05\x40\x0A\x00\x66\x20\x0A\x00\xFF\x02\x06\x00\x02\x02\x67\x31\x05\x00\x04\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x3E\xF7\x83\x43\x7A\x01\x00\x00\x01\x01\x03\x63\x31\x31\x3E\xF7\x83\x43\x7A\x01\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x00\x02\x67\x32\x00\x00\x00\x00\x09\x00\x3D\x52\xEF\x68\x67\x52\x1D\xFA"
set reply [r XINFO STREAM x FULL]
assert_equal [dict get $reply max-deleted-entry-id] "0-0"
assert_equal [dict get $reply entries-added] 2
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 1
assert_equal [dict get $group lag] 1
set group [lindex [dict get $reply groups] 1]
assert_equal [dict get $group entries-read] 0
assert_equal [dict get $group lag] 2
}
start_server {tags {"external:skip"}} {
set master [srv -1 client]
set master_host [srv -1 host]
@ -841,7 +997,7 @@ start_server {
waitForBgrewriteaof r
r debug loadaof
assert {[dict get [r xinfo stream mystream] length] == 0}
assert {[r xinfo groups mystream] == $grpinfo}
assert_equal [r xinfo groups mystream] $grpinfo
}
}
}

View File

@ -760,7 +760,9 @@ start_server {tags {"stream xsetid"}} {
test {XSETID can set a specific ID} {
r XSETID mystream "200-0"
assert {[dict get [r xinfo stream mystream] last-generated-id] == "200-0"}
set reply [r XINFO stream mystream]
assert_equal [dict get $reply last-generated-id] "200-0"
assert_equal [dict get $reply entries-added] 1
}
test {XSETID cannot SETID with smaller ID} {
@ -774,6 +776,98 @@ start_server {tags {"stream xsetid"}} {
catch {r XSETID stream 1-1} err
set _ $err
} {ERR no such key}
test {XSETID cannot run with an offset but without a maximal tombstone} {
catch {r XSETID stream 1-1 0} err
set _ $err
} {ERR syntax error}
test {XSETID cannot run with a maximal tombstone but without an offset} {
catch {r XSETID stream 1-1 0-0} err
set _ $err
} {ERR syntax error}
test {XSETID errors on negstive offset} {
catch {r XSETID stream 1-1 ENTRIESADDED -1 MAXDELETEDID 0-0} err
set _ $err
} {ERR*must be positive}
test {XSETID cannot set the maximal tombstone with larger ID} {
r DEL x
r XADD x 1-0 a b
catch {r XSETID x "1-0" ENTRIESADDED 1 MAXDELETEDID "2-0" } err
r XADD mystream MAXLEN 0 * a b
set err
} {ERR*smaller*}
test {XSETID cannot set the offset to less than the length} {
r DEL x
r XADD x 1-0 a b
catch {r XSETID x "1-0" ENTRIESADDED 0 MAXDELETEDID "0-0" } err
r XADD mystream MAXLEN 0 * a b
set err
} {ERR*smaller*}
}
start_server {tags {"stream offset"}} {
test {XADD advances the entries-added counter and sets the recorded-first-entry-id} {
r DEL x
r XADD x 1-0 data a
set reply [r XINFO STREAM x FULL]
assert_equal [dict get $reply entries-added] 1
assert_equal [dict get $reply recorded-first-entry-id] "1-0"
r XADD x 2-0 data a
set reply [r XINFO STREAM x FULL]
assert_equal [dict get $reply entries-added] 2
assert_equal [dict get $reply recorded-first-entry-id] "1-0"
}
test {XDEL/TRIM are reflected by recorded first entry} {
r DEL x
r XADD x 1-0 data a
r XADD x 2-0 data a
r XADD x 3-0 data a
r XADD x 4-0 data a
r XADD x 5-0 data a
set reply [r XINFO STREAM x FULL]
assert_equal [dict get $reply entries-added] 5
assert_equal [dict get $reply recorded-first-entry-id] "1-0"
r XDEL x 2-0
set reply [r XINFO STREAM x FULL]
assert_equal [dict get $reply recorded-first-entry-id] "1-0"
r XDEL x 1-0
set reply [r XINFO STREAM x FULL]
assert_equal [dict get $reply recorded-first-entry-id] "3-0"
r XTRIM x MAXLEN = 2
set reply [r XINFO STREAM x FULL]
assert_equal [dict get $reply recorded-first-entry-id] "4-0"
}
test {Maxmimum XDEL ID behaves correctly} {
r DEL x
r XADD x 1-0 data a
r XADD x 2-0 data b
r XADD x 3-0 data c
set reply [r XINFO STREAM x FULL]
assert_equal [dict get $reply max-deleted-entry-id] "0-0"
r XDEL x 2-0
set reply [r XINFO STREAM x FULL]
assert_equal [dict get $reply max-deleted-entry-id] "2-0"
r XDEL x 1-0
set reply [r XINFO STREAM x FULL]
assert_equal [dict get $reply max-deleted-entry-id] "2-0"
}
}
start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no}} {
@ -796,7 +890,7 @@ start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-
waitForBgrewriteaof r
r debug loadaof
assert {[dict get [r xinfo stream mystream] length] == 1}
assert {[dict get [r xinfo stream mystream] last-generated-id] == "2-2"}
assert_equal [dict get [r xinfo stream mystream] last-generated-id] "2-2"
}
}