optimize(remove) usage of client's pending_querybuf (#10413)

To remove `pending_querybuf`, the key point is reusing `querybuf`, it means master client's `querybuf` is not only used to parse command, but also proxy to sub-replicas.

1. add a new variable `repl_applied` for master client to record how many data applied (propagated via `replicationFeedStreamFromMasterStream()`) but not trimmed in `querybuf`.

2. don't sdsrange `querybuf` in `commandProcessed()`, we trim it to `repl_applied` after the whole replication pipeline processed to avoid fragmented `sdsrange`. And here are some scenarios we cannot trim to `qb_pos`:
    * we don't receive complete command from master
    * master client blocked because of client pause
    * IO threads operate read, master client flagged with CLIENT_PENDING_COMMAND

    In these scenarios, `qb_pos` points to the part of the current command or the beginning of next command, and the current command is not applied yet, so the `repl_applied` is not equal to `qb_pos`.

Some other notes:
* Do not do big arg optimization on master client, since we can only sdsrange `querybuf` after data sent to replicas.
* Set `qb_pos` and `repl_applied` to 0 when `freeClient` in `replicationCacheMaster`.
* Rewrite `processPendingCommandsAndResetClient` to `processPendingCommandAndInputBuffer`, let `processInputBuffer` to be called successively after `processCommandAndResetClient`.
This commit is contained in:
zhaozhao.zz 2022-03-25 10:45:40 +08:00 committed by GitHub
parent 1a57af629c
commit 78bef6e1fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 58 additions and 61 deletions

View File

