Cluster announce ip / port initial implementation.

This commit is contained in:
antirez 2016-01-21 16:57:35 +01:00
parent b0939303e6
commit 11436b1449
5 changed files with 127 additions and 42 deletions

View File

@ -670,6 +670,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->link = NULL; node->link = NULL;
memset(node->ip,0,sizeof(node->ip)); memset(node->ip,0,sizeof(node->ip));
node->port = 0; node->port = 0;
node->cport = 0;
node->fail_reports = listCreate(); node->fail_reports = listCreate();
node->voted_time = 0; node->voted_time = 0;
node->orphaned_time = 0; node->orphaned_time = 0;
@ -1212,7 +1213,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) {
/* Return true if we already have a node in HANDSHAKE state matching the /* Return true if we already have a node in HANDSHAKE state matching the
* specified ip address and port number. This function is used in order to * specified ip address and port number. This function is used in order to
* avoid adding a new handshake node for the same address multiple times. */ * avoid adding a new handshake node for the same address multiple times. */
int clusterHandshakeInProgress(char *ip, int port) { int clusterHandshakeInProgress(char *ip, int port, int cport) {
dictIterator *di; dictIterator *di;
dictEntry *de; dictEntry *de;
@ -1221,7 +1222,9 @@ int clusterHandshakeInProgress(char *ip, int port) {
clusterNode *node = dictGetVal(de); clusterNode *node = dictGetVal(de);
if (!nodeInHandshake(node)) continue; if (!nodeInHandshake(node)) continue;
if (!strcasecmp(node->ip,ip) && node->port == port) break; if (!strcasecmp(node->ip,ip) &&
node->port == port &&
node->cport == cport) break;
} }
dictReleaseIterator(di); dictReleaseIterator(di);
return de != NULL; return de != NULL;
@ -1234,7 +1237,7 @@ int clusterHandshakeInProgress(char *ip, int port) {
* *
* EAGAIN - There is already an handshake in progress for this address. * EAGAIN - There is already an handshake in progress for this address.
* EINVAL - IP or port are not valid. */ * EINVAL - IP or port are not valid. */
int clusterStartHandshake(char *ip, int port) { int clusterStartHandshake(char *ip, int port, int cport) {
clusterNode *n; clusterNode *n;
char norm_ip[NET_IP_STR_LEN]; char norm_ip[NET_IP_STR_LEN];
struct sockaddr_storage sa; struct sockaddr_storage sa;
@ -1254,7 +1257,7 @@ int clusterStartHandshake(char *ip, int port) {
} }
/* Port sanity check */ /* Port sanity check */
if (port <= 0 || port > (65535-CLUSTER_PORT_INCR)) { if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
errno = EINVAL; errno = EINVAL;
return 0; return 0;
} }
@ -1271,7 +1274,7 @@ int clusterStartHandshake(char *ip, int port) {
(void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr), (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
norm_ip,NET_IP_STR_LEN); norm_ip,NET_IP_STR_LEN);
if (clusterHandshakeInProgress(norm_ip,port)) { if (clusterHandshakeInProgress(norm_ip,port,cport)) {
errno = EAGAIN; errno = EAGAIN;
return 0; return 0;
} }
@ -1282,6 +1285,7 @@ int clusterStartHandshake(char *ip, int port) {
n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET); n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
memcpy(n->ip,norm_ip,sizeof(n->ip)); memcpy(n->ip,norm_ip,sizeof(n->ip));
n->port = port; n->port = port;
n->cport = cport;
clusterAddNode(n); clusterAddNode(n);
return 1; return 1;
} }
@ -1301,10 +1305,11 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
sds ci; sds ci;
ci = representClusterNodeFlags(sdsempty(), flags); ci = representClusterNodeFlags(sdsempty(), flags);
serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d %s", serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d:%d %s",
g->nodename, g->nodename,
g->ip, g->ip,
ntohs(g->port), ntohs(g->port),
ntohs(g->cport),
ci); ci);
sdsfree(ci); sdsfree(ci);
@ -1338,11 +1343,14 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) && if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
!(flags & CLUSTER_NODE_NOADDR) && !(flags & CLUSTER_NODE_NOADDR) &&
!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) && !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
(strcasecmp(node->ip,g->ip) || node->port != ntohs(g->port))) (strcasecmp(node->ip,g->ip) ||
node->port != ntohs(g->port) ||
node->cport != ntohs(g->cport)))
{ {
if (node->link) freeClusterLink(node->link); if (node->link) freeClusterLink(node->link);
memcpy(node->ip,g->ip,NET_IP_STR_LEN); memcpy(node->ip,g->ip,NET_IP_STR_LEN);
node->port = ntohs(g->port); node->port = ntohs(g->port);
node->cport = ntohs(g->cport);
node->flags &= ~CLUSTER_NODE_NOADDR; node->flags &= ~CLUSTER_NODE_NOADDR;
} }
} else { } else {
@ -1356,7 +1364,7 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
!(flags & CLUSTER_NODE_NOADDR) && !(flags & CLUSTER_NODE_NOADDR) &&
!clusterBlacklistExists(g->nodename)) !clusterBlacklistExists(g->nodename))
{ {
clusterStartHandshake(g->ip,ntohs(g->port)); clusterStartHandshake(g->ip,ntohs(g->port),ntohs(g->cport));
} }
} }
@ -1365,23 +1373,36 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
} }
} }
/* IP -> string conversion. 'buf' is supposed to at least be 46 bytes. */ /* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
void nodeIp2String(char *buf, clusterLink *link) { * If 'announced_ip' length is non-zero, it is used instead of extracting
anetPeerToString(link->fd, buf, NET_IP_STR_LEN, NULL); * the IP from the socket peer address. */
void nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
if (announced_ip[0] != '\0') {
memcpy(buf,announced_ip,NET_IP_STR_LEN);
buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
} else {
anetPeerToString(link->fd, buf, NET_IP_STR_LEN, NULL);
}
} }
/* Update the node address to the IP address that can be extracted /* Update the node address to the IP address that can be extracted
* from link->fd, and at the specified port. * from link->fd, or if hdr->myip is non empty, to the address the node
* Also disconnect the node link so that we'll connect again to the new * is announcing us. The port is taken from the packet header as well.
* address. *
* If the address or port changed, disconnect the node link so that we'll
* connect again to the new address.
* *
* If the ip/port pair are already correct no operation is performed at * If the ip/port pair are already correct no operation is performed at
* all. * all.
* *
* The function returns 0 if the node address is still the same, * The function returns 0 if the node address is still the same,
* otherwise 1 is returned. */ * otherwise 1 is returned. */
int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, int port) { int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
clusterMsg *hdr)
{
char ip[NET_IP_STR_LEN] = {0}; char ip[NET_IP_STR_LEN] = {0};
int port = ntohs(hdr->port);
int cport = ntohs(hdr->cport);
/* We don't proceed if the link is the same as the sender link, as this /* We don't proceed if the link is the same as the sender link, as this
* function is designed to see if the node link is consistent with the * function is designed to see if the node link is consistent with the
@ -1391,12 +1412,14 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, int port) {
* it is safe to call during packet processing. */ * it is safe to call during packet processing. */
if (link == node->link) return 0; if (link == node->link) return 0;
nodeIp2String(ip,link); nodeIp2String(ip,link,hdr->myip);
if (node->port == port && strcmp(ip,node->ip) == 0) return 0; if (node->port == port && node->cport == cport &&
strcmp(ip,node->ip) == 0) return 0;
/* IP / port is different, update it. */ /* IP / port is different, update it. */
memcpy(node->ip,ip,sizeof(ip)); memcpy(node->ip,ip,sizeof(ip));
node->port = port; node->port = port;
node->cport = cport;
if (node->link) freeClusterLink(node->link); if (node->link) freeClusterLink(node->link);
node->flags &= ~CLUSTER_NODE_NOADDR; node->flags &= ~CLUSTER_NODE_NOADDR;
serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d", serverLog(LL_WARNING,"Address updated for node %.40s, now %s:%d",
@ -1647,7 +1670,10 @@ int clusterProcessPacket(clusterLink *link) {
if (type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') { if (type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') {
char ip[NET_IP_STR_LEN]; char ip[NET_IP_STR_LEN];
if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 && if (server.cluster_announce_ip) {
strncpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
myself->ip[NET_IP_STR_LEN-1] = '\0';
} else if (anetSockName(link->fd,ip,sizeof(ip),NULL) != -1 &&
strcmp(ip,myself->ip)) strcmp(ip,myself->ip))
{ {
memcpy(myself->ip,ip,NET_IP_STR_LEN); memcpy(myself->ip,ip,NET_IP_STR_LEN);
@ -1665,8 +1691,9 @@ int clusterProcessPacket(clusterLink *link) {
clusterNode *node; clusterNode *node;
node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE); node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
nodeIp2String(node->ip,link); nodeIp2String(node->ip,link,hdr->myip);
node->port = ntohs(hdr->port); node->port = ntohs(hdr->port);
node->cport = ntohs(hdr->cport);
clusterAddNode(node); clusterAddNode(node);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
} }
@ -1696,7 +1723,7 @@ int clusterProcessPacket(clusterLink *link) {
serverLog(LL_VERBOSE, serverLog(LL_VERBOSE,
"Handshake: we already know node %.40s, " "Handshake: we already know node %.40s, "
"updating the address if needed.", sender->name); "updating the address if needed.", sender->name);
if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) if (nodeUpdateAddressIfNeeded(sender,link,hdr))
{ {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE); CLUSTER_TODO_UPDATE_STATE);
@ -1728,6 +1755,7 @@ int clusterProcessPacket(clusterLink *link) {
link->node->flags |= CLUSTER_NODE_NOADDR; link->node->flags |= CLUSTER_NODE_NOADDR;
link->node->ip[0] = '\0'; link->node->ip[0] = '\0';
link->node->port = 0; link->node->port = 0;
link->node->cport = 0;
freeClusterLink(link); freeClusterLink(link);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
return 0; return 0;
@ -1737,7 +1765,7 @@ int clusterProcessPacket(clusterLink *link) {
/* Update the node address if it changed. */ /* Update the node address if it changed. */
if (sender && type == CLUSTERMSG_TYPE_PING && if (sender && type == CLUSTERMSG_TYPE_PING &&
!nodeInHandshake(sender) && !nodeInHandshake(sender) &&
nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) nodeUpdateAddressIfNeeded(sender,link,hdr))
{ {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE); CLUSTER_TODO_UPDATE_STATE);
@ -2134,11 +2162,28 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
hdr->type = htons(type); hdr->type = htons(type);
memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN); memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
/* If cluster-announce-ip option is enabled, force the receivers of our
* packets to use the specified address for this node. Otherwise if the
* first byte is zero, they'll do auto discovery. */
memset(hdr->myip,0,NET_IP_STR_LEN);
if (server.cluster_announce_ip) {
strncpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
hdr->myip[NET_IP_STR_LEN-1] = '\0';
}
/* Handle cluster-announce-port as well. */
int announced_port = server.cluster_announce_port ?
server.cluster_announce_port : server.port;
int announced_cport = server.cluster_announce_bus_port ?
server.cluster_announce_bus_port :
(server.port + CLUSTER_PORT_INCR);
memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots)); memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
memset(hdr->slaveof,0,CLUSTER_NAMELEN); memset(hdr->slaveof,0,CLUSTER_NAMELEN);
if (myself->slaveof != NULL) if (myself->slaveof != NULL)
memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN); memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
hdr->port = htons(server.port); hdr->port = htons(announced_port);
hdr->cport = htons(announced_cport);
hdr->flags = htons(myself->flags); hdr->flags = htons(myself->flags);
hdr->state = server.cluster->state; hdr->state = server.cluster->state;
@ -2274,9 +2319,9 @@ void clusterSendPing(clusterLink *link, int type) {
gossip->pong_received = htonl(this->pong_received); gossip->pong_received = htonl(this->pong_received);
memcpy(gossip->ip,this->ip,sizeof(this->ip)); memcpy(gossip->ip,this->ip,sizeof(this->ip));
gossip->port = htons(this->port); gossip->port = htons(this->port);
gossip->cport = htons(this->cport);
gossip->flags = htons(this->flags); gossip->flags = htons(this->flags);
gossip->notused1 = 0; gossip->notused1 = 0;
gossip->notused2 = 0;
gossipcount++; gossipcount++;
} }
@ -3100,7 +3145,7 @@ void clusterCron(void) {
clusterLink *link; clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip, fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->port+CLUSTER_PORT_INCR, NET_FIRST_BIND_ADDR); node->cport, NET_FIRST_BIND_ADDR);
if (fd == -1) { if (fd == -1) {
/* We got a synchronous error from connect before /* We got a synchronous error from connect before
* clusterSendPing() had a chance to be called. * clusterSendPing() had a chance to be called.
@ -3110,8 +3155,7 @@ void clusterCron(void) {
if (node->ping_sent == 0) node->ping_sent = mstime(); if (node->ping_sent == 0) node->ping_sent = mstime();
serverLog(LL_DEBUG, "Unable to connect to " serverLog(LL_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip, "Cluster Node [%s]:%d -> %s", node->ip,
node->port+CLUSTER_PORT_INCR, node->cport, server.neterr);
server.neterr);
continue; continue;
} }
link = createClusterLink(node); link = createClusterLink(node);
@ -3142,7 +3186,7 @@ void clusterCron(void) {
node->flags &= ~CLUSTER_NODE_MEET; node->flags &= ~CLUSTER_NODE_MEET;
serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d", serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
node->name, node->ip, node->port+CLUSTER_PORT_INCR); node->name, node->ip, node->cport);
} }
} }
dictReleaseIterator(di); dictReleaseIterator(di);
@ -3845,16 +3889,27 @@ void clusterCommand(client *c) {
return; return;
} }
if (!strcasecmp(c->argv[1]->ptr,"meet") && c->argc == 4) { if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
long long port; /* CLUSTER MEET <ip> <port> [cport] */
long long port, cport;
if (getLongLongFromObject(c->argv[3], &port) != C_OK) { if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
addReplyErrorFormat(c,"Invalid TCP port specified: %s", addReplyErrorFormat(c,"Invalid TCP base port specified: %s",
(char*)c->argv[3]->ptr); (char*)c->argv[3]->ptr);
return; return;
} }
if (clusterStartHandshake(c->argv[2]->ptr,port) == 0 && if (c->argc == 5) {
if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
addReplyErrorFormat(c,"Invalid TCP bus port specified: %s",
(char*)c->argv[4]->ptr);
return;
}
} else {
cport = port + CLUSTER_PORT_INCR;
}
if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
errno == EINVAL) errno == EINVAL)
{ {
addReplyErrorFormat(c,"Invalid node address specified: %s:%s", addReplyErrorFormat(c,"Invalid node address specified: %s:%s",

View File

@ -100,7 +100,8 @@ typedef struct clusterNode {
mstime_t orphaned_time; /* Starting time of orphaned master condition */ mstime_t orphaned_time; /* Starting time of orphaned master condition */
long long repl_offset; /* Last known repl offset for this node. */ long long repl_offset; /* Last known repl offset for this node. */
char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */ char ip[NET_IP_STR_LEN]; /* Latest known IP address of this node */
int port; /* Latest known port of this node */ int port; /* Latest known clients port of this node */
int cport; /* Latest known cluster port of this node. */
clusterLink *link; /* TCP/IP link with this node */ clusterLink *link; /* TCP/IP link with this node */
list *fail_reports; /* List of nodes signaling this as failing */ list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode; } clusterNode;
@ -171,10 +172,10 @@ typedef struct {
uint32_t ping_sent; uint32_t ping_sent;
uint32_t pong_received; uint32_t pong_received;
char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */ char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */
uint16_t port; /* port last time it was seen */ uint16_t port; /* base port last time it was seen */
uint16_t cport; /* cluster port last time it was seen */
uint16_t flags; /* node->flags copy */ uint16_t flags; /* node->flags copy */
uint16_t notused1; /* Some room for future improvements. */ uint32_t notused1;
uint32_t notused2;
} clusterMsgDataGossip; } clusterMsgDataGossip;
typedef struct { typedef struct {
@ -225,7 +226,7 @@ typedef struct {
char sig[4]; /* Siganture "RCmb" (Redis Cluster message bus). */ char sig[4]; /* Siganture "RCmb" (Redis Cluster message bus). */
uint32_t totlen; /* Total length of this message */ uint32_t totlen; /* Total length of this message */
uint16_t ver; /* Protocol version, currently set to 0. */ uint16_t ver; /* Protocol version, currently set to 0. */
uint16_t notused0; /* 2 bytes not used. */ uint16_t port; /* TCP base port number. */
uint16_t type; /* Message type */ uint16_t type; /* Message type */
uint16_t count; /* Only used for some kind of messages. */ uint16_t count; /* Only used for some kind of messages. */
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */ uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
@ -239,8 +240,8 @@ typedef struct {
char slaveof[CLUSTER_NAMELEN]; char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */ char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
char notused1[34]; /* 34 bytes reserved for future usage. */ char notused1[34]; /* 34 bytes reserved for future usage. */
uint16_t port; /* Sender TCP base port */ uint16_t cport; /* Sender TCP cluster bus port */
uint16_t flags; /* Sender node flags */ uint16_t flags; /* Sender node flags */
unsigned char state; /* Cluster state from the POV of the sender */ unsigned char state; /* Cluster state from the POV of the sender */
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */ unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
union clusterMsgData data; union clusterMsgData data;

View File

@ -517,6 +517,22 @@ void loadServerConfigFromString(char *config) {
} else if (!strcasecmp(argv[0],"cluster-announce-ip") && argc == 2) { } else if (!strcasecmp(argv[0],"cluster-announce-ip") && argc == 2) {
zfree(server.cluster_announce_ip); zfree(server.cluster_announce_ip);
server.cluster_announce_ip = zstrdup(argv[1]); server.cluster_announce_ip = zstrdup(argv[1]);
} else if (!strcasecmp(argv[0],"cluster-announce-port") && argc == 2) {
server.cluster_announce_port = atoi(argv[1]);
if (server.cluster_announce_port < 0 ||
server.cluster_announce_port > 65535)
{
err = "Invalid port"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"cluster-announce-bus-port") &&
argc == 2)
{
server.cluster_announce_bus_port = atoi(argv[1]);
if (server.cluster_announce_bus_port < 0 ||
server.cluster_announce_bus_port > 65535)
{
err = "Invalid port"; goto loaderr;
}
} else if (!strcasecmp(argv[0],"cluster-require-full-coverage") && } else if (!strcasecmp(argv[0],"cluster-require-full-coverage") &&
argc == 2) argc == 2)
{ {
@ -969,6 +985,10 @@ void configSetCommand(client *c) {
refreshGoodSlavesCount(); refreshGoodSlavesCount();
} config_set_numerical_field( } config_set_numerical_field(
"cluster-node-timeout",server.cluster_node_timeout,0,LLONG_MAX) { "cluster-node-timeout",server.cluster_node_timeout,0,LLONG_MAX) {
} config_set_numerical_field(
"cluster-announce-port",server.cluster_announce_port,0,65535) {
} config_set_numerical_field(
"cluster-announce-bus-port",server.cluster_announce_bus_port,0,65535) {
} config_set_numerical_field( } config_set_numerical_field(
"cluster-migration-barrier",server.cluster_migration_barrier,0,LLONG_MAX){ "cluster-migration-barrier",server.cluster_migration_barrier,0,LLONG_MAX){
} config_set_numerical_field( } config_set_numerical_field(
@ -1110,6 +1130,8 @@ void configGetCommand(client *c) {
config_get_numerical_field("slowlog-max-len", config_get_numerical_field("slowlog-max-len",
server.slowlog_max_len); server.slowlog_max_len);
config_get_numerical_field("port",server.port); config_get_numerical_field("port",server.port);
config_get_numerical_field("cluster-announce-port",server.cluster_announce_port);
config_get_numerical_field("cluster-announce-bus-port",server.cluster_announce_bus_port);
config_get_numerical_field("tcp-backlog",server.tcp_backlog); config_get_numerical_field("tcp-backlog",server.tcp_backlog);
config_get_numerical_field("databases",server.dbnum); config_get_numerical_field("databases",server.dbnum);
config_get_numerical_field("repl-ping-slave-period",server.repl_ping_slave_period); config_get_numerical_field("repl-ping-slave-period",server.repl_ping_slave_period);
@ -1799,6 +1821,8 @@ int rewriteConfig(char *path) {
rewriteConfigYesNoOption(state,"daemonize",server.daemonize,0); rewriteConfigYesNoOption(state,"daemonize",server.daemonize,0);
rewriteConfigStringOption(state,"pidfile",server.pidfile,CONFIG_DEFAULT_PID_FILE); rewriteConfigStringOption(state,"pidfile",server.pidfile,CONFIG_DEFAULT_PID_FILE);
rewriteConfigNumericalOption(state,"port",server.port,CONFIG_DEFAULT_SERVER_PORT); rewriteConfigNumericalOption(state,"port",server.port,CONFIG_DEFAULT_SERVER_PORT);
rewriteConfigNumericalOption(state,"cluster-announce-port",server.cluster_announce_port,CONFIG_DEFAULT_CLUSTER_ANNOUNCE_PORT);
rewriteConfigNumericalOption(state,"cluster-announce-bus-port",server.cluster_announce_bus_port,CONFIG_DEFAULT_CLUSTER_ANNOUNCE_BUS_PORT);
rewriteConfigNumericalOption(state,"tcp-backlog",server.tcp_backlog,CONFIG_DEFAULT_TCP_BACKLOG); rewriteConfigNumericalOption(state,"tcp-backlog",server.tcp_backlog,CONFIG_DEFAULT_TCP_BACKLOG);
rewriteConfigBindOption(state); rewriteConfigBindOption(state);
rewriteConfigStringOption(state,"unixsocket",server.unixsocket,NULL); rewriteConfigStringOption(state,"unixsocket",server.unixsocket,NULL);

View File

@ -1523,8 +1523,9 @@ void initServerConfig(void) {
server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY; server.cluster_slave_validity_factor = CLUSTER_DEFAULT_SLAVE_VALIDITY;
server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE; server.cluster_require_full_coverage = CLUSTER_DEFAULT_REQUIRE_FULL_COVERAGE;
server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE); server.cluster_configfile = zstrdup(CONFIG_DEFAULT_CLUSTER_CONFIG_FILE);
server.cluster_announce_ip = NULL; server.cluster_announce_ip = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_IP;
server.cluster_announce_port = 0; server.cluster_announce_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_PORT;
server.cluster_announce_bus_port = CONFIG_DEFAULT_CLUSTER_ANNOUNCE_BUS_PORT;
server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL); server.migrate_cached_sockets = dictCreate(&migrateCacheDictType,NULL);
server.next_client_id = 1; /* Client IDs, start from 1 .*/ server.next_client_id = 1; /* Client IDs, start from 1 .*/
server.loading_process_events_interval_bytes = (1024*1024*2); server.loading_process_events_interval_bytes = (1024*1024*2);

View File

@ -108,6 +108,9 @@ typedef long long mstime_t; /* millisecond time type. */
#define CONFIG_DEFAULT_PID_FILE "/var/run/redis.pid" #define CONFIG_DEFAULT_PID_FILE "/var/run/redis.pid"
#define CONFIG_DEFAULT_SYSLOG_IDENT "redis" #define CONFIG_DEFAULT_SYSLOG_IDENT "redis"
#define CONFIG_DEFAULT_CLUSTER_CONFIG_FILE "nodes.conf" #define CONFIG_DEFAULT_CLUSTER_CONFIG_FILE "nodes.conf"
#define CONFIG_DEFAULT_CLUSTER_ANNOUNCE_IP NULL /* Auto detect. */
#define CONFIG_DEFAULT_CLUSTER_ANNOUNCE_PORT 0 /* Use server.port */
#define CONFIG_DEFAULT_CLUSTER_ANNOUNCE_BUS_PORT 0 /* Use +10000 offset. */
#define CONFIG_DEFAULT_DAEMONIZE 0 #define CONFIG_DEFAULT_DAEMONIZE 0
#define CONFIG_DEFAULT_UNIX_SOCKET_PERM 0 #define CONFIG_DEFAULT_UNIX_SOCKET_PERM 0
#define CONFIG_DEFAULT_TCP_KEEPALIVE 0 #define CONFIG_DEFAULT_TCP_KEEPALIVE 0
@ -947,8 +950,9 @@ struct redisServer {
int cluster_slave_validity_factor; /* Slave max data age for failover. */ int cluster_slave_validity_factor; /* Slave max data age for failover. */
int cluster_require_full_coverage; /* If true, put the cluster down if int cluster_require_full_coverage; /* If true, put the cluster down if
there is at least an uncovered slot.*/ there is at least an uncovered slot.*/
char *cluster_announce_ip; /* cluster-announce-ip option. */ char *cluster_announce_ip; /* IP address to announce on cluster bus. */
int cluster_announce_port; /* cluster-announce-port option. */ int cluster_announce_port; /* base port to announce on cluster bus. */
int cluster_announce_bus_port; /* bus port to announce on cluster bus. */
/* Scripting */ /* Scripting */
lua_State *lua; /* The Lua interpreter. We use just one for all clients */ lua_State *lua; /* The Lua interpreter. We use just one for all clients */
client *lua_client; /* The "fake client" to query Redis from Lua */ client *lua_client; /* The "fake client" to query Redis from Lua */