Mem efficiency, make full use of client struct memory for reply buffers (#8968)

When we allocate a client struct with 16k reply buffer, the allocator we may give us 20K,
This commit makes use of that extra space.
Additionally, it tries to store whatever it can from the reply into the static 'buf' before
allocating a new node for the reply list.
This commit is contained in:
Wang Yuan 2021-06-08 18:40:12 +08:00 committed by GitHub
parent 119121c739
commit c396fd91a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 45 additions and 32 deletions

View File

@ -304,8 +304,8 @@ struct client {
redisDb *db;
int flags;
list *reply;
char buf[PROTO_REPLY_CHUNK_BYTES];
// ... many other fields ...
char buf[PROTO_REPLY_CHUNK_BYTES];
}
```
The client structure defines a *connected client*:

View File

@ -651,6 +651,7 @@ struct client *createAOFClient(void) {
c->original_argv = NULL;
c->argv_len_sum = 0;
c->bufpos = 0;
c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf);
/*
* The AOF client should never be blocked (unlike master

View File

@ -131,6 +131,7 @@ client *createClient(connection *conn) {
c->conn = conn;
c->name = NULL;
c->bufpos = 0;
c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf);
c->qb_pos = 0;
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
@ -279,30 +280,23 @@ int prepareClientToWrite(client *c) {
* -------------------------------------------------------------------------- */
/* Attempts to add the reply to the static buffer in the client struct.
* Returns C_ERR if the buffer is full, or the reply list is not empty,
* in which case the reply must be added to the reply list. */
int _addReplyToBuffer(client *c, const char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
* Returns the length of data that is added to the reply buffer. */
size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
size_t available = c->buf_usable_size - c->bufpos;
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
if (listLength(c->reply) > 0) return C_ERR;
if (listLength(c->reply) > 0) return 0;
/* Check that the buffer has enough space available for this string. */
if (len > available) return C_ERR;
memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return C_OK;
size_t reply_len = len > available ? available : len;
memcpy(c->buf+c->bufpos,s,reply_len);
c->bufpos+=reply_len;
return reply_len;
}
/* Adds the reply to the reply linked list.
* Note: some edits to this function need to be relayed to AddReplyFromClient. */
void _addReplyProtoToList(client *c, const char *s, size_t len) {
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
listNode *ln = listLast(c->reply);
clientReplyBlock *tail = ln? listNodeValue(ln): NULL;
@ -324,10 +318,11 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) {
if (len) {
/* Create a new node, make sure it is allocated to at
* least PROTO_REPLY_CHUNK_BYTES */
size_t usable_size;
size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;
tail = zmalloc(size + sizeof(clientReplyBlock));
tail = zmalloc_usable(size + sizeof(clientReplyBlock), &usable_size);
/* take over the allocation's internal fragmentation */
tail->size = zmalloc_usable_size(tail) - sizeof(clientReplyBlock);
tail->size = usable_size - sizeof(clientReplyBlock);
tail->used = len;
memcpy(tail->buf, s, len);
listAddNodeTail(c->reply, tail);
@ -337,6 +332,13 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) {
}
}
void _addReplyToBufferOrList(client *c, const char *s, size_t len) {
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;
size_t reply_len = _addReplyToBuffer(c,s,len);
if (len > reply_len) _addReplyProtoToList(c,s+reply_len,len-reply_len);
}
/* -----------------------------------------------------------------------------
* Higher level functions to queue data on the client output buffer.
* The following functions are the ones that commands implementations will call.
@ -347,16 +349,14 @@ void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
_addReplyToBufferOrList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyProtoToList(c,buf,len);
_addReplyToBufferOrList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
@ -370,8 +370,7 @@ void addReplySds(client *c, sds s) {
sdsfree(s);
return;
}
if (_addReplyToBuffer(c,s,sdslen(s)) != C_OK)
_addReplyProtoToList(c,s,sdslen(s));
_addReplyToBufferOrList(c,s,sdslen(s));
sdsfree(s);
}
@ -385,8 +384,7 @@ void addReplySds(client *c, sds s) {
* in the list of objects. */
void addReplyProto(client *c, const char *s, size_t len) {
if (prepareClientToWrite(c) != C_OK) return;
if (_addReplyToBuffer(c,s,len) != C_OK)
_addReplyProtoToList(c,s,len);
_addReplyToBufferOrList(c,s,len);
}
/* Low level function called by the addReplyError...() functions.
@ -956,12 +954,20 @@ void AddReplyFromClient(client *dst, client *src) {
* The function takes care of freeing the old output buffers of the
* destination client. */
void copyClientOutputBuffer(client *dst, client *src) {
listRelease(dst->reply);
listEmpty(dst->reply);
dst->sentlen = 0;
dst->reply = listDup(src->reply);
memcpy(dst->buf,src->buf,src->bufpos);
dst->bufpos = src->bufpos;
dst->reply_bytes = src->reply_bytes;
dst->bufpos = 0;
dst->reply_bytes = 0;
/* First copy src static buffer into dst (either static buffer or reply
* list, maybe clients have different 'usable_buffer_size'). */
_addReplyToBufferOrList(dst,src->buf,src->bufpos);
/* Copy src reply list into the dest. */
list* reply = listDup(src->reply);
listJoin(dst->reply,reply);
dst->reply_bytes += src->reply_bytes;
listRelease(reply);
}
/* Return true if the specified client has pending reply buffers to write to

View File

@ -748,7 +748,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
/* Convert the result of the Redis command into a suitable Lua type.
* The first thing we need is to create a single string from the client
* output buffers. */
if (listLength(c->reply) == 0 && c->bufpos < PROTO_REPLY_CHUNK_BYTES) {
if (listLength(c->reply) == 0 && (size_t)c->bufpos < c->buf_usable_size) {
/* This is a fast path for the common case of a reply inside the
* client static buffer. Don't create an SDS string but just use
* the client buffer directly. */

View File

@ -969,6 +969,12 @@ typedef struct client {
int client_cron_last_memory_type;
/* Response buffer */
int bufpos;
size_t buf_usable_size; /* Usable size of buffer. */
/* Note that 'buf' must be the last field of client struct, because memory
* allocator may give us more memory than our apply for reducing fragments,
* but we want to make full use of given memory, i.e. we may access the
* memory after 'buf'. To avoid make others fields corrupt, 'buf' must be
* the last one. */
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;