Introduce memory management on cluster link buffers (#9774)

Introduce memory management on cluster link buffers:
 * Introduce a new `cluster-link-sendbuf-limit` config that caps memory usage of cluster bus link send buffers.
 * Introduce a new `CLUSTER LINKS` command that displays current TCP links to/from peers.
 * Introduce a new `mem_cluster_links` field under `INFO` command output, which displays the overall memory usage by all current cluster links.
 * Introduce a new `total_cluster_links_buffer_limit_exceeded` field under `CLUSTER INFO` command output, which displays the accumulated count of cluster links freed due to `cluster-link-sendbuf-limit`.
This commit is contained in:
ny0312 2021-12-16 21:56:59 -08:00 committed by GitHub
parent 687210f155
commit 792afb4432
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 421 additions and 32 deletions

View File

@ -1576,6 +1576,17 @@ lua-time-limit 5000
#
# cluster-allow-reads-when-down no
# Cluster link send buffer limit is the limit on the memory usage of an individual
# cluster bus link's send buffer in bytes. Cluster links would be freed if they exceed
# this limit. This is to primarily prevent send buffers from growing unbounded on links
# toward slow peers (E.g. PubSub messages being piled up).
# This limit is disabled by default. Enable this limit when 'mem_cluster_links' INFO field
# and/or 'send-buffer-allocated' entries in the 'CLUSTER LINKS` command output continuously increase.
# Minimum limit of 1gb is recommended so that cluster link buffer can fit in at least a single
# PubSub message by default. (client-query-buffer-limit default value is 1gb)
#
# cluster-link-sendbuf-limit 0
# In order to setup your cluster make sure to read the documentation
# available at https://redis.io web site.

View File

@ -568,11 +568,15 @@ void clusterInit(void) {
server.cluster->failover_auth_epoch = 0;
server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
server.cluster->lastVoteEpoch = 0;
/* Initialize stats */
for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
server.cluster->stats_bus_messages_sent[i] = 0;
server.cluster->stats_bus_messages_received[i] = 0;
}
server.cluster->stats_pfail_nodes = 0;
server.cluster->stat_cluster_links_buffer_limit_exceeded = 0;
memset(server.cluster->slots,0, sizeof(server.cluster->slots));
clusterCloseAllSlots();
@ -711,8 +715,13 @@ clusterLink *createClusterLink(clusterNode *node) {
link->sndbuf = sdsempty();
link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
link->rcvbuf_len = 0;
link->node = node;
link->conn = NULL;
link->node = node;
/* Related node can only possibly be known at link creation time if this is an outbound link */
link->inbound = (node == NULL);
if (!link->inbound) {
node->link = link;
}
return link;
}
@ -726,11 +735,33 @@ void freeClusterLink(clusterLink *link) {
}
sdsfree(link->sndbuf);
zfree(link->rcvbuf);
if (link->node)
link->node->link = NULL;
if (link->node) {
if (link->node->link == link) {
serverAssert(!link->inbound);
link->node->link = NULL;
} else if (link->node->inbound_link == link) {
serverAssert(link->inbound);
link->node->inbound_link = NULL;
}
}
zfree(link);
}
void setClusterNodeToInboundClusterLink(clusterNode *node, clusterLink *link) {
serverAssert(!link->node);
serverAssert(link->inbound);
if (node->inbound_link) {
/* A peer may disconnect and then reconnect with us, and it's not guaranteed that
* we would always process the disconnection of the existing inbound link before
* accepting a new existing inbound link. Therefore, it's possible to have more than
* one inbound link from the same node at the same time. */
serverLog(LL_DEBUG, "Replacing inbound link fd %d from node %s with fd %d",
node->inbound_link->conn->fd, node->name, link->conn->fd);
}
node->inbound_link = link;
link->node = node;
}
static void clusterConnAcceptHandler(connection *conn) {
clusterLink *link;
@ -879,6 +910,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->data_received = 0;
node->fail_time = 0;
node->link = NULL;
node->inbound_link = NULL;
memset(node->ip,0,sizeof(node->ip));
node->port = 0;
node->cport = 0;
@ -1046,8 +1078,9 @@ void freeClusterNode(clusterNode *n) {
serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
sdsfree(nodename);
/* Release link and associated data structures. */
/* Release links and associated data structures. */
if (n->link) freeClusterLink(n->link);
if (n->inbound_link) freeClusterLink(n->inbound_link);
listRelease(n->fail_reports);
zfree(n->slaves);
zfree(n);
@ -1821,6 +1854,26 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
}
}
static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) {
clusterNode *sender;
if (link->node && !nodeInHandshake(link->node)) {
/* If the link has an associated node, use that so that we don't have to look it
* up every time, except when the node is still in handshake, the node still has
* a random name thus not truly "known". */
sender = link->node;
} else {
/* Otherwise, fetch sender based on the message */
sender = clusterLookupNode(hdr->sender);
/* We know the sender node but haven't associate it with the link. This must
* be an inbound link because only for inbound links we didn't know which node
* to associate when they were created. */
if (sender && !link->node) {
setClusterNodeToInboundClusterLink(sender, link);
}
}
return sender;
}
/* When this function is called, there is a packet to process starting
* at link->rcvbuf. Releasing the buffer is up to the caller, so this
* function should just handle the higher level stuff of processing the
@ -1896,10 +1949,7 @@ int clusterProcessPacket(clusterLink *link) {
if (totlen != explen) return 1;
}
/* Check if the sender is a known node. Note that for incoming connections
* we don't store link->node information, but resolve the node by the
* ID in the header each time in the current implementation. */
sender = clusterLookupNode(hdr->sender);
sender = getNodeFromLinkAndMsg(link, hdr);
/* Update the last time we saw any data from this node. We
* use this in order to avoid detecting a timeout from a node that
@ -2000,7 +2050,7 @@ int clusterProcessPacket(clusterLink *link) {
serverLog(LL_DEBUG,"%s packet received: %s",
clusterGetMessageTypeString(type),
link->node ? link->node->name : "NULL");
if (link->node) {
if (!link->inbound) {
if (nodeInHandshake(link->node)) {
/* If we already have this node, try to change the
* IP/port of the node with the new one. */
@ -2070,7 +2120,7 @@ int clusterProcessPacket(clusterLink *link) {
}
/* Update our info about the node */
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
if (!link->inbound && type == CLUSTERMSG_TYPE_PONG) {
link->node->pong_received = now;
link->node->ping_sent = 0;
@ -2673,7 +2723,7 @@ void clusterSendPing(clusterLink *link, int type) {
hdr = (clusterMsg*) buf;
/* Populate the header. */
if (link->node && type == CLUSTERMSG_TYPE_PING)
if (!link->inbound && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
clusterBuildMessageHdr(hdr,type);
@ -3588,7 +3638,7 @@ void clusterHandleManualFailover(void) {
/* Check if the node is disconnected and re-establish the connection.
* Also update a few stats while we are here, that can be used to make
* better decisions in other part of the code. */
int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
/* Not interested in reconnecting the link with myself or nodes
* for which we have no address. */
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) return 1;
@ -3622,20 +3672,57 @@ int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout
freeClusterLink(link);
return 0;
}
node->link = link;
}
return 0;
}
/* Resize the send buffer of a node if it is wasting
* enough space. */
int clusterNodeCronResizeBuffers(clusterNode *node) {
static void resizeClusterLinkBuffer(clusterLink *link) {
/* If unused space is a lot bigger than the used portion of the buffer then free up unused space.
* We use a factor of 4 because of the greediness of sdsMakeRoomFor (used by sdscatlen). */
if (node->link != NULL && sdsavail(node->link->sndbuf) / 4 > sdslen(node->link->sndbuf)) {
node->link->sndbuf = sdsRemoveFreeSpace(node->link->sndbuf);
if (link != NULL && sdsavail(link->sndbuf) / 4 > sdslen(link->sndbuf)) {
link->sndbuf = sdsRemoveFreeSpace(link->sndbuf);
}
return 0;
}
/* Resize the send buffer of a node if it is wasting
* enough space. */
static void clusterNodeCronResizeBuffers(clusterNode *node) {
resizeClusterLinkBuffer(node->link);
resizeClusterLinkBuffer(node->inbound_link);
}
static void freeClusterLinkOnBufferLimitReached(clusterLink *link) {
if (link == NULL || server.cluster_link_sendbuf_limit_bytes == 0) {
return;
}
unsigned long long mem_link = sdsalloc(link->sndbuf);
if (mem_link > server.cluster_link_sendbuf_limit_bytes) {
serverLog(LL_WARNING, "Freeing cluster link(%s node %s, used memory: %llu) due to "
"exceeding send buffer memory limit.", link->inbound ? "from" : "to",
link->node ? link->node->name : "", mem_link);
freeClusterLink(link);
server.cluster->stat_cluster_links_buffer_limit_exceeded++;
}
}
/* Free outbound link to a node if its send buffer size exceeded limit. */
static void clusterNodeCronFreeLinkOnBufferLimitReached(clusterNode *node) {
freeClusterLinkOnBufferLimitReached(node->link);
freeClusterLinkOnBufferLimitReached(node->inbound_link);
}
static size_t getClusterLinkMemUsage(clusterLink *link) {
if (link != NULL) {
return sizeof(clusterLink) + sdsalloc(link->sndbuf) + link->rcvbuf_alloc;
} else {
return 0;
}
}
/* Update memory usage statistics of all current cluster links */
static void clusterNodeCronUpdateClusterLinksMemUsage(clusterNode *node) {
server.stat_cluster_links_memory += getClusterLinkMemUsage(node->link);
server.stat_cluster_links_memory += getClusterLinkMemUsage(node->inbound_link);
}
/* This is executed 10 times every second */
@ -3662,14 +3749,25 @@ void clusterCron(void) {
/* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
server.cluster->stats_pfail_nodes = 0;
/* Clear so clusterNodeCronUpdateClusterLinksMemUsage can count the current memory usage of all cluster links. */
server.stat_cluster_links_memory = 0;
/* Run through some of the operations we want to do on each cluster node. */
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
/* The protocol is that they return non-zero if the node was
* terminated. */
/* The sequence goes:
* 1. We try to shrink link buffers if possible.
* 2. We free the links whose buffers are still oversized after possible shrinking.
* 3. We update the latest memory usage of cluster links.
* 4. We immediately attempt reconnecting after freeing links.
*/
clusterNodeCronResizeBuffers(node);
clusterNodeCronFreeLinkOnBufferLimitReached(node);
clusterNodeCronUpdateClusterLinksMemUsage(node);
/* The protocol is that function(s) below return non-zero if the node was
* terminated.
*/
if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
if(clusterNodeCronResizeBuffers(node)) continue;
}
dictReleaseIterator(di);
@ -4399,6 +4497,70 @@ sds clusterGenNodesDescription(int filter, int use_pport) {
return ci;
}
/* Add to the output buffer of the given client the description of the given cluster link.
* The description is a map with each entry being an attribute of the link. */
void addReplyClusterLinkDescription(client *c, clusterLink *link) {
addReplyMapLen(c, 6);
addReplyBulkCString(c, "direction");
addReplyBulkCString(c, link->inbound ? "from" : "to");
/* addReplyClusterLinkDescription is only called for links that have been
* associated with nodes. The association is always bi-directional, so
* in addReplyClusterLinkDescription, link->node should never be NULL. */
serverAssert(link->node);
sds node_name = sdsnewlen(link->node->name, CLUSTER_NAMELEN);
addReplyBulkCString(c, "node");
addReplyBulkCString(c, node_name);
sdsfree(node_name);
addReplyBulkCString(c, "create-time");
addReplyLongLong(c, link->ctime);
char events[3], *p;
p = events;
if (link->conn) {
if (connHasReadHandler(link->conn)) *p++ = 'r';
if (connHasWriteHandler(link->conn)) *p++ = 'w';
}
*p = '\0';
addReplyBulkCString(c, "events");
addReplyBulkCString(c, events);
addReplyBulkCString(c, "send-buffer-allocated");
addReplyLongLong(c, sdsalloc(link->sndbuf));
addReplyBulkCString(c, "send-buffer-used");
addReplyLongLong(c, sdslen(link->sndbuf));
}
/* Add to the output buffer of the given client an array of cluster link descriptions,
* with array entry being a description of a single current cluster link. */
void addReplyClusterLinksDescription(client *c) {
dictIterator *di;
dictEntry *de;
void *arraylen_ptr = NULL;
int num_links = 0;
arraylen_ptr = addReplyDeferredLen(c);
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->link) {
num_links++;
addReplyClusterLinkDescription(c, node->link);
}
if (node->inbound_link) {
num_links++;
addReplyClusterLinkDescription(c, node->inbound_link);
}
}
dictReleaseIterator(di);
setDeferredArrayLen(c, arraylen_ptr, num_links);
}
/* -----------------------------------------------------------------------------
* CLUSTER command
* -------------------------------------------------------------------------- */
@ -4608,6 +4770,9 @@ void clusterCommand(client *c) {
"SLOTS",
" Return information about slots range mappings. Each range is made of:",
" start, end, master and replicas IP addresses, ports and ids",
"LINKS",
" Return information about all network links between this node and its peers.",
" Output format is an array where each array element is a map containing attributes of a link",
NULL
};
addReplyHelp(c, help);
@ -4919,6 +5084,10 @@ NULL
info = sdscatprintf(info,
"cluster_stats_messages_received:%lld\r\n", tot_msg_received);
info = sdscatprintf(info,
"total_cluster_links_buffer_limit_exceeded:%llu\r\n",
server.cluster->stat_cluster_links_buffer_limit_exceeded);
/* Produce the reply protocol. */
addReplyVerbatim(c,info,sdslen(info),"txt");
sdsfree(info);
@ -5182,6 +5351,9 @@ NULL
}
clusterReset(hard);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"links") && c->argc == 2) {
/* CLUSTER LINKS */
addReplyClusterLinksDescription(c);
} else {
addReplySubcommandSyntaxError(c);
return;

View File

@ -39,7 +39,8 @@ typedef struct clusterLink {
char *rcvbuf; /* Packet reception buffer */
size_t rcvbuf_len; /* Used size of rcvbuf */
size_t rcvbuf_alloc; /* Allocated size of rcvbuf */
struct clusterNode *node; /* Node related to this link if any, or NULL */
struct clusterNode *node; /* Node related to this link. Initialized to NULL when unknown */
int inbound; /* 1 if this link is an inbound link accepted from the related node */
} clusterLink;
/* Cluster node flags and macros. */
@ -137,7 +138,8 @@ typedef struct clusterNode {
int pport; /* Latest known clients plaintext port. Only used
if the main clients port is for TLS. */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link with this node */
clusterLink *link; /* TCP/IP link established toward this node */
clusterLink *inbound_link; /* TCP/IP link accepted from this node */
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
@ -192,11 +194,13 @@ typedef struct clusterState {
/* The following fields are used by masters to take state on elections. */
uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */
int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */
/* Stats */
/* Messages received and sent by type. */
long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT];
long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT];
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
excluding nodes without address. */
unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */
} clusterState;
/* Redis cluster messages header */

