refactor rewriteStreamObject code for adding missing streamIteratorStop call (#7829)

This commit adds streamIteratorStop call in rewriteStreamObject function in some of the return statement. Although currently this will not cause memory leak since stream id is only 16 bytes long.
This commit is contained in:
Wen Hui 2020-09-22 02:05:47 -04:00 committed by GitHub
parent 9216b96b41
commit 23b50bcccc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 36 additions and 18 deletions

View File

@ -1201,16 +1201,24 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
* the ID, the second is an array of field-value pairs. */
/* Emit the XADD <key> <id> ...fields... command. */
if (rioWriteBulkCount(r,'*',3+numfields*2) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
if (!rioWriteBulkCount(r,'*',3+numfields*2) ||
!rioWriteBulkString(r,"XADD",4) ||
!rioWriteBulkObject(r,key) ||
!rioWriteBulkStreamID(r,&id))
{
streamIteratorStop(&si);
return 0;
}
while(numfields--) {
unsigned char *field, *value;
int64_t field_len, value_len;
streamIteratorGetField(&si,&field,&value,&field_len,&value_len);
if (rioWriteBulkString(r,(char*)field,field_len) == 0) return 0;
if (rioWriteBulkString(r,(char*)value,value_len) == 0) return 0;
if (!rioWriteBulkString(r,(char*)field,field_len) ||
!rioWriteBulkString(r,(char*)value,value_len))
{
streamIteratorStop(&si);
return 0;
}
}
}
} else {
@ -1218,22 +1226,30 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
* the key we are serializing is an empty string, which is possible
* for the Stream type. */
id.ms = 0; id.seq = 1;
if (rioWriteBulkCount(r,'*',7) == 0) return 0;
if (rioWriteBulkString(r,"XADD",4) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkString(r,"MAXLEN",6) == 0) return 0;
if (rioWriteBulkString(r,"0",1) == 0) return 0;
if (rioWriteBulkStreamID(r,&id) == 0) return 0;
if (rioWriteBulkString(r,"x",1) == 0) return 0;
if (rioWriteBulkString(r,"y",1) == 0) return 0;
if (!rioWriteBulkCount(r,'*',7) ||
!rioWriteBulkString(r,"XADD",4) ||
!rioWriteBulkObject(r,key) ||
!rioWriteBulkString(r,"MAXLEN",6) ||
!rioWriteBulkString(r,"0",1) ||
!rioWriteBulkStreamID(r,&id) ||
!rioWriteBulkString(r,"x",1) ||
!rioWriteBulkString(r,"y",1))
{
streamIteratorStop(&si);
return 0;
}
}
/* Append XSETID after XADD, make sure lastid is correct,
* in case of XDEL lastid. */
if (rioWriteBulkCount(r,'*',3) == 0) return 0;
if (rioWriteBulkString(r,"XSETID",6) == 0) return 0;
if (rioWriteBulkObject(r,key) == 0) return 0;
if (rioWriteBulkStreamID(r,&s->last_id) == 0) return 0;
if (!rioWriteBulkCount(r,'*',3) ||
!rioWriteBulkString(r,"XSETID",6) ||
!rioWriteBulkObject(r,key) ||
!rioWriteBulkStreamID(r,&s->last_id))
{
streamIteratorStop(&si);
return 0;
}
/* Create all the stream consumer groups. */
@ -1252,6 +1268,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
!rioWriteBulkStreamID(r,&group->last_id))
{
raxStop(&ri);
streamIteratorStop(&si);
return 0;
}
@ -1277,6 +1294,7 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) {
raxStop(&ri_pel);
raxStop(&ri_cons);
raxStop(&ri);
streamIteratorStop(&si);
return 0;
}
}