Replication backlog and replicas use one global shared replication buffer (#9166)

## Background
For redis master, one replica uses one copy of replication buffer, that is a big waste of memory,
more replicas more waste, and allocate/free memory for every reply list also cost much.
If we set client-output-buffer-limit small and write traffic is heavy, master may disconnect with
replicas and can't finish synchronization with replica. If we set  client-output-buffer-limit big,
master may be OOM when there are many replicas that separately keep much memory.
Because replication buffers of different replica client are the same, one simple idea is that
all replicas only use one replication buffer, that will effectively save memory.

Since replication backlog content is the same as replicas' output buffer, now we
can discard replication backlog memory and use global shared replication buffer
to implement replication backlog mechanism.

## Implementation
I create one global "replication buffer" which contains content of replication stream.
The structure of "replication buffer" is similar to the reply list that exists in every client.
But the node of list is `replBufBlock`, which has `id, repl_offset, refcount` fields.
```c
/* Replication buffer blocks is the list of replBufBlock.
 *
 * +--------------+       +--------------+       +--------------+
 * | refcount = 1 |  ...  | refcount = 0 |  ...  | refcount = 2 |
 * +--------------+       +--------------+       +--------------+
 *      |                                            /       \
 *      |                                           /         \
 *      |                                          /           \
 *  Repl Backlog                               Replia_A      Replia_B
 * 
 * Each replica or replication backlog increments only the refcount of the
 * 'ref_repl_buf_node' which it points to. So when replica walks to the next
 * node, it should first increase the next node's refcount, and when we trim
 * the replication buffer nodes, we remove node always from the head node which
 * refcount is 0. If the refcount of the head node is not 0, we must stop
 * trimming and never iterate the next node. */

/* Similar with 'clientReplyBlock', it is used for shared buffers between
 * all replica clients and replication backlog. */
typedef struct replBufBlock {
    int refcount;           /* Number of replicas or repl backlog using. */
    long long id;           /* The unique incremental number. */
    long long repl_offset;  /* Start replication offset of the block. */
    size_t size, used;
    char buf[];
} replBufBlock;
```
So now when we feed replication stream into replication backlog and all replicas, we only need
to feed stream into replication buffer `feedReplicationBuffer`. In this function, we set some fields of
replication backlog and replicas to references of the global replication buffer blocks. And we also
need to check replicas' output buffer limit to free if exceeding `client-output-buffer-limit`, and trim
replication backlog if exceeding `repl-backlog-size`.

When sending reply to replicas, we also need to iterate replication buffer blocks and send its
content, when totally sending one block for replica, we decrease current node count and
increase the next current node count, and then free the block which reference is 0 from the
head of replication buffer blocks.

Since now we use linked list to manage replication backlog, it may cost much time for iterating
all linked list nodes to find corresponding replication buffer node. So we create a rax tree to
store some nodes  for index, but to avoid rax tree occupying too much memory, i record
one per 64 nodes for index.

Currently, to make partial resynchronization as possible as much, we always let replication
backlog as the last reference of replication buffer blocks, backlog size may exceeds our setting
if slow replicas that reference vast replication buffer blocks, and this method doesn't increase
memory usage since they share replication buffer. To avoid freezing server for freeing unreferenced
replication buffer blocks when we need to trim backlog for exceeding backlog size setting,
we trim backlog incrementally (free 64 blocks per call now), and make it faster in
`beforeSleep` (free 640 blocks).

### Other changes
- `mem_total_replication_buffers`: we add this field in INFO command, it means the total
  memory of replication buffers used.
- `mem_clients_slaves`:  now even replica is slow to replicate, and its output buffer memory
  is not 0, but it still may be 0, since replication backlog and replicas share one global replication
  buffer, only if replication buffer memory is more than the repl backlog setting size, we consider
  the excess as replicas' memory. Otherwise, we think replication buffer memory is the consumption
  of repl backlog.
- Key eviction
  Since all replicas and replication backlog share global replication buffer, we think only the
  part of exceeding backlog size the extra separate consumption of replicas.
  Because we trim backlog incrementally in the background, backlog size may exceeds our
  setting if slow replicas that reference vast replication buffer blocks disconnect.
  To avoid massive eviction loop, we don't count the delayed freed replication backlog into
  used memory even if there are no replicas, i.e. we also regard this memory as replicas's memory.
- `client-output-buffer-limit` check for replica clients
  It doesn't make sense to set the replica clients output buffer limit lower than the repl-backlog-size
  config (partial sync will succeed and then replica will get disconnected). Such a configuration is
  ignored (the size of repl-backlog-size will be used). This doesn't have memory consumption
  implications since the replica client will share the backlog buffers memory.
- Drop replication backlog after loading data if needed
  We always create replication backlog if server is a master, we need it because we put DELs in
  it when loading expired keys in RDB, but if RDB doesn't have replication info or there is no rdb,
  it is not possible to support partial resynchronization, to avoid extra memory of replication backlog,
  we drop it.
- Multi IO threads
 Since all replicas and replication backlog use global replication buffer,  if I/O threads are enabled,
  to guarantee data accessing thread safe, we must let main thread handle sending the output buffer
  to all replicas. But before, other IO threads could handle sending output buffer of all replicas.

## Other optimizations
This solution resolve some other problem:
- When replicas disconnect with master since of out of output buffer limit, releasing the output
  buffer of replicas may freeze server if we set big `client-output-buffer-limit` for replicas, but now,
  it doesn't cause freezing.
- This implementation may mitigate reply list copy cost time(also freezes server) when one replication
  has huge reply buffer and another replica can copy buffer for full synchronization. now, we just copy
  reference info, it is very light.
- If we set replication backlog size big, it also may cost much time to copy replication backlog into
  replica's output buffer. But this commit eliminates this problem.
- Resizing replication backlog size doesn't empty current replication backlog content.
This commit is contained in:
Wang Yuan 2021-10-25 14:24:31 +08:00 committed by GitHub
parent 6b297cd646
commit c1718f9d86
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 946 additions and 308 deletions

View File

@ -1836,6 +1836,13 @@ activerehashing yes
# Instead there is a default limit for pubsub and replica clients, since
# subscribers and replicas receive data in a push fashion.
#
# Note that it doesn't make sense to set the replica clients output buffer
# limit lower than the repl-backlog-size config (partial sync will succeed
# and then replica will get disconnected).
# Such a configuration is ignored (the size of repl-backlog-size will be used).
# This doesn't have memory consumption implications since the replica client
# will share the backlog buffers memory.
#
# Both the hard or the soft limit can be disabled by setting them to zero.
client-output-buffer-limit normal 0 0 0
client-output-buffer-limit replica 256mb 64mb 60

View File

@ -2399,10 +2399,10 @@ static int updateJemallocBgThread(int val, int prev, const char **err) {
}
static int updateReplBacklogSize(long long val, long long prev, const char **err) {
/* resizeReplicationBacklog sets server.cfg_repl_backlog_size, and relies on
/* resizeReplicationBacklog sets server.repl_backlog_size, and relies on
* being able to tell when the size changes, so restore prev before calling it. */
UNUSED(err);
server.cfg_repl_backlog_size = prev;
server.repl_backlog_size = prev;
resizeReplicationBacklog(val);
return 1;
}
@ -2684,7 +2684,7 @@ standardConfig configs[] = {
createLongLongConfig("latency-monitor-threshold", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.latency_monitor_threshold, 0, INTEGER_CONFIG, NULL, NULL),
createLongLongConfig("proto-max-bulk-len", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */
createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL),
createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.cfg_repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */
createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */
/* Unsigned Long Long configs */
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),

View File

@ -325,22 +325,44 @@ unsigned long LFUDecrAndReturn(robj *o) {
}
/* We don't want to count AOF buffers and slaves output buffers as
* used memory: the eviction should use mostly data size. This function
* returns the sum of AOF and slaves buffer. */
* used memory: the eviction should use mostly data size, because
* it can cause feedback-loop when we push DELs into them, putting
* more and more DELs will make them bigger, if we count them, we
* need to evict more keys, and then generate more DELs, maybe cause
* massive eviction loop, even all keys are evicted.
*
* This function returns the sum of AOF and replication buffer. */
size_t freeMemoryGetNotCountedMemory(void) {
size_t overhead = 0;
int slaves = listLength(server.slaves);
if (slaves) {
listIter li;
listNode *ln;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = listNodeValue(ln);
overhead += getClientOutputBufferMemoryUsage(slave);
/* Since all replicas and replication backlog share global replication
* buffer, we think only the part of exceeding backlog size is the extra
* separate consumption of replicas.
*
* Note that although the backlog is also initially incrementally grown
* (pushing DELs consumes memory), it'll eventually stop growing and
* remain constant in size, so even if its creation will cause some
* eviction, it's capped, and also here to stay (no resonance effect)
*
* Note that, because we trim backlog incrementally in the background,
* backlog size may exceeds our setting if slow replicas that reference
* vast replication buffer blocks disconnect. To avoid massive eviction
* loop, we don't count the delayed freed replication backlog into used
* memory even if there are no replicas, i.e. we still regard this memory
* as replicas'. */
if ((long long)server.repl_buffer_mem > server.repl_backlog_size) {
/* We use list structure to manage replication buffer blocks, so backlog
* also occupies some extra memory, we can't know exact blocks numbers,
* we only get approximate size according to per block size. */
size_t extra_approx_size =
(server.repl_backlog_size/PROTO_REPLY_CHUNK_BYTES + 1) *
(sizeof(replBufBlock)+sizeof(listNode));
size_t counted_mem = server.repl_backlog_size + extra_approx_size;
if (server.repl_buffer_mem > counted_mem) {
overhead += (server.repl_buffer_mem - counted_mem);
}
}
if (server.aof_state != AOF_OFF) {
overhead += sdsAllocSize(server.aof_buf)+aofRewriteBufferMemoryUsage();
}

View File

@ -46,6 +46,18 @@ void lazyFreeLuaScripts(void *args[]) {
atomicIncr(lazyfreed_objects,len);
}
/* Release replication backlog referencing memory. */
void lazyFreeReplicationBacklogRefMem(void *args[]) {
list *blocks = args[0];
rax *index = args[1];
long long len = listLength(blocks);
len += raxSize(index);
listRelease(blocks);
raxFree(index);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
}
/* Return the number of currently pending objects to free. */
size_t lazyfreeGetPendingObjectsCount(void) {
size_t aux;
@ -180,3 +192,16 @@ void freeLuaScriptsAsync(dict *lua_scripts) {
dictRelease(lua_scripts);
}
}
/* Free replication backlog referencing buffer blocks and rax index. */
void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
if (listLength(blocks) > LAZYFREE_THRESHOLD ||
raxSize(index) > LAZYFREE_THRESHOLD)
{
atomicIncr(lazyfree_objects,listLength(blocks)+raxSize(index));
bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index);
} else {
listRelease(blocks);
raxFree(index);
}
}

View File

@ -276,7 +276,7 @@ void execCommand(client *c) {
* backlog with the final EXEC. */
if (server.repl_backlog && was_master && !is_master) {
char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
feedReplicationBacklog(execcmd,strlen(execcmd));
feedReplicationBuffer(execcmd,strlen(execcmd));
}
afterPropagateExec();
}

View File

@ -140,6 +140,8 @@ client *createClient(connection *conn) {
c->name = NULL;
c->bufpos = 0;
c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf);
c->ref_repl_buf_node = NULL;
c->ref_block_pos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
@ -467,7 +469,7 @@ void afterErrorReply(client *c, const char *s, size_t len) {
"to its %s: '%.*s' after processing the command "
"'%s'", from, to, (int)len, s, cmdname);
if (ctype == CLIENT_TYPE_MASTER && server.repl_backlog &&
server.repl_backlog_histlen > 0)
server.repl_backlog->histlen > 0)
{
showLatestBacklog();
}
@ -985,30 +987,37 @@ void AddReplyFromClient(client *dst, client *src) {
closeClientOnOutputBufferLimitReached(dst, 1);
}
/* Copy 'src' client output buffers into 'dst' client output buffers.
* The function takes care of freeing the old output buffers of the
* destination client. */
void copyClientOutputBuffer(client *dst, client *src) {
listEmpty(dst->reply);
dst->sentlen = 0;
dst->bufpos = 0;
dst->reply_bytes = 0;
/* Logically copy 'src' replica client buffers info to 'dst' replica.
* Basically increase referenced buffer block node reference count. */
void copyReplicaOutputBuffer(client *dst, client *src) {
serverAssert(src->bufpos == 0 && listLength(src->reply) == 0);
/* First copy src static buffer into dst (either static buffer or reply
* list, maybe clients have different 'usable_buffer_size'). */
_addReplyToBufferOrList(dst,src->buf,src->bufpos);
/* Copy src reply list into the dest. */
list* reply = listDup(src->reply);
listJoin(dst->reply,reply);
dst->reply_bytes += src->reply_bytes;
listRelease(reply);
if (src->ref_repl_buf_node == NULL) return;
dst->ref_repl_buf_node = src->ref_repl_buf_node;
dst->ref_block_pos = src->ref_block_pos;
((replBufBlock *)listNodeValue(dst->ref_repl_buf_node))->refcount++;
}
/* Return true if the specified client has pending reply buffers to write to
* the socket. */
int clientHasPendingReplies(client *c) {
return c->bufpos || listLength(c->reply);
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
/* Replicas use global shared replication buffer instead of
* private output buffer. */
serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
if (c->ref_repl_buf_node == NULL) return 0;
/* If the last replication buffer block content is totally sent,
* we have nothing to send. */
listNode *ln = listLast(server.repl_buffer_blocks);
replBufBlock *tail = listNodeValue(ln);
if (ln == c->ref_repl_buf_node &&
c->ref_block_pos == tail->used) return 0;
return 1;
} else {
return c->bufpos || listLength(c->reply);
}
}
void clientAcceptHandler(connection *conn) {
@ -1395,6 +1404,7 @@ void freeClient(client *c) {
/* Free data structures. */
listRelease(c->reply);
freeReplicaReferencedReplBuffer(c);
freeClientArgv(c);
freeClientOriginalArgv(c);
@ -1542,6 +1552,77 @@ client *lookupClientByID(uint64_t id) {
return (c == raxNotFound) ? NULL : c;
}
/* This function does actual writing output buffers to different types of
* clients, it is called by writeToClient.
* If we write successfully, it return C_OK, otherwise, C_ERR is returned,
* And 'nwritten' is a output parameter, it means how many bytes server write
* to client. */
int _writeToClient(client *c, ssize_t *nwritten) {
*nwritten = 0;
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
serverAssert(c->bufpos == 0 && listLength(c->reply) == 0);
replBufBlock *o = listNodeValue(c->ref_repl_buf_node);
serverAssert(o->used >= c->ref_block_pos);
/* Send current block if it is not fully sent. */
if (o->used > c->ref_block_pos) {
*nwritten = connWrite(c->conn, o->buf+c->ref_block_pos,
o->used-c->ref_block_pos);
if (*nwritten <= 0) return C_ERR;
c->ref_block_pos += *nwritten;
}
/* If we fully sent the object on head, go to the next one. */
listNode *next = listNextNode(c->ref_repl_buf_node);
if (next && c->ref_block_pos == o->used) {
o->refcount--;
((replBufBlock *)(listNodeValue(next)))->refcount++;
c->ref_repl_buf_node = next;
c->ref_block_pos = 0;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
return C_OK;
}
if (c->bufpos > 0) {
*nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (*nwritten <= 0) return C_ERR;
c->sentlen += *nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
clientReplyBlock *o = listNodeValue(listFirst(c->reply));
size_t objlen = o->used;
if (objlen == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
return C_OK;
}
*nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);
if (*nwritten <= 0) return C_ERR;
c->sentlen += *nwritten;
/* If we fully sent the object on head go to the next one */
if (c->sentlen == objlen) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
}
}
return C_OK;
}
/* Write data in output buffers to client. Return C_OK if the client
* is still valid after the call, C_ERR if it was freed because of some
* error. If handler_installed is set, it will attempt to clear the
@ -1555,48 +1636,11 @@ int writeToClient(client *c, int handler_installed) {
atomicIncr(server.stat_total_writes_processed, 1);
ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
clientReplyBlock *o;
while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
o = listNodeValue(listFirst(c->reply));
objlen = o->used;
if (objlen == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
}
nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;
/* If we fully sent the object on head go to the next one */
if (c->sentlen == objlen) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
}
}
int ret = _writeToClient(c, &nwritten);
if (ret == C_ERR) break;
totwritten += nwritten;
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
@ -2077,8 +2121,7 @@ void commandProcessed(client *c) {
if (c->flags & CLIENT_MASTER) {
long long applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves,
c->pending_querybuf, applied);
replicationFeedStreamFromMasterStream(c->pending_querybuf,applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
@ -2399,6 +2442,13 @@ sds catClientInfoString(sds s, client *client) {
/* Compute the total memory consumed by this client. */
size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem);
size_t used_blocks_of_repl_buf = 0;
if (client->ref_repl_buf_node) {
replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
replBufBlock *cur = listNodeValue(client->ref_repl_buf_node);
used_blocks_of_repl_buf = last->id - cur->id + 1;
}
sds cmdname = client->lastcmd ? getFullCommandName(client->lastcmd) : NULL;
sds ret = sdscatfmt(s,
"id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
@ -2419,7 +2469,7 @@ sds catClientInfoString(sds s, client *client) {
(unsigned long long) client->argv_len_sum,
(unsigned long long) client->mstate.argv_len_sums,
(unsigned long long) client->bufpos,
(unsigned long long) listLength(client->reply),
(unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf,
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
(unsigned long long) total_mem,
events,
@ -3247,8 +3297,21 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* the caller wishes. The main usage of this function currently is
* enforcing the client output length limits. */
size_t getClientOutputBufferMemoryUsage(client *c) {
size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
return c->reply_bytes + (list_item_size*listLength(c->reply));
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
size_t repl_buf_size = 0;
size_t repl_node_num = 0;
size_t repl_node_size = sizeof(listNode) + sizeof(replBufBlock);
if (c->ref_repl_buf_node) {
replBufBlock *last = listNodeValue(listLast(server.repl_buffer_blocks));
replBufBlock *cur = listNodeValue(c->ref_repl_buf_node);
repl_buf_size = last->repl_offset + last->size - cur->repl_offset;
repl_node_num = last->id - cur->id + 1;
}
return repl_buf_size + (repl_node_size*repl_node_num);
} else {
size_t list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
return c->reply_bytes + (list_item_size*listLength(c->reply));
}
}
/* Returns the total client's memory usage.
@ -3332,8 +3395,18 @@ int checkClientOutputBufferLimits(client *c) {
* like normal clients. */
if (class == CLIENT_TYPE_MASTER) class = CLIENT_TYPE_NORMAL;
/* Note that it doesn't make sense to set the replica clients output buffer
* limit lower than the repl-backlog-size config (partial sync will succeed
* and then replica will get disconnected).
* Such a configuration is ignored (the size of repl-backlog-size will be used).
* This doesn't have memory consumption implications since the replica client
* will share the backlog buffers memory. */
size_t hard_limit_bytes = server.client_obuf_limits[class].hard_limit_bytes;
if (class == CLIENT_TYPE_SLAVE && hard_limit_bytes &&
(long long)hard_limit_bytes < server.repl_backlog_size)
hard_limit_bytes = server.repl_backlog_size;
if (server.client_obuf_limits[class].hard_limit_bytes &&
used_mem >= server.client_obuf_limits[class].hard_limit_bytes)
used_mem >= hard_limit_bytes)
hard = 1;
if (server.client_obuf_limits[class].soft_limit_bytes &&
used_mem >= server.client_obuf_limits[class].soft_limit_bytes)
@ -3375,7 +3448,10 @@ int checkClientOutputBufferLimits(client *c) {
int closeClientOnOutputBufferLimitReached(client *c, int async) {
if (!c->conn) return 0; /* It is unsafe to free fake clients. */
serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return 0;
/* Note that c->reply_bytes is irrelevant for replica clients
* (they use the global repl buffers). */
if ((c->reply_bytes == 0 && getClientType(c) != CLIENT_TYPE_SLAVE) ||
c->flags & CLIENT_CLOSE_ASAP) return 0;
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(),c);
@ -3740,6 +3816,15 @@ int handleClientsWithPendingWritesUsingThreads(void) {
continue;
}
/* Since all replicas and replication backlog use global replication
* buffer, to guarantee data accessing thread safe, we must put all
* replicas client into io_threads_list[0] i.e. main thread handles
* sending the output buffer of all replicas. */
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
listAddNodeTail(io_threads_list[0],c);
continue;
}
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;

View File

@ -1172,20 +1172,34 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mem_total += server.initial_memory_usage;
mem = 0;
if (server.repl_backlog)
mem += zmalloc_size(server.repl_backlog);
mh->repl_backlog = mem;
mem_total += mem;
/* Replication backlog and replicas share one global replication buffer,
* only if replication buffer memory is more than the repl backlog setting,
* we consider the excess as replicas' memory. Otherwise, replication buffer
* memory is the consumption of repl backlog. */
if (listLength(server.slaves) &&
(long long)server.repl_buffer_mem > server.repl_backlog_size)
{
mh->clients_slaves = server.repl_buffer_mem - server.repl_backlog_size;
mh->repl_backlog = server.repl_backlog_size;
} else {
mh->clients_slaves = 0;
mh->repl_backlog = server.repl_buffer_mem;
}
if (server.repl_backlog) {
/* The approximate memory of rax tree for indexed blocks. */
mh->repl_backlog +=
server.repl_backlog->blocks_index->numnodes * sizeof(raxNode) +
raxSize(server.repl_backlog->blocks_index) * sizeof(void*);
}
mem_total += mh->repl_backlog;
mem_total += mh->clients_slaves;
/* Computing the memory used by the clients would be O(N) if done
* here online. We use our values computed incrementally by
* updateClientMemUsage(). */
mh->clients_slaves = server.stat_clients_type_memory[CLIENT_TYPE_SLAVE];
mh->clients_normal = server.stat_clients_type_memory[CLIENT_TYPE_MASTER]+
server.stat_clients_type_memory[CLIENT_TYPE_PUBSUB]+
server.stat_clients_type_memory[CLIENT_TYPE_NORMAL];
mem_total += mh->clients_slaves;
mem_total += mh->clients_normal;
mem = 0;
@ -1312,7 +1326,7 @@ sds getMemoryDoctorReport(void) {
}
/* Slaves using more than 10 MB each? */
if (numslaves > 0 && mh->clients_slaves / numslaves > (1024*1024*10)) {
if (numslaves > 0 && mh->clients_slaves > (1024*1024*10)) {
big_slave_buf = 1;
num_reports++;
}

View File

@ -33,6 +33,7 @@
#include "cluster.h"
#include "bio.h"
#include <memory.h>
#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
@ -109,95 +110,59 @@ int bg_unlink(const char *filename) {
void createReplicationBacklog(void) {
serverAssert(server.repl_backlog == NULL);
server.repl_backlog = zmalloc(server.cfg_repl_backlog_size);
server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
server.repl_backlog = zmalloc(sizeof(replBacklog));
server.repl_backlog->ref_repl_buf_node = NULL;
server.repl_backlog->unindexed_count = 0;
server.repl_backlog->blocks_index = raxNew();
server.repl_backlog->histlen = 0;
/* We don't have any data inside our buffer, but virtually the first
* byte we have is the next byte that will be generated for the
* replication stream. */
server.repl_backlog_off = server.master_repl_offset+1;
server.repl_backlog->offset = server.master_repl_offset+1;
}
/* This function is called when the user modifies the replication backlog
* size at runtime. It is up to the function to both update the
* server.cfg_repl_backlog_size and to resize the buffer and setup it so that
* server.repl_backlog_size and to resize the buffer and setup it so that
* it contains the same data as the previous one (possibly less data, but
* the most recent bytes, or the same data and more free space in case the
* buffer is enlarged). */
void resizeReplicationBacklog(long long newsize) {
if (newsize < CONFIG_REPL_BACKLOG_MIN_SIZE)
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
if (server.cfg_repl_backlog_size == newsize) return;
if (server.repl_backlog_size == newsize) return;
server.cfg_repl_backlog_size = newsize;
if (server.repl_backlog != NULL) {
/* What we actually do is to flush the old buffer and realloc a new
* empty one. It will refill with new data incrementally.
* The reason is that copying a few gigabytes adds latency and even
* worse often we need to alloc additional space before freeing the
* old buffer. */
zfree(server.repl_backlog);
server.repl_backlog = zmalloc(server.cfg_repl_backlog_size);
server.repl_backlog_size = zmalloc_usable_size(server.repl_backlog);
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
/* Next byte we have is... the next since the buffer is empty. */
server.repl_backlog_off = server.master_repl_offset+1;
}
server.repl_backlog_size = newsize;
if (server.repl_backlog)
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
void freeReplicationBacklog(void) {
serverAssert(listLength(server.slaves) == 0);
if (server.repl_backlog == NULL) return;
/* Decrease the start buffer node reference count. */
if (server.repl_backlog->ref_repl_buf_node) {
replBufBlock *o = listNodeValue(
server.repl_backlog->ref_repl_buf_node);
serverAssert(o->refcount == 1); /* Last reference. */
o->refcount--;
}
/* Replication buffer blocks are completely released when we free the
* backlog, since the backlog is released only when there are no replicas
* and the backlog keeps the last reference of all blocks. */
freeReplicationBacklogRefMemAsync(server.repl_buffer_blocks,
server.repl_backlog->blocks_index);
resetReplicationBuffer();
zfree(server.repl_backlog);
server.repl_backlog = NULL;
}
/* Add data to the replication backlog.
* This function also increments the global replication offset stored at
* server.master_repl_offset, because there is no case where we want to feed
* the backlog without incrementing the offset. */
void feedReplicationBacklog(void *ptr, size_t len) {
unsigned char *p = ptr;
server.master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */
while(len) {
size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;
if (thislen > len) thislen = len;
memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);
server.repl_backlog_idx += thislen;
if (server.repl_backlog_idx == server.repl_backlog_size)
server.repl_backlog_idx = 0;
len -= thislen;
p += thislen;
server.repl_backlog_histlen += thislen;
}
if (server.repl_backlog_histlen > server.repl_backlog_size)
server.repl_backlog_histlen = server.repl_backlog_size;
/* Set the offset of the first byte we have in the backlog. */
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
}
/* Wrapper for feedReplicationBacklog() that takes Redis string objects
* as input. */
void feedReplicationBacklogWithObject(robj *o) {
char llstr[LONG_STR_SIZE];
void *p;
size_t len;
if (o->encoding == OBJ_ENCODING_INT) {
len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
p = llstr;
} else {
len = sdslen(o->ptr);
p = o->ptr;
}
feedReplicationBacklog(p,len);
void resetReplicationBuffer(void) {
server.repl_buffer_mem = 0;
server.repl_buffer_blocks = listCreate();
listSetFreeMethod(server.repl_buffer_blocks, (void (*)(void*))zfree);
}
int canFeedReplicaReplBuffer(client *replica) {
@ -210,14 +175,231 @@ int canFeedReplicaReplBuffer(client *replica) {
return 1;
}
/* Propagate write commands to slaves, and populate the replication backlog
* as well. This function is used if the instance is a master: we use
* the commands received by our clients in order to create the replication
* stream. Instead if the instance is a slave and has sub-slaves attached,
* we use replicationFeedSlavesFromMasterStream() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln;
/* Similar with 'prepareClientToWrite', note that we must call this function
* before feeding replication stream into global replication buffer, since
* clientHasPendingReplies in prepareClientToWrite will access the global
* replication buffer to make judgements. */
int prepareReplicasToWrite(void) {
listIter li;
listNode *ln;
int prepared = 0;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (!canFeedReplicaReplBuffer(slave)) continue;
if (prepareClientToWrite(slave) == C_ERR) continue;
prepared++;
}
return prepared;
}
/* Wrapper for feedReplicationBuffer() that takes Redis string objects
* as input. */
void feedReplicationBufferWithObject(robj *o) {
char llstr[LONG_STR_SIZE];
void *p;
size_t len;
if (o->encoding == OBJ_ENCODING_INT) {
len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
p = llstr;
} else {
len = sdslen(o->ptr);
p = o->ptr;
}
feedReplicationBuffer(p,len);
}
/* Generally, we only have one replication buffer block to trim when replication
* backlog size exceeds our setting and no replica reference it. But if replica
* clients disconnect, we need to free many replication buffer blocks that are
* referenced. It would cost much time if there are a lots blocks to free, that
* will freeze server, so we trim replication backlog incrementally. */
void incrementalTrimReplicationBacklog(size_t max_blocks) {
serverAssert(server.repl_backlog != NULL);
size_t trimmed_blocks = 0, trimmed_bytes = 0;
while (server.repl_backlog->histlen > server.repl_backlog_size &&
trimmed_blocks < max_blocks)
{
/* We never trim backlog to less than one block. */
if (listLength(server.repl_buffer_blocks) <= 1) break;
/* Replicas increment the refcount of the first replication buffer block
* they refer to, in that case, we don't trim the backlog even if
* backlog_histlen exceeds backlog_size. This implicitly makes backlog
* bigger than our setting, but makes the master accept partial resync as
* much as possible. So that backlog must be the last reference of
* replication buffer blocks. */
listNode *first = listFirst(server.repl_buffer_blocks);
serverAssert(first == server.repl_backlog->ref_repl_buf_node);
replBufBlock *fo = listNodeValue(first);
if (fo->refcount != 1) break;
/* We don't try trim backlog if backlog valid size will be lessen than
* setting backlog size once we release the first repl buffer block. */
if (server.repl_backlog->histlen - (long long)fo->size <=
server.repl_backlog_size) break;
/* Decr refcount and release the first block later. */
fo->refcount--;
trimmed_bytes += fo->size;
trimmed_blocks++;
/* Go to use next replication buffer block node. */
listNode *next = listNextNode(first);
server.repl_backlog->ref_repl_buf_node = next;
serverAssert(server.repl_backlog->ref_repl_buf_node != NULL);
/* Incr reference count to keep the new head node. */
((replBufBlock *)listNodeValue(next))->refcount++;
/* Remove the node in recorded blocks. */
uint64_t encoded_offset = htonu64(fo->repl_offset);
raxRemove(server.repl_backlog->blocks_index,
(unsigned char*)&encoded_offset, sizeof(uint64_t), NULL);
/* Delete the first node from global replication buffer. */
serverAssert(fo->refcount == 0 && fo->used == fo->size);
server.repl_buffer_mem -= (fo->size +
sizeof(listNode) + sizeof(replBufBlock));
listDelNode(server.repl_buffer_blocks, first);
}
server.repl_backlog->histlen -= trimmed_bytes;
/* Set the offset of the first byte we have in the backlog. */
server.repl_backlog->offset = server.master_repl_offset -
server.repl_backlog->histlen + 1;
}
/* Free replication buffer blocks that are referenced by this client. */
void freeReplicaReferencedReplBuffer(client *replica) {
if (replica->ref_repl_buf_node != NULL) {
/* Decrease the start buffer node reference count. */
replBufBlock *o = listNodeValue(replica->ref_repl_buf_node);
serverAssert(o->refcount > 0);
o->refcount--;
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
replica->ref_repl_buf_node = NULL;
replica->ref_block_pos = 0;
}
/* Append bytes into the global replication buffer list, replication backlog and
* all replica clients use replication buffers collectively, this function replace
* 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog,
* First we add buffer into global replication buffer block list, and then
* update replica / replication-backlog referenced node and block position. */
void feedReplicationBuffer(char *s, size_t len) {
static long long repl_block_id = 0;
if (server.repl_backlog == NULL) return;
server.master_repl_offset += len;
server.repl_backlog->histlen += len;
/* Install write handler for all replicas. */
prepareReplicasToWrite();
size_t start_pos = 0; /* The position of referenced blok to start sending. */
listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
int add_new_block = 0; /* Create new block if current block is total used. */
listNode *ln = listLast(server.repl_buffer_blocks);
replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
/* Append to tail string when possible. */
if (tail && tail->size > tail->used) {
start_node = listLast(server.repl_buffer_blocks);
start_pos = tail->used;
/* Copy the part we can fit into the tail, and leave the rest for a
* new node */
size_t avail = tail->size - tail->used;
size_t copy = (avail >= len) ? len : avail;
memcpy(tail->buf + tail->used, s, copy);
tail->used += copy;
s += copy;
len -= copy;
}
if (len) {
/* Create a new node, make sure it is allocated to at
* least PROTO_REPLY_CHUNK_BYTES */
size_t usable_size;
size_t size = (len < PROTO_REPLY_CHUNK_BYTES) ? PROTO_REPLY_CHUNK_BYTES : len;
tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
/* Take over the allocation's internal fragmentation */
tail->size = usable_size - sizeof(replBufBlock);
tail->used = len;
tail->refcount = 0;
tail->repl_offset = server.master_repl_offset - tail->used + 1;
tail->id = repl_block_id++;
memcpy(tail->buf, s, len);
listAddNodeTail(server.repl_buffer_blocks, tail);
/* We also count the list node memory into replication buffer memory. */
server.repl_buffer_mem += (usable_size + sizeof(listNode));
add_new_block = 1;
if (start_node == NULL) {
start_node = listLast(server.repl_buffer_blocks);
start_pos = 0;
}
}
/* For output buffer of replicas. */
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (!canFeedReplicaReplBuffer(slave)) continue;
/* Update shared replication buffer start position. */
if (slave->ref_repl_buf_node == NULL) {
slave->ref_repl_buf_node = start_node;
slave->ref_block_pos = start_pos;
/* Only increase the start block reference count. */
((replBufBlock *)listNodeValue(start_node))->refcount++;
}
/* Check output buffer limit only when add new block. */
if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
}
/* For replication backlog */
if (server.repl_backlog->ref_repl_buf_node == NULL) {
server.repl_backlog->ref_repl_buf_node = start_node;
/* Only increase the start block reference count. */
((replBufBlock *)listNodeValue(start_node))->refcount++;
/* Replication buffer must be empty before adding replication stream
* into replication backlog. */
serverAssert(add_new_block == 1 && start_pos == 0);
}
if (add_new_block) {
/* To make search offset from replication buffer blocks quickly
* when replicas ask partial resynchronization, we create one index
* block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */
server.repl_backlog->unindexed_count++;
if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) {
uint64_t encoded_offset = htonu64(tail->repl_offset);
raxInsert(server.repl_backlog->blocks_index,
(unsigned char*)&encoded_offset, sizeof(uint64_t),
listLast(server.repl_buffer_blocks), NULL);
server.repl_backlog->unindexed_count = 0;
}
}
/* Try to trim replication backlog since replication backlog may exceed
* our setting when we add replication stream. Note that it is important to
* try to trim at least one node since in the common case this is where one
* new backlog node is added and one should be removed. See also comments
* in freeMemoryGetNotCountedMemory for details. */
incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
}
/* Propagate write commands to replication stream.
*
* This function is used if the instance is a master: we use the commands
* received by our clients in order to create the replication stream.
* Instead if the instance is a replica and has sub-replicas attached, we use
* replicationFeedStreamFromMasterStream() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
int j, len;
char llstr[LONG_STR_SIZE];
@ -252,68 +434,36 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
dictid_len, llstr));
}
/* Add the SELECT command into the backlog. */
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);
/* Send it to slaves. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (!canFeedReplicaReplBuffer(slave)) continue;
addReply(slave,selectcmd);
}
feedReplicationBufferWithObject(selectcmd);
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
}
server.slaveseldb = dictid;
/* Write the command to the replication backlog if any. */
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3];
/* Write the command to the replication buffer if any. */
char aux[LONG_STR_SIZE+3];
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBuffer(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* and add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* and add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}
/* Write the command to every slave. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (!canFeedReplicaReplBuffer(slave)) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
/* Add the multi bulk length. */
addReplyArrayLen(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
feedReplicationBuffer(aux,len+3);
feedReplicationBufferWithObject(argv[j]);
feedReplicationBuffer(aux+len+1,2);
}
}
@ -323,26 +473,24 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
* guess what kind of bug it could be. */
void showLatestBacklog(void) {
if (server.repl_backlog == NULL) return;
if (listLength(server.repl_buffer_blocks) == 0) return;
long long dumplen = 256;
if (server.repl_backlog_histlen < dumplen)
dumplen = server.repl_backlog_histlen;
size_t dumplen = 256;
if (server.repl_backlog->histlen < (long long)dumplen)
dumplen = server.repl_backlog->histlen;
/* Identify the first byte to dump. */
long long idx =
(server.repl_backlog_idx + (server.repl_backlog_size - dumplen)) %
server.repl_backlog_size;
/* Scan the circular buffer to collect 'dumplen' bytes. */
sds dump = sdsempty();
listNode *node = listLast(server.repl_buffer_blocks);
while(dumplen) {
long long thislen =
((server.repl_backlog_size - idx) < dumplen) ?
(server.repl_backlog_size - idx) : dumplen;
dump = sdscatrepr(dump,server.repl_backlog+idx,thislen);
if (node == NULL) break;
replBufBlock *o = listNodeValue(node);
size_t thislen = o->used >= dumplen ? dumplen : o->used;
sds head = sdscatrepr(sdsempty(), o->buf+o->used-thislen, thislen);
sds tmp = sdscatsds(head, dump);
sdsfree(dump);
dump = tmp;
dumplen -= thislen;
idx = 0;
node = listPrevNode(node);
}
/* Finally log such bytes: this is vital debugging info to
@ -354,10 +502,7 @@ void showLatestBacklog(void) {
/* This function is used in order to proxy what we receive from our master
* to our sub-slaves. */
#include <ctype.h>
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) {
listNode *ln;
listIter li;
void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) {
/* Debugging: this is handy to see the stream sent from master
* to slaves. Disabled with if(0). */
if (0) {
@ -368,14 +513,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
printf("\n");
}
if (server.repl_backlog) feedReplicationBacklog(buf,buflen);
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (!canFeedReplicaReplBuffer(slave)) continue;
addReplyProto(slave,buf,buflen);
}
/* There must be replication backlog if having attached slaves. */
if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL);
if (server.repl_backlog) feedReplicationBuffer(buf,buflen);
}
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
@ -422,11 +562,11 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
/* Feed the slave 'c' with the replication backlog starting from the
* specified 'offset' up to the end of the backlog. */
long long addReplyReplicationBacklog(client *c, long long offset) {
long long j, skip, len;
long long skip;
serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
if (server.repl_backlog_histlen == 0) {
if (server.repl_backlog->histlen == 0) {
serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
return 0;
}
@ -434,41 +574,58 @@ long long addReplyReplicationBacklog(client *c, long long offset) {
serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
server.repl_backlog_size);
serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
server.repl_backlog_off);
server.repl_backlog->offset);
serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
server.repl_backlog_histlen);
serverLog(LL_DEBUG, "[PSYNC] Current index: %lld",
server.repl_backlog_idx);
server.repl_backlog->histlen);
/* Compute the amount of bytes we need to discard. */
skip = offset - server.repl_backlog_off;
skip = offset - server.repl_backlog->offset;
serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
/* Point j to the oldest byte, that is actually our
* server.repl_backlog_off byte. */
j = (server.repl_backlog_idx +
(server.repl_backlog_size-server.repl_backlog_histlen)) %
server.repl_backlog_size;
serverLog(LL_DEBUG, "[PSYNC] Index of first byte: %lld", j);
/* Discard the amount of data to seek to the specified 'offset'. */
j = (j + skip) % server.repl_backlog_size;
/* Feed slave with data. Since it is a circular buffer we have to
* split the reply in two parts if we are cross-boundary. */
len = server.repl_backlog_histlen - skip;
serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len);
while(len) {
long long thislen =
((server.repl_backlog_size - j) < len) ?
(server.repl_backlog_size - j) : len;
serverLog(LL_DEBUG, "[PSYNC] addReply() length: %lld", thislen);
addReplyProto(c,server.repl_backlog + j, thislen);
len -= thislen;
j = 0;
/* Iterate recorded blocks, quickly search the approximate node. */
listNode *node = NULL;
if (raxSize(server.repl_backlog->blocks_index) > 0) {
uint64_t encoded_offset = htonu64(offset);
raxIterator ri;
raxStart(&ri, server.repl_backlog->blocks_index);
raxSeek(&ri, ">", (unsigned char*)&encoded_offset, sizeof(uint64_t));
if (raxEOF(&ri)) {
/* No found, so search from the last recorded node. */
raxSeek(&ri, "$", NULL, 0);
raxPrev(&ri);
node = (listNode *)ri.data;
} else {
raxPrev(&ri); /* Skip the sought node. */
/* We should search from the prev node since the offset of current
* sought node exceeds searching offset. */
if (raxPrev(&ri))
node = (listNode *)ri.data;
else
node = server.repl_backlog->ref_repl_buf_node;
}
raxStop(&ri);
} else {
/* No recorded blocks, just from the start node to search. */
node = server.repl_backlog->ref_repl_buf_node;
}
return server.repl_backlog_histlen - skip;
/* Search the exact node. */
while (node != NULL) {
replBufBlock *o = listNodeValue(node);
if (o->repl_offset + (long long)o->used >= offset) break;
node = listNextNode(node);
}
serverAssert(node != NULL);
/* Install a writer handler first.*/
prepareClientToWrite(c);
/* Setting output buffer of the replica. */
replBufBlock *o = listNodeValue(node);
o->refcount++;
c->ref_repl_buf_node = node;
c->ref_block_pos = offset - o->repl_offset;
return server.repl_backlog->histlen - skip;
}
/* Return the offset to provide as reply to the PSYNC command received
@ -569,8 +726,8 @@ int masterTryPartialResynchronization(client *c) {
/* We still have the data our slave is asking for? */
if (!server.repl_backlog ||
psync_offset < server.repl_backlog_off ||
psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))
psync_offset < server.repl_backlog->offset ||
psync_offset > (server.repl_backlog->offset + server.repl_backlog->histlen))
{
serverLog(LL_NOTICE,
"Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
@ -853,7 +1010,8 @@ void syncCommand(client *c) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer.
* We don't copy buffer if clients don't want. */
if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave);
if (!(c->flags & CLIENT_REPL_RDBONLY))
copyReplicaOutputBuffer(c,slave);
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
@ -3488,6 +3646,16 @@ void replicationCron(void) {
* with any persistence. */
removeRDBUsedToSyncReplicas();
/* Sanity check replication buffer, the first block of replication buffer blocks
* must be referenced by someone, since it will be freed when not referenced,
* otherwise, server will OOM. also, its refcount must not be more than
* replicas number + 1(replication backlog). */
if (listLength(server.repl_buffer_blocks) > 0) {
replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks));
serverAssert(o->refcount > 0 &&
o->refcount <= (int)listLength(server.slaves)+1);
}
/* Refresh the number of slaves with lag <= min-slaves-max-lag. */
refreshGoodSlavesCount();
replication_cron_loops++; /* Incremented with frequency 1 HZ. */