View File

@ -399,6 +399,14 @@ struct redisCommandArg CLUSTER_KEYSLOT_Args[] = {
{0}
};
/********** CLUSTER LINKS ********************/
/* CLUSTER LINKS history */
#define CLUSTER_LINKS_History NULL
/* CLUSTER LINKS hints */
#define CLUSTER_LINKS_Hints NULL
/********** CLUSTER MEET ********************/
/* CLUSTER MEET history */
@ -552,6 +560,7 @@ struct redisCommand CLUSTER_Subcommands[] = {
{"help","Show helpful text about the different subcommands","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_HELP_History,CLUSTER_HELP_Hints,clusterCommand,2,CMD_LOADING|CMD_STALE,0},
{"info","Provides info about Redis Cluster node state","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_INFO_History,CLUSTER_INFO_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0},
{"keyslot","Returns the hash slot of the specified key","O(N) where N is the number of bytes in the key","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_KEYSLOT_History,CLUSTER_KEYSLOT_Hints,clusterCommand,3,CMD_RANDOM|CMD_STALE,0,.args=CLUSTER_KEYSLOT_Args},
{"links","Returns a list of all TCP links to and from peer nodes in cluster","O(N) where N is the total number of Cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_LINKS_History,CLUSTER_LINKS_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0},
{"meet","Force a node cluster to handshake with another node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MEET_History,CLUSTER_MEET_Hints,clusterCommand,-4,CMD_ADMIN|CMD_RANDOM|CMD_STALE,0,.args=CLUSTER_MEET_Args},
{"myid","Return the node id","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MYID_History,CLUSTER_MYID_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0},
{"nodes","Get Cluster config for the node","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_NODES_History,CLUSTER_NODES_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0},

View File

@ -0,0 +1,15 @@
{
"LINKS": {
"summary": "Returns a list of all TCP links to and from peer nodes in cluster",
"complexity": "O(N) where N is the total number of Cluster nodes",
"group": "cluster",
"since": "7.0.0",
"arity": 2,
"container": "CLUSTER",
"function": "clusterCommand",
"command_flags": [
"RANDOM",
"STALE"
]
}
}

View File

@ -2690,6 +2690,7 @@ standardConfig configs[] = {
/* Unsigned Long Long configs */
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),
createULongLongConfig("cluster-link-sendbuf-limit", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.cluster_link_sendbuf_limit_bytes, 0, MEMORY_CONFIG, NULL, NULL),
/* Size_t configs */
createSizeTConfig("hash-max-listpack-entries", "hash-max-ziplist-entries", MODIFIABLE_CONFIG, 0, LONG_MAX, server.hash_max_listpack_entries, 512, INTEGER_CONFIG, NULL, NULL),

View File

@ -1196,6 +1196,9 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
server.stat_clients_type_memory[CLIENT_TYPE_NORMAL];
mem_total += mh->clients_normal;
mh->cluster_links = server.stat_cluster_links_memory;
mem_total += mh->cluster_links;
mem = 0;
if (server.aof_state != AOF_OFF) {
mem += sdsZmallocSize(server.aof_buf);

View File

@ -2352,6 +2352,7 @@ void initServer(void) {
server.stat_module_progress = 0;
for (int j = 0; j < CLIENT_TYPE_COUNT; j++)
server.stat_clients_type_memory[j] = 0;
server.stat_cluster_links_memory = 0;
server.cron_malloc_stats.zmalloc_used = 0;
server.cron_malloc_stats.process_rss = 0;
server.cron_malloc_stats.allocator_allocated = 0;
@ -4559,6 +4560,7 @@ sds genRedisInfoString(const char *section) {
"mem_total_replication_buffers:%zu\r\n"
"mem_clients_slaves:%zu\r\n"
"mem_clients_normal:%zu\r\n"
"mem_cluster_links:%zu\r\n"
"mem_aof_buffer:%zu\r\n"
"mem_allocator:%s\r\n"
"active_defrag_running:%d\r\n"
@ -4611,6 +4613,7 @@ sds genRedisInfoString(const char *section) {
server.repl_buffer_mem,
mh->clients_slaves,
mh->clients_normal,
mh->cluster_links,
mh->aof_buffer,
ZMALLOC_LIB,
server.active_defrag_running,

View File

@ -1217,6 +1217,7 @@ struct redisMemOverhead {
size_t repl_backlog;
size_t clients_slaves;
size_t clients_normal;
size_t cluster_links;
size_t aof_buffer;
size_t lua_caches;
size_t functions_caches;
@ -1462,6 +1463,7 @@ struct redisServer {
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
double stat_module_progress; /* Module save progress. */
redisAtomic size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
size_t stat_cluster_links_memory;/* Mem usage by cluster links */
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
@ -1734,6 +1736,7 @@ struct redisServer {
int cluster_allow_reads_when_down; /* Are reads allowed when the cluster
is down? */
int cluster_config_file_lock_fd; /* cluster config fd, will be flock */
unsigned long long cluster_link_sendbuf_limit_bytes; /* Memory usage limit on individual link send buffers*/
/* Scripting */
client *script_caller; /* The client running script right now, or NULL */
mstime_t script_time_limit; /* Script timeout in milliseconds */

View File

@ -175,3 +175,72 @@ proc wait_for_cluster_propagation {} {
fail "cluster config did not reach a consistent state"
}
}
# Returns a parsed CLUSTER LINKS output of the instance identified
# by the given `id` as a list of dictionaries, with each dictionary
# corresponds to a link.
proc get_cluster_links id {
set lines [R $id cluster links]
set links {}
foreach l $lines {
if {$l eq {}} continue
assert_equal [llength $l] 12
assert_equal [lindex $l 0] "direction"
set dir [lindex $l 1]
assert_equal [lindex $l 2] "node"
set node [lindex $l 3]
assert_equal [lindex $l 4] "create-time"
set create_time [lindex $l 5]
assert_equal [lindex $l 6] "events"
set events [lindex $l 7]
assert_equal [lindex $l 8] "send-buffer-allocated"
set send_buffer_allocated [lindex $l 9]
assert_equal [lindex $l 10] "send-buffer-used"
set send_buffer_used [lindex $l 11]
set link [dict create \
dir $dir \
node $node \
create_time $create_time \
events $events \
send_buffer_allocated $send_buffer_allocated \
send_buffer_used $send_buffer_used \
]
lappend links $link
}
return $links
}
proc get_links_with_peer {this_instance_id peer_nodename} {
set links [get_cluster_links $this_instance_id]
set links_with_peer {}
foreach l $links {
if {[dict get $l node] eq $peer_nodename} {
lappend links_with_peer $l
}
}
return $links_with_peer
}
# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
# corresponds to the link established toward a peer identified by `peer_nodename`
proc get_link_to_peer {this_instance_id peer_nodename} {
set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
foreach l $links_with_peer {
if {[dict get $l dir] eq "to"} {
return $l
}
}
return {}
}
# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
# corresponds to the link accepted from a peer identified by `peer_nodename`
proc get_link_from_peer {this_instance_id peer_nodename} {
set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
foreach l $links_with_peer {
if {[dict get $l dir] eq "from"} {
return $l
}
}
return {}
}

View File

@ -0,0 +1,99 @@
source "../tests/includes/init-tests.tcl"
test "Create a cluster with two single-node shards" {
create_cluster 2 0
}
test "Cluster should start ok" {
assert_cluster_state ok
}
test "Each node has two links with each peer" {
foreach_redis_id id {
# Get number of peers, excluding myself
set nodes [get_cluster_nodes $id]
set num_peers [expr [llength $nodes] - 1]
# Get number of links to peers
set links [get_cluster_links $id]
set num_links [llength $links]
# Two links per peer
assert {$num_peers*2 eq $num_links}
# For each peer there should be exactly one
# link "to" it and one link "from" it.
foreach n $nodes {
if {[has_flag $n myself]} continue
set peer [dict get $n id]
set to 0
set from 0
foreach l $links {
if {[dict get $l node] eq $peer} {
if {[dict get $l dir] eq "to"} {
incr to
} elseif {[dict get $l dir] eq "from"} {
incr from
}
}
}
assert {$to eq 1}
assert {$from eq 1}
}
}
}
set primary1_id 0
set primary2_id 1
set primary1 [Rn $primary1_id]
set primary2 [Rn $primary2_id]
test "Disconnect link when send buffer limit reached" {
# On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts
set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1]
$primary1 CONFIG set cluster-node-timeout [expr 60*60*1000]
# Get primary1's links with primary2
set primary2_name [dict get [get_myself $primary2_id] id]
set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
# On primary1, set cluster link send buffer limit to 32MB
set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1]
$primary1 CONFIG set cluster-link-sendbuf-limit [expr 32*1024*1024]
assert {[get_info_field [$primary1 cluster info] total_cluster_links_buffer_limit_exceeded] eq 0}
# To manufacture an ever-growing send buffer from primary1 to primary2,
# make primary2 unresponsive.
set primary2_pid [get_instance_attrib redis $primary2_id pid]
exec kill -SIGSTOP $primary2_pid
# On primary1, send a 10MB Pubsub message. It will stay in send buffer of
# the link from primary1 to primary2
$primary1 publish channel [prepare_value [expr 10*1024*1024]]
# Check the same link has not been disconnected, but its send buffer has grown
set same_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
assert {[dict get $same_link_p1_to_p2 create_time] eq [dict get $orig_link_p1_to_p2 create_time]}
assert {[dict get $same_link_p1_to_p2 send_buffer_allocated] > [dict get $orig_link_p1_to_p2 send_buffer_allocated]}
# On primary1, send another 30MB Pubsub message.
$primary1 publish channel [prepare_value [expr 30*1024*1024]]
# Link has exceeded buffer limit and been dropped and recreated
set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
assert {[dict get $new_link_p1_to_p2 create_time] > [dict get $orig_link_p1_to_p2 create_time]}
assert {[get_info_field [$primary1 cluster info] total_cluster_links_buffer_limit_exceeded] eq 1}
# Link from primary2 should not be affected
set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
assert {[dict get $same_link_p1_from_p2 create_time] eq [dict get $orig_link_p1_from_p2 create_time]}
# Revive primary2
exec kill -SIGCONT $primary2_pid
# Reset configs on primary1 so config changes don't leak out to other tests
$primary1 CONFIG set cluster-node-timeout $oldtimeout
$primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit
}

View File

@ -978,3 +978,11 @@ proc read_big_bulk {code {compare no} {prefix ""}} {
r readraw 0
return $resp_len
}
proc prepare_value {size} {
set _v "c"
for {set i 1} {$i < $size} {incr i} {
append _v 0
}
return $_v
}

View File

@ -4,14 +4,6 @@ proc info_memory {r property} {
}
}
proc prepare_value {size} {
set _v "c"
for {set i 1} {$i < $size} {incr i} {
append _v 0
}
return $_v
}
start_server {tags {"wait external:skip"}} {
start_server {} {
set slave [srv 0 client]