Implement clusterbus message extensions and cluster hostname support (#9530)

Implement the ability for cluster nodes to advertise their location with extension messages.
This commit is contained in:
Madelyn Olson 2022-01-02 19:48:29 -08:00 committed by GitHub
parent 9f8885760b
commit 5460c10047
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 609 additions and 50 deletions

View File

@ -1632,6 +1632,32 @@ lua-time-limit 5000
# PubSub message by default. (client-query-buffer-limit default value is 1gb)
#
# cluster-link-sendbuf-limit 0
# Clusters can configure their announced hostname using this config. This is a common use case for
# applications that need to use TLS Server Name Indication (SNI) or dealing with DNS based
# routing. By default this value is only shown as additional metadata in the CLUSTER SLOTS
# command, but can be changed using 'cluster-preferred-endpoint-type' config. This value is
# communicated along the clusterbus to all nodes, setting it to an empty string will remove
# the hostname and also propgate the removal.
#
# cluster-announce-hostname ""
# Clusters can advertise how clients should connect to them using either their IP address,
# a user defined hostname, or by declaring they have no endpoint. Which endpoint is
# shown as the preferred endpoint is set by using the cluster-preferred-endpoint-type
# config with values 'ip', 'hostname', or 'unknown-endpoint'. This value controls how
# the endpoint returned for MOVED/ASKING requests as well as the first field of CLUSTER SLOTS.
# If the preferred endpoint type is set to hostname, but no announced hostname is set, a '?'
# will be returned instead.
#
# When a cluster advertises itself as having an unknown endpoint, it's indicating that
# the server doesn't know how clients can reach the cluster. This can happen in certain
# networking situations where there are multiple possible routes to the node, and the
# server doesn't know which one the client took. In this case, the server is expecting
# the client to reach out on the same endpoint it used for making the last request, but use
# the port provided in the response.
#
# cluster-preferred-endpoint-type ip
# In order to setup your cluster make sure to read the documentation
# available at https://redis.io web site.

View File

@ -215,6 +215,9 @@ int clusterLoadConfig(char *filename) {
n = createClusterNode(argv[0],0);
clusterAddNode(n);
}
/* Format for the node address information:
* ip:port[@cport][,hostname] */
/* Address and port */
if ((p = strrchr(argv[1],':')) == NULL) {
sdsfreesplitres(argv,argc);
@ -234,6 +237,18 @@ int clusterLoadConfig(char *filename) {
* base port. */
n->cport = busp ? atoi(busp) : n->port + CLUSTER_PORT_INCR;
/* Hostname is an optional argument that defines the endpoint
* that can be reported to clients instead of IP. */
char *hostname = strchr(p, ',');
if (hostname) {
*hostname = '\0';
hostname++;
zfree(n->hostname);
n->hostname = zstrdup(hostname);
} else {
n->hostname = NULL;
}
/* The plaintext port for client in a TLS cluster (n->pport) is not
* stored in nodes.conf. It is received later over the bus protocol. */
@ -553,6 +568,31 @@ void clusterUpdateMyselfIp(void) {
}
}
/* Update the hostname for the specified node with the provided C string. */
static void updateAnnouncedHostname(clusterNode *node, char *new) {
if (!node->hostname && !new) {
return;
}
/* Previous and new hostname are the same, no need to update. */
if (new && node->hostname && !strcmp(new, node->hostname)) {
return;
}
if (node->hostname) zfree(node->hostname);
if (new) {
node->hostname = zstrdup(new);
} else {
node->hostname = NULL;
}
}
/* Update my hostname based on server configuration values */
void clusterUpdateMyselfHostname(void) {
if (!myself) return;
updateAnnouncedHostname(myself, server.cluster_announce_hostname);
}
void clusterInit(void) {
int saveconf = 0;
@ -646,6 +686,7 @@ void clusterInit(void) {
resetManualFailover();
clusterUpdateMyselfFlags();
clusterUpdateMyselfIp();
clusterUpdateMyselfHostname();
}
/* Reset a node performing a soft or hard reset:
@ -918,6 +959,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->link = NULL;
node->inbound_link = NULL;
memset(node->ip,0,sizeof(node->ip));
node->hostname = NULL;
node->port = 0;
node->cport = 0;
node->pport = 0;
@ -1083,6 +1125,7 @@ void freeClusterNode(clusterNode *n) {
nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
sdsfree(nodename);
zfree(n->hostname);
/* Release links and associated data structures. */
if (n->link) freeClusterLink(n->link);
@ -1871,6 +1914,93 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
}
}
/* Cluster ping extensions.
*
* The ping/pong/meet messages support arbitrary extensions to add additional
* metadata to the messages that are sent between the various nodes in the
* cluster. The extensions take the form:
* [ Header length + type (8 bytes) ]
* [ Extension information (Arbitrary length, but must be 8 byte padded) ]
*/
/* Returns the length of a given extension */
static uint32_t getPingExtLength(clusterMsgPingExt *ext) {
return ntohl(ext->length);
}
/* Returns the initial position of ping extensions. May return an invalid
* address if there are no ping extensions. */
static clusterMsgPingExt *getInitialPingExt(clusterMsg *hdr, uint16_t count) {
clusterMsgPingExt *initial = (clusterMsgPingExt*) &(hdr->data.ping.gossip[count]);
return initial;
}
/* Given a current ping extension, returns the start of the next extension. May return
* an invalid address if there are no further ping extensions. */
static clusterMsgPingExt *getNextPingExt(clusterMsgPingExt *ext) {
clusterMsgPingExt *next = (clusterMsgPingExt *) (((char *) ext) + getPingExtLength(ext));
return next;
}
/* Returns the exact size needed to store the hostname. The returned value
* will be 8 byte padded. */
int getHostnamePingExtSize() {
/* If hostname is not set, we don't send this extension */
if (!myself->hostname) return 0;
int totlen = sizeof(clusterMsgPingExt) + EIGHT_BYTE_ALIGN(strlen(myself->hostname) + 1);
return totlen;
}
/* Write the hostname ping extension at the start of the cursor. This function
* will update the cursor to point to the end of the written extension and
* will return the amount of bytes written. */
int writeHostnamePingExt(clusterMsgPingExt **cursor) {
/* If hostname is not set, we don't send this extension */
if (!myself->hostname) return 0;
/* Add the hostname information at the extension cursor */
clusterMsgPingExtHostname *ext = &(*cursor)->ext[0].hostname;
size_t hostname_len = strlen(myself->hostname);
memcpy(ext->hostname, myself->hostname, hostname_len);
uint32_t extension_size = getHostnamePingExtSize();
/* Move the write cursor */
(*cursor)->type = CLUSTERMSG_EXT_TYPE_HOSTNAME;
(*cursor)->length = htonl(extension_size);
/* Make sure the string is NULL terminated by adding 1 */
*cursor = (clusterMsgPingExt *) (ext->hostname + EIGHT_BYTE_ALIGN(strlen(myself->hostname) + 1));
return extension_size;
}
/* We previously validated the extensions, so this function just needs to
* handle the extensions. */
void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender);
char *ext_hostname = NULL;
uint16_t extensions = ntohs(hdr->extensions);
/* Loop through all the extensions and process them */
clusterMsgPingExt *ext = getInitialPingExt(hdr, ntohs(hdr->count));
while (extensions--) {
uint16_t type = ntohs(ext->type);
if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) {
clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname);
ext_hostname = hostname_ext->hostname;
} else {
/* Unknown type, we will ignore it but log what happened. */
serverLog(LL_WARNING, "Received unknown extension type %d", type);
}
/* We know this will be valid since we validated it ahead of time */
ext = getNextPingExt(ext);
}
/* If the node did not send us a hostname extension, assume
* they don't have an announced hostname. Otherwise, we'll
* set it now. */
updateAnnouncedHostname(sender, ext_hostname);
}
static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) {
clusterNode *sender;
if (link->node && !nodeInHandshake(link->node)) {
@ -1920,52 +2050,78 @@ int clusterProcessPacket(clusterLink *link) {
return 1;
}
if (type == server.cluster_drop_packet_filter) {
serverLog(LL_WARNING, "Dropping packet that matches debug drop filter");
return 1;
}
uint16_t flags = ntohs(hdr->flags);
uint16_t extensions = ntohs(hdr->extensions);
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
uint32_t explen; /* expected length of this packet */
clusterNode *sender;
if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
type == CLUSTERMSG_TYPE_MEET)
{
uint16_t count = ntohs(hdr->count);
uint32_t explen; /* expected length of this packet */
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += (sizeof(clusterMsgDataGossip)*count);
if (totlen != explen) return 1;
/* If there is extension data, which doesn't have a fixed length,
* loop through them and validate the length of it now. */
if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
clusterMsgPingExt *ext = getInitialPingExt(hdr, count);
while (extensions--) {
uint16_t extlen = getPingExtLength(ext);
if (extlen % 8 != 0) {
serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)",
clusterGetMessageTypeString(type), (int) extlen);
return 1;
}
if ((totlen - explen) < extlen) {
serverLog(LL_WARNING, "Received invalid %s packet with extension data that exceeds "
"total packet length (%lld)", clusterGetMessageTypeString(type),
(unsigned long long) totlen);
return 1;
}
explen += extlen;
ext = getNextPingExt(ext);
}
}
} else if (type == CLUSTERMSG_TYPE_FAIL) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataFail);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) -
8 +
ntohl(hdr->data.publish.msg.channel_len) +
ntohl(hdr->data.publish.msg.message_len);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
type == CLUSTERMSG_TYPE_MFSTART)
{
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
if (totlen != explen) return 1;
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataUpdate);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_MODULE) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgModule) -
3 + ntohl(hdr->data.module.msg.len);
if (totlen != explen) return 1;
} else {
/* We don't know this type of packet, so we assume it's well formed. */
explen = totlen;
}
if (totlen != explen) {
serverLog(LL_WARNING, "Received invalid %s packet of length %lld but expected length %lld",
clusterGetMessageTypeString(type), (unsigned long long) totlen, (unsigned long long) explen);
return 1;
}
sender = getNodeFromLinkAndMsg(link, hdr);
/* Update the last time we saw any data from this node. We
@ -2272,7 +2428,10 @@ int clusterProcessPacket(clusterLink *link) {
}
/* Get info from the gossip section */
if (sender) clusterProcessGossipSection(hdr,link);
if (sender) {
clusterProcessGossipSection(hdr,link);
clusterProcessPingExtensions(hdr,link);
}
} else if (type == CLUSTERMSG_TYPE_FAIL) {
clusterNode *failing;
@ -2695,7 +2854,7 @@ void clusterSendPing(clusterLink *link, int type) {
clusterMsg *hdr;
int gossipcount = 0; /* Number of gossip sections added so far. */
int wanted; /* Number of gossip sections we want to append if possible. */
int totlen; /* Total packet length. */
int estlen; /* Upper bound on estimated packet length */
/* freshnodes is the max number of nodes we can hope to append at all:
* nodes available minus two (ourself and the node we are sending the
* message to). However practically there may be less valid nodes since
@ -2736,15 +2895,17 @@ void clusterSendPing(clusterLink *link, int type) {
* faster to propagate to go from PFAIL to FAIL state. */
int pfail_wanted = server.cluster->stats_pfail_nodes;
/* Compute the maximum totlen to allocate our buffer. We'll fix the totlen
/* Compute the maximum estlen to allocate our buffer. We'll fix the estlen
* later according to the number of gossip sections we really were able
* to put inside the packet. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted));
estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted));
estlen += sizeof(clusterMsgPingExt) + getHostnamePingExtSize();
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg);
buf = zcalloc(totlen);
if (estlen < (int)sizeof(clusterMsg)) estlen = sizeof(clusterMsg);
buf = zcalloc(estlen);
hdr = (clusterMsg*) buf;
/* Populate the header. */
@ -2808,11 +2969,23 @@ void clusterSendPing(clusterLink *link, int type) {
dictReleaseIterator(di);
}
/* Ready to send... fix the totlen field and queue the message in the
* output buffer. */
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
int totlen = 0;
int extensions = 0;
/* Set the initial extension position */
clusterMsgPingExt *cursor = getInitialPingExt(hdr, gossipcount);
/* Add in the extensions */
if (myself->hostname) {
hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA;
totlen += writeHostnamePingExt(&cursor);
extensions++;
}
/* Compute the actual total length and send! */
totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
hdr->count = htons(gossipcount);
hdr->extensions = htons(extensions);
hdr->totlen = htonl(totlen);
clusterSendMessage(link,buf,totlen);
zfree(buf);
@ -3786,6 +3959,7 @@ void clusterCron(void) {
iteration++; /* Number of times this function was called so far. */
updateAnnouncedHostname(myself, server.cluster_announce_hostname);
/* The handshake timeout is the time after which a handshake node that was
* not turned into a normal node is removed from the nodes. Usually it is
* just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
@ -4404,10 +4578,18 @@ sds clusterGenNodeDescription(clusterNode *node, int use_pport) {
/* Node coordinates */
ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN);
ci = sdscatfmt(ci," %s:%i@%i ",
node->ip,
port,
node->cport);
if (node->hostname) {
ci = sdscatfmt(ci," %s:%i@%i,%s ",
node->ip,
port,
node->cport,
node->hostname);
} else {
ci = sdscatfmt(ci," %s:%i@%i ",
node->ip,
port,
node->cport);
}
/* Flags */
ci = representClusterNodeFlags(ci, node->flags);
@ -4619,6 +4801,15 @@ void addReplyClusterLinksDescription(client *c) {
* CLUSTER command
* -------------------------------------------------------------------------- */
const char *getPreferredEndpoint(clusterNode *n) {
switch(server.cluster_preferred_endpoint_type) {
case CLUSTER_ENDPOINT_TYPE_IP: return n->ip;
case CLUSTER_ENDPOINT_TYPE_HOSTNAME: return n->hostname ? n->hostname : "?";
case CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT: return "";
}
return "unknown";
}
const char *clusterGetMessageTypeString(int type) {
switch(type) {
case CLUSTERMSG_TYPE_PING: return "ping";
@ -4702,31 +4893,56 @@ void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
}
}
void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
int i, nested_elements = 3; /* slots (2) + master addr (1) */
void *nested_replylen = addReplyDeferredLen(c);
addReplyLongLong(c, start_slot);
addReplyLongLong(c, end_slot);
addReplyArrayLen(c, 3);
addReplyBulkCString(c, node->ip);
void addNodeToNodeReply(client *c, clusterNode *node) {
addReplyArrayLen(c, 4);
if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) {
addReplyBulkCString(c, node->ip);
} else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) {
addReplyBulkCString(c, node->hostname ? node->hostname : "?");
} else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT) {
addReplyNull(c);
} else {
serverPanic("Unrecognized preferred endpoint type");
}
/* Report non-TLS ports to non-TLS client in TLS cluster if available. */
int use_pport = (server.tls_cluster &&
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
addReplyLongLong(c, use_pport && node->pport ? node->pport : node->port);
addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);
/* Add the additional endpoint information, this is all the known networking information
* that is not the preferred endpoint. */
void *deflen = addReplyDeferredLen(c);
int length = 0;
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
addReplyBulkCString(c, "ip");
addReplyBulkCString(c, node->ip);
length++;
}
if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME
&& node->hostname)
{
addReplyBulkCString(c, "hostname");
addReplyBulkCString(c, node->hostname);
length++;
}
setDeferredMapLen(c, deflen, length);
}
void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
int i, nested_elements = 3; /* slots (2) + master addr (1) */
void *nested_replylen = addReplyDeferredLen(c);
addReplyLongLong(c, start_slot);
addReplyLongLong(c, end_slot);
addNodeToNodeReply(c, node);
/* Remaining nodes in reply are replicas for slot range */
for (i = 0; i < node->numslaves; i++) {
/* This loop is copy/pasted from clusterGenNodeDescription()
* with modifications for per-slot node aggregation. */
if (!isReplicaAvailable(node->slaves[i])) continue;
addReplyArrayLen(c, 3);
addReplyBulkCString(c, node->slaves[i]->ip);
/* Report slave's non-TLS port to non-TLS client in TLS cluster */
addReplyLongLong(c, (use_pport && node->slaves[i]->pport ?
node->slaves[i]->pport :
node->slaves[i]->port));
addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);
addNodeToNodeReply(c, node->slaves[i]);
nested_elements++;
}
setDeferredArrayLen(c, nested_replylen, nested_elements);
@ -4864,7 +5080,7 @@ NULL
/* Report plaintext ports, only if cluster is TLS but client is known to
* be non-TLS). */
int use_pport = (server.tls_cluster &&
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
sds nodes = clusterGenNodesDescription(0, use_pport);
addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
sdsfree(nodes);
@ -6391,12 +6607,12 @@ void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_co
/* Redirect to IP:port. Include plaintext port if cluster is TLS but
* client is non-TLS. */
int use_pport = (server.tls_cluster &&
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
c->conn && connGetType(c->conn) != CONN_TYPE_TLS);
int port = use_pport && n->pport ? n->pport : n->port;
addReplyErrorSds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d",
(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot, n->ip, port));
hashslot, getPreferredEndpoint(n), port));
} else {
serverPanic("getNodeByQuery() unknown error.");
}

