Add modules API for streams (#8288)

APIs added for these stream operations: add, delete, iterate and
trim (by ID or maxlength). The functions are prefixed by RM_Stream.

* RM_StreamAdd
* RM_StreamDelete
* RM_StreamIteratorStart
* RM_StreamIteratorStop
* RM_StreamIteratorNextID
* RM_StreamIteratorNextField
* RM_StreamIteratorDelete
* RM_StreamTrimByLength
* RM_StreamTrimByID

The type RedisModuleStreamID is added and functions for converting
from and to RedisModuleString.

* RM_CreateStringFromStreamID
* RM_StringToStreamID

Whenever the stream functions return REDISMODULE_ERR, errno is set to
provide additional error information.

Refactoring: The zset iterator fields in the RedisModuleKey struct
are wrapped in a union, to allow the same space to be used for type-
specific info for streams and allow future use for other key types.
This commit is contained in:
Viktor Söderqvist 2021-01-28 15:19:43 +01:00 committed by GitHub
parent bb7cd97439
commit 4355145a62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1089 additions and 76 deletions

View File

@ -31,4 +31,5 @@ $TCLSH tests/test_helper.tcl \
--single unit/moduleapi/getkeys \
--single unit/moduleapi/test_lazyfree \
--single unit/moduleapi/defrag \
--single unit/moduleapi/stream \
"${@}"

View File

@ -177,15 +177,25 @@ struct RedisModuleKey {
void *iter; /* Iterator. */
int mode; /* Opening mode. */
/* Zset iterator. */
uint32_t ztype; /* REDISMODULE_ZSET_RANGE_* */
zrangespec zrs; /* Score range. */
zlexrangespec zlrs; /* Lex range. */
uint32_t zstart; /* Start pos for positional ranges. */
uint32_t zend; /* End pos for positional ranges. */
void *zcurrent; /* Zset iterator current node. */
int zer; /* Zset iterator end reached flag
(true if end was reached). */
union {
struct {
/* Zset iterator, use only if value->type == OBJ_ZSET */
uint32_t type; /* REDISMODULE_ZSET_RANGE_* */
zrangespec rs; /* Score range. */
zlexrangespec lrs; /* Lex range. */
uint32_t start; /* Start pos for positional ranges. */
uint32_t end; /* End pos for positional ranges. */
void *current; /* Zset iterator current node. */
int er; /* Zset iterator end reached flag
(true if end was reached). */
} zset;
struct {
/* Stream, use only if value->type == OBJ_STREAM */
streamID currentid; /* Current entry while iterating. */
int64_t numfieldsleft; /* Fields left to fetch for current entry. */
int signalready; /* Flag that signalKeyAsReady() is needed. */
} stream;
} u;
};
typedef struct RedisModuleKey RedisModuleKey;
@ -376,6 +386,7 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int
void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx);
void RM_ZsetRangeStop(RedisModuleKey *kp);
static void zsetKeyReset(RedisModuleKey *key);
static void moduleInitKeyTypeSpecific(RedisModuleKey *key);
void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d);
void RM_FreeServerInfo(RedisModuleCtx *ctx, RedisModuleServerInfoData *data);
@ -509,10 +520,14 @@ int moduleCreateEmptyKey(RedisModuleKey *key, int type) {
case REDISMODULE_KEYTYPE_HASH:
obj = createHashObject();
break;
case REDISMODULE_KEYTYPE_STREAM:
obj = createStreamObject();
break;
default: return REDISMODULE_ERR;
}
dbAdd(key->db,key->key,obj);
key->value = obj;
moduleInitKeyTypeSpecific(key);
return REDISMODULE_OK;
}
@ -1113,6 +1128,18 @@ RedisModuleString *RM_CreateStringFromString(RedisModuleCtx *ctx, const RedisMod
return o;
}
/* Creates a string from a stream ID. The returned string must be released with
* RedisModule_FreeString(), unless automatic memory is enabled.
*
* The passed context `ctx` may be NULL if necessary. See the
* RedisModule_CreateString() documentation for more info. */
RedisModuleString *RM_CreateStringFromStreamID(RedisModuleCtx *ctx, const RedisModuleStreamID *id) {
streamID streamid = {id->ms, id->seq};
RedisModuleString *o = createObjectFromStreamID(&streamid);
if (ctx != NULL) autoMemoryAdd(ctx, REDISMODULE_AM_STRING, o);
return o;
}
/* Free a module string object obtained with one of the Redis modules API calls
* that return new string objects.
*
@ -1270,6 +1297,30 @@ int RM_StringToLongDouble(const RedisModuleString *str, long double *ld) {
return retval ? REDISMODULE_OK : REDISMODULE_ERR;
}
/* Convert the string into a stream ID, storing it at `*id`.
* Returns REDISMODULE_OK on success and returns REDISMODULE_ERR if the string
* is not a valid string representation of a stream ID. The special IDs "+" and
* "-" are allowed.
*
* RedisModuleStreamID is a struct with two 64-bit fields, which is used in
* stream functions and defined as
*
* typedef struct RedisModuleStreamID {
* uint64_t ms;
* uint64_t seq;
* } RedisModuleStreamID;
*/
int RM_StringToStreamID(const RedisModuleString *str, RedisModuleStreamID *id) {
streamID streamid;
if (streamParseID(str, &streamid) == C_OK) {
id->ms = streamid.ms;
id->seq = streamid.seq;
return REDISMODULE_OK;
} else {
return REDISMODULE_ERR;
}
}
/* Compare two string objects, returning -1, 0 or 1 respectively if
* a < b, a == b, a > b. Strings are compared byte by byte as two
* binary blobs without any encoding care / collation attempt. */
@ -2072,7 +2123,15 @@ static void moduleInitKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname
kp->value = value;
kp->iter = NULL;
kp->mode = mode;
zsetKeyReset(kp);
if (kp->value) moduleInitKeyTypeSpecific(kp);
}
/* Initialize the type-specific part of the key. Only when key has a value. */
static void moduleInitKeyTypeSpecific(RedisModuleKey *key) {
switch (key->value->type) {
case OBJ_ZSET: zsetKeyReset(key); break;
case OBJ_STREAM: key->u.stream.signalready = 0; break;
}
}
/* Return an handle representing a Redis key, so that it is possible
@ -2115,8 +2174,13 @@ static void moduleCloseKey(RedisModuleKey *key) {
int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx);
if ((key->mode & REDISMODULE_WRITE) && signal)
signalModifiedKey(key->ctx->client,key->db,key->key);
/* TODO: if (key->iter) RM_KeyIteratorStop(kp); */
if (key->iter) zfree(key->iter);
RM_ZsetRangeStop(key);
if (key && key->value && key->value->type == OBJ_STREAM &&
key->u.stream.signalready) {
/* One of more RM_StreamAdd() have been done. */
signalKeyAsReady(key->db, key->key, OBJ_STREAM);
}
decrRefCount(key->key);
}
@ -2545,16 +2609,17 @@ int RM_ZsetScore(RedisModuleKey *key, RedisModuleString *ele, double *score) {
* -------------------------------------------------------------------------- */
void zsetKeyReset(RedisModuleKey *key) {
key->ztype = REDISMODULE_ZSET_RANGE_NONE;
key->zcurrent = NULL;
key->zer = 1;
key->u.zset.type = REDISMODULE_ZSET_RANGE_NONE;
key->u.zset.current = NULL;
key->u.zset.er = 1;
}
/* Stop a sorted set iteration. */
void RM_ZsetRangeStop(RedisModuleKey *key) {
if (!key->value || key->value->type != OBJ_ZSET) return;
/* Free resources if needed. */
if (key->ztype == REDISMODULE_ZSET_RANGE_LEX)
zslFreeLexRange(&key->zlrs);
if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX)
zslFreeLexRange(&key->u.zset.lrs);
/* Setup sensible values so that misused iteration API calls when an
* iterator is not active will result into something more sensible
* than crashing. */
@ -2563,7 +2628,7 @@ void RM_ZsetRangeStop(RedisModuleKey *key) {
/* Return the "End of range" flag value to signal the end of the iteration. */
int RM_ZsetRangeEndReached(RedisModuleKey *key) {
return key->zer;
return key->u.zset.er;
}
/* Helper function for RM_ZsetFirstInScoreRange() and RM_ZsetLastInScoreRange().
@ -2576,29 +2641,29 @@ int zsetInitScoreRange(RedisModuleKey *key, double min, double max, int minex, i
if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
RM_ZsetRangeStop(key);
key->ztype = REDISMODULE_ZSET_RANGE_SCORE;
key->zer = 0;
key->u.zset.type = REDISMODULE_ZSET_RANGE_SCORE;
key->u.zset.er = 0;
/* Setup the range structure used by the sorted set core implementation
* in order to seek at the specified element. */
zrangespec *zrs = &key->zrs;
zrangespec *zrs = &key->u.zset.rs;
zrs->min = min;
zrs->max = max;
zrs->minex = minex;
zrs->maxex = maxex;
if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
key->zcurrent = first ? zzlFirstInRange(key->value->ptr,zrs) :
zzlLastInRange(key->value->ptr,zrs);
key->u.zset.current = first ? zzlFirstInRange(key->value->ptr,zrs) :
zzlLastInRange(key->value->ptr,zrs);
} else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = key->value->ptr;
zskiplist *zsl = zs->zsl;
key->zcurrent = first ? zslFirstInRange(zsl,zrs) :
zslLastInRange(zsl,zrs);
key->u.zset.current = first ? zslFirstInRange(zsl,zrs) :
zslLastInRange(zsl,zrs);
} else {
serverPanic("Unsupported zset encoding");
}
if (key->zcurrent == NULL) key->zer = 1;
if (key->u.zset.current == NULL) key->u.zset.er = 1;
return REDISMODULE_OK;
}
@ -2640,29 +2705,29 @@ int zsetInitLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleStr
if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR;
RM_ZsetRangeStop(key);
key->zer = 0;
key->u.zset.er = 0;
/* Setup the range structure used by the sorted set core implementation
* in order to seek at the specified element. */
zlexrangespec *zlrs = &key->zlrs;
zlexrangespec *zlrs = &key->u.zset.lrs;
if (zslParseLexRange(min, max, zlrs) == C_ERR) return REDISMODULE_ERR;
/* Set the range type to lex only after successfully parsing the range,
* otherwise we don't want the zlexrangespec to be freed. */
key->ztype = REDISMODULE_ZSET_RANGE_LEX;
key->u.zset.type = REDISMODULE_ZSET_RANGE_LEX;
if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
key->zcurrent = first ? zzlFirstInLexRange(key->value->ptr,zlrs) :
zzlLastInLexRange(key->value->ptr,zlrs);
key->u.zset.current = first ? zzlFirstInLexRange(key->value->ptr,zlrs) :
zzlLastInLexRange(key->value->ptr,zlrs);
} else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = key->value->ptr;
zskiplist *zsl = zs->zsl;
key->zcurrent = first ? zslFirstInLexRange(zsl,zlrs) :
zslLastInLexRange(zsl,zlrs);
key->u.zset.current = first ? zslFirstInLexRange(zsl,zlrs) :
zslLastInLexRange(zsl,zlrs);
} else {
serverPanic("Unsupported zset encoding");
}
if (key->zcurrent == NULL) key->zer = 1;
if (key->u.zset.current == NULL) key->u.zset.er = 1;
return REDISMODULE_OK;
}
@ -2695,10 +2760,11 @@ int RM_ZsetLastInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModu
RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score) {
RedisModuleString *str;
if (key->zcurrent == NULL) return NULL;
if (!key->value || key->value->type != OBJ_ZSET) return NULL;
if (key->u.zset.current == NULL) return NULL;
if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *eptr, *sptr;
eptr = key->zcurrent;
eptr = key->u.zset.current;
sds ele = ziplistGetObject(eptr);
if (score) {
sptr = ziplistNext(key->value->ptr,eptr);
@ -2706,7 +2772,7 @@ RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score
}
str = createObject(OBJ_STRING,ele);
} else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
zskiplistNode *ln = key->zcurrent;
zskiplistNode *ln = key->u.zset.current;
if (score) *score = ln->score;
str = createStringObject(ln->ele,sdslen(ln->ele));
} else {
@ -2720,58 +2786,59 @@ RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score
* a next element, 0 if we are already at the latest element or the range
* does not include any item at all. */
int RM_ZsetRangeNext(RedisModuleKey *key) {
if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */
if (!key->value || key->value->type != OBJ_ZSET) return 0;
if (!key->u.zset.type || !key->u.zset.current) return 0; /* No active iterator. */
if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = key->value->ptr;
unsigned char *eptr = key->zcurrent;
unsigned char *eptr = key->u.zset.current;
unsigned char *next;
next = ziplistNext(zl,eptr); /* Skip element. */
if (next) next = ziplistNext(zl,next); /* Skip score. */
if (next == NULL) {
key->zer = 1;
key->u.zset.er = 1;
return 0;
} else {
/* Are we still within the range? */
if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) {
if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE) {
/* Fetch the next element score for the
* range check. */
unsigned char *saved_next = next;
next = ziplistNext(zl,next); /* Skip next element. */
double score = zzlGetScore(next); /* Obtain the next score. */
if (!zslValueLteMax(score,&key->zrs)) {
key->zer = 1;
if (!zslValueLteMax(score,&key->u.zset.rs)) {
key->u.zset.er = 1;
return 0;
}
next = saved_next;
} else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
if (!zzlLexValueLteMax(next,&key->zlrs)) {
key->zer = 1;
} else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) {
if (!zzlLexValueLteMax(next,&key->u.zset.lrs)) {
key->u.zset.er = 1;
return 0;
}
}
key->zcurrent = next;
key->u.zset.current = next;
return 1;
}
} else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
zskiplistNode *ln = key->zcurrent, *next = ln->level[0].forward;
zskiplistNode *ln = key->u.zset.current, *next = ln->level[0].forward;
if (next == NULL) {
key->zer = 1;
key->u.zset.er = 1;
return 0;
} else {
/* Are we still within the range? */
if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
!zslValueLteMax(next->score,&key->zrs))
if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE &&
!zslValueLteMax(next->score,&key->u.zset.rs))
{
key->zer = 1;
key->u.zset.er = 1;
return 0;
} else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
if (!zslLexValueLteMax(next->ele,&key->zlrs)) {
key->zer = 1;
} else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) {
if (!zslLexValueLteMax(next->ele,&key->u.zset.lrs)) {
key->u.zset.er = 1;
return 0;
}
}
key->zcurrent = next;
key->u.zset.current = next;
return 1;
}
} else {
@ -2783,58 +2850,59 @@ int RM_ZsetRangeNext(RedisModuleKey *key) {
* a previous element, 0 if we are already at the first element or the range
* does not include any item at all. */
int RM_ZsetRangePrev(RedisModuleKey *key) {
if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */
if (!key->value || key->value->type != OBJ_ZSET) return 0;
if (!key->u.zset.type || !key->u.zset.current) return 0; /* No active iterator. */
if (key->value->encoding == OBJ_ENCODING_ZIPLIST) {
unsigned char *zl = key->value->ptr;
unsigned char *eptr = key->zcurrent;
unsigned char *eptr = key->u.zset.current;
unsigned char *prev;
prev = ziplistPrev(zl,eptr); /* Go back to previous score. */
if (prev) prev = ziplistPrev(zl,prev); /* Back to previous ele. */
if (prev == NULL) {
key->zer = 1;
key->u.zset.er = 1;
return 0;
} else {
/* Are we still within the range? */
if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) {
if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE) {
/* Fetch the previous element score for the
* range check. */
unsigned char *saved_prev = prev;
prev = ziplistNext(zl,prev); /* Skip element to get the score.*/
double score = zzlGetScore(prev); /* Obtain the prev score. */
if (!zslValueGteMin(score,&key->zrs)) {
key->zer = 1;
if (!zslValueGteMin(score,&key->u.zset.rs)) {
key->u.zset.er = 1;
return 0;
}
prev = saved_prev;
} else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
if (!zzlLexValueGteMin(prev,&key->zlrs)) {
key->zer = 1;
} else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) {
if (!zzlLexValueGteMin(prev,&key->u.zset.lrs)) {
key->u.zset.er = 1;
return 0;
}
}
key->zcurrent = prev;
key->u.zset.current = prev;
return 1;
}
} else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) {
zskiplistNode *ln = key->zcurrent, *prev = ln->backward;
zskiplistNode *ln = key->u.zset.current, *prev = ln->backward;
if (prev == NULL) {
key->zer = 1;
key->u.zset.er = 1;
return 0;
} else {
/* Are we still within the range? */
if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE &&
!zslValueGteMin(prev->score,&key->zrs))
if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE &&
!zslValueGteMin(prev->score,&key->u.zset.rs))
{
key->zer = 1;
key->u.zset.er = 1;
return 0;
} else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) {
if (!zslLexValueGteMin(prev->ele,&key->zlrs)) {
key->zer = 1;
} else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) {
if (!zslLexValueGteMin(prev->ele,&key->u.zset.lrs)) {
key->u.zset.er = 1;
return 0;
}
}
key->zcurrent = prev;
key->u.zset.current = prev;
return 1;
}
} else {
@ -3049,6 +3117,455 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) {
return REDISMODULE_OK;
}
/* --------------------------------------------------------------------------
* Key API for the stream type.
* -------------------------------------------------------------------------- */
/* Adds an entry to a stream. Like XADD without trimming.
*
* - `key`: The key where the stream is (or will be) stored
* - `flags`: A bit field of
* - `REDISMODULE_STREAM_ADD_AUTOID`: Assign a stream ID automatically, like
* `*` in the XADD command.
* - `id`: If the `AUTOID` flag is set, this is where the assigned ID is
* returned. Can be NULL if `AUTOID` is set, if you don't care to receive the
* ID. If `AUTOID` is not set, this is the requested ID.
* - `argv`: A pointer to an array of size `numfields * 2` containing the
* fields and values.
* - `numfields`: The number of field-value pairs in `argv`.
*
* Returns REDISMODULE_OK if an entry has been added. On failure,
* REDISMODULE_ERR is returned and `errno` is set as follows:
*
* - EINVAL if called with invalid arguments
* - ENOTSUP if the key refers to a value of a type other than stream
* - EBADF if the key was not opened for writing
* - EDOM if the given ID was 0-0 or not greater than all other IDs in the
* stream (only if the AUTOID flag is unset)
* - EFBIG if the stream has reached the last possible ID
*/
int RM_StreamAdd(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisModuleString **argv, long numfields) {
/* Validate args */
if (!key || (numfields != 0 && !argv) || /* invalid key or argv */
(flags & ~(REDISMODULE_STREAM_ADD_AUTOID)) || /* invalid flags */
(!(flags & REDISMODULE_STREAM_ADD_AUTOID) && !id)) { /* id required */
errno = EINVAL;
return REDISMODULE_ERR;
} else if (key->value && key->value->type != OBJ_STREAM) {
errno = ENOTSUP; /* wrong type */
return REDISMODULE_ERR;
} else if (!(key->mode & REDISMODULE_WRITE)) {
errno = EBADF; /* key not open for writing */
return REDISMODULE_ERR;
} else if (!(flags & REDISMODULE_STREAM_ADD_AUTOID) &&
id->ms == 0 && id->seq == 0) {
errno = EDOM; /* ID out of range */
return REDISMODULE_ERR;
}
/* Create key if necessery */
int created = 0;
if (key->value == NULL) {
moduleCreateEmptyKey(key, REDISMODULE_KEYTYPE_STREAM);
created = 1;
}
stream *s = key->value->ptr;
if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) {
/* The stream has reached the last possible ID */
errno = EFBIG;
return REDISMODULE_ERR;
}
streamID added_id;
streamID use_id;
streamID *use_id_ptr = NULL;
if (!(flags & REDISMODULE_STREAM_ADD_AUTOID)) {
use_id.ms = id->ms;
use_id.seq = id->seq;
use_id_ptr = &use_id;
}
if (streamAppendItem(s, argv, numfields, &added_id, use_id_ptr) == C_ERR) {
/* ID not greater than all existing IDs in the stream */
errno = EDOM;
return REDISMODULE_ERR;
}
/* Postponed signalKeyAsReady(). Done implicitly by moduleCreateEmptyKey()
* so not needed if the stream has just been created. */
if (!created) key->u.stream.signalready = 1;
if (id != NULL) {
id->ms = added_id.ms;
id->seq = added_id.seq;
}
return REDISMODULE_OK;
}
/* Deletes an entry from a stream.
*
* - `key`: A key opened for writing, with no stream iterator started.
* - `id`: The stream ID of the entry to delete.
*
* Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned
* and `errno` is set as follows:
*
* - EINVAL if called with invalid arguments
* - ENOTSUP if the key refers to a value of a type other than stream or if the
* key is empty
* - EBADF if the key was not opened for writing or if a stream iterator is
* associated with the key
* - ENOENT if no entry with the given stream ID exists
*
* See also RM_StreamIteratorDelete() for deleting the current entry while
* iterating using a stream iterator.
*/
int RM_StreamDelete(RedisModuleKey *key, RedisModuleStreamID *id) {
if (!key || !id) {
errno = EINVAL;
return REDISMODULE_ERR;
} else if (!key->value || key->value->type != OBJ_STREAM) {
errno = ENOTSUP; /* wrong type */
return REDISMODULE_ERR;
} else if (!(key->mode & REDISMODULE_WRITE) ||
key->iter != NULL) {
errno = EBADF; /* key not opened for writing or iterator started */
return REDISMODULE_ERR;
}
stream *s = key->value->ptr;
streamID streamid = {id->ms, id->seq};
if (streamDeleteItem(s, &streamid)) {
return REDISMODULE_OK;
} else {
errno = ENOENT; /* no entry with this id */
return REDISMODULE_ERR;
}
}
/* Sets up a stream iterator.
*
* - `key`: The stream key opened for reading using RedisModule_OpenKey().
* - `flags`:
* - `REDISMODULE_STREAM_ITERATOR_EXCLUSIVE`: Don't include `start` and `end`
* in the iterated range.
* - `REDISMODULE_STREAM_ITERATOR_REVERSE`: Iterate in reverse order, starting
* from the `end` of the range.
* - `start`: The lower bound of the range. Use NULL for the beginning of the
* stream.
* - `end`: The upper bound of the range. Use NULL for the end of the stream.
*
* Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned
* and `errno` is set as follows:
*
* - EINVAL if called with invalid arguments
* - ENOTSUP if the key refers to a value of a type other than stream or if the
* key is empty
* - EBADF if the key was not opened for writing or if a stream iterator is
* already associated with the key
* - EDOM if `start` or `end` is outside the valid range
*
* Returns REDISMODULE_OK on success and REDISMODULE_ERR if the key doesn't
* refer to a stream or if invalid arguments were given.
*
* The stream IDs are retrieved using RedisModule_StreamIteratorNextID() and
* for each stream ID, the fields and values are retrieved using
* RedisModule_StreamIteratorNextField(). The iterator is freed by calling
* RedisModule_StreamIteratorStop().
*
* Example (error handling omitted):
*
* RedisModule_StreamIteratorStart(key, 0, startid_ptr, endid_ptr);
* RedisModuleStreamID id;
* long numfields;
* while (RedisModule_StreamIteratorNextID(key, &id, &numfields) ==
* REDISMODULE_OK) {
* RedisModuleString *field, *value;
* while (RedisModule_StreamIteratorNextField(key, &field, &value) ==
* REDISMODULE_OK) {
* //
* // ... Do stuff ...
* //
* RedisModule_Free(field);
* RedisModule_Free(value);
* }
* }
* RedisModule_StreamIteratorStop(key);
*/
int RM_StreamIteratorStart(RedisModuleKey *key, int flags, RedisModuleStreamID *start, RedisModuleStreamID *end) {
/* check args */
if (!key ||
(flags & ~(REDISMODULE_STREAM_ITERATOR_EXCLUSIVE |
REDISMODULE_STREAM_ITERATOR_REVERSE))) {
errno = EINVAL; /* key missing or invalid flags */
return REDISMODULE_ERR;
} else if (!key->value || key->value->type != OBJ_STREAM) {
errno = ENOTSUP;
return REDISMODULE_ERR; /* not a stream */
} else if (key->iter) {
errno = EBADF; /* iterator already started */
return REDISMODULE_ERR;
}
/* define range for streamIteratorStart() */
streamID lower, upper;
if (start) lower = (streamID){start->ms, start->seq};
if (end) upper = (streamID){end->ms, end->seq};
if (flags & REDISMODULE_STREAM_ITERATOR_EXCLUSIVE) {
if ((start && streamIncrID(&lower) != C_OK) ||
(end && streamDecrID(&upper) != C_OK)) {
errno = EDOM; /* end is 0-0 or start is MAX-MAX? */
return REDISMODULE_ERR;
}
}
/* create iterator */
stream *s = key->value->ptr;
int rev = flags & REDISMODULE_STREAM_ITERATOR_REVERSE;
streamIterator *si = zmalloc(sizeof(*si));
streamIteratorStart(si, s, start ? &lower : NULL, end ? &upper : NULL, rev);
key->iter = si;
key->u.stream.currentid.ms = 0; /* for RM_StreamIteratorDelete() */
key->u.stream.currentid.seq = 0;
key->u.stream.numfieldsleft = 0; /* for RM_StreamIteratorNextField() */
return REDISMODULE_OK;
}
/* Stops a stream iterator created using RedisModule_StreamIteratorStart() and
* reclaims its memory.
*
* Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned
* and `errno` is set as follows:
*
* - EINVAL if called with a NULL key
* - ENOTSUP if the key refers to a value of a type other than stream or if the
* key is empty
* - EBADF if the key was not opened for writing or if no stream iterator is
* associated with the key
*/
int RM_StreamIteratorStop(RedisModuleKey *key) {
if (!key) {
errno = EINVAL;
return REDISMODULE_ERR;
} else if (!key->value || key->value->type != OBJ_STREAM) {
errno = ENOTSUP;
return REDISMODULE_ERR;
} else if (!key->iter) {
errno = EBADF;
return REDISMODULE_ERR;
}
zfree(key->iter);
key->iter = NULL;
return REDISMODULE_OK;
}
/* Finds the next stream entry and returns its stream ID and the number of
* fields.
*
* - `key`: Key for which a stream iterator has been started using
* RedisModule_StreamIteratorStart().
* - `id`: The stream ID returned. NULL if you don't care.
* - `numfields`: The number of fields in the found stream entry. NULL if you
* don't care.
*
* Returns REDISMODULE_OK and sets `*id` and `*numfields` if an entry was found.
* On failure, REDISMODULE_ERR is returned and `errno` is set as follows:
*
* - EINVAL if called with a NULL key
* - ENOTSUP if the key refers to a value of a type other than stream or if the
* key is empty
* - EBADF if no stream iterator is associated with the key
* - ENOENT if there are no more entries in the range of the iterator
*
* In practice, if RM_StreamIteratorNextID() is called after a successful call
* to RM_StreamIteratorStart() and with the same key, it is safe to assume that
* an REDISMODULE_ERR return value means that there are no more entries.
*
* Use RedisModule_StreamIteratorNextField() to retrieve the fields and values.
* See the example at RedisModule_StreamIteratorStart().
*/
int RM_StreamIteratorNextID(RedisModuleKey *key, RedisModuleStreamID *id, long *numfields) {
if (!key) {
errno = EINVAL;
return REDISMODULE_ERR;
} else if (!key->value || key->value->type != OBJ_STREAM) {
errno = ENOTSUP;
return REDISMODULE_ERR;
} else if (!key->iter) {
errno = EBADF;
return REDISMODULE_ERR;
}
streamIterator *si = key->iter;
int64_t *num_ptr = &key->u.stream.numfieldsleft;
streamID *streamid_ptr = &key->u.stream.currentid;
if (streamIteratorGetID(si, streamid_ptr, num_ptr)) {
if (id) {
id->ms = streamid_ptr->ms;
id->seq = streamid_ptr->seq;
}
if (numfields) *numfields = *num_ptr;
return REDISMODULE_OK;
} else {
/* No entry found. */
key->u.stream.currentid.ms = 0; /* for RM_StreamIteratorDelete() */
key->u.stream.currentid.seq = 0;
key->u.stream.numfieldsleft = 0; /* for RM_StreamIteratorNextField() */
errno = ENOENT;
return REDISMODULE_ERR;
}
}
/* Retrieves the next field of the current stream ID and its corresponding value
* in a stream iteration. This function should be called repeatedly after calling
* RedisModule_StreamIteratorNextID() to fetch each field-value pair.
*
* - `key`: Key where a stream iterator has been started.
* - `field_ptr`: This is where the field is returned.
* - `value_ptr`: This is where the value is returned.
*
* Returns REDISMODULE_OK and points `*field_ptr` and `*value_ptr` to freshly
* allocated RedisModuleString objects. The string objects are freed
* automatically when the callback finishes if automatic memory is enabled. On
* failure, REDISMODULE_ERR is returned and `errno` is set as follows:
*
* - EINVAL if called with a NULL key
* - ENOTSUP if the key refers to a value of a type other than stream or if the
* key is empty
* - EBADF if no stream iterator is associated with the key
* - ENOENT if there are no more fields in the current stream entry
*
* In practice, if RM_StreamIteratorNextField() is called after a successful
* call to RM_StreamIteratorNextID() and with the same key, it is safe to assume
* that an REDISMODULE_ERR return value means that there are no more fields.
*
* See the example at RedisModule_StreamIteratorStart().
*/
int RM_StreamIteratorNextField(RedisModuleKey *key, RedisModuleString **field_ptr, RedisModuleString **value_ptr) {
if (!key) {
errno = EINVAL;
return REDISMODULE_ERR;
} else if (!key->value || key->value->type != OBJ_STREAM) {
errno = ENOTSUP;
return REDISMODULE_ERR;
} else if (!key->iter) {
errno = EBADF;
return REDISMODULE_ERR;
} else if (key->u.stream.numfieldsleft <= 0) {
errno = ENOENT;
return REDISMODULE_ERR;
}
streamIterator *si = key->iter;
unsigned char *field, *value;
int64_t field_len, value_len;
streamIteratorGetField(si, &field, &value, &field_len, &value_len);
if (field_ptr) {
*field_ptr = createRawStringObject((char *)field, field_len);
autoMemoryAdd(key->ctx, REDISMODULE_AM_STRING, *field_ptr);
}
if (value_ptr) {
*value_ptr = createRawStringObject((char *)value, value_len);
autoMemoryAdd(key->ctx, REDISMODULE_AM_STRING, *value_ptr);
}
key->u.stream.numfieldsleft--;
return REDISMODULE_OK;
}
/* Deletes the current stream entry while iterating.
*
* This function can be called after RM_StreamIteratorNextID() or after any
* calls to RM_StreamIteratorNextField().
*
* Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned
* and `errno` is set as follows:
*
* - EINVAL if key is NULL
* - ENOTSUP if the key is empty or is of another type than stream
* - EBADF if the key is not opened for writing, if no iterator has been started
* - ENOENT if the iterator has no current stream entry
*/
int RM_StreamIteratorDelete(RedisModuleKey *key) {
if (!key) {
errno = EINVAL;
return REDISMODULE_ERR;
} else if (!key->value || key->value->type != OBJ_STREAM) {
errno = ENOTSUP;
return REDISMODULE_ERR;
} else if (!(key->mode & REDISMODULE_WRITE) || !key->iter) {
errno = EBADF;
return REDISMODULE_ERR;
} else if (key->u.stream.currentid.ms == 0 &&
key->u.stream.currentid.seq == 0) {
errno = ENOENT;
return REDISMODULE_ERR;
}
streamIterator *si = key->iter;
streamIteratorRemoveEntry(si, &key->u.stream.currentid);
key->u.stream.currentid.ms = 0; /* Make sure repeated Delete() fails */
key->u.stream.currentid.seq = 0;
key->u.stream.numfieldsleft = 0; /* Make sure NextField() fails */
return REDISMODULE_OK;
}
/* Trim a stream by length, similar to XTRIM with MAXLEN.
*
* - `key`: Key opened for writing.
* - `flags`: A bitfield of
* - `REDISMODULE_STREAM_TRIM_APPROX`: Trim less if it improves performance,
* like XTRIM with `~`.
* - `length`: The number of stream entries to keep after trimming.
*
* Returns the number of entries deleted. On failure, a negative value is
* returned and `errno` is set as follows:
*
* - EINVAL if called with invalid arguments
* - ENOTSUP if the key is empty or of a type other than stream
* - EBADF if the key is not opened for writing
*/
long long RM_StreamTrimByLength(RedisModuleKey *key, int flags, long long length) {
if (!key || (flags & ~(REDISMODULE_STREAM_TRIM_APPROX)) || length < 0) {
errno = EINVAL;
return -1;
} else if (!key->value || key->value->type != OBJ_STREAM) {
errno = ENOTSUP;
return -1;
} else if (!(key->mode & REDISMODULE_WRITE)) {
errno = EBADF;
return -1;
}
int approx = flags & REDISMODULE_STREAM_TRIM_APPROX ? 1 : 0;
return streamTrimByLength((stream *)key->value->ptr, length, approx);
}
/* Trim a stream by ID, similar to XTRIM with MINID.
*
* - `key`: Key opened for writing.
* - `flags`: A bitfield of
* - `REDISMODULE_STREAM_TRIM_APPROX`: Trim less if it improves performance,
* like XTRIM with `~`.
* - `id`: The smallest stream ID to keep after trimming.
*
* Returns the number of entries deleted. On failure, a negative value is
* returned and `errno` is set as follows:
*
* - EINVAL if called with invalid arguments
* - ENOTSUP if the key is empty or of a type other than stream
* - EBADF if the key is not opened for writing
*/
long long RM_StreamTrimByID(RedisModuleKey *key, int flags, RedisModuleStreamID *id) {
if (!key || (flags & ~(REDISMODULE_STREAM_TRIM_APPROX)) || !id) {
errno = EINVAL;
return -1;
} else if (!key->value || key->value->type != OBJ_STREAM) {
errno = ENOTSUP;
return -1;
} else if (!(key->mode & REDISMODULE_WRITE)) {
errno = EBADF;
return -1;
}
int approx = flags & REDISMODULE_STREAM_TRIM_APPROX ? 1 : 0;
streamID minid = (streamID){id->ms, id->seq};
return streamTrimByID((stream *)key->value->ptr, minid, approx);
}
/* --------------------------------------------------------------------------
* Redis <-> Modules generic Call() API
* -------------------------------------------------------------------------- */
@ -8462,6 +8979,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(StringToLongLong);
REGISTER_API(StringToDouble);
REGISTER_API(StringToLongDouble);
REGISTER_API(StringToStreamID);
REGISTER_API(Call);
REGISTER_API(CallReplyProto);
REGISTER_API(FreeCallReply);
@ -8476,6 +8994,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(CreateStringFromDouble);
REGISTER_API(CreateStringFromLongDouble);
REGISTER_API(CreateStringFromString);
REGISTER_API(CreateStringFromStreamID);
REGISTER_API(CreateStringPrintf);
REGISTER_API(FreeString);
REGISTER_API(StringPtrLen);
@ -8507,6 +9026,15 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(ZsetRangeEndReached);
REGISTER_API(HashSet);
REGISTER_API(HashGet);
REGISTER_API(StreamAdd);
REGISTER_API(StreamDelete);
REGISTER_API(StreamIteratorStart);
REGISTER_API(StreamIteratorStop);
REGISTER_API(StreamIteratorNextID);
REGISTER_API(StreamIteratorNextField);
REGISTER_API(StreamIteratorDelete);
REGISTER_API(StreamTrimByLength);
REGISTER_API(StreamTrimByID);
REGISTER_API(IsKeysPositionRequest);
REGISTER_API(KeyAtPos);
REGISTER_API(GetClientId);