@ -141,12 +141,7 @@ void processUnblockedClients(void) {
* the code is conceptually more correct this way. */
if (!(c->flags & CLIENT_BLOCKED)) {
/* If we have a queued command, execute it now. */
if (processPendingCommandsAndResetClient(c) == C_OK) {
/* Now process client if it has more data in it's buffer. */
if (c->querybuf && sdslen(c->querybuf) > 0) {
if (processInputBuffer(c) == C_ERR) c = NULL;
}
} else {
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
c = NULL;
}
}

View File

@ -147,7 +147,6 @@ client *createClient(connection *conn) {
c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
c->querybuf_peak = 0;
c->reqtype = 0;
c->argc = 0;
@ -167,6 +166,7 @@ client *createClient(connection *conn) {
c->repl_start_cmd_stream_on_ack = 0;
c->reploff = 0;
c->read_reploff = 0;
c->repl_applied = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->repl_last_partial_write = 0;
@ -1568,7 +1568,6 @@ void freeClient(client *c) {
/* Free the query buffer */
sdsfree(c->querybuf);
sdsfree(c->pending_querybuf);
c->querybuf = NULL;
/* Deallocate structures used to block on blocking ops. */
@ -2296,8 +2295,12 @@ int processMultibulkBuffer(client *c) {
}
c->qb_pos = newline-c->querybuf+2;
if (ll >= PROTO_MBULK_BIG_ARG) {
/* If we are going to read a large object from network
if (!(c->flags & CLIENT_MASTER) && ll >= PROTO_MBULK_BIG_ARG) {
/* When the client is not a master client (because master
* client's querybuf can only be trimmed after data applied
* and sent to replicas).
*
* If we are going to read a large object from network
* try to make it likely that it will start at c->querybuf
* boundary so that we can optimize object creation
* avoiding a large copy of data.
@ -2328,10 +2331,11 @@ int processMultibulkBuffer(client *c) {
c->argv = zrealloc(c->argv, sizeof(robj*)*c->argv_len);
}
/* Optimization: if the buffer contains JUST our bulk element
/* Optimization: if a non-master client's buffer contains JUST our bulk element
* instead of creating a new object by *copying* the sds we
* just use the current sds string. */
if (c->qb_pos == 0 &&
if (!(c->flags & CLIENT_MASTER) &&
c->qb_pos == 0 &&
c->bulklen >= PROTO_MBULK_BIG_ARG &&
sdslen(c->querybuf) == (size_t)(c->bulklen+2))
{
@ -2392,8 +2396,8 @@ void commandProcessed(client *c) {
if (c->flags & CLIENT_MASTER) {
long long applied = c->reploff - prev_offset;
if (applied) {
replicationFeedStreamFromMasterStream(c->pending_querybuf,applied);
sdsrange(c->pending_querybuf,applied,-1);
replicationFeedStreamFromMasterStream(c->querybuf+c->repl_applied,applied);
c->repl_applied += applied;
}
}
}
@ -2436,13 +2440,22 @@ int processCommandAndResetClient(client *c) {
/* This function will execute any fully parsed commands pending on
* the client. Returns C_ERR if the client is no longer valid after executing
* the command, and C_OK for all other cases. */
int processPendingCommandsAndResetClient(client *c) {
int processPendingCommandAndInputBuffer(client *c) {
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
if (processCommandAndResetClient(c) == C_ERR) {
return C_ERR;
}
}
/* Now process client if it has more data in it's buffer.
*
* Note: when a master client steps into this function,
* it can always satisfy this condition, because its querbuf
* contains data not applied. */
if (c->querybuf && sdslen(c->querybuf) > 0) {
return processInputBuffer(c);
}
return C_OK;
}
@ -2514,8 +2527,26 @@ int processInputBuffer(client *c) {
}
}
/* Trim to pos */
if (c->qb_pos) {
if (c->flags & CLIENT_MASTER) {
/* If the client is a master, trim the querybuf to repl_applied,
* since master client is very special, its querybuf not only
* used to parse command, but also proxy to sub-replicas.
*
* Here are some scenarios we cannot trim to qb_pos:
* 1. we don't receive complete command from master
* 2. master client blocked cause of client pause
* 3. io threads operate read, master client flagged with CLIENT_PENDING_COMMAND
*
* In these scenarios, qb_pos points to the part of the current command
* or the beginning of next command, and the current command is not applied yet,
* so the repl_applied is not equal to qb_pos. */
if (c->repl_applied) {
sdsrange(c->querybuf,c->repl_applied,-1);
c->qb_pos -= c->repl_applied;
c->repl_applied = 0;
}
} else if (c->qb_pos) {
/* Trim to pos */
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
@ -2551,16 +2582,22 @@ void readQueryFromClient(connection *conn) {
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
ssize_t remaining = (size_t)(c->bulklen+2)-(sdslen(c->querybuf)-c->qb_pos);
big_arg = 1;
/* Note that the 'remaining' variable may be zero in some edge case,
* for example once we resume a blocked client after CLIENT PAUSE. */
if (remaining > 0) readlen = remaining;
/* Master client needs expand the readlen when meet BIG_ARG(see #9100),
* but doesn't need align to the next arg, we can read more data. */
if (c->flags & CLIENT_MASTER && readlen < PROTO_IOBUF_LEN)
readlen = PROTO_IOBUF_LEN;
}
qblen = sdslen(c->querybuf);
if (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN) {
if (!(c->flags & CLIENT_MASTER) && // master client's querybuf can grow greedy.
(big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN)) {
/* When reading a BIG_ARG we won't be reading more than that one arg
* into the query buffer, so we don't need to pre-allocate more than we
* need, so using the non-greedy growing. For an initial allocation of
@ -2590,12 +2627,6 @@ void readQueryFromClient(connection *conn) {
}
freeClientAsync(c);
goto done;
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
* copy of the string applied by the last command executed. */
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
@ -4288,14 +4319,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
/* Once io-threads are idle we can update the client in the mem usage */
updateClientMemUsage(c);
if (processPendingCommandsAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
if (processInputBuffer(c) == C_ERR) {
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */

View File

@ -3197,7 +3197,8 @@ void replicationCacheMaster(client *c) {
* offsets, including pending transactions, already populated arguments,
* pending outputs to the master. */
sdsclear(server.master->querybuf);
sdsclear(server.master->pending_querybuf);
server.master->qb_pos = 0;
server.master->repl_applied = 0;
server.master->read_reploff = server.master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);

View File

@ -710,23 +710,6 @@ int clientsCronResizeQueryBuffer(client *c) {
* which ever is bigger. */
if (c->bulklen != -1 && (size_t)c->bulklen > c->querybuf_peak)
c->querybuf_peak = c->bulklen;
/* Clients representing masters also use a "pending query buffer" that
* is the yet not applied part of the stream we are reading. Such buffer
* also needs resizing from time to time, otherwise after a very large
* transfer (a huge value or a big MIGRATE operation) it will keep using
* a lot of memory. */
if (c->flags & CLIENT_MASTER) {
/* There are two conditions to resize the pending query buffer:
* 1) Pending Query buffer is > LIMIT_PENDING_QUERYBUF.
* 2) Used length is smaller than pending_querybuf_size/2 */
size_t pending_querybuf_size = sdsAllocSize(c->pending_querybuf);
if(pending_querybuf_size > LIMIT_PENDING_QUERYBUF &&
sdslen(c->pending_querybuf) < (pending_querybuf_size/2))
{
c->pending_querybuf = sdsRemoveFreeSpace(c->pending_querybuf);
}
}
return 0;
}
@ -6418,7 +6401,6 @@ void dismissClientMemory(client *c) {
/* Dismiss client query buffer and static reply buffer. */
dismissMemory(c->buf, c->buf_usable_size);
dismissSds(c->querybuf);
dismissSds(c->pending_querybuf);
/* Dismiss argv array only if we estimate it contains a big buffer. */
if (c->argc && c->argv_len_sum/c->argc >= server.page_size) {
for (int i = 0; i < c->argc; i++) {

View File

@ -166,8 +166,6 @@ typedef long long ustime_t; /* microsecond time type. */
#define PROTO_REPLY_MIN_BYTES (1024) /* the lower limit on reply buffer size */
#define REDIS_AUTOSYNC_BYTES (1024*1024*4) /* Sync file every 4MB. */
#define LIMIT_PENDING_QUERYBUF (4*1024*1024) /* 4mb */
#define REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME 5000 /* 5 seconds */
/* When configuring the server eventloop, we setup it so that the total number
@ -1082,10 +1080,6 @@ typedef struct client {
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the
replication stream that we are receiving from
the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
@ -1122,6 +1116,7 @@ typedef struct client {
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
long long repl_applied; /* Applied replication data count in querybuf, if this is a replica. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */
@ -2848,7 +2843,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
size_t freeMemoryGetNotCountedMemory();
int overMaxmemoryAfterAlloc(size_t moremem);
int processCommand(client *c);
int processPendingCommandsAndResetClient(client *c);
int processPendingCommandAndInputBuffer(client *c);
void setupSignalHandlers(void);
void removeSignalHandlers(void);
int createSocketAcceptHandler(socketFds *sfd, aeFileProc *accept_handler);

View File

@ -122,7 +122,7 @@ proc wait_replica_online r {
wait_for_condition 50 100 {
[string match "*slave0:*,state=online*" [$r info replication]]
} else {
fail "replica didn't sync in time"
fail "replica didn't online in time"
}
}
@ -130,7 +130,7 @@ proc wait_for_ofs_sync {r1 r2} {
wait_for_condition 50 100 {
[status $r1 master_repl_offset] eq [status $r2 master_repl_offset]
} else {
fail "replica didn't sync in time"
fail "replica offset didn't match in time"
}
}