Extend XINFO STREAM output

Introducing XINFO STREAM <key> FULL
This commit is contained in:
Guy Benoish 2020-04-22 16:05:57 +03:00
parent 3e738c8a6c
commit 1e2aee3919
2 changed files with 226 additions and 34 deletions

View File

@ -2475,16 +2475,200 @@ void xtrimCommand(client *c) {
addReplyLongLong(c,deleted);
}
/* Helper function for xinfoCommand.
* Handles the variants of XINFO STREAM */
void xinfoReplyWithStreamInfo(client *c, stream *s) {
int full = 1;
long long count = 0;
robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */
int optc = c->argc - 3;
/* Parse options. */
if (optc == 0) {
full = 0;
} else {
/* Valid options are [FULL] or [FULL COUNT <count>] */
if (optc != 1 && optc != 3) {
addReplySubcommandSyntaxError(c);
return;
}
/* First option must be "FULL" */
if (strcasecmp(optv[0]->ptr,"full")) {
addReplySubcommandSyntaxError(c);
return;
}
if (optc == 3) {
/* First option must be "FULL" */
if (strcasecmp(optv[1]->ptr,"count")) {
addReplySubcommandSyntaxError(c);
return;
}
if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL) == C_ERR)
return;
if (count < 0) count = 0;
}
}
addReplyMapLen(c,full ? 6 : 7);
addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length);
addReplyBulkCString(c,"radix-tree-keys");
addReplyLongLong(c,raxSize(s->rax));
addReplyBulkCString(c,"radix-tree-nodes");
addReplyLongLong(c,s->rax->numnodes);
addReplyBulkCString(c,"last-generated-id");
addReplyStreamID(c,&s->last_id);
if (!full) {
/* XINFO STREAM <key> */
addReplyBulkCString(c,"groups");
addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
/* To emit the first/last entry we use streamReplyWithRange(). */
int emitted;
streamID start, end;
start.ms = start.seq = 0;
end.ms = end.seq = UINT64_MAX;
addReplyBulkCString(c,"first-entry");
emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
if (!emitted) addReplyNull(c);
addReplyBulkCString(c,"last-entry");
emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
if (!emitted) addReplyNull(c);
} else {
/* XINFO STREAM <key> FULL [COUNT <count>] */
/* Stream entries */
addReplyBulkCString(c,"entries");
streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL);
/* Consumer groups */
addReplyBulkCString(c,"groups");
if (s->cgroups == NULL) {
addReplyArrayLen(c,0);
} else {
addReplyArrayLen(c,raxSize(s->cgroups));
raxIterator ri_cgroups;
raxStart(&ri_cgroups,s->cgroups);
raxSeek(&ri_cgroups,"^",NULL,0);
while(raxNext(&ri_cgroups)) {
streamCG *cg = ri_cgroups.data;
addReplyMapLen(c,5);
/* Name */
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len);
/* Last delivered ID */
addReplyBulkCString(c,"last-delivered-id");
addReplyStreamID(c,&cg->last_id);
/* Group PEL count */
addReplyBulkCString(c,"pel-count");
addReplyLongLong(c,raxSize(cg->pel));
/* Group PEL */
addReplyBulkCString(c,"pending");
long long arraylen_cg_pel = 0;
void *arrayptr_cg_pel = addReplyDeferredLen(c);
raxIterator ri_cg_pel;
raxStart(&ri_cg_pel,cg->pel);
raxSeek(&ri_cg_pel,"^",NULL,0);
while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) {
streamNACK *nack = ri_cg_pel.data;
addReplyArrayLen(c,4);
/* Entry ID. */
streamID id;
streamDecodeID(ri_cg_pel.key,&id);
addReplyStreamID(c,&id);
/* Consumer name. */
addReplyBulkCBuffer(c,nack->consumer->name,
sdslen(nack->consumer->name));
/* Last delivery. */
addReplyLongLong(c,nack->delivery_time);
/* Number of deliveries. */
addReplyLongLong(c,nack->delivery_count);
arraylen_cg_pel++;
}
setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel);
raxStop(&ri_cg_pel);
/* Consumers */
addReplyBulkCString(c,"consumers");
addReplyArrayLen(c,raxSize(cg->consumers));
raxIterator ri_consumers;
raxStart(&ri_consumers,cg->consumers);
raxSeek(&ri_consumers,"^",NULL,0);
while(raxNext(&ri_consumers)) {
streamConsumer *consumer = ri_consumers.data;
addReplyMapLen(c,4);
/* Consumer name */
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
/* Seen-time */
addReplyBulkCString(c,"seen-time");
addReplyLongLong(c,consumer->seen_time);
/* Consumer PEL count */
addReplyBulkCString(c,"pel-count");
addReplyLongLong(c,raxSize(consumer->pel));
/* Consumer PEL */
addReplyBulkCString(c,"pending");
long long arraylen_cpel = 0;
void *arrayptr_cpel = addReplyDeferredLen(c);
raxIterator ri_cpel;
raxStart(&ri_cpel,consumer->pel);
raxSeek(&ri_cpel,"^",NULL,0);
while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) {
streamNACK *nack = ri_cpel.data;
addReplyArrayLen(c,3);
/* Entry ID. */
streamID id;
streamDecodeID(ri_cpel.key,&id);
addReplyStreamID(c,&id);
/* Last delivery. */
addReplyLongLong(c,nack->delivery_time);
/* Number of deliveries. */
addReplyLongLong(c,nack->delivery_count);
arraylen_cpel++;
}
setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel);
raxStop(&ri_cpel);
}
raxStop(&ri_consumers);
}
raxStop(&ri_cgroups);
}
}
}
/* XINFO CONSUMERS <key> <group>
* XINFO GROUPS <key>
* XINFO STREAM <key>
* XINFO STREAM <key> [FULL [COUNT <count>]]
* XINFO HELP. */
void xinfoCommand(client *c) {
const char *help[] = {
"CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.",
"GROUPS <key> -- Show the stream consumer groups.",
"STREAM <key> -- Show information about the stream.",
"HELP -- Print this help.",
"CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.",
"GROUPS <key> -- Show the stream consumer groups.",
"STREAM <key> [FULL [COUNT <count>]] -- Show information about the stream.",
"HELP -- Print this help.",
NULL
};
stream *s = NULL;
@ -2564,36 +2748,10 @@ NULL
addReplyStreamID(c,&cg->last_id);
}
raxStop(&ri);
} else if (!strcasecmp(opt,"STREAM") && c->argc == 3) {
/* XINFO STREAM <key> (or the alias XINFO <key>). */
addReplyMapLen(c,7);
addReplyBulkCString(c,"length");
addReplyLongLong(c,s->length);
addReplyBulkCString(c,"radix-tree-keys");
addReplyLongLong(c,raxSize(s->rax));
addReplyBulkCString(c,"radix-tree-nodes");
addReplyLongLong(c,s->rax->numnodes);
addReplyBulkCString(c,"groups");
addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
addReplyBulkCString(c,"last-generated-id");
addReplyStreamID(c,&s->last_id);
/* To emit the first/last entry we us the streamReplyWithRange()
* API. */
int count;
streamID start, end;
start.ms = start.seq = 0;
end.ms = end.seq = UINT64_MAX;
addReplyBulkCString(c,"first-entry");
count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
if (!count) addReplyNull(c);
addReplyBulkCString(c,"last-entry");
count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
if (!count) addReplyNull(c);
} else if (!strcasecmp(opt,"STREAM")) {
/* XINFO STREAM <key> [FULL [COUNT <count>]]. */
xinfoReplyWithStreamInfo(c,s);
} else {
addReplySubcommandSyntaxError(c);
}
}