View File

@ -135,6 +135,7 @@ typedef struct clusterNode {
mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
char *hostname; /* The known hostname for this node */
int port; /* Latest known clients port (TLS or plain). */
int pport; /* Latest known clients plaintext port. Only used
if the main clients port is for TLS. */
@ -245,11 +246,38 @@ typedef struct {
unsigned char bulk_data[3]; /* 3 bytes just as placeholder. */
} clusterMsgModule;
/* The cluster supports optional extension messages that can be sent
* along with ping/pong/meet messages to give additional info in a
* consistent manner. */
typedef enum {
CLUSTERMSG_EXT_TYPE_HOSTNAME,
} clusterMsgPingtypes;
/* Helper function for making sure extensions are eight byte aligned. */
#define EIGHT_BYTE_ALIGN(size) ((((size) + 7) / 8) * 8)
typedef struct {
char hostname[1]; /* The announced hostname, ends with \0. */
} clusterMsgPingExtHostname;
typedef struct {
uint32_t length; /* Total length of this extension message (including this header) */
uint16_t type; /* Type of this extension message (see clusterMsgPingExtTypes) */
uint16_t unused; /* 16 bits of padding to make this structure 8 byte aligned. */
union {
clusterMsgPingExtHostname hostname;
} ext[]; /* Actual extension information, formatted so that the data is 8
* byte aligned, regardless of its content. */
} clusterMsgPingExt;
union clusterMsgData {
/* PING, MEET and PONG */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
/* Extension data that can optionally be sent for ping/meet/pong
* messages. We can't explicitly define them here though, since
* the gossip array isn't the real length of the gossip data. */
} ping;
/* FAIL */
@ -292,7 +320,8 @@ typedef struct {
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
char notused1[32]; /* 32 bytes reserved for future usage. */
uint16_t extensions; /* Number of extensions sent along with this packet. */
char notused1[16]; /* 16 bytes reserved for future usage. */
uint16_t pport; /* Sender TCP plaintext port, if base port is TLS */
uint16_t cport; /* Sender TCP cluster bus port */
uint16_t flags; /* Sender node flags */
@ -308,6 +337,7 @@ typedef struct {
#define CLUSTERMSG_FLAG0_PAUSED (1<<0) /* Master paused for manual failover. */
#define CLUSTERMSG_FLAG0_FORCEACK (1<<1) /* Give ACK to AUTH_REQUEST even if
master is up. */
#define CLUSTERMSG_FLAG0_EXT_DATA (1<<2) /* Message contains extension data */
/* ---------------------- API exported outside cluster.c -------------------- */
void clusterInit(void);
@ -334,5 +364,6 @@ void clusterUpdateMyselfFlags(void);
void clusterUpdateMyselfIp(void);
void slotToChannelAdd(sds channel);
void slotToChannelDel(sds channel);
void clusterUpdateMyselfHostname(void);
#endif /* __CLUSTER_H */

View File

@ -141,6 +141,13 @@ configEnum protected_action_enum[] = {
{NULL, 0}
};
configEnum cluster_preferred_endpoint_type_enum[] = {
{"ip", CLUSTER_ENDPOINT_TYPE_IP},
{"hostname", CLUSTER_ENDPOINT_TYPE_HOSTNAME},
{"unknown-endpoint", CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT},
{NULL, 0}
};
/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
@ -2150,6 +2157,30 @@ static int isValidAOFfilename(char *val, const char **err) {
return 1;
}
static int isValidAnnouncedHostname(char *val, const char **err) {
if (strlen(val) >= NET_HOST_STR_LEN) {
*err = "Hostnames must be less than "
STRINGIFY(NET_HOST_STR_LEN) " characters";
return 0;
}
int i = 0;
char c;
while ((c = val[i])) {
/* We just validate the character set to make sure that everything
* is parsed and handled correctly. */
if (!((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
|| (c >= '0' && c <= '9') || (c == '-') || (c == '.')))
{
*err = "Hostnames may only contain alphanumeric characters, "
"hyphens or dots";
return 0;
}
c = val[i++];
}
return 1;
}
/* Validate specified string is a valid proc-title-template */
static int isValidProcTitleTemplate(char *val, const char **err) {
if (!validateProcTitleTemplate(val)) {
@ -2305,6 +2336,12 @@ static int updateClusterIp(const char **err) {
return 1;
}
int updateClusterHostname(const char **err) {
UNUSED(err);
clusterUpdateMyselfHostname();
return 1;
}
#ifdef USE_OPENSSL
static int applyTlsCfg(const char **err) {
UNUSED(err);
@ -2652,6 +2689,7 @@ standardConfig configs[] = {
createStringConfig("masteruser", NULL, MODIFIABLE_CONFIG | SENSITIVE_CONFIG, EMPTY_STRING_IS_NULL, server.masteruser, NULL, NULL, NULL),
createStringConfig("cluster-announce-ip", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_ip, NULL, NULL, updateClusterIp),
createStringConfig("cluster-config-file", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.cluster_configfile, "nodes.conf", NULL, NULL),
createStringConfig("cluster-announce-hostname", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, server.cluster_announce_hostname, NULL, isValidAnnouncedHostname, updateClusterHostname),
createStringConfig("syslog-ident", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.syslog_ident, "redis", NULL, NULL),
createStringConfig("dbfilename", NULL, MODIFIABLE_CONFIG | PROTECTED_CONFIG, ALLOW_EMPTY_STRING, server.rdb_filename, "dump.rdb", isValidDBfilename, NULL),
createStringConfig("appendfilename", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.aof_filename, "appendonly.aof", isValidAOFfilename, NULL),
@ -2681,6 +2719,7 @@ standardConfig configs[] = {
createEnumConfig("enable-protected-configs", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_protected_configs, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-debug-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_debug_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("enable-module-command", NULL, IMMUTABLE_CONFIG, protected_action_enum, server.enable_module_cmd, PROTECTED_ACTION_ALLOWED_NO, NULL, NULL),
createEnumConfig("cluster-preferred-endpoint-type", NULL, MODIFIABLE_CONFIG, cluster_preferred_endpoint_type_enum, server.cluster_preferred_endpoint_type, CLUSTER_ENDPOINT_TYPE_IP, NULL, NULL),
/* Integer configs */
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),

View File

@ -425,6 +425,8 @@ void debugCommand(client *c) {
#endif
"OBJECT <key>",
" Show low level info about `key` and associated value.",
"DROP-CLUSTER-PACKET-FILTER <packet-type>",
" Drop all packets that match the filtered type. Set to -1 allow all packets.",
"OOM",
" Crash the server simulating an out-of-memory error.",
"PANIC",
@ -575,6 +577,12 @@ NULL
server.dirty = 0; /* Prevent AOF / replication */
serverLog(LL_WARNING,"Append Only File loaded by DEBUG LOADAOF");
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"drop-cluster-packet-filter") && c->argc == 3) {
long packet_type;
if (getLongFromObjectOrReply(c, c->argv[2], &packet_type, NULL) != C_OK)
return;
server.cluster_drop_packet_filter = packet_type;
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
dictEntry *de;
robj *val;

View File

@ -2293,6 +2293,7 @@ void initServer(void) {
server.blocked_last_cron = 0;
server.blocking_op_nesting = 0;
server.thp_enabled = 0;
server.cluster_drop_packet_filter = -1;
resetReplicationBuffer();
if ((server.tls_port || server.tls_replication || server.tls_cluster)

View File

@ -527,6 +527,13 @@ typedef struct {
mstime_t end;
} pause_event;
/* Ways that a clusters endpoint can be described */
typedef enum {
CLUSTER_ENDPOINT_TYPE_IP = 0, /* Show IP address */
CLUSTER_ENDPOINT_TYPE_HOSTNAME, /* Show hostname */
CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT /* Show NULL or empty */
} cluster_endpoint_type;
/* RDB active child save type. */
#define RDB_CHILD_TYPE_NONE 0
#define RDB_CHILD_TYPE_DISK 1 /* RDB is written to disk. */
@ -1771,6 +1778,8 @@ struct redisServer {
int cluster_slave_no_failover; /* Prevent slave from starting a failover
if the master is in failure state. */
char *cluster_announce_ip; /* IP address to announce on cluster bus. */
char *cluster_announce_hostname; /* IP address to announce on cluster bus. */
int cluster_preferred_endpoint_type; /* Use the announced hostname when available. */
int cluster_announce_port; /* base port to announce on cluster bus. */
int cluster_announce_tls_port; /* TLS port to announce on cluster bus. */
int cluster_announce_bus_port; /* bus port to announce on cluster bus. */
@ -1782,6 +1791,8 @@ struct redisServer {
is down? */
int cluster_config_file_lock_fd; /* cluster config fd, will be flock */
unsigned long long cluster_link_sendbuf_limit_bytes; /* Memory usage limit on individual link send buffers*/
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
/* Scripting */
client *script_caller; /* The client running script right now, or NULL */
mstime_t script_time_limit; /* Script timeout in milliseconds */
@ -3334,4 +3345,7 @@ int isTlsConfigured(void);
int iAmMaster(void);
#define STRINGIFY_(x) #x
#define STRINGIFY(x) STRINGIFY_(x)
#endif

View File

@ -142,7 +142,8 @@ proc cluster_allocate_with_continuous_slots {n} {
}
}
# Create a cluster composed of the specified number of masters and slaves with continuous slots.
# Create a cluster composed of the specified number of masters and slaves,
# but with a continuous slot range.
proc cluster_create_with_continuous_slots {masters slaves} {
cluster_allocate_with_continuous_slots $masters
if {$slaves} {

View File

@ -0,0 +1,221 @@
source "../tests/includes/init-tests.tcl"
# Check if cluster's view of hostnames is consistent
proc are_hostnames_propagated {match_string} {
for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} {
set cfg [R $j cluster slots]
foreach node $cfg {
for {set i 2} {$i < [llength $node]} {incr i} {
if {! [string match $match_string [lindex [lindex [lindex $node $i] 3] 1]] } {
return 0
}
}
}
}
return 1
}
# Isolate a node from the cluster and give it a new nodeid
proc isolate_node {id} {
set node_id [R $id CLUSTER MYID]
R 6 CLUSTER RESET HARD
for {set j 0} {$j < 20} {incr j} {
if { $j eq $id } {
continue
}
R $j CLUSTER FORGET $node_id
}
}
proc get_slot_field {slot_output shard_id node_id attrib_id} {
return [lindex [lindex [lindex $slot_output $shard_id] $node_id] $attrib_id]
}
test "Create a 6 nodes cluster" {
cluster_create_with_continuous_slots 3 3
}
test "Cluster should start ok" {
assert_cluster_state ok
wait_for_cluster_propagation
}
test "Set cluster hostnames and verify they are propagated" {
for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} {
R $j config set cluster-announce-hostname "host-$j.com"
}
wait_for_condition 50 100 {
[are_hostnames_propagated "host-*.com"] eq 1
} else {
fail "cluster hostnames were not propagated"
}
# Now that everything is propagated, assert everyone agrees
wait_for_cluster_propagation
}
test "Update hostnames and make sure they are all eventually propagated" {
for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} {
R $j config set cluster-announce-hostname "host-updated-$j.com"
}
wait_for_condition 50 100 {
[are_hostnames_propagated "host-updated-*.com"] eq 1
} else {
fail "cluster hostnames were not propagated"
}
# Now that everything is propagated, assert everyone agrees
wait_for_cluster_propagation
}
test "Remove hostnames and make sure they are all eventually propagated" {
for {set j 0} {$j < $::cluster_master_nodes + $::cluster_replica_nodes} {incr j} {
R $j config set cluster-announce-hostname ""
}
wait_for_condition 50 100 {
[are_hostnames_propagated ""] eq 1
} else {
fail "cluster hostnames were not propagated"
}
# Now that everything is propagated, assert everyone agrees
wait_for_cluster_propagation
}
test "Verify cluster-preferred-endpoint-type behavior for redirects and info" {
R 0 config set cluster-announce-hostname "me.com"
R 1 config set cluster-announce-hostname ""
R 2 config set cluster-announce-hostname "them.com"
wait_for_cluster_propagation
# Verify default behavior
set slot_result [R 0 cluster slots]
assert_equal "" [lindex [get_slot_field $slot_result 0 2 0] 1]
assert_equal "" [lindex [get_slot_field $slot_result 2 2 0] 1]
assert_equal "hostname" [lindex [get_slot_field $slot_result 0 2 3] 0]
assert_equal "me.com" [lindex [get_slot_field $slot_result 0 2 3] 1]
assert_equal "hostname" [lindex [get_slot_field $slot_result 2 2 3] 0]
assert_equal "them.com" [lindex [get_slot_field $slot_result 2 2 3] 1]
# Redirect will use the IP address
catch {R 0 set foo foo} redir_err
assert_match "MOVED * 127.0.0.1:*" $redir_err
# Verify prefer hostname behavior
R 0 config set cluster-preferred-endpoint-type hostname
set slot_result [R 0 cluster slots]
assert_equal "me.com" [get_slot_field $slot_result 0 2 0]
assert_equal "them.com" [get_slot_field $slot_result 2 2 0]
# Redirect should use hostname
catch {R 0 set foo foo} redir_err
assert_match "MOVED * them.com:*" $redir_err
# Redirect to an unknown hostname returns ?
catch {R 0 set barfoo bar} redir_err
assert_match "MOVED * ?:*" $redir_err
# Verify unknown hostname behavior
R 0 config set cluster-preferred-endpoint-type unknown-endpoint
# Verify default behavior
set slot_result [R 0 cluster slots]
assert_equal "ip" [lindex [get_slot_field $slot_result 0 2 3] 0]
assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 0 2 3] 1]
assert_equal "ip" [lindex [get_slot_field $slot_result 2 2 3] 0]
assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 2 2 3] 1]
assert_equal "ip" [lindex [get_slot_field $slot_result 1 2 3] 0]
assert_equal "127.0.0.1" [lindex [get_slot_field $slot_result 1 2 3] 1]
# Not required by the protocol, but IP comes before hostname
assert_equal "hostname" [lindex [get_slot_field $slot_result 0 2 3] 2]
assert_equal "me.com" [lindex [get_slot_field $slot_result 0 2 3] 3]
assert_equal "hostname" [lindex [get_slot_field $slot_result 2 2 3] 2]
assert_equal "them.com" [lindex [get_slot_field $slot_result 2 2 3] 3]
# This node doesn't have a hostname
assert_equal 2 [llength [get_slot_field $slot_result 1 2 3]]
# Redirect should use empty string
catch {R 0 set foo foo} redir_err
assert_match "MOVED * :*" $redir_err
R 0 config set cluster-preferred-endpoint-type ip
}
test "Verify the nodes configured with prefer hostname only show hostname for new nodes" {
# Have everyone forget node 6 and isolate it from the cluster.
isolate_node 6
# Set hostnames for the primaries, now that the node is isolated
R 0 config set cluster-announce-hostname "shard-1.com"
R 1 config set cluster-announce-hostname "shard-2.com"
R 2 config set cluster-announce-hostname "shard-3.com"
# Prevent Node 0 and Node 6 from properly meeting,
# they'll hang in the handshake phase. This allows us to
# test the case where we "know" about it but haven't
# successfully retrieved information about it yet.
R 0 DEBUG DROP-CLUSTER-PACKET-FILTER 0
R 6 DEBUG DROP-CLUSTER-PACKET-FILTER 0
# Have a replica meet the isolated node
R 3 cluster meet 127.0.0.1 [get_instance_attrib redis 6 port]
# Now, we wait until the two nodes that aren't filtering packets
# to accept our isolated nodes connections. At this point they will
# start showing up in cluster slots.
wait_for_condition 50 100 {
[llength [R 6 CLUSTER SLOTS]] eq 2
} else {
fail "Node did not learn about the 2 shards it can talk to"
}
set slot_result [R 6 CLUSTER SLOTS]
assert_equal [lindex [get_slot_field $slot_result 0 2 3] 1] "shard-2.com"
assert_equal [lindex [get_slot_field $slot_result 1 2 3] 1] "shard-3.com"
# Also make sure we know about the isolated primary, we
# just can't reach it.
set primary_id [R 0 CLUSTER MYID]
assert_match "*$primary_id*" [R 6 CLUSTER NODES]
# Stop dropping cluster packets, and make sure everything
# stabilizes
R 0 DEBUG DROP-CLUSTER-PACKET-FILTER -1
R 6 DEBUG DROP-CLUSTER-PACKET-FILTER -1
wait_for_condition 50 100 {
[llength [R 6 CLUSTER SLOTS]] eq 3
} else {
fail "Node did not learn about the 2 shards it can talk to"
}
set slot_result [R 6 CLUSTER SLOTS]
assert_equal [lindex [get_slot_field $slot_result 0 2 3] 1] "shard-1.com"
assert_equal [lindex [get_slot_field $slot_result 1 2 3] 1] "shard-2.com"
assert_equal [lindex [get_slot_field $slot_result 2 2 3] 1] "shard-3.com"
}
test "Test restart will keep hostname information" {
# Set a new hostname, reboot and make sure it sticks
R 0 config set cluster-announce-hostname "restart-1.com"
restart_instance redis 0
set slot_result [R 0 CLUSTER SLOTS]
assert_equal [lindex [get_slot_field $slot_result 0 2 3] 1] "restart-1.com"
# As a sanity check, make sure everyone eventually agrees
wait_for_cluster_propagation
}
test "Test hostname validation" {
catch {R 0 config set cluster-announce-hostname [string repeat x 256]} err
assert_match "*Hostnames must be less than 256 characters*" $err
catch {R 0 config set cluster-announce-hostname "?.com"} err
assert_match "*Hostnames may only contain alphanumeric characters, hyphens or dots*" $err
# Note this isn't a valid hostname, but it passes our internal validation
R 0 config set cluster-announce-hostname "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-."
}

View File

@ -42,6 +42,8 @@ test "Cluster nodes hard reset" {
R $id config set loading-process-events-interval-bytes 2097152
R $id config set key-load-delay 0
R $id config set repl-diskless-load disabled
R $id config set cluster-announce-hostname ""
R $id DEBUG DROP-CLUSTER-PACKET-FILTER -1
R $id config rewrite
}
}