View File

@ -3450,6 +3450,11 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
/* Incrementally trim replication backlog, 10 times the normal speed is
* to free replication backlog as much as possible. */
if (server.repl_backlog)
incrementalTrimReplicationBacklog(10*REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
/* Disconnect some clients if they are consuming too much memory. */
evictClients();
@ -3717,9 +3722,6 @@ void initServerConfig(void) {
/* Replication partial resync backlog */
server.repl_backlog = NULL;
server.repl_backlog_histlen = 0;
server.repl_backlog_idx = 0;
server.repl_backlog_off = 0;
server.repl_no_slaves_since = time(NULL);
/* Failover related */
@ -4171,6 +4173,7 @@ void initServer(void) {
server.blocked_last_cron = 0;
server.blocking_op_nesting = 0;
server.thp_enabled = 0;
resetReplicationBuffer();
if ((server.tls_port || server.tls_replication || server.tls_cluster)
&& tlsConfigure(&server.tls_ctx_config) == C_ERR) {
@ -6330,6 +6333,7 @@ sds genRedisInfoString(const char *section) {
"mem_fragmentation_bytes:%zd\r\n"
"mem_not_counted_for_evict:%zu\r\n"
"mem_replication_backlog:%zu\r\n"
"mem_total_replication_buffers:%zu\r\n"
"mem_clients_slaves:%zu\r\n"
"mem_clients_normal:%zu\r\n"
"mem_aof_buffer:%zu\r\n"
@ -6374,6 +6378,7 @@ sds genRedisInfoString(const char *section) {
mh->total_frag_bytes,
freeMemoryGetNotCountedMemory(),
mh->repl_backlog,
server.repl_buffer_mem,
mh->clients_slaves,
mh->clients_normal,
mh->aof_buffer,
@ -6762,8 +6767,8 @@ sds genRedisInfoString(const char *section) {
server.second_replid_offset,
server.repl_backlog != NULL,
server.repl_backlog_size,
server.repl_backlog_off,
server.repl_backlog_histlen);
server.repl_backlog ? server.repl_backlog->offset : 0,
server.repl_backlog ? server.repl_backlog->histlen : 0);
}
/* CPU */
@ -7515,15 +7520,19 @@ void dismissMemoryInChild(void) {
/* Currently we use zmadvise_dontneed only when we use jemalloc with Linux.
* so we avoid these pointless loops when they're not going to do anything. */
#if defined(USE_JEMALLOC) && defined(__linux__)
listIter li;
listNode *ln;
/* Dismiss replication backlog. */
if (server.repl_backlog != NULL) {
dismissMemory(server.repl_backlog, server.repl_backlog_size);
/* Dismiss replication buffer. We don't need to separately dismiss replication
* backlog and replica' output buffer, because they just reference the global
* replication buffer but don't cost real memory. */
listRewind(server.repl_buffer_blocks, &li);
while((ln = listNext(&li))) {
replBufBlock *o = listNodeValue(ln);
dismissMemory(o, o->size);
}
/* Dismiss all clients memory. */
listIter li;
listNode *ln;
listRewind(server.clients, &li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
@ -7592,15 +7601,34 @@ void loadDataFromDisk(void) {
server.second_replid_offset = rsi.repl_offset+1;
/* Rebase master_repl_offset from rsi.repl_offset. */
server.master_repl_offset += rsi.repl_offset;
server.repl_backlog_off = server.master_repl_offset -
server.repl_backlog_histlen + 1;
serverAssert(server.repl_backlog);
server.repl_backlog->offset = server.master_repl_offset -
server.repl_backlog->histlen + 1;
server.repl_no_slaves_since = time(NULL);
/* Rebase replication buffer blocks' offset since the previous
* setting offset starts from 0. */
listIter li;
listNode *ln;
listRewind(server.repl_buffer_blocks, &li);
while ((ln = listNext(&li))) {
replBufBlock *o = listNodeValue(ln);
o->repl_offset += rsi.repl_offset;
}
}
}
} else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
}
/* We always create replication backlog if server is a master, we need
* it because we put DELs in it when loading expired keys in RDB, but
* if RDB doesn't have replication info or there is no rdb, it is not
* possible to support partial resynchronization, to avoid extra memory
* of replication backlog, we drop it. */
if (server.master_repl_offset == 0 && server.repl_backlog)
freeReplicationBacklog();
}
}

View File

@ -377,6 +377,13 @@ typedef enum {
/* Synchronous read timeout - slave side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5
/* The default number of replication backlog blocks to trim per call. */
#define REPL_BACKLOG_TRIM_BLOCKS_PER_CALL 64
/* In order to quickly find the requested offset for PSYNC requests,
* we index some nodes in the replication buffer linked list into a rax. */
#define REPL_BACKLOG_INDEX_PER_BLOCKS 64
/* List related stuff */
#define LIST_HEAD 0
#define LIST_TAIL 1
@ -767,6 +774,33 @@ typedef struct clientReplyBlock {
char buf[];
} clientReplyBlock;
/* Replication buffer blocks is the list of replBufBlock.
*
* +--------------+ +--------------+ +--------------+
* | refcount = 1 | ... | refcount = 0 | ... | refcount = 2 |
* +--------------+ +--------------+ +--------------+
* | / \
* | / \
* | / \
* Repl Backlog Replia_A Replia_B
*
* Each replica or replication backlog increments only the refcount of the
* 'ref_repl_buf_node' which it points to. So when replica walks to the next
* node, it should first increase the next node's refcount, and when we trim
* the replication buffer nodes, we remove node always from the head node which
* refcount is 0. If the refcount of the head node is not 0, we must stop
* trimming and never iterate the next node. */
/* Similar with 'clientReplyBlock', it is used for shared buffers between
* all replica clients and replication backlog. */
typedef struct replBufBlock {
int refcount; /* Number of replicas or repl backlog using. */
long long id; /* The unique incremental number. */
long long repl_offset; /* Start replication offset of the block. */
size_t size, used;
char buf[];
} replBufBlock;
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
@ -929,6 +963,24 @@ typedef struct {
need more reserved IDs use UINT64_MAX-1,
-2, ... and so forth. */
/* Replication backlog is not separate memory, it just is one consumer of
* the global replication buffer. This structure records the reference of
* replication buffers. Since the replication buffer block list may be very long,
* it would cost much time to search replication offset on partial resync, so
* we use one rax tree to index some blocks every REPL_BACKLOG_INDEX_PER_BLOCKS
* to make searching offset from replication buffer blocks list faster. */
typedef struct replBacklog {
listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,
* see the definition of replBufBlock. */
size_t unindexed_count; /* The count from last creating index block. */
rax *blocks_index; /* The index of reocrded blocks of replication
* buffer for quickly searching replication
* offset on partial resynchronization. */
long long histlen; /* Backlog actual data length */
long long offset; /* Replication "master offset" of first
* byte in the replication backlog buffer.*/
} replBacklog;
typedef struct {
list *clients;
size_t mem_usage_sum;
@ -1029,6 +1081,11 @@ typedef struct client {
listNode *mem_usage_bucket_node;
clientMemUsageBucket *mem_usage_bucket;
listNode *ref_repl_buf_node; /* Referenced node of replication buffer blocks,
* see the definition of replBufBlock. */
size_t ref_block_pos; /* Access position of referenced buffer block,
* i.e. the next offset to send. */
/* Response buffer */
int bufpos;
size_t buf_usable_size; /* Usable size of buffer. */
@ -1528,14 +1585,8 @@ struct redisServer {
long long second_replid_offset; /* Accept offsets up to this for replid2. */
int slaveseldb; /* Last SELECTed DB in replication output */
int repl_ping_slave_period; /* Master pings the slave every N seconds */
char *repl_backlog; /* Replication backlog for partial syncs */
replBacklog *repl_backlog; /* Replication backlog for partial syncs */
long long repl_backlog_size; /* Backlog circular buffer size */
long long cfg_repl_backlog_size;/* Backlog circular buffer size in config */
long long repl_backlog_histlen; /* Backlog actual data length */
long long repl_backlog_idx; /* Backlog circular buffer current offset,
that is the next byte will'll write to.*/
long long repl_backlog_off; /* Replication "master offset" of first
byte in the replication backlog buffer.*/
time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time.
@ -1547,6 +1598,9 @@ struct redisServer {
int repl_diskless_load; /* Slave parse RDB directly from the socket.
* see REPL_DISKLESS_LOAD_* enum */
int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */
size_t repl_buffer_mem; /* The memory of replication buffer. */
list *repl_buffer_blocks; /* Replication buffers blocks list
* (serving replica clients and repl backlog) */
/* Replication (slave) */
char *masteruser; /* AUTH with this user and masterauth with master */
sds masterauth; /* AUTH with this password with master */
@ -2031,6 +2085,7 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
void readQueryFromClient(connection *conn);
int prepareClientToWrite(client *c);
void addReplyNull(client *c);
void addReplyNullArray(client *c);
void addReplyBool(client *c, int b);
@ -2063,8 +2118,8 @@ void addReplyPushLen(client *c, long length);
void addReplyHelp(client *c, const char **help);
void addReplySubcommandSyntaxError(client *c);
void addReplyLoadedModules(client *c);
void copyReplicaOutputBuffer(client *dst, client *src);
void addListRangeReply(client *c, robj *o, long start, long end, int reverse);
void copyClientOutputBuffer(client *dst, client *src);
size_t sdsZmallocSize(sds s);
size_t getStringObjectSdsUsedMemory(robj *o);
void freeClientReplyValue(void *o);
@ -2238,7 +2293,10 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen);
void replicationFeedStreamFromMasterStream(char *buf, size_t buflen);
void resetReplicationBuffer(void);
void feedReplicationBuffer(char *buf, size_t len);
void freeReplicaReferencedReplBuffer(client *replica);
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr, int type);
void replicationCron(void);
@ -2264,8 +2322,11 @@ int replicationSetupSlaveForFullResync(client *slave, long long offset);
void changeReplicationId(void);
void clearReplicationId2(void);
void createReplicationBacklog(void);
void freeReplicationBacklog(void);
void replicationCacheMasterUsingMyself(void);
void feedReplicationBacklog(void *ptr, size_t len);
void incrementalTrimReplicationBacklog(size_t blocks);
int canFeedReplicaReplBuffer(client *replica);
void showLatestBacklog(void);
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
@ -2613,7 +2674,7 @@ size_t lazyfreeGetPendingObjectsCount(void);
size_t lazyfreeGetFreedObjectsCount(void);
void lazyfreeResetStats(void);
void freeObjAsync(robj *key, robj *obj, int dbid);
void freeReplicationBacklogRefMemAsync(list *blocks, rax *index);
/* API to get key arguments from commands */
int *getKeysPrepareResult(getKeysResult *result, int numkeys);

View File

@ -118,7 +118,8 @@ start_server {} {
$master config rewrite
$master debug set-active-expire 0
for {set j 0} {$j < 1024} {incr j} {
# Make sure replication backlog is full and will be trimmed.
for {set j 0} {$j < 2048} {incr j} {
$master select [expr $j%16]
$master set $j somevalue px 10
}
@ -149,7 +150,7 @@ start_server {} {
assert {[status $master repl_backlog_first_byte_offset] > [status $master second_repl_offset]}
assert {[status $master sync_partial_ok] == 0}
assert {[status $master sync_full] == 1}
assert {[status $master rdb_last_load_keys_expired] == 1024}
assert {[status $master rdb_last_load_keys_expired] == 2048}
assert {[status $replica sync_full] == 1}
set digest [$master debug digest]

View File

@ -0,0 +1,218 @@
# This test group aims to test that all replicas share one global replication buffer,
# two replicas don't make replication buffer size double, and when there is no replica,
# replica buffer will shrink.
start_server {tags {"repl external:skip"}} {
start_server {} {
start_server {} {
start_server {} {
set replica1 [srv -3 client]
set replica2 [srv -2 client]
set replica3 [srv -1 client]
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$master config set save ""
$master config set repl-backlog-size 16384
$master config set client-output-buffer-limit "replica 0 0 0"
# Make sure replica3 is synchronized with master
$replica3 replicaof $master_host $master_port
wait_for_sync $replica3
# Generating RDB will take some 100 seconds
$master config set rdb-key-save-delay 1000000
populate 100 "" 16
# Make sure replica1 and replica2 are waiting bgsave
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
wait_for_condition 50 100 {
([s rdb_bgsave_in_progress] == 1) &&
[lindex [$replica1 role] 3] eq {sync} &&
[lindex [$replica2 role] 3] eq {sync}
} else {
fail "fail to sync with replicas"
}
test {All replicas share one global replication buffer} {
set before_used [s used_memory]
populate 1024 "" 1024 ; # Write extra 1M data
# New data uses 1M memory, but all replicas use only one
# replication buffer, so all replicas output memory is not
# more than double of replication buffer.
set repl_buf_mem [s mem_total_replication_buffers]
set extra_mem [expr {[s used_memory]-$before_used-1024*1024}]
assert {$extra_mem < 2*$repl_buf_mem}
# Kill replica1, replication_buffer will not become smaller
catch {$replica1 shutdown nosave}
wait_for_condition 50 100 {
[s connected_slaves] eq {2}
} else {
fail "replica doesn't disconnect with master"
}
assert_equal $repl_buf_mem [s mem_total_replication_buffers]
}
test {Replication buffer will become smaller when no replica uses} {
# Make sure replica3 catch up with the master
wait_for_ofs_sync $master $replica3
set repl_buf_mem [s mem_total_replication_buffers]
# Kill replica2, replication_buffer will become smaller
catch {$replica2 shutdown nosave}
wait_for_condition 50 100 {
[s connected_slaves] eq {1}
} else {
fail "replica2 doesn't disconnect with master"
}
assert {[expr $repl_buf_mem - 1024*1024] > [s mem_total_replication_buffers]}
}
}
}
}
}
# This test group aims to test replication backlog size can outgrow the backlog
# limit config if there is a slow replica which keep massive replication buffers,
# and replicas could use this replication buffer (beyond backlog config) for
# partial re-synchronization. Of course, replication backlog memory also can
# become smaller when master disconnects with slow replicas since output buffer
# limit is reached.
start_server {tags {"repl external:skip"}} {
start_server {} {
start_server {} {
set replica1 [srv -2 client]
set replica1_pid [s -2 process_id]
set replica2 [srv -1 client]
set replica2_pid [s -1 process_id]
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$master config set save ""
$master config set repl-backlog-size 16384
$master config set client-output-buffer-limit "replica 0 0 0"
$replica1 replicaof $master_host $master_port
wait_for_sync $replica1
test {Replication backlog size can outgrow the backlog limit config} {
# Generating RDB will take 1000 seconds
$master config set rdb-key-save-delay 1000000
populate 1000 master 10000
$replica2 replicaof $master_host $master_port
# Make sure replica2 is waiting bgsave
wait_for_condition 5000 100 {
([s rdb_bgsave_in_progress] == 1) &&
[lindex [$replica2 role] 3] eq {sync}
} else {
fail "fail to sync with replicas"
}
# Replication actual backlog grow more than backlog setting since
# the slow replica2 kept replication buffer.
populate 10000 master 10000
assert {[s repl_backlog_histlen] > [expr 10000*10000]}
}
# Wait replica1 catch up with the master
wait_for_condition 1000 100 {
[s -2 master_repl_offset] eq [s master_repl_offset]
} else {
fail "Replica offset didn't catch up with the master after too long time"
}
test {Replica could use replication buffer (beyond backlog config) for partial resynchronization} {
# replica1 disconnects with master
$replica1 replicaof [srv -1 host] [srv -1 port]
# Write a mass of data that exceeds repl-backlog-size
populate 10000 master 10000
# replica1 reconnects with master
$replica1 replicaof $master_host $master_port
wait_for_condition 1000 100 {
[s -2 master_repl_offset] eq [s master_repl_offset]
} else {
fail "Replica offset didn't catch up with the master after too long time"
}
# replica2 still waits for bgsave ending
assert {[s rdb_bgsave_in_progress] eq {1} && [lindex [$replica2 role] 3] eq {sync}}
# master accepted replica1 partial resync
assert_equal [s sync_partial_ok] {1}
assert_equal [$master debug digest] [$replica1 debug digest]
}
test {Replication backlog memory will become smaller if disconnecting with replica} {
assert {[s repl_backlog_histlen] > [expr 2*10000*10000]}
assert_equal [s connected_slaves] {2}
exec kill -SIGSTOP $replica2_pid
r config set client-output-buffer-limit "replica 128k 0 0"
# trigger output buffer limit check
r set key [string repeat A [expr 64*1024]]
# master will close replica2's connection since replica2's output
# buffer limit is reached, so there only is replica1.
wait_for_condition 100 100 {
[s connected_slaves] eq {1}
} else {
fail "master didn't disconnect with replica2"
}
# Since we trim replication backlog inrementally, replication backlog
# memory may take time to be reclaimed.
wait_for_condition 1000 100 {
[s repl_backlog_histlen] < [expr 10000*10000]
} else {
fail "Replication backlog memory is not smaller"
}
exec kill -SIGCONT $replica2_pid
}
}
}
}
test {Partial resynchronization is successful even client-output-buffer-limit is less than repl-backlog-size} {
start_server {tags {"repl external:skip"}} {
start_server {} {
r config set save ""
r config set repl-backlog-size 100mb
r config set client-output-buffer-limit "replica 512k 0 0"
set replica [srv -1 client]
$replica replicaof [srv 0 host] [srv 0 port]
wait_for_sync $replica
set big_str [string repeat A [expr 10*1024*1024]] ;# 10mb big string
r multi
r client kill type replica
r set key $big_str
r set key $big_str
r debug sleep 2 ;# wait for replica reconnecting
r exec
# When replica reconnects with master, master accepts partial resync,
# and don't close replica client even client output buffer limit is
# reached.
r set key $big_str ;# trigger output buffer limit check
wait_for_ofs_sync r $replica
# master accepted replica partial resync
assert_equal [s sync_full] {1}
assert_equal [s sync_partial_ok] {1}
r multi
r set key $big_str
r set key $big_str
r exec
# replica's reply buffer size is more than client-output-buffer-limit but
# doesn't exceed repl-backlog-size, we don't close replica client.
wait_for_condition 1000 100 {
[s -1 master_repl_offset] eq [s master_repl_offset]
} else {
fail "Replica offset didn't catch up with the master after too long time"
}
assert_equal [s sync_full] {1}
assert_equal [s sync_partial_ok] {1}
}
}
}

View File

@ -527,8 +527,11 @@ test {diskless loading short read} {
$master multi
$master client kill type replica
$master set asdf asdf
# the side effect of resizing the backlog is that it is flushed (16k is the min size)
$master config set repl-backlog-size [expr {16384 + $i}]
# fill replication backlog with new content
$master config set repl-backlog-size 16384
for {set keyid 0} {$keyid < 10} {incr keyid} {
$master set "$keyid string_$keyid" [string repeat A 16384]
}
$master exec
}
# wait for loading to stop (fail)

View File

@ -43,6 +43,7 @@ set ::all_tests {
integration/replication-3
integration/replication-4
integration/replication-psync
integration/replication-buffer
integration/aof
integration/rdb
integration/corrupt-dump

View File

@ -355,7 +355,7 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline}
$rd_master setrange key:0 0 [string repeat A $payload_len]
}
for {set k 0} {$k < $cmd_count} {incr k} {
#$rd_master read
$rd_master read
}
} else {
for {set k 0} {$k < $cmd_count} {incr k} {
@ -382,12 +382,14 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline}
assert {$delta < $delta_max && $delta > -$delta_max}
$master client kill type slave
set killed_used [s -1 used_memory]
set info_str [$master info memory]
set killed_used [getInfoProperty $info_str used_memory]
set killed_mem_not_counted_for_evict [getInfoProperty $info_str mem_not_counted_for_evict]
set killed_slave_buf [s -1 mem_clients_slaves]
set killed_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
# we need to exclude replies buffer and query buffer of slave from used memory after kill slave
set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict - [slave_query_buffer $master]}]
set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}]
assert {[$master dbsize] == 100}
assert {$killed_slave_buf == 0}
assert {$delta_no_repl > -$delta_max && $delta_no_repl < $delta_max}

View File

@ -107,8 +107,11 @@ tags "modules" {
$master multi
$master client kill type replica
$master set asdf asdf
# the side effect of resizing the backlog is that it is flushed (16k is the min size)
$master config set repl-backlog-size [expr {16384 + $i}]
# fill replication backlog with new content
$master config set repl-backlog-size 16384
for {set keyid 0} {$keyid < 10} {incr keyid} {
$master set "$keyid string_$keyid" [string repeat A 16384]
}
$master exec
}
# wait for loading to stop (fail)