View File

@ -69,6 +69,20 @@
#define REDISMODULE_HASH_CFIELDS (1<<2)
#define REDISMODULE_HASH_EXISTS (1<<3)
/* StreamID type. */
typedef struct RedisModuleStreamID {
uint64_t ms;
uint64_t seq;
} RedisModuleStreamID;
/* StreamAdd() flags. */
#define REDISMODULE_STREAM_ADD_AUTOID (1<<0)
/* StreamIteratorStart() flags. */
#define REDISMODULE_STREAM_ITERATOR_EXCLUSIVE (1<<0)
#define REDISMODULE_STREAM_ITERATOR_REVERSE (1<<1)
/* StreamIteratorTrim*() flags. */
#define REDISMODULE_STREAM_TRIM_APPROX (1<<0)
/* Context Flags: Info about the current context returned by
* RM_GetContextFlags(). */
@ -578,6 +592,7 @@ REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringFromLongLong)(Redi
REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringFromDouble)(RedisModuleCtx *ctx, double d) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringFromLongDouble)(RedisModuleCtx *ctx, long double ld, int humanfriendly) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringFromStreamID)(RedisModuleCtx *ctx, const RedisModuleStreamID *id) REDISMODULE_ATTR;
REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...) REDISMODULE_ATTR_PRINTF(2,3) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_FreeString)(RedisModuleCtx *ctx, RedisModuleString *str) REDISMODULE_ATTR;
REDISMODULE_API const char * (*RedisModule_StringPtrLen)(const RedisModuleString *str, size_t *len) REDISMODULE_ATTR;
@ -599,6 +614,7 @@ REDISMODULE_API int (*RedisModule_ReplyWithCallReply)(RedisModuleCtx *ctx, Redis
REDISMODULE_API int (*RedisModule_StringToLongLong)(const RedisModuleString *str, long long *ll) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_StringToDouble)(const RedisModuleString *str, double *d) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_StringToLongDouble)(const RedisModuleString *str, long double *d) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_StringToStreamID)(const RedisModuleString *str, RedisModuleStreamID *id) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_AutoMemory)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_Replicate)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ReplicateVerbatim)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
@ -629,6 +645,15 @@ REDISMODULE_API int (*RedisModule_ZsetRangePrev)(RedisModuleKey *key) REDISMODUL
REDISMODULE_API int (*RedisModule_ZsetRangeEndReached)(RedisModuleKey *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_HashSet)(RedisModuleKey *key, int flags, ...) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_HashGet)(RedisModuleKey *key, int flags, ...) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_StreamAdd)(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisModuleString **argv, int64_t numfields) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_StreamDelete)(RedisModuleKey *key, RedisModuleStreamID *id) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_StreamIteratorStart)(RedisModuleKey *key, int flags, RedisModuleStreamID *startid, RedisModuleStreamID *endid) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_StreamIteratorStop)(RedisModuleKey *key) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_StreamIteratorNextID)(RedisModuleKey *key, RedisModuleStreamID *id, long *numfields) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_StreamIteratorNextField)(RedisModuleKey *key, RedisModuleString **field_ptr, RedisModuleString **value_ptr) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_StreamIteratorDelete)(RedisModuleKey *key) REDISMODULE_ATTR;
REDISMODULE_API long long (*RedisModule_StreamTrimByLength)(RedisModuleKey *key, int flags, long long length) REDISMODULE_ATTR;
REDISMODULE_API long long (*RedisModule_StreamTrimByID)(RedisModuleKey *key, int flags, RedisModuleStreamID *id) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos) REDISMODULE_ATTR;
REDISMODULE_API unsigned long long (*RedisModule_GetClientId)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
@ -842,6 +867,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(StringToLongLong);
REDISMODULE_GET_API(StringToDouble);
REDISMODULE_GET_API(StringToLongDouble);
REDISMODULE_GET_API(StringToStreamID);
REDISMODULE_GET_API(Call);
REDISMODULE_GET_API(CallReplyProto);
REDISMODULE_GET_API(FreeCallReply);
@ -856,6 +882,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(CreateStringFromDouble);
REDISMODULE_GET_API(CreateStringFromLongDouble);
REDISMODULE_GET_API(CreateStringFromString);
REDISMODULE_GET_API(CreateStringFromStreamID);
REDISMODULE_GET_API(CreateStringPrintf);
REDISMODULE_GET_API(FreeString);
REDISMODULE_GET_API(StringPtrLen);
@ -887,6 +914,15 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(ZsetRangeEndReached);
REDISMODULE_GET_API(HashSet);
REDISMODULE_GET_API(HashGet);
REDISMODULE_GET_API(StreamAdd);
REDISMODULE_GET_API(StreamDelete);
REDISMODULE_GET_API(StreamIteratorStart);
REDISMODULE_GET_API(StreamIteratorStop);
REDISMODULE_GET_API(StreamIteratorNextID);
REDISMODULE_GET_API(StreamIteratorNextField);
REDISMODULE_GET_API(StreamIteratorDelete);
REDISMODULE_GET_API(StreamTrimByLength);
REDISMODULE_GET_API(StreamTrimByID);
REDISMODULE_GET_API(IsKeysPositionRequest);
REDISMODULE_GET_API(KeyAtPos);
REDISMODULE_GET_API(GetClientId);

