Streams: XTRIM command added.

This commit is contained in:
antirez 2018-04-19 16:25:29 +02:00
parent 19ae809458
commit e6b0e8d9ec
3 changed files with 70 additions and 0 deletions

View File

@ -314,6 +314,7 @@ struct redisCommand redisCommandTable[] = {
{"xclaim",xclaimCommand,-5,"wF",0,NULL,1,1,1,0,0},
{"xinfo",xinfoCommand,-2,"r",0,NULL,2,2,1,0,0},
{"xdel",xdelCommand,-2,"wF",0,NULL,1,1,1,0,0},
{"xtrim",xtrimCommand,-2,"wF",0,NULL,1,1,1,0,0},
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}

View File

@ -2054,6 +2054,7 @@ void xpendingCommand(client *c);
void xclaimCommand(client *c);
void xinfoCommand(client *c);
void xdelCommand(client *c);
void xtrimCommand(client *c);
#if defined(__GNUC__)
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));

View File

@ -2024,6 +2024,74 @@ void xdelCommand(client *c) {
server.dirty += deleted;
addReplyLongLong(c,deleted);
}
/* General form: XTRIM <key> [... options ...]
*
* List of options:
*
* MAXLEN [~] <count> -- Trim so that the stream will be capped at
* the specified length. Use ~ before the
* count in order to demand approximated trimming
* (like XADD MAXLEN option).
*/
#define TRIM_STRATEGY_NONE 0
#define TRIM_STRATEGY_MAXLEN 1
void xtrimCommand(client *c) {
robj *o;
/* If the key does not exist, we are ok returning zero, that is, the
* number of elements removed from the stream. */
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL
|| checkType(c,o,OBJ_STREAM)) return;
stream *s = o->ptr;
/* Argument parsing. */
int trim_strategy = TRIM_STRATEGY_NONE;
long long maxlen = 0; /* 0 means no maximum length. */
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
the maxium length is not applied verbatim. */
/* Parse options. */
int i = 2; /* Start of options. */
for (; i < c->argc; i++) {
int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
char *opt = c->argv[i]->ptr;
if (!strcasecmp(opt,"maxlen") && moreargs) {
trim_strategy = TRIM_STRATEGY_MAXLEN;
char *next = c->argv[i+1]->ptr;
/* Check for the form MAXLEN ~ <count>. */
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
approx_maxlen = 1;
i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
i++;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
/* Perform the trimming. */
int64_t deleted = 0;
if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
deleted = streamTrimByLength(s,maxlen,approx_maxlen);
} else {
addReplyError(c,"XTRIM called without an option to trim the stream");
return;
}
/* Propagate the write if needed. */
if (deleted) {
signalModifiedKey(c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
server.dirty += deleted;
}
addReplyLongLong(c,deleted);
}
/* XINFO CONSUMERS key group
* XINFO GROUPS <key>
* XINFO STREAM <key>