Optimize listpack for stream usage to avoid repeated reallocs (#6281)

Avoid repeated reallocs growing the listpack while entries are being added.
This is done by pre-allocating the listpack to near maximum size, and using
malloc_size to check if it needs realloc or not.
When the listpack reaches the maximum number of entries, we shrink it to fit it's used size.

Co-authored-by: Viktor Söderqvist <viktor@zuiderkwast.se>
Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
yihuang 2021-02-16 22:17:38 +08:00 committed by GitHub
parent fd052d2a86
commit aab479f8cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 44 additions and 8 deletions

View File

@ -219,9 +219,12 @@ int lpStringToInt64(const char *s, unsigned long slen, int64_t *value) {
}
/* Create a new, empty listpack.
* On success the new listpack is returned, otherwise an error is returned. */
unsigned char *lpNew(void) {
unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
* On success the new listpack is returned, otherwise an error is returned.
* Pre-allocate at least `capacity` bytes of memory,
* over-allocated memory can be shrinked by `lpShrinkToFit`.
* */
unsigned char *lpNew(size_t capacity) {
unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
if (lp == NULL) return NULL;
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
lpSetNumElements(lp,0);
@ -234,6 +237,16 @@ void lpFree(unsigned char *lp) {
lp_free(lp);
}
/* Shrink the memory to fit. */
unsigned char* lpShrinkToFit(unsigned char *lp) {
size_t size = lpGetTotalBytes(lp);
if (size < lp_malloc_size(lp)) {
return lp_realloc(lp, size);
} else {
return lp;
}
}
/* Given an element 'ele' of size 'size', determine if the element can be
* represented inside the listpack encoded as integer, and returns
* LP_ENCODING_INT if so. Otherwise returns LP_ENCODING_STR if no integer
@ -702,7 +715,8 @@ unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, un
unsigned char *dst = lp + poff; /* May be updated after reallocation. */
/* Realloc before: we need more room. */
if (new_listpack_bytes > old_listpack_bytes) {
if (new_listpack_bytes > old_listpack_bytes &&
new_listpack_bytes > lp_malloc_size(lp)) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}

View File

@ -35,6 +35,7 @@
#ifndef __LISTPACK_H
#define __LISTPACK_H
#include <stdlib.h>
#include <stdint.h>
#define LP_INTBUF_SIZE 21 /* 20 digits of -2^63 + 1 null term = 21. */
@ -44,8 +45,9 @@
#define LP_AFTER 1
#define LP_REPLACE 2
unsigned char *lpNew(void);
unsigned char *lpNew(size_t capacity);
void lpFree(unsigned char *lp);
unsigned char* lpShrinkToFit(unsigned char *lp);
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp);
unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size);
unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp);

View File

@ -42,4 +42,5 @@
#define lp_malloc zmalloc
#define lp_realloc zrealloc
#define lp_free zfree
#define lp_malloc_size zmalloc_usable_size
#endif

View File

@ -43,6 +43,10 @@
* avoid malloc allocation.*/
#define STREAMID_STATIC_VECTOR_LEN 8
/* Max pre-allocation for listpack. This is done to avoid abuse of a user
* setting stream_node_max_bytes to a huge number. */
#define STREAM_LISTPACK_MAX_PRE_ALLOCATE 4096
void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
@ -509,7 +513,13 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
lp = NULL;
} else if (server.stream_node_max_entries) {
int64_t count = lpGetInteger(lpFirst(lp));
if (count >= server.stream_node_max_entries) lp = NULL;
if (count >= server.stream_node_max_entries) {
/* Shrink extra pre-allocated memory */
lp = lpShrinkToFit(lp);
if (ri.data != lp)
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
lp = NULL;
}
}
}
@ -517,8 +527,17 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
if (lp == NULL) {
master_id = id;
streamEncodeID(rax_key,&id);
/* Create the listpack having the master entry ID and fields. */
lp = lpNew();
/* Create the listpack having the master entry ID and fields.
* Pre-allocate some bytes when creating listpack to avoid realloc on
* every XADD. Since listpack.c uses malloc_size, it'll grow in steps,
* and won't realloc on every XADD.
* When listpack reaches max number of entries, we'll shrink the
* allocation to fit the data. */
size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE;
if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) {
prealloc = server.stream_node_max_bytes;
}
lp = lpNew(prealloc);
lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
lp = lpAppendInteger(lp,numfields);