View File

@ -108,6 +108,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created);
@ -121,5 +122,11 @@ int streamDecrID(streamID *id);
void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
robj *streamDup(robj *o);
int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep);
int streamParseID(const robj *o, streamID *id);
robj *createObjectFromStreamID(streamID *id);
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id);
int streamDeleteItem(stream *s, streamID *id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);
#endif

View File

@ -818,6 +818,28 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
return deleted;
}
/* Trims a stream by length. Returns the number of deleted items. */
int64_t streamTrimByLength(stream *s, long long maxlen, int approx) {
streamAddTrimArgs args = {
.trim_strategy = TRIM_STRATEGY_MAXLEN,
.approx_trim = approx,
.limit = approx ? 100 * server.stream_node_max_entries : 0,
.maxlen = maxlen
};
return streamTrim(s, &args);
}
/* Trims a stream by minimum ID. Returns the number of deleted items. */
int64_t streamTrimByID(stream *s, streamID minid, int approx) {
streamAddTrimArgs args = {
.trim_strategy = TRIM_STRATEGY_MINID,
.approx_trim = approx,
.limit = approx ? 100 * server.stream_node_max_entries : 0,
.minid = minid
};
return streamTrim(s, &args);
}
/* Parse the arguements of XADD/XTRIM.
*
* See streamAddTrimArgs for more details about the arguments handled.
@ -1625,7 +1647,7 @@ robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
* treated as an invalid ID.
*
* If 'c' is set to NULL, no reply is sent to the client. */
int streamGenericParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int strict) {
int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict) {
char buf[128];
if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
memcpy(buf,o->ptr,sdslen(o->ptr)+1);
@ -1661,6 +1683,11 @@ invalid:
return C_ERR;
}
/* Wrapper for streamGenericParseIDOrReply() used by module API. */
int streamParseID(const robj *o, streamID *id) {
return streamGenericParseIDOrReply(NULL, o, id, 0, 0);
}
/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
* 0, to be used when - and + are acceptable IDs. */
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {

View File

@ -27,7 +27,8 @@ TEST_MODULES = \
getkeys.so \
test_lazyfree.so \
timer.so \
defragtest.so
defragtest.so \
stream.so
.PHONY: all

258
tests/modules/stream.c Normal file
View File

@ -0,0 +1,258 @@
#include "redismodule.h"
#include <string.h>
#include <strings.h>
#include <assert.h>
#include <unistd.h>
#include <errno.h>
/* Command which adds a stream entry with automatic ID, like XADD *.
*
* Syntax: STREAM.ADD key field1 value1 [ field2 value2 ... ]
*
* The response is the ID of the added stream entry or an error message.
*/
int stream_add(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 2 || argc % 2 != 0) {
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
RedisModuleStreamID id;
if (RedisModule_StreamAdd(key, REDISMODULE_STREAM_ADD_AUTOID, &id,
&argv[2], (argc-2)/2) == REDISMODULE_OK) {
RedisModuleString *id_str = RedisModule_CreateStringFromStreamID(ctx, &id);
RedisModule_ReplyWithString(ctx, id_str);
RedisModule_FreeString(ctx, id_str);
} else {
RedisModule_ReplyWithError(ctx, "ERR StreamAdd failed");
}
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
/* Command which adds a stream entry N times.
*
* Syntax: STREAM.ADD key N field1 value1 [ field2 value2 ... ]
*
* Returns the number of successfully added entries.
*/
int stream_addn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 3 || argc % 2 == 0) {
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
long long n, i;
if (RedisModule_StringToLongLong(argv[2], &n) == REDISMODULE_ERR) {
RedisModule_ReplyWithError(ctx, "N must be a number");
return REDISMODULE_OK;
}
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
for (i = 0; i < n; i++) {
if (RedisModule_StreamAdd(key, REDISMODULE_STREAM_ADD_AUTOID, NULL,
&argv[3], (argc-3)/2) == REDISMODULE_ERR)
break;
}
RedisModule_ReplyWithLongLong(ctx, i);
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
/* STREAM.DELETE key stream-id */
int stream_delete(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 3) return RedisModule_WrongArity(ctx);
RedisModuleStreamID id;
if (RedisModule_StringToStreamID(argv[2], &id) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "Invalid stream ID");
}
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
if (RedisModule_StreamDelete(key, &id) == REDISMODULE_OK) {
RedisModule_ReplyWithSimpleString(ctx, "OK");
} else {
RedisModule_ReplyWithError(ctx, "ERR StreamDelete failed");
}
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
/* STREAM.RANGE key start-id end-id
*
* Returns an array of stream items. Each item is an array on the form
* [stream-id, [field1, value1, field2, value2, ...]].
*
* A funny side-effect used for testing RM_StreamIteratorDelete() is that if any
* entry has a field named "selfdestruct", the stream entry is deleted. It is
* however included in the results of this command.
*/
int stream_range(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 4) {
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
RedisModuleStreamID startid, endid;
if (RedisModule_StringToStreamID(argv[2], &startid) != REDISMODULE_OK ||
RedisModule_StringToStreamID(argv[3], &endid) != REDISMODULE_OK) {
RedisModule_ReplyWithError(ctx, "Invalid stream ID");
return REDISMODULE_OK;
}
/* If startid > endid, we swap and set the reverse flag. */
int flags = 0;
if (startid.ms > endid.ms ||
(startid.ms == endid.ms && startid.seq > endid.seq)) {
RedisModuleStreamID tmp = startid;
startid = endid;
endid = tmp;
flags |= REDISMODULE_STREAM_ITERATOR_REVERSE;
}
/* Open key and start iterator. */
int openflags = REDISMODULE_READ | REDISMODULE_WRITE;
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], openflags);
if (RedisModule_StreamIteratorStart(key, flags,
&startid, &endid) != REDISMODULE_OK) {
/* Key is not a stream, etc. */
RedisModule_ReplyWithError(ctx, "ERR StreamIteratorStart failed");
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
/* Check error handling: Delete current entry when no current entry. */
assert(RedisModule_StreamIteratorDelete(key) ==
REDISMODULE_ERR);
assert(errno == ENOENT);
/* Check error handling: Fetch fields when no current entry. */
assert(RedisModule_StreamIteratorNextField(key, NULL, NULL) ==
REDISMODULE_ERR);
assert(errno == ENOENT);
/* Return array. */
RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN);
RedisModule_AutoMemory(ctx);
RedisModuleStreamID id;
long numfields;
long len = 0;
while (RedisModule_StreamIteratorNextID(key, &id,
&numfields) == REDISMODULE_OK) {
RedisModule_ReplyWithArray(ctx, 2);
RedisModuleString *id_str = RedisModule_CreateStringFromStreamID(ctx, &id);
RedisModule_ReplyWithString(ctx, id_str);
RedisModule_ReplyWithArray(ctx, numfields * 2);
int delete = 0;
RedisModuleString *field, *value;
for (long i = 0; i < numfields; i++) {
assert(RedisModule_StreamIteratorNextField(key, &field, &value) ==
REDISMODULE_OK);
RedisModule_ReplyWithString(ctx, field);
RedisModule_ReplyWithString(ctx, value);
/* check if this is a "selfdestruct" field */
size_t field_len;
const char *field_str = RedisModule_StringPtrLen(field, &field_len);
if (!strncmp(field_str, "selfdestruct", field_len)) delete = 1;
}
if (delete) {
assert(RedisModule_StreamIteratorDelete(key) == REDISMODULE_OK);
}
/* check error handling: no more fields to fetch */
assert(RedisModule_StreamIteratorNextField(key, &field, &value) ==
REDISMODULE_ERR);
assert(errno == ENOENT);
len++;
}
RedisModule_ReplySetArrayLength(ctx, len);
RedisModule_StreamIteratorStop(key);
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
/*
* STREAM.TRIM key (MAXLEN (=|~) length | MINID (=|~) id)
*/
int stream_trim(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 5) {
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
/* Parse args */
int trim_by_id = 0; /* 0 = maxlen, 1 = minid */
long long maxlen;
RedisModuleStreamID minid;
size_t arg_len;
const char *arg = RedisModule_StringPtrLen(argv[2], &arg_len);
if (!strcasecmp(arg, "minid")) {
trim_by_id = 1;
if (RedisModule_StringToStreamID(argv[4], &minid) != REDISMODULE_OK) {
RedisModule_ReplyWithError(ctx, "ERR Invalid stream ID");
return REDISMODULE_OK;
}
} else if (!strcasecmp(arg, "maxlen")) {
if (RedisModule_StringToLongLong(argv[4], &maxlen) == REDISMODULE_ERR) {
RedisModule_ReplyWithError(ctx, "ERR Maxlen must be a number");
return REDISMODULE_OK;
}
} else {
RedisModule_ReplyWithError(ctx, "ERR Invalid arguments");
return REDISMODULE_OK;
}
/* Approx or exact */
int flags;
arg = RedisModule_StringPtrLen(argv[3], &arg_len);
if (arg_len == 1 && arg[0] == '~') {
flags = REDISMODULE_STREAM_TRIM_APPROX;
} else if (arg_len == 1 && arg[0] == '=') {
flags = 0;
} else {
RedisModule_ReplyWithError(ctx, "ERR Invalid approx-or-exact mark");
return REDISMODULE_OK;
}
/* Trim */
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
long long trimmed;
if (trim_by_id) {
trimmed = RedisModule_StreamTrimByID(key, flags, &minid);
} else {
trimmed = RedisModule_StreamTrimByLength(key, flags, maxlen);
}
/* Return result */
if (trimmed < 0) {
RedisModule_ReplyWithError(ctx, "ERR Trimming failed");
} else {
RedisModule_ReplyWithLongLong(ctx, trimmed);
}
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx, "stream", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "stream.add", stream_add, "",
1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "stream.addn", stream_addn, "",
1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "stream.delete", stream_delete, "",
1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "stream.range", stream_range, "",
1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "stream.trim", stream_trim, "",
1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}