View File

@ -294,6 +294,40 @@ start_server {
assert {[lindex $reply 0 3] == 2}
}
test {XINFO FULL output} {
r del x
r XADD x 100 a 1
r XADD x 101 b 1
r XADD x 102 c 1
r XADD x 103 e 1
r XADD x 104 f 1
r XGROUP CREATE x g1 0
r XGROUP CREATE x g2 0
r XREADGROUP GROUP g1 Alice COUNT 1 STREAMS x >
r XREADGROUP GROUP g1 Bob COUNT 1 STREAMS x >
r XREADGROUP GROUP g1 Bob NOACK COUNT 1 STREAMS x >
r XREADGROUP GROUP g2 Charlie COUNT 4 STREAMS x >
r XDEL x 103
set reply [r XINFO STREAM x FULL]
assert_equal [llength $reply] 12
assert_equal [lindex $reply 1] 4 ;# stream length
assert_equal [lindex $reply 9] "{100-0 {a 1}} {101-0 {b 1}} {102-0 {c 1}} {104-0 {f 1}}" ;# entries
assert_equal [lindex $reply 11 0 1] "g1" ;# first group name
assert_equal [lindex $reply 11 0 7 0 0] "100-0" ;# first entry in group's PEL
assert_equal [lindex $reply 11 0 9 0 1] "Alice" ;# first consumer
assert_equal [lindex $reply 11 0 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL
assert_equal [lindex $reply 11 1 1] "g2" ;# second group name
assert_equal [lindex $reply 11 1 9 0 1] "Charlie" ;# first consumer
assert_equal [lindex $reply 11 1 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL
assert_equal [lindex $reply 11 1 9 0 7 1 0] "101-0" ;# second entry in first consumer's PEL
set reply [r XINFO STREAM x FULL COUNT 1]
assert_equal [llength $reply] 12
assert_equal [lindex $reply 1] 4
assert_equal [lindex $reply 9] "{100-0 {a 1}}"
}
start_server {} {
set master [srv -1 client]
set master_host [srv -1 host]