View File

@ -0,0 +1,155 @@
set testmodule [file normalize tests/modules/stream.so]
start_server {tags {"modules"}} {
r module load $testmodule
test {Module stream add and delete} {
r del mystream
# add to empty key
set streamid1 [r stream.add mystream item 1 value a]
# add to existing stream
set streamid2 [r stream.add mystream item 2 value b]
# check result
assert { [string match "*-*" $streamid1] }
set items [r XRANGE mystream - +]
assert_equal $items \
"{$streamid1 {item 1 value a}} {$streamid2 {item 2 value b}}"
# delete one of them and try deleting non-existing ID
assert_equal OK [r stream.delete mystream $streamid1]
assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456}
assert_error "Invalid stream ID*" {r stream.delete mystream foo}
assert_equal "{$streamid2 {item 2 value b}}" [r XRANGE mystream - +]
# check error condition: wrong type
r del mystream
r set mystream mystring
assert_error "ERR StreamAdd*" {r stream.add mystream item 1 value a}
assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456}
}
test {Module stream add unblocks blocking xread} {
r del mystream
# Blocking XREAD on an empty key
set rd1 [redis_deferring_client]
$rd1 XREAD BLOCK 3000 STREAMS mystream $
# wait until client is actually blocked
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Client is not blocked"
}
set id [r stream.add mystream field 1 value a]
assert_equal "{mystream {{$id {field 1 value a}}}}" [$rd1 read]
# Blocking XREAD on an existing stream
set rd2 [redis_deferring_client]
$rd2 XREAD BLOCK 3000 STREAMS mystream $
# wait until client is actually blocked
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Client is not blocked"
}
set id [r stream.add mystream field 2 value b]
assert_equal "{mystream {{$id {field 2 value b}}}}" [$rd2 read]
}
test {Module stream add benchmark (1M stream add)} {
set n 1000000
r del mystream
set result [r stream.addn mystream $n field value]
assert_equal $result $n
}
test {Module stream iterator} {
r del mystream
set streamid1 [r xadd mystream * item 1 value a]
set streamid2 [r xadd mystream * item 2 value b]
# range result
set result1 [r stream.range mystream "-" "+"]
set expect1 [r xrange mystream "-" "+"]
assert_equal $result1 $expect1
# reverse range
set result_rev [r stream.range mystream "+" "-"]
set expect_rev [r xrevrange mystream "+" "-"]
assert_equal $result_rev $expect_rev
# only one item: range with startid = endid
set result2 [r stream.range mystream "-" $streamid1]
assert_equal $result2 "{$streamid1 {item 1 value a}}"
assert_equal $result2 [list [list $streamid1 {item 1 value a}]]
# only one item: range with startid = endid
set result3 [r stream.range mystream $streamid2 $streamid2]
assert_equal $result3 "{$streamid2 {item 2 value b}}"
assert_equal $result3 [list [list $streamid2 {item 2 value b}]]
}
test {Module stream iterator delete} {
r del mystream
set id1 [r xadd mystream * normal item]
set id2 [r xadd mystream * selfdestruct yes]
set id3 [r xadd mystream * another item]
# stream.range deletes the "selfdestruct" item after returning it
assert_equal \
"{$id1 {normal item}} {$id2 {selfdestruct yes}} {$id3 {another item}}" \
[r stream.range mystream - +]
# now, the "selfdestruct" item is gone
assert_equal \
"{$id1 {normal item}} {$id3 {another item}}" \
[r stream.range mystream - +]
}
test {Module stream trim by length} {
r del mystream
# exact maxlen
r xadd mystream * item 1 value a
r xadd mystream * item 2 value b
r xadd mystream * item 3 value c
assert_equal 3 [r xlen mystream]
assert_equal 0 [r stream.trim mystream maxlen = 5]
assert_equal 3 [r xlen mystream]
assert_equal 2 [r stream.trim mystream maxlen = 1]
assert_equal 1 [r xlen mystream]
assert_equal 1 [r stream.trim mystream maxlen = 0]
# check that there is no limit for exact maxlen
r stream.addn mystream 20000 item x value y
assert_equal 20000 [r stream.trim mystream maxlen = 0]
# approx maxlen (100 items per node implies default limit 10K items)
r stream.addn mystream 20000 item x value y
assert_equal 20000 [r xlen mystream]
assert_equal 10000 [r stream.trim mystream maxlen ~ 2]
assert_equal 9900 [r stream.trim mystream maxlen ~ 2]
assert_equal 0 [r stream.trim mystream maxlen ~ 2]
assert_equal 100 [r xlen mystream]
assert_equal 100 [r stream.trim mystream maxlen ~ 0]
assert_equal 0 [r xlen mystream]
}
test {Module stream trim by ID} {
r del mystream
# exact minid
r xadd mystream * item 1 value a
r xadd mystream * item 2 value b
set minid [r xadd mystream * item 3 value c]
assert_equal 3 [r xlen mystream]
assert_equal 0 [r stream.trim mystream minid = -]
assert_equal 3 [r xlen mystream]
assert_equal 2 [r stream.trim mystream minid = $minid]
assert_equal 1 [r xlen mystream]
assert_equal 1 [r stream.trim mystream minid = +]
# check that there is no limit for exact minid
r stream.addn mystream 20000 item x value y
assert_equal 20000 [r stream.trim mystream minid = +]
# approx minid (100 items per node implies default limit 10K items)
r stream.addn mystream 19980 item x value y
set minid [r xadd mystream * item x value y]
r stream.addn mystream 19 item x value y
assert_equal 20000 [r xlen mystream]
assert_equal 10000 [r stream.trim mystream minid ~ $minid]
assert_equal 9900 [r stream.trim mystream minid ~ $minid]
assert_equal 0 [r stream.trim mystream minid ~ $minid]
assert_equal 100 [r xlen mystream]
assert_equal 100 [r stream.trim mystream minid ~ +]
assert_equal 0 [r xlen mystream]
}
}