Sharded pubsub implementation (#8621)

This commit implements a sharded pubsub implementation based off of shard channels.

Co-authored-by: Harkrishn Patro <harkrisp@amazon.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
This commit is contained in:
Harkrishn Patro 2022-01-03 01:54:47 +01:00 committed by GitHub
parent b8ba942ac2
commit 9f8885760b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1343 additions and 106 deletions

View File

@ -1613,6 +1613,15 @@ lua-time-limit 5000
#
# cluster-allow-reads-when-down no
# This option, when set to yes, allows nodes to serve pubsub shard traffic while the
# the cluster is in a down state, as long as it believes it owns the slots.
#
# This is useful if the application would like to use the pubsub feature even when
# the cluster global stable state is not OK. If the application wants to make sure only
# one shard is serving a given channel, this feature should be kept as yes.
#
# cluster-allow-pubsubshard-when-down yes
# Cluster link send buffer limit is the limit on the memory usage of an individual
# cluster bus link's send buffer in bytes. Cluster links would be freed if they exceed
# this limit. This is to primarily prevent send buffers from growing unbounded on links

View File

@ -1307,8 +1307,11 @@ int ACLCheckCommandPerm(const user *u, struct redisCommand *cmd, robj **argv, in
}
/* Check if the user can execute commands explicitly touching the keys
* mentioned in the command arguments. */
* mentioned in the command arguments. Shard channels are treated as
* special keys for client library to rely on `COMMAND` command
* to discover the node to connect to. These don't need acl key check. */
if (!(u->flags & USER_FLAG_ALLKEYS) &&
!(cmd->flags & CMD_PUBSUB) &&
(cmd->getkeys_proc || cmd->key_specs_num))
{
getKeysResult result = GETKEYS_RESULT_INIT;
@ -1392,6 +1395,7 @@ void ACLKillPubsubClientsIfNeeded(user *u, list *upcoming) {
}
/* Check for channel violations. */
if (!kill) {
/* Check for global channels violation. */
dictIterator *di = dictGetIterator(c->pubsub_channels);
dictEntry *de;
while (!kill && ((de = dictNext(di)) != NULL)) {
@ -1400,6 +1404,16 @@ void ACLKillPubsubClientsIfNeeded(user *u, list *upcoming) {
ACL_DENIED_CHANNEL);
}
dictReleaseIterator(di);
/* Check for shard channels violation. */
di = dictGetIterator(c->pubsubshard_channels);
while (!kill && ((de = dictNext(di)) != NULL)) {
o = dictGetKey(de);
kill = (ACLCheckPubsubChannelPerm(o->ptr,upcoming,0) ==
ACL_DENIED_CHANNEL);
}
dictReleaseIterator(di);
}
/* Kill it. */
@ -1448,9 +1462,9 @@ int ACLCheckAllUserCommandPerm(const user *u, struct redisCommand *cmd, robj **a
int acl_retval = ACLCheckCommandPerm(u,cmd,argv,argc,idxptr);
if (acl_retval != ACL_OK)
return acl_retval;
if (cmd->proc == publishCommand)
if (cmd->proc == publishCommand || cmd->proc == spublishCommand)
acl_retval = ACLCheckPubsubPerm(u,argv,1,1,0,idxptr);
else if (cmd->proc == subscribeCommand)
else if (cmd->proc == subscribeCommand || cmd->proc == ssubscribeCommand)
acl_retval = ACLCheckPubsubPerm(u,argv,1,argc-1,0,idxptr);
else if (cmd->proc == psubscribeCommand)
acl_retval = ACLCheckPubsubPerm(u,argv,1,argc-1,1,idxptr);

View File

@ -57,6 +57,7 @@ void clusterUpdateState(void);
int clusterNodeGetSlotBit(clusterNode *n, int slot);
sds clusterGenNodesDescription(int filter, int use_pport);
clusterNode *clusterLookupNode(const char *name);
list *clusterGetNodesServingMySlots(clusterNode *node);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
@ -77,7 +78,9 @@ uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
const char *clusterGetMessageTypeString(int type);
void removeChannelsInSlot(unsigned int slot);
unsigned int countKeysInSlot(unsigned int hashslot);
unsigned int countChannelsInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot);
/* Links to the next and previous entries for keys in the same slot are stored
@ -631,6 +634,9 @@ void clusterInit(void) {
/* Initialize data for the Slot to key API. */
slotToKeyInit(server.db);
/* The slots -> channels map is a radix tree. Initialize it here. */
server.cluster->slots_to_channels = raxNew();
/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
deriveAnnouncedPorts(&myself->port, &myself->pport, &myself->cport);
@ -1146,6 +1152,17 @@ clusterNode *clusterLookupNode(const char *name) {
return dictGetVal(de);
}
/* Get all the nodes serving the same slots as myself. */
list *clusterGetNodesServingMySlots(clusterNode *node) {
list *nodes_for_slot = listCreate();
clusterNode *my_primary = nodeIsMaster(node) ? node : node->slaveof;
listAddNodeTail(nodes_for_slot, my_primary);
for (int i=0; i < my_primary->numslaves; i++) {
listAddNodeTail(nodes_for_slot, my_primary->slaves[i]);
}
return nodes_for_slot;
}
/* This is only used after the handshake. When we connect a given IP/PORT
* as a result of CLUSTER MEET we don't have the node name yet, so we
* pick a random one, and will fix it when we receive the PONG request using
@ -1921,7 +1938,7 @@ int clusterProcessPacket(clusterLink *link) {
explen += sizeof(clusterMsgDataFail);
if (totlen != explen) return 1;
} else if (type == CLUSTERMSG_TYPE_PUBLISH) {
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
uint32_t explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) -
@ -2278,7 +2295,7 @@ int clusterProcessPacket(clusterLink *link) {
"Ignoring FAIL message from unknown node %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
}
} else if (type == CLUSTERMSG_TYPE_PUBLISH) {
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
if (!sender) return 1; /* We don't know that node. */
robj *channel, *message;
@ -2286,8 +2303,10 @@ int clusterProcessPacket(clusterLink *link) {
/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
if (dictSize(server.pubsub_channels) ||
dictSize(server.pubsub_patterns))
if ((type == CLUSTERMSG_TYPE_PUBLISH
&& serverPubsubSubscriptionCount() > 0)
|| (type == CLUSTERMSG_TYPE_PUBLISHSHARD
&& serverPubsubShardSubscriptionCount() > 0))
{
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);
@ -2296,7 +2315,11 @@ int clusterProcessPacket(clusterLink *link) {
message = createStringObject(
(char*)hdr->data.publish.msg.bulk_data+channel_len,
message_len);
pubsubPublishMessage(channel,message);
if (type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
pubsubPublishMessageShard(channel, message);
} else {
pubsubPublishMessage(channel,message);
}
decrRefCount(channel);
decrRefCount(message);
}
@ -2841,7 +2864,7 @@ void clusterBroadcastPong(int target) {
* the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
* positive in this context. */
REDIS_NO_SANITIZE("bounds")
void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_t type) {
unsigned char *payload;
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
@ -2853,7 +2876,7 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
channel_len = sdslen(channel->ptr);
message_len = sdslen(message->ptr);
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
clusterBuildMessageHdr(hdr,type);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
@ -2976,7 +2999,28 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin
* messages to hosts without receives for a given channel.
* -------------------------------------------------------------------------- */
void clusterPropagatePublish(robj *channel, robj *message) {
clusterSendPublish(NULL, channel, message);
clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH);
}
/* -----------------------------------------------------------------------------
* CLUSTER Pub/Sub shard support
*
* Publish this message across the slot (primary/replica).
* -------------------------------------------------------------------------- */
void clusterPropagatePublishShard(robj *channel, robj *message) {
list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
if (listLength(nodes_for_slot) != 0) {
listIter li;
listNode *ln;
listRewind(nodes_for_slot, &li);
while((ln = listNext(&li))) {
clusterNode *node = listNodeValue(ln);
if (node != myself) {
clusterSendPublish(node->link, channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
}
}
}
listRelease(nodes_for_slot);
}
/* -----------------------------------------------------------------------------
@ -4075,6 +4119,14 @@ int clusterDelSlot(int slot) {
clusterNode *n = server.cluster->slots[slot];
if (!n) return C_ERR;
/* Cleanup the channels in master/replica as part of slot deletion. */
list *nodes_for_slot = clusterGetNodesServingMySlots(n);
listNode *ln = listSearchKey(nodes_for_slot, myself);
if (ln != NULL) {
removeChannelsInSlot(slot);
}
listRelease(nodes_for_slot);
serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
server.cluster->slots[slot] = NULL;
return C_OK;
@ -4574,6 +4626,7 @@ const char *clusterGetMessageTypeString(int type) {
case CLUSTERMSG_TYPE_MEET: return "meet";
case CLUSTERMSG_TYPE_FAIL: return "fail";
case CLUSTERMSG_TYPE_PUBLISH: return "publish";
case CLUSTERMSG_TYPE_PUBLISHSHARD: return "publishshard";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
case CLUSTERMSG_TYPE_UPDATE: return "update";
@ -5362,6 +5415,30 @@ NULL
}
}
void removeChannelsInSlot(unsigned int slot) {
unsigned int channelcount = countChannelsInSlot(slot);
if (channelcount == 0) return;
/* Retrieve all the channels for the slot. */
robj **channels = zmalloc(sizeof(robj*)*channelcount);
raxIterator iter;
int j = 0;
unsigned char indexed[2];
indexed[0] = (slot >> 8) & 0xff;
indexed[1] = slot & 0xff;
raxStart(&iter,server.cluster->slots_to_channels);
raxSeek(&iter,">=",indexed,2);
while(raxNext(&iter)) {
if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
channels[j++] = createStringObject((char*)iter.key + 2, iter.key_len - 2);
}
raxStop(&iter);
pubsubUnsubscribeShardChannels(channels, channelcount);
zfree(channels);
}
/* -----------------------------------------------------------------------------
* DUMP, RESTORE and MIGRATE commands
* -------------------------------------------------------------------------- */
@ -6121,6 +6198,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
mc.cmd = cmd;
}
int is_pubsubshard = cmd->proc == ssubscribeCommand ||
cmd->proc == sunsubscribeCommand ||
cmd->proc == spublishCommand;
/* Check that all the keys are in the same hash slot, and obtain this
* slot and the node associated. */
for (i = 0; i < ms->count; i++) {
@ -6172,8 +6253,8 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
importing_slot = 1;
}
} else {
/* If it is not the first key, make sure it is exactly
* the same key as the first we saw. */
/* If it is not the first key/channel, make sure it is exactly
* the same key/channel as the first we saw. */
if (!equalStringObjects(firstkey,thiskey)) {
if (slot != thisslot) {
/* Error: multiple keys from different slots. */
@ -6183,15 +6264,20 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
return NULL;
} else {
/* Flag this request as one with multiple different
* keys. */
* keys/channels. */
multiple_keys = 1;
}
}
}
/* Migrating / Importing slot? Count keys we don't have. */
/* Migrating / Importing slot? Count keys we don't have.
* If it is pubsubshard command, it isn't required to check
* the channel being present or not in the node during the
* slot migration, the channel will be served from the source
* node until the migration completes with CLUSTER SETSLOT <slot>
* NODE <node-id>. */
int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY;
if ((migrating_slot || importing_slot) &&
if ((migrating_slot || importing_slot) && !is_pubsubshard &&
lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL)
{
missing_keys++;
@ -6207,7 +6293,12 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
/* Cluster is globally down but we got keys? We only serve the request
* if it is a read command and when allow_reads_when_down is enabled. */
if (server.cluster->state != CLUSTER_OK) {
if (!server.cluster_allow_reads_when_down) {
if (is_pubsubshard) {
if (!server.cluster_allow_pubsubshard_when_down) {
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
return NULL;
}
} else if (!server.cluster_allow_reads_when_down) {
/* The cluster is configured to block commands when the
* cluster is down. */
if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
@ -6259,7 +6350,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in
* is serving, we can reply without redirection. */
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
if (c->flags & CLIENT_READONLY &&
if (((c->flags & CLIENT_READONLY) || is_pubsubshard) &&
!is_write_command &&
nodeIsSlave(myself) &&
myself->slaveof == n)
@ -6482,3 +6573,51 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int countKeysInSlot(unsigned int hashslot) {
return (*server.db->slots_to_keys).by_slot[hashslot].count;
}
/* -----------------------------------------------------------------------------
* Operation(s) on channel rax tree.
* -------------------------------------------------------------------------- */
void slotToChannelUpdate(sds channel, int add) {
size_t keylen = sdslen(channel);
unsigned int hashslot = keyHashSlot(channel,keylen);
unsigned char buf[64];
unsigned char *indexed = buf;
if (keylen+2 > 64) indexed = zmalloc(keylen+2);
indexed[0] = (hashslot >> 8) & 0xff;
indexed[1] = hashslot & 0xff;
memcpy(indexed+2,channel,keylen);
if (add) {
raxInsert(server.cluster->slots_to_channels,indexed,keylen+2,NULL,NULL);
} else {
raxRemove(server.cluster->slots_to_channels,indexed,keylen+2,NULL);
}
if (indexed != buf) zfree(indexed);
}
void slotToChannelAdd(sds channel) {
slotToChannelUpdate(channel,1);
}
void slotToChannelDel(sds channel) {
slotToChannelUpdate(channel,0);
}
/* Get the count of the channels for a given slot. */
unsigned int countChannelsInSlot(unsigned int hashslot) {
raxIterator iter;
int j = 0;
unsigned char indexed[2];
indexed[0] = (hashslot >> 8) & 0xff;
indexed[1] = hashslot & 0xff;
raxStart(&iter,server.cluster->slots_to_channels);
raxSeek(&iter,">=",indexed,2);
while(raxNext(&iter)) {
if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
j++;
}
raxStop(&iter);
return j;
}

View File

@ -97,6 +97,7 @@ typedef struct clusterLink {
#define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */
#define CLUSTERMSG_TYPE_MODULE 9 /* Module cluster API message. */
#define CLUSTERMSG_TYPE_COUNT 10 /* Total number of message types. */
#define CLUSTERMSG_TYPE_PUBLISHSHARD 11 /* Pub/Sub Publish shard propagation */
/* Flags that a module can set in order to prevent certain Redis Cluster
* features to be enabled. Useful when implementing a different distributed
@ -173,6 +174,7 @@ typedef struct clusterState {
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
rax *slots_to_channels;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
@ -320,6 +322,7 @@ int verifyClusterConfigWithData(void);
unsigned long getClusterConnectionsCount(void);
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len);
void clusterPropagatePublish(robj *channel, robj *message);
void clusterPropagatePublishShard(robj *channel, robj *message);
unsigned int keyHashSlot(char *key, int keylen);
void slotToKeyAddEntry(dictEntry *entry, redisDb *db);
void slotToKeyDelEntry(dictEntry *entry, redisDb *db);
@ -329,5 +332,7 @@ void slotToKeyFlush(redisDb *db);
void slotToKeyDestroy(redisDb *db);
void clusterUpdateMyselfFlags(void);
void clusterUpdateMyselfIp(void);
void slotToChannelAdd(sds channel);
void slotToChannelDel(sds channel);
#endif /* __CLUSTER_H */

View File

@ -2913,12 +2913,36 @@ struct redisCommandArg PUBSUB_NUMSUB_Args[] = {
{0}
};
/********** PUBSUB SHARDCHANNELS ********************/
/* PUBSUB SHARDCHANNELS history */
#define PUBSUB_SHARDCHANNELS_History NULL
/* PUBSUB SHARDCHANNELS hints */
#define PUBSUB_SHARDCHANNELS_Hints NULL
/* PUBSUB SHARDCHANNELS argument table */
struct redisCommandArg PUBSUB_SHARDCHANNELS_Args[] = {
{"pattern",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL},
{0}
};
/********** PUBSUB SHARDNUMSUB ********************/
/* PUBSUB SHARDNUMSUB history */
#define PUBSUB_SHARDNUMSUB_History NULL
/* PUBSUB SHARDNUMSUB hints */
#define PUBSUB_SHARDNUMSUB_Hints NULL
/* PUBSUB command table */
struct redisCommand PUBSUB_Subcommands[] = {
{"channels","List active channels","O(N) where N is the number of active channels, and assuming constant time pattern matching (relatively short channels and patterns)","2.8.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_CHANNELS_History,PUBSUB_CHANNELS_Hints,pubsubCommand,-2,CMD_PUBSUB|CMD_LOADING|CMD_STALE,0,.args=PUBSUB_CHANNELS_Args},
{"help","Show helpful text about the different subcommands","O(1)","6.2.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_HELP_History,PUBSUB_HELP_Hints,pubsubCommand,2,CMD_LOADING|CMD_STALE,0},
{"numpat","Get the count of unique patterns pattern subscriptions","O(1)","2.8.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_NUMPAT_History,PUBSUB_NUMPAT_Hints,pubsubCommand,2,CMD_PUBSUB|CMD_LOADING|CMD_STALE,0},
{"numsub","Get the count of subscribers for channels","O(N) for the NUMSUB subcommand, where N is the number of requested channels","2.8.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_NUMSUB_History,PUBSUB_NUMSUB_Hints,pubsubCommand,-2,CMD_PUBSUB|CMD_LOADING|CMD_STALE,0,.args=PUBSUB_NUMSUB_Args},
{"shardchannels","List active shard channels","O(N) where N is the number of active shard channels, and assuming constant time pattern matching (relatively short channels).","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_SHARDCHANNELS_History,PUBSUB_SHARDCHANNELS_Hints,pubsubCommand,-2,CMD_PUBSUB|CMD_LOADING|CMD_STALE,0,.args=PUBSUB_SHARDCHANNELS_Args},
{"shardnumsub","Get the count of subscribers for shard channels","O(N) for the SHARDNUMSUB subcommand, where N is the number of requested channels","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_SHARDNUMSUB_History,PUBSUB_SHARDNUMSUB_Hints,pubsubCommand,-2,CMD_PUBSUB|CMD_LOADING|CMD_STALE,0},
{0}
};
@ -2944,6 +2968,35 @@ struct redisCommandArg PUNSUBSCRIBE_Args[] = {
{0}
};
/********** SPUBLISH ********************/
/* SPUBLISH history */
#define SPUBLISH_History NULL
/* SPUBLISH hints */
#define SPUBLISH_Hints NULL
/* SPUBLISH argument table */
struct redisCommandArg SPUBLISH_Args[] = {
{"channel",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"message",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{0}
};
/********** SSUBSCRIBE ********************/
/* SSUBSCRIBE history */
#define SSUBSCRIBE_History NULL
/* SSUBSCRIBE hints */
#define SSUBSCRIBE_Hints NULL
/* SSUBSCRIBE argument table */
struct redisCommandArg SSUBSCRIBE_Args[] = {
{"channel",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_MULTIPLE},
{0}
};
/********** SUBSCRIBE ********************/
/* SUBSCRIBE history */
@ -2961,6 +3014,20 @@ struct redisCommandArg SUBSCRIBE_Args[] = {
{0}
};
/********** SUNSUBSCRIBE ********************/
/* SUNSUBSCRIBE history */
#define SUNSUBSCRIBE_History NULL
/* SUNSUBSCRIBE hints */
#define SUNSUBSCRIBE_Hints NULL
/* SUNSUBSCRIBE argument table */
struct redisCommandArg SUNSUBSCRIBE_Args[] = {
{"channel",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL|CMD_ARG_MULTIPLE},
{0}
};
/********** UNSUBSCRIBE ********************/
/* UNSUBSCRIBE history */
@ -6511,7 +6578,10 @@ struct redisCommand redisCommandTable[] = {
{"publish","Post a message to a channel","O(N+M) where N is the number of clients subscribed to the receiving channel and M is the total number of subscribed patterns (by any client).","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBLISH_History,PUBLISH_Hints,publishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE|CMD_SENTINEL,0,.args=PUBLISH_Args},
{"pubsub","A container for Pub/Sun commands","Depends on subcommand.","2.8.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUBSUB_History,PUBSUB_Hints,NULL,-2,0,0,.subcommands=PUBSUB_Subcommands},
{"punsubscribe","Stop listening for messages posted to channels matching the given patterns","O(N+M) where N is the number of patterns the client is already subscribed and M is the number of total patterns subscribed in the system (by any client).","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,PUNSUBSCRIBE_History,PUNSUBSCRIBE_Hints,punsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=PUNSUBSCRIBE_Args},
{"spublish","Post a message to a shard channel","O(N) where N is the number of clients subscribed to the receiving shard channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,SPUBLISH_History,SPUBLISH_Hints,spublishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE,0,{{CMD_KEY_SHARD_CHANNEL,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={0,1,0}}},.args=SPUBLISH_Args},
{"ssubscribe","Listen for messages published to the given shard channels","O(N) where N is the number of shard channels to subscribe to.","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,SSUBSCRIBE_History,SSUBSCRIBE_Hints,ssubscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,{{CMD_KEY_SHARD_CHANNEL,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=SSUBSCRIBE_Args},
{"subscribe","Listen for messages published to the given channels","O(N) where N is the number of channels to subscribe to.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,SUBSCRIBE_History,SUBSCRIBE_Hints,subscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=SUBSCRIBE_Args},
{"sunsubscribe","Stop listening for messages posted to the given shard channels","O(N) where N is the number of clients already subscribed to a channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,SUNSUBSCRIBE_History,SUNSUBSCRIBE_Hints,sunsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,{{CMD_KEY_SHARD_CHANNEL,KSPEC_BS_INDEX,.bs.index={1},KSPEC_FK_RANGE,.fk.range={-1,1,0}}},.args=SUNSUBSCRIBE_Args},
{"unsubscribe","Stop listening for messages posted to the given channels","O(N) where N is the number of clients already subscribed to a channel.","2.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_PUBSUB,UNSUBSCRIBE_History,UNSUBSCRIBE_Hints,unsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,.args=UNSUBSCRIBE_Args},
/* scripting */
{"eval","Execute a Lua script server side","Depends on the script that is executed.","2.6.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,EVAL_History,EVAL_Hints,evalCommand,-3,CMD_NOSCRIPT|CMD_SKIP_MONITOR|CMD_MAY_REPLICATE|CMD_NO_MANDATORY_KEYS,ACL_CATEGORY_SCRIPTING,{{CMD_KEY_WRITE|CMD_KEY_READ,KSPEC_BS_INDEX,.bs.index={2},KSPEC_FK_KEYNUM,.fk.keynum={0,1,1}}},evalGetKeys,.args=EVAL_Args},

View File

@ -0,0 +1,23 @@
{
"SHARDCHANNELS": {
"summary": "List active shard channels",
"complexity": "O(N) where N is the number of active shard channels, and assuming constant time pattern matching (relatively short channels).",
"group": "pubsub",
"since": "7.0.0",
"arity": -2,
"container": "PUBSUB",
"function": "pubsubCommand",
"command_flags": [
"PUBSUB",
"LOADING",
"STALE"
],
"arguments": [
{
"name": "pattern",
"type": "string",
"optional": true
}
]
}
}

View File

@ -0,0 +1,16 @@
{
"SHARDNUMSUB": {
"summary": "Get the count of subscribers for shard channels",
"complexity": "O(N) for the SHARDNUMSUB subcommand, where N is the number of requested channels",
"group": "pubsub",
"since": "7.0.0",
"arity": -2,
"container": "PUBSUB",
"function": "pubsubCommand",
"command_flags": [
"PUBSUB",
"LOADING",
"STALE"
]
}
}

View File

@ -0,0 +1,46 @@
{
"SPUBLISH": {
"summary": "Post a message to a shard channel",
"complexity": "O(N) where N is the number of clients subscribed to the receiving shard channel.",
"group": "pubsub",
"since": "7.0.0",
"arity": 3,
"function": "spublishCommand",
"command_flags": [
"PUBSUB",
"LOADING",
"STALE",
"FAST",
"MAY_REPLICATE"
],
"arguments": [
{
"name": "channel",
"type": "string"
},
{
"name": "message",
"type": "string"
}
],
"key_specs": [
{
"flags": [
"SHARD_CHANNEL"
],
"begin_search": {
"index": {
"pos": 1
}
},
"find_keys": {
"range": {
"lastkey": 0,
"step": 1,
"limit": 0
}
}
}
]
}
}

View File

@ -0,0 +1,42 @@
{
"SSUBSCRIBE": {
"summary": "Listen for messages published to the given shard channels",
"complexity": "O(N) where N is the number of shard channels to subscribe to.",
"group": "pubsub",
"since": "7.0.0",
"arity": -2,
"function": "ssubscribeCommand",
"command_flags": [
"PUBSUB",
"NOSCRIPT",
"LOADING",
"STALE"
],
"arguments": [
{
"name": "channel",
"type": "string",
"multiple": true
}
],
"key_specs": [
{
"flags": [
"SHARD_CHANNEL"
],
"begin_search": {
"index": {
"pos": 1
}
},
"find_keys": {
"range": {
"lastkey": -1,
"step": 1,
"limit": 0
}
}
}
]
}
}

View File

@ -0,0 +1,43 @@
{
"SUNSUBSCRIBE": {
"summary": "Stop listening for messages posted to the given shard channels",
"complexity": "O(N) where N is the number of clients already subscribed to a channel.",
"group": "pubsub",
"since": "7.0.0",
"arity": -1,
"function": "sunsubscribeCommand",
"command_flags": [
"PUBSUB",
"NOSCRIPT",
"LOADING",
"STALE"
],
"arguments": [
{
"name": "channel",
"type": "string",
"optional": true,
"multiple": true
}
],
"key_specs": [
{
"flags": [
"SHARD_CHANNEL"
],
"begin_search": {
"index": {
"pos": 1
}
},
"find_keys": {
"range": {
"lastkey": -1,
"step": 1,
"limit": 0
}
}
}
]
}
}

View File

@ -2636,6 +2636,7 @@ standardConfig configs[] = {
createBoolConfig("cluster-enabled", NULL, IMMUTABLE_CONFIG, server.cluster_enabled, 0, NULL, NULL),
createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG | DENY_LOADING_CONFIG, server.aof_enabled, 0, NULL, updateAppendonly),
createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, server.cluster_allow_reads_when_down, 0, NULL, NULL),
createBoolConfig("cluster-allow-pubsubshard-when-down", NULL, MODIFIABLE_CONFIG, server.cluster_allow_pubsubshard_when_down, 1, NULL, NULL),
createBoolConfig("crash-log-enabled", NULL, MODIFIABLE_CONFIG, server.crashlog_enabled, 1, NULL, updateSighandlerEnabled),
createBoolConfig("crash-memcheck-enabled", NULL, MODIFIABLE_CONFIG, server.memcheck_enabled, 1, NULL, NULL),
createBoolConfig("use-exit-on-panic", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, server.use_exit_on_panic, 0, NULL, NULL),

View File

@ -790,6 +790,7 @@ int64_t commandKeySpecsFlagsFromString(const char *s) {
char *t = tokens[j];
if (!strcasecmp(t,"write")) flags |= CMD_KEY_WRITE;
else if (!strcasecmp(t,"read")) flags |= CMD_KEY_READ;
else if (!strcasecmp(t,"shard_channel")) flags |= CMD_KEY_SHARD_CHANNEL;
else if (!strcasecmp(t,"incomplete")) flags |= CMD_KEY_INCOMPLETE;
else break;
}

View File

@ -190,6 +190,7 @@ client *createClient(connection *conn) {
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType);
c->pubsub_patterns = listCreate();
c->pubsubshard_channels = dictCreate(&objectKeyPointerValueDictType);
c->peerid = NULL;
c->sockname = NULL;
c->client_list_node = NULL;
@ -1424,9 +1425,11 @@ void freeClient(client *c) {
/* Unsubscribe from all the pubsub channels */
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0);
dictRelease(c->pubsub_channels);
listRelease(c->pubsub_patterns);
dictRelease(c->pubsubshard_channels);
/* Free data structures. */
listRelease(c->reply);
@ -2592,6 +2595,7 @@ void resetCommand(client *c) {
discardTransaction(c);
pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0);
if (c->name) {

View File

@ -30,8 +30,68 @@
#include "server.h"
#include "cluster.h"
/* Structure to hold the pubsub related metadata. Currently used
* for pubsub and pubsubshard feature. */
typedef struct pubsubtype {
int shard;
dict *(*clientPubSubChannels)(client*);
int (*subscriptionCount)(client*);
dict **serverPubSubChannels;
robj **subscribeMsg;
robj **unsubscribeMsg;
}pubsubtype;
/*
* Get client's global Pub/Sub channels subscription count.
*/
int clientSubscriptionsCount(client *c);
/*
* Get client's shard level Pub/Sub channels subscription count.
*/
int clientShardSubscriptionsCount(client *c);
/*
* Get client's global Pub/Sub channels dict.
*/
dict* getClientPubSubChannels(client *c);
/*
* Get client's shard level Pub/Sub channels dict.
*/
dict* getClientPubSubShardChannels(client *c);
/*
* Get list of channels client is subscribed to.
* If a pattern is provided, the subset of channels is returned
* matching the pattern.
*/
void channelList(client *c, sds pat, dict* pubsub_channels);
/*
* Pub/Sub type for global channels.
*/
pubsubtype pubSubType = {
.shard = 0,
.clientPubSubChannels = getClientPubSubChannels,
.subscriptionCount = clientSubscriptionsCount,
.serverPubSubChannels = &server.pubsub_channels,
.subscribeMsg = &shared.subscribebulk,
.unsubscribeMsg = &shared.unsubscribebulk,
};
/*
* Pub/Sub type for shard level channels bounded to a slot.
*/
pubsubtype pubSubShardType = {
.shard = 1,
.clientPubSubChannels = getClientPubSubShardChannels,
.subscriptionCount = clientShardSubscriptionsCount,
.serverPubSubChannels = &server.pubsubshard_channels,
.subscribeMsg = &shared.ssubscribebulk,
.unsubscribeMsg = &shared.sunsubscribebulk
};
/*-----------------------------------------------------------------------------
* Pubsub client replies API
*----------------------------------------------------------------------------*/
@ -66,31 +126,31 @@ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
}
/* Send the pubsub subscription notification to the client. */
void addReplyPubsubSubscribed(client *c, robj *channel) {
void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,shared.subscribebulk);
addReply(c,*type.subscribeMsg);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
addReplyLongLong(c,type.subscriptionCount(c));
}
/* Send the pubsub unsubscription notification to the client.
* Channel can be NULL: this is useful when the client sends a mass
* unsubscribe command but there are no channels to unsubscribe from: we
* still send a notification. */
void addReplyPubsubUnsubscribed(client *c, robj *channel) {
void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,shared.unsubscribebulk);
addReply(c, *type.unsubscribeMsg);
if (channel)
addReplyBulk(c,channel);
else
addReplyNull(c);
addReplyLongLong(c,clientSubscriptionsCount(c));
addReplyLongLong(c,type.subscriptionCount(c));
}
/* Send the pubsub pattern subscription notification to the client. */
@ -125,28 +185,57 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
* Pubsub low level API
*----------------------------------------------------------------------------*/
/* Return the number of pubsub channels + patterns is handled. */
int serverPubsubSubscriptionCount() {
return dictSize(server.pubsub_channels) + dictSize(server.pubsub_patterns);
}
/* Return the number of pubsub shard level channels is handled. */
int serverPubsubShardSubscriptionCount() {
return dictSize(server.pubsubshard_channels);
}
/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
return dictSize(c->pubsub_channels)+
listLength(c->pubsub_patterns);
return dictSize(c->pubsub_channels) + listLength(c->pubsub_patterns);
}
/* Return the number of shard level channels a client is subscribed to. */
int clientShardSubscriptionsCount(client *c) {
return dictSize(c->pubsubshard_channels);
}
dict* getClientPubSubChannels(client *c) {
return c->pubsub_channels;
}
dict* getClientPubSubShardChannels(client *c) {
return c->pubsubshard_channels;
}
/* Return the number of pubsub + pubsub shard level channels
* a client is subscribed to. */
int clientTotalPubSubSubscriptionCount(client *c) {
return clientSubscriptionsCount(c) + clientShardSubscriptionsCount(c);
}
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel) {
int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
/* Add the channel to the client -> channels hash table */
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
de = dictFind(server.pubsub_channels,channel);
de = dictFind(*type.serverPubSubChannels, channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
dictAdd(*type.serverPubSubChannels, channel, clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
@ -154,13 +243,13 @@ int pubsubSubscribeChannel(client *c, robj *channel) {
listAddNodeTail(clients,c);
}
/* Notify the client */
addReplyPubsubSubscribed(c,channel);
addReplyPubsubSubscribed(c,channel,type);
return retval;
}
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) {
dictEntry *de;
list *clients;
listNode *ln;
@ -169,10 +258,10 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
/* Remove the channel from the client -> channels hash table */
incrRefCount(channel); /* channel may be just a pointer to the same object
we have in the hash tables. Protect it... */
if (dictDelete(c->pubsub_channels,channel) == DICT_OK) {
if (dictDelete(type.clientPubSubChannels(c),channel) == DICT_OK) {
retval = 1;
/* Remove the client from the channel -> clients list hash table */
de = dictFind(server.pubsub_channels,channel);
de = dictFind(*type.serverPubSubChannels, channel);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
@ -182,15 +271,53 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
/* Free the list and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
dictDelete(server.pubsub_channels,channel);
dictDelete(*type.serverPubSubChannels, channel);
/* As this channel isn't subscribed by anyone, it's safe
* to remove the channel from the slot. */
if (server.cluster_enabled & type.shard) {
slotToChannelDel(channel->ptr);
}
}
}
/* Notify the client */
if (notify) addReplyPubsubUnsubscribed(c,channel);
if (notify) {
addReplyPubsubUnsubscribed(c,channel,type);
}
decrRefCount(channel); /* it is finally safe to release it */
return retval;
}
void pubsubShardUnsubscribeAllClients(robj *channel) {
int retval;
dictEntry *de = dictFind(server.pubsubshard_channels, channel);
serverAssertWithInfo(NULL,channel,de != NULL);
list *clients = dictGetVal(de);
if (listLength(clients) > 0) {
/* For each client subscribed to the channel, unsubscribe it. */
listIter li;
listNode *ln;
listRewind(clients, &li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
retval = dictDelete(c->pubsubshard_channels, channel);
serverAssertWithInfo(c,channel,retval == DICT_OK);
addReplyPubsubUnsubscribed(c, channel, pubSubShardType);
/* If the client has no other pubsub subscription,
* move out of pubsub mode. */
if (clientTotalPubSubSubscriptionCount(c) == 0) {
c->flags &= ~CLIENT_PUBSUB;
}
}
}
/* Delete the channel from server pubsubshard channels hash table. */
retval = dictDelete(server.pubsubshard_channels, channel);
/* Delete the channel from slots_to_channel mapping. */
slotToChannelDel(channel->ptr);
serverAssertWithInfo(NULL,channel,retval == DICT_OK);
decrRefCount(channel); /* it is finally safe to release it */
}
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
int pubsubSubscribePattern(client *c, robj *pattern) {
dictEntry *de;
@ -250,24 +377,53 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
/* Unsubscribe from all the channels. Return the number of channels the
* client was subscribed to. */
int pubsubUnsubscribeAllChannels(client *c, int notify) {
int pubsubUnsubscribeAllChannelsInternal(client *c, int notify, pubsubtype type) {
int count = 0;
if (dictSize(c->pubsub_channels) > 0) {
dictIterator *di = dictGetSafeIterator(c->pubsub_channels);
if (dictSize(type.clientPubSubChannels(c)) > 0) {
dictIterator *di = dictGetSafeIterator(type.clientPubSubChannels(c));
dictEntry *de;
while((de = dictNext(di)) != NULL) {
robj *channel = dictGetKey(de);
count += pubsubUnsubscribeChannel(c,channel,notify);
count += pubsubUnsubscribeChannel(c,channel,notify,type);
}
dictReleaseIterator(di);
}
/* We were subscribed to nothing? Still reply to the client. */
if (notify && count == 0) addReplyPubsubUnsubscribed(c,NULL);
if (notify && count == 0) {
addReplyPubsubUnsubscribed(c,NULL,type);
}
return count;
}
/*
* Unsubscribe a client from all global channels.
*/
int pubsubUnsubscribeAllChannels(client *c, int notify) {
int count = pubsubUnsubscribeAllChannelsInternal(c,notify,pubSubType);
return count;
}
/*
* Unsubscribe a client from all shard subscribed channels.
*/
int pubsubUnsubscribeShardAllChannels(client *c, int notify) {
int count = pubsubUnsubscribeAllChannelsInternal(c, notify, pubSubShardType);
return count;
}
/*
* Unsubscribe a client from provided shard subscribed channel(s).
*/
void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count) {
for (unsigned int j = 0; j < count; j++) {
/* Remove the channel from server and from the clients
* subscribed to it as well as notify them. */
pubsubShardUnsubscribeAllClients(channels[j]);
}
}
/* Unsubscribe from all the patterns. Return the number of patterns the
* client was subscribed from. */
int pubsubUnsubscribeAllPatterns(client *c, int notify) {
@ -285,8 +441,10 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
return count;
}
/* Publish a message */
int pubsubPublishMessage(robj *channel, robj *message) {
/*
* Publish a message to all the subscribers.
*/
int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
int receivers = 0;
dictEntry *de;
dictIterator *di;
@ -294,7 +452,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
listIter li;
/* Send to clients listening for that channel */
de = dictFind(server.pubsub_channels,channel);
de = dictFind(*type.serverPubSubChannels, channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
@ -308,6 +466,12 @@ int pubsubPublishMessage(robj *channel, robj *message) {
receivers++;
}
}
if (type.shard) {
/* Shard pubsub ignores patterns. */
return receivers;
}
/* Send to clients listening to matching channels */
di = dictGetIterator(server.pubsub_patterns);
if (di) {
@ -334,6 +498,17 @@ int pubsubPublishMessage(robj *channel, robj *message) {
return receivers;
}
/* Publish a message to all the subscribers. */
int pubsubPublishMessage(robj *channel, robj *message) {
return pubsubPublishMessageInternal(channel,message,pubSubType);
}
/* Publish a shard message to all the subscribers. */
int pubsubPublishMessageShard(robj *channel, robj *message) {
return pubsubPublishMessageInternal(channel, message, pubSubShardType);
}
/*-----------------------------------------------------------------------------
* Pubsub commands implementation
*----------------------------------------------------------------------------*/
@ -352,13 +527,12 @@ void subscribeCommand(client *c) {
addReplyError(c, "SUBSCRIBE isn't allowed for a DENY BLOCKING client");
return;
}
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
pubsubSubscribeChannel(c,c->argv[j],pubSubType);
c->flags |= CLIENT_PUBSUB;
}
/* UNSUBSCRIBE [channel [channel ...]] */
/* UNSUBSCRIBE [channel ...] */
void unsubscribeCommand(client *c) {
if (c->argc == 1) {
pubsubUnsubscribeAllChannels(c,1);
@ -366,9 +540,9 @@ void unsubscribeCommand(client *c) {
int j;
for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1);
pubsubUnsubscribeChannel(c,c->argv[j],1,pubSubType);
}
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
/* PSUBSCRIBE pattern [pattern ...] */
@ -401,7 +575,7 @@ void punsubscribeCommand(client *c) {
for (j = 1; j < c->argc; j++)
pubsubUnsubscribePattern(c,c->argv[j],1);
}
if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}
/* PUBLISH <channel> <message> */
@ -429,7 +603,11 @@ void pubsubCommand(client *c) {
" Return number of subscriptions to patterns.",
"NUMSUB [<channel> ...]",
" Return the number of subscribers for the specified channels, excluding",
" pattern subscriptions(default: no channels).",
" pattern subscriptions(default: no channels)."
"SHARDCHANNELS [<pattern>]",
" Return the currently active shard level channels matching a <pattern> (default: '*').",
"SHARDNUMSUB [<channel> ...]",
" Return the number of subscribers for the specified shard level channel(s)",
NULL
};
addReplyHelp(c, help);
@ -438,25 +616,7 @@ NULL
{
/* PUBSUB CHANNELS [<pattern>] */
sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
dictIterator *di = dictGetIterator(server.pubsub_channels);
dictEntry *de;
long mblen = 0;
void *replylen;
replylen = addReplyDeferredLen(c);
while((de = dictNext(di)) != NULL) {
robj *cobj = dictGetKey(de);
sds channel = cobj->ptr;
if (!pat || stringmatchlen(pat, sdslen(pat),
channel, sdslen(channel),0))
{
addReplyBulk(c,cobj);
mblen++;
}
}
dictReleaseIterator(di);
setDeferredArrayLen(c,replylen,mblen);
channelList(c, pat, server.pubsub_channels);
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
/* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
int j;
@ -471,7 +631,93 @@ NULL
} else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
/* PUBSUB NUMPAT */
addReplyLongLong(c,dictSize(server.pubsub_patterns));
} else if (!strcasecmp(c->argv[1]->ptr,"shardchannels") &&
(c->argc == 2 || c->argc == 3))
{
/* PUBSUB SHARDCHANNELS */
sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
channelList(c,pat,server.pubsubshard_channels);
} else if (!strcasecmp(c->argv[1]->ptr,"shardnumsub") && c->argc >= 2) {
/* PUBSUB SHARDNUMSUB [Channel_1 ... Channel_N] */
int j;
addReplyArrayLen(c, (c->argc-2)*2);
for (j = 2; j < c->argc; j++) {
list *l = dictFetchValue(server.pubsubshard_channels, c->argv[j]);
addReplyBulk(c,c->argv[j]);
addReplyLongLong(c,l ? listLength(l) : 0);
}
} else {
addReplySubcommandSyntaxError(c);
}
}
void channelList(client *c, sds pat, dict *pubsub_channels) {
dictIterator *di = dictGetIterator(pubsub_channels);
dictEntry *de;
long mblen = 0;
void *replylen;
replylen = addReplyDeferredLen(c);
while((de = dictNext(di)) != NULL) {
robj *cobj = dictGetKey(de);
sds channel = cobj->ptr;
if (!pat || stringmatchlen(pat, sdslen(pat),
channel, sdslen(channel),0))
{
addReplyBulk(c,cobj);
mblen++;
}
}
dictReleaseIterator(di);
setDeferredArrayLen(c,replylen,mblen);
}
/* SPUBLISH <channel> <message> */
void spublishCommand(client *c) {
int receivers = pubsubPublishMessageInternal(c->argv[1], c->argv[2], pubSubShardType);
if (server.cluster_enabled) {
clusterPropagatePublishShard(c->argv[1], c->argv[2]);
} else {
forceCommandPropagation(c,PROPAGATE_REPL);
}
addReplyLongLong(c,receivers);
}
/* SSUBSCRIBE channel [channel ...] */
void ssubscribeCommand(client *c) {
if (c->flags & CLIENT_DENY_BLOCKING) {
/* A client that has CLIENT_DENY_BLOCKING flag on
* expect a reply per command and so can not execute subscribe. */
addReplyError(c, "SSUBSCRIBE isn't allowed for a DENY BLOCKING client");
return;
}
for (int j = 1; j < c->argc; j++) {
/* A channel is only considered to be added, if a
* subscriber exists for it. And if a subscriber
* already exists the slotToChannel doesn't needs
* to be incremented. */
if (server.cluster_enabled &
(dictFind(*pubSubShardType.serverPubSubChannels, c->argv[j]) == NULL)) {
slotToChannelAdd(c->argv[j]->ptr);
}
pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
}
c->flags |= CLIENT_PUBSUB;
}
/* SUNSUBSCRIBE [channel ...] */
void sunsubscribeCommand(client *c) {
if (c->argc == 1) {
pubsubUnsubscribeShardAllChannels(c, 1);
} else {
for (int j = 1; j < c->argc; j++) {
pubsubUnsubscribeChannel(c, c->argv[j], 1, pubSubShardType);
}
}
if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB;
}

View File

@ -1381,7 +1381,8 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
if (!strcasecmp(command,"shutdown")) config.shutdown = 1;
if (!strcasecmp(command,"monitor")) config.monitor_mode = 1;
if (!strcasecmp(command,"subscribe") ||
!strcasecmp(command,"psubscribe")) config.pubsub_mode = 1;
!strcasecmp(command,"psubscribe") ||
!strcasecmp(command,"ssubscribe")) config.pubsub_mode = 1;
if (!strcasecmp(command,"sync") ||
!strcasecmp(command,"psync")) config.slave_mode = 1;

View File

@ -1648,6 +1648,8 @@ void createSharedObjects(void) {
shared.pmessagebulk = createStringObject("$8\r\npmessage\r\n",14);
shared.subscribebulk = createStringObject("$9\r\nsubscribe\r\n",15);
shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17);
shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19);
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);
@ -2367,6 +2369,7 @@ void initServer(void) {
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&keylistDictType);
server.pubsub_patterns = dictCreate(&keylistDictType);
server.pubsubshard_channels = dictCreate(&keylistDictType);
server.cronloops = 0;
server.in_script = 0;
server.in_exec = 0;
@ -3499,14 +3502,16 @@ int processCommand(client *c) {
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != ssubscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != sunsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand &&
c->cmd->proc != quitCommand &&
c->cmd->proc != resetCommand) {
rejectCommandFormat(c,
"Can't execute '%s': only (P)SUBSCRIBE / "
"(P)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
"Can't execute '%s': only (P|S)SUBSCRIBE / "
"(P|S)UNSUBSCRIBE / PING / QUIT / RESET are allowed in this context",
c->cmd->name);
return C_OK;
}
@ -4001,6 +4006,7 @@ void addReplyFlagsForKeyArgs(client *c, uint64_t flags) {
void *flaglen = addReplyDeferredLen(c);
flagcount += addReplyCommandFlag(c,flags,CMD_KEY_WRITE, "write");
flagcount += addReplyCommandFlag(c,flags,CMD_KEY_READ, "read");
flagcount += addReplyCommandFlag(c,flags,CMD_KEY_SHARD_CHANNEL, "shard_channel");
flagcount += addReplyCommandFlag(c,flags,CMD_KEY_INCOMPLETE, "incomplete");
setDeferredSetLen(c, flaglen, flagcount);
}

View File

@ -233,9 +233,12 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
/* Key argument flags. Please check the command table defined in the server.c file
* for more information about the meaning of every flag. */
#define CMD_KEY_WRITE (1ULL<<0)
#define CMD_KEY_READ (1ULL<<1)
#define CMD_KEY_INCOMPLETE (1ULL<<2) /* meaning that the keyspec might not point out to all keys it should cover */
#define CMD_KEY_WRITE (1ULL<<0) /* "write" flag */
#define CMD_KEY_READ (1ULL<<1) /* "read" flag */
#define CMD_KEY_SHARD_CHANNEL (1ULL<<2) /* "shard_channel" flag */
#define CMD_KEY_INCOMPLETE (1ULL<<3) /* "incomplete" flag (meaning that
* the keyspec might not point out
* to all keys it should cover) */
/* AOF states */
#define AOF_OFF 0 /* AOF is off */
@ -1086,6 +1089,7 @@ typedef struct client {
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */
sds peerid; /* Cached peer ID. */
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
@ -1174,6 +1178,7 @@ struct sharedObjectsStruct {
*time, *pxat, *absttl, *retrycount, *force, *justid,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
*getack, *special_asterick, *special_equals, *default_username, *redacted,
*ssubscribebulk,*sunsubscribebulk,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@ -1751,6 +1756,7 @@ struct redisServer {
dict *pubsub_patterns; /* A dict of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */
dict *pubsubshard_channels; /* Map channels to list of subscribed clients */
/* Cluster */
int cluster_enabled; /* Is cluster enabled? */
int cluster_port; /* Set the cluster port for a node. */
@ -1821,6 +1827,8 @@ struct redisServer {
* failover then any replica can be used. */
int target_replica_port; /* Failover target port */
int failover_state; /* Failover state */
int cluster_allow_pubsubshard_when_down; /* Is pubsubshard allowed when the cluster
is down, doesn't affect pubsub global. */
};
#define MAX_KEYS_BUFFER 256
@ -2816,9 +2824,14 @@ robj *hashTypeDup(robj *o);
/* Pub / Sub */
int pubsubUnsubscribeAllChannels(client *c, int notify);
int pubsubUnsubscribeShardAllChannels(client *c, int notify);
void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count);
int pubsubUnsubscribeAllPatterns(client *c, int notify);
int pubsubPublishMessage(robj *channel, robj *message);
int pubsubPublishMessageShard(robj *channel, robj *message);
void addReplyPubsubMessage(client *c, robj *channel, robj *msg);
int serverPubsubSubscriptionCount();
int serverPubsubShardSubscriptionCount();
/* Keyspace events notification */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);
@ -2902,6 +2915,7 @@ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index);
/* API to get key arguments from commands */
int *getKeysPrepareResult(getKeysResult *result, int numkeys);
int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result);
int getChannelsFromCommand(struct redisCommand *cmd, int argc, getKeysResult *result);
void getKeysFreeResult(getKeysResult *result);
int sintercardGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
int zunionInterDiffGetKeys(struct redisCommand *cmd,robj **argv, int argc, getKeysResult *result);
@ -3184,6 +3198,9 @@ void psubscribeCommand(client *c);
void punsubscribeCommand(client *c);
void publishCommand(client *c);
void pubsubCommand(client *c);
void spublishCommand(client *c);
void ssubscribeCommand(client *c);
void sunsubscribeCommand(client *c);
void watchCommand(client *c);
void unwatchCommand(client *c);
void clusterCommand(client *c);

View File

@ -228,6 +228,12 @@ void trackingRememberKeys(client *c) {
getKeysFreeResult(&result);
return;
}
/* Shard channels are treated as special keys for client
* library to rely on `COMMAND` command to discover the node
* to connect to. These channels doesn't need to be tracked. */
if (c->cmd->flags & CMD_PUBSUB) {
return;
}
int *keys = result.keys;

View File

@ -66,7 +66,7 @@ proc s {n field} {
get_info_field [R $n info] $field
}
# Assuming nodes are reest, this function performs slots allocation.
# Assuming nodes are reset, this function performs slots allocation.
# Only the first 'n' nodes are used.
proc cluster_allocate_slots {n} {
set slot 16383
@ -129,6 +129,32 @@ proc create_cluster {masters slaves} {
set ::cluster_replica_nodes $slaves
}
proc cluster_allocate_with_continuous_slots {n} {
set slot 16383
set avg [expr ($slot+1) / $n]
while {$slot >= 0} {
set node [expr $slot/$avg >= $n ? $n-1 : $slot/$avg]
lappend slots_$node $slot
incr slot -1
}
for {set j 0} {$j < $n} {incr j} {
R $j cluster addslots {*}[set slots_${j}]
}
}
# Create a cluster composed of the specified number of masters and slaves with continuous slots.
proc cluster_create_with_continuous_slots {masters slaves} {
cluster_allocate_with_continuous_slots $masters
if {$slaves} {
cluster_allocate_slaves $masters $slaves
}
assert_cluster_state ok
set ::cluster_master_nodes $masters
set ::cluster_replica_nodes $slaves
}
# Set the cluster node-timeout to all the reachalbe nodes.
proc set_cluster_node_timeout {to} {
foreach_redis_id id {
@ -243,4 +269,4 @@ proc get_link_from_peer {this_instance_id peer_nodename} {
}
}
return {}
}
}

View File

@ -2,27 +2,6 @@
source "../tests/includes/init-tests.tcl"
proc cluster_allocate_with_continuous_slots {n} {
set slot 16383
set avg [expr ($slot+1) / $n]
while {$slot >= 0} {
set node [expr $slot/$avg >= $n ? $n-1 : $slot/$avg]
lappend slots_$node $slot
incr slot -1
}
for {set j 0} {$j < $n} {incr j} {
R $j cluster addslots {*}[set slots_${j}]
}
}
proc cluster_create_with_continuous_slots {masters slaves} {
cluster_allocate_with_continuous_slots $masters
if {$slaves} {
cluster_allocate_slaves $masters $slaves
}
assert_cluster_state ok
}
test "Create a 2 nodes cluster" {
cluster_create_with_continuous_slots 2 2
}

View File

@ -2,7 +2,7 @@
source "../tests/includes/init-tests.tcl"
proc cluster_allocate_with_continuous_slots {n} {
proc cluster_allocate_with_continuous_slots_local {n} {
R 0 cluster ADDSLOTSRANGE 0 3276
R 1 cluster ADDSLOTSRANGE 3277 6552
R 2 cluster ADDSLOTSRANGE 6553 9828
@ -10,8 +10,8 @@ proc cluster_allocate_with_continuous_slots {n} {
R 4 cluster ADDSLOTSRANGE 13105 16383
}
proc cluster_create_with_continuous_slots {masters slaves} {
cluster_allocate_with_continuous_slots $masters
proc cluster_create_with_continuous_slots_local {masters slaves} {
cluster_allocate_with_continuous_slots_local $masters
if {$slaves} {
cluster_allocate_slaves $masters $slaves
}
@ -20,7 +20,7 @@ proc cluster_create_with_continuous_slots {masters slaves} {
test "Create a 5 nodes cluster" {
cluster_create_with_continuous_slots 5 5
cluster_create_with_continuous_slots_local 5 5
}
test "Cluster should start ok" {

View File

@ -0,0 +1,171 @@
source "../tests/includes/init-tests.tcl"
test "Create a 3 nodes cluster" {
cluster_create_with_continuous_slots 3 3
}
test "Cluster is up" {
assert_cluster_state ok
}
set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
test "Migrate a slot, verify client receives sunsubscribe on primary serving the slot." {
# Setup the to and from node
set channelname mychannel
set slot [$cluster cluster keyslot $channelname]
array set nodefrom [$cluster masternode_for_slot $slot]
array set nodeto [$cluster masternode_notfor_slot $slot]
set subscribeclient [redis_deferring_client_by_addr $nodefrom(host) $nodefrom(port)]
$subscribeclient deferred 1
$subscribeclient ssubscribe $channelname
$subscribeclient read
assert_equal {OK} [$nodefrom(link) cluster setslot $slot migrating $nodeto(id)]
assert_equal {OK} [$nodeto(link) cluster setslot $slot importing $nodefrom(id)]
# Verify subscribe is still valid, able to receive messages.
$nodefrom(link) spublish $channelname hello
assert_equal {message mychannel hello} [$subscribeclient read]
assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
set msg [$subscribeclient read]
assert {"sunsubscribe" eq [lindex $msg 0]}
assert {$channelname eq [lindex $msg 1]}
assert {"0" eq [lindex $msg 2]}
assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)]
$subscribeclient close
}
test "Client subscribes to multiple channels, migrate a slot, verify client receives sunsubscribe on primary serving the slot." {
# Setup the to and from node
set channelname ch3
set anotherchannelname ch7
set slot [$cluster cluster keyslot $channelname]
array set nodefrom [$cluster masternode_for_slot $slot]
array set nodeto [$cluster masternode_notfor_slot $slot]
set subscribeclient [redis_deferring_client_by_addr $nodefrom(host) $nodefrom(port)]
$subscribeclient deferred 1
$subscribeclient ssubscribe $channelname
$subscribeclient read
$subscribeclient ssubscribe $anotherchannelname
$subscribeclient read
assert_equal {OK} [$nodefrom(link) cluster setslot $slot migrating $nodeto(id)]
assert_equal {OK} [$nodeto(link) cluster setslot $slot importing $nodefrom(id)]
# Verify subscribe is still valid, able to receive messages.
$nodefrom(link) spublish $channelname hello
assert_equal {message ch3 hello} [$subscribeclient read]
assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
# Verify the client receives sunsubscribe message for the channel(slot) which got migrated.
set msg [$subscribeclient read]
assert {"sunsubscribe" eq [lindex $msg 0]}
assert {$channelname eq [lindex $msg 1]}
assert {"1" eq [lindex $msg 2]}
assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)]
$nodefrom(link) spublish $anotherchannelname hello
# Verify the client is still connected and receives message from the other channel.
set msg [$subscribeclient read]
assert {"message" eq [lindex $msg 0]}
assert {$anotherchannelname eq [lindex $msg 1]}
assert {"hello" eq [lindex $msg 2]}
$subscribeclient close
}
test "Migrate a slot, verify client receives sunsubscribe on replica serving the slot." {
# Setup the to and from node
set channelname mychannel1
set slot [$cluster cluster keyslot $channelname]
array set nodefrom [$cluster masternode_for_slot $slot]
array set nodeto [$cluster masternode_notfor_slot $slot]
# Get replica node serving slot (mychannel) to connect a client.
set replicanodeinfo [$cluster cluster replicas $nodefrom(id)]
set args [split $replicanodeinfo " "]
set addr [lindex [split [lindex $args 1] @] 0]
set replicahost [lindex [split $addr :] 0]
set replicaport [lindex [split $addr :] 1]
set subscribeclient [redis_deferring_client_by_addr $replicahost $replicaport]
$subscribeclient deferred 1
$subscribeclient ssubscribe $channelname
$subscribeclient read
assert_equal {OK} [$nodefrom(link) cluster setslot $slot migrating $nodeto(id)]
assert_equal {OK} [$nodeto(link) cluster setslot $slot importing $nodefrom(id)]
# Verify subscribe is still valid, able to receive messages.
$nodefrom(link) spublish $channelname hello
assert_equal {message mychannel1 hello} [$subscribeclient read]
assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)]
set msg [$subscribeclient read]
assert {"sunsubscribe" eq [lindex $msg 0]}
assert {$channelname eq [lindex $msg 1]}
assert {"0" eq [lindex $msg 2]}
$subscribeclient close
}
test "Delete a slot, verify sunsubscribe message" {
set channelname ch2
set slot [$cluster cluster keyslot $channelname]
array set primary_client [$cluster masternode_for_slot $slot]
set subscribeclient [redis_deferring_client_by_addr $primary_client(host) $primary_client(port)]
$subscribeclient deferred 1
$subscribeclient ssubscribe $channelname
$subscribeclient read
$primary_client(link) cluster DELSLOTS $slot
set msg [$subscribeclient read]
assert {"sunsubscribe" eq [lindex $msg 0]}
assert {$channelname eq [lindex $msg 1]}
assert {"0" eq [lindex $msg 2]}
$subscribeclient close
}
test "Reset cluster, verify sunsubscribe message" {
set channelname ch4
set slot [$cluster cluster keyslot $channelname]
array set primary_client [$cluster masternode_for_slot $slot]
set subscribeclient [redis_deferring_client_by_addr $primary_client(host) $primary_client(port)]
$subscribeclient deferred 1
$subscribeclient ssubscribe $channelname
$subscribeclient read
$cluster cluster reset HARD
set msg [$subscribeclient read]
assert {"sunsubscribe" eq [lindex $msg 0]}
assert {$channelname eq [lindex $msg 1]}
assert {"0" eq [lindex $msg 2]}
$cluster close
$subscribeclient close
}

View File

@ -0,0 +1,94 @@
# Test PUBSUB shard propagation in a cluster slot.
source "../tests/includes/init-tests.tcl"
test "Create a 3 nodes cluster" {
cluster_create_with_continuous_slots 3 3
}
set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
test "Pub/Sub shard basics" {
set slot [$cluster cluster keyslot "channel.0"]
array set publishnode [$cluster masternode_for_slot $slot]
array set notshardnode [$cluster masternode_notfor_slot $slot]
set publishclient [redis_client_by_addr $publishnode(host) $publishnode(port)]
set subscribeclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
set subscribeclient2 [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
set anotherclient [redis_deferring_client_by_addr $notshardnode(host) $notshardnode(port)]
$subscribeclient ssubscribe channel.0
$subscribeclient read
$subscribeclient2 ssubscribe channel.0
$subscribeclient2 read
$anotherclient ssubscribe channel.0
catch {$anotherclient read} err
assert_match {MOVED *} $err
set data [randomValue]
$publishclient spublish channel.0 $data
set msg [$subscribeclient read]
assert_equal $data [lindex $msg 2]
set msg [$subscribeclient2 read]
assert_equal $data [lindex $msg 2]
$publishclient close
$subscribeclient close
$subscribeclient2 close
$anotherclient close
}
test "client can't subscribe to multiple shard channels across different slots in same call" {
catch {$cluster ssubscribe channel.0 channel.1} err
assert_match {CROSSSLOT Keys*} $err
}
test "client can subscribe to multiple shard channels across different slots in separate call" {
$cluster ssubscribe ch3
$cluster ssubscribe ch7
$cluster sunsubscribe ch3
$cluster sunsubscribe ch7
}
test "Verify Pub/Sub and Pub/Sub shard no overlap" {
set slot [$cluster cluster keyslot "channel.0"]
array set publishnode [$cluster masternode_for_slot $slot]
array set notshardnode [$cluster masternode_notfor_slot $slot]
set publishshardclient [redis_client_by_addr $publishnode(host) $publishnode(port)]
set publishclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
set subscribeshardclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
set subscribeclient [redis_deferring_client_by_addr $publishnode(host) $publishnode(port)]
$subscribeshardclient deferred 1
$subscribeshardclient ssubscribe channel.0
$subscribeshardclient read
$subscribeclient deferred 1
$subscribeclient subscribe channel.0
$subscribeclient read
set sharddata "testingpubsubdata"
$publishshardclient spublish channel.0 $sharddata
set data "somemoredata"
$publishclient publish channel.0 $data
set msg [$subscribeshardclient read]
assert_equal $sharddata [lindex $msg 2]
set msg [$subscribeclient read]
assert_equal $data [lindex $msg 2]
$cluster close
$publishclient close
$subscribeclient close
$subscribeshardclient close
}

View File

@ -677,9 +677,19 @@ proc redis_deferring_client {type id} {
return $client
}
proc redis_deferring_client_by_addr {host port} {
set client [redis $host $port 1 $::tls]
return $client
}
proc redis_client {type id} {
set port [get_instance_attrib $type $id port]
set host [get_instance_attrib $type $id host]
set client [redis $host $port 0 $::tls]
return $client
}
proc redis_client_by_addr {host port} {
set client [redis $host $port 0 $::tls]
return $client
}

View File

@ -13,6 +13,7 @@ package require Tcl 8.5
package provide redis_cluster 0.1
namespace eval redis_cluster {}
set ::redis_cluster::internal_id 0
set ::redis_cluster::id 0
array set ::redis_cluster::startup_nodes {}
array set ::redis_cluster::nodes {}
@ -32,7 +33,8 @@ set ::redis_cluster::plain_commands {
hget hmset hmget hincrby hincrbyfloat hdel hlen hkeys hvals
hgetall hexists hscan incrby decrby incrbyfloat getset move
expire expireat pexpire pexpireat type ttl pttl persist restore
dump bitcount bitpos pfadd pfcount
dump bitcount bitpos pfadd pfcount cluster ssubscribe spublish
sunsubscribe
}
# Create a cluster client. The nodes are given as a list of host:port. The TLS
@ -118,6 +120,7 @@ proc ::redis_cluster::__method__refresh_nodes_map {id} {
# Build this node description as an hash.
set node [dict create \
id $nodeid \
internal_id $id \
addr $addr \
host $host \
port $port \
@ -265,6 +268,7 @@ proc ::redis_cluster::get_keys_from_command {cmd argv} {
mget {return $argv}
eval {return [lrange $argv 2 1+[lindex $argv 1]]}
evalsha {return [lrange $argv 2 1+[lindex $argv 1]]}
spublish {return [list [lindex $argv 1]]}
}
# All the remaining commands are not handled.

View File

@ -62,6 +62,7 @@ set ::all_tests {
integration/redis-benchmark
integration/dismiss-mem
unit/pubsub
unit/pubsubshard
unit/slowlog
unit/scripting
unit/functions

View File

@ -87,6 +87,10 @@ start_server {tags {"acl external:skip"}} {
r PUBLISH foo bar
} {0}
test {By default users are able to publish to any shard channel} {
r SPUBLISH foo bar
} {0}
test {By default users are able to subscribe to any channel} {
set rd [redis_deferring_client]
$rd AUTH psuser pspass
@ -96,6 +100,15 @@ start_server {tags {"acl external:skip"}} {
$rd close
} {0}
test {By default users are able to subscribe to any shard channel} {
set rd [redis_deferring_client]
$rd AUTH psuser pspass
$rd read
$rd SSUBSCRIBE foo
assert_match {ssubscribe foo 1} [$rd read]
$rd close
} {0}
test {By default users are able to subscribe to any pattern} {
set rd [redis_deferring_client]
$rd AUTH psuser pspass
@ -113,6 +126,14 @@ start_server {tags {"acl external:skip"}} {
set e
} {*NOPERM*channel*}
test {It's possible to allow publishing to a subset of shard channels} {
r ACL setuser psuser resetchannels &foo:1 &bar:*
assert_equal {0} [r SPUBLISH foo:1 somemessage]
assert_equal {0} [r SPUBLISH bar:2 anothermessage]
catch {r SPUBLISH zap:3 nosuchmessage} e
set e
} {*NOPERM*channel*}
test {Validate subset of channels is prefixed with resetchannels flag} {
r ACL setuser hpuser on nopass resetchannels &foo +@all
@ -166,6 +187,19 @@ start_server {tags {"acl external:skip"}} {
set e
} {*NOPERM*channel*}
test {It's possible to allow subscribing to a subset of shard channels} {
set rd [redis_deferring_client]
$rd AUTH psuser pspass
$rd read
$rd SSUBSCRIBE foo:1
assert_match {ssubscribe foo:1 1} [$rd read]
$rd SSUBSCRIBE bar:2
assert_match {ssubscribe bar:2 2} [$rd read]
$rd SSUBSCRIBE zap:3
catch {$rd read} e
set e
} {*NOPERM*channel*}
test {It's possible to allow subscribing to a subset of channel patterns} {
set rd [redis_deferring_client]
$rd AUTH psuser pspass
@ -193,6 +227,20 @@ start_server {tags {"acl external:skip"}} {
$rd close
} {0}
test {Subscribers are killed when revoked of channel permission} {
set rd [redis_deferring_client]
r ACL setuser psuser resetchannels &foo:1
$rd AUTH psuser pspass
$rd read
$rd CLIENT SETNAME deathrow
$rd read
$rd SSUBSCRIBE foo:1
$rd read
r ACL setuser psuser resetchannels
assert_no_match {*deathrow*} [r CLIENT LIST]
$rd close
} {0}
test {Subscribers are killed when revoked of pattern permission} {
set rd [redis_deferring_client]
r ACL setuser psuser resetchannels &bar:*
@ -209,16 +257,18 @@ start_server {tags {"acl external:skip"}} {
test {Subscribers are pardoned if literal permissions are retained and/or gaining allchannels} {
set rd [redis_deferring_client]
r ACL setuser psuser resetchannels &foo:1 &bar:*
r ACL setuser psuser resetchannels &foo:1 &bar:* &orders
$rd AUTH psuser pspass
$rd read
$rd CLIENT SETNAME pardoned
$rd read
$rd SUBSCRIBE foo:1
$rd read
$rd SSUBSCRIBE orders
$rd read
$rd PSUBSCRIBE bar:*
$rd read
r ACL setuser psuser resetchannels &foo:1 &bar:* &baz:qaz &zoo:*
r ACL setuser psuser resetchannels &foo:1 &bar:* &orders &baz:qaz &zoo:*
assert_match {*pardoned*} [r CLIENT LIST]
r ACL setuser psuser allchannels
assert_match {*pardoned*} [r CLIENT LIST]

213
tests/unit/pubsubshard.tcl Normal file
View File

@ -0,0 +1,213 @@
start_server {tags {"pubsubshard external:skip"}} {
proc __consume_ssubscribe_messages {client type channels} {
set numsub -1
set counts {}
for {set i [llength $channels]} {$i > 0} {incr i -1} {
set msg [$client read]
assert_equal $type [lindex $msg 0]
# when receiving subscribe messages the channels names
# are ordered. when receiving unsubscribe messages
# they are unordered
set idx [lsearch -exact $channels [lindex $msg 1]]
if {[string match "sunsubscribe" $type]} {
assert {$idx >= 0}
} else {
assert {$idx == 0}
}
set channels [lreplace $channels $idx $idx]
# aggregate the subscription count to return to the caller
lappend counts [lindex $msg 2]
}
# we should have received messages for channels
assert {[llength $channels] == 0}
return $counts
}
proc __consume_subscribe_messages {client type channels} {
set numsub -1
set counts {}
for {set i [llength $channels]} {$i > 0} {incr i -1} {
set msg [$client read]
assert_equal $type [lindex $msg 0]
# when receiving subscribe messages the channels names
# are ordered. when receiving unsubscribe messages
# they are unordered
set idx [lsearch -exact $channels [lindex $msg 1]]
if {[string match "unsubscribe" $type]} {
assert {$idx >= 0}
} else {
assert {$idx == 0}
}
set channels [lreplace $channels $idx $idx]
# aggregate the subscription count to return to the caller
lappend counts [lindex $msg 2]
}
# we should have received messages for channels
assert {[llength $channels] == 0}
return $counts
}
proc ssubscribe {client channels} {
$client ssubscribe {*}$channels
__consume_ssubscribe_messages $client ssubscribe $channels
}
proc subscribe {client channels} {
$client subscribe {*}$channels
__consume_subscribe_messages $client subscribe $channels
}
proc sunsubscribe {client {channels {}}} {
$client sunsubscribe {*}$channels
__consume_subscribe_messages $client sunsubscribe $channels
}
proc unsubscribe {client {channels {}}} {
$client unsubscribe {*}$channels
__consume_subscribe_messages $client unsubscribe $channels
}
test "SPUBLISH/SSUBSCRIBE basics" {
set rd1 [redis_deferring_client]
# subscribe to two channels
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {2} [ssubscribe $rd1 {chan2}]
assert_equal 1 [r SPUBLISH chan1 hello]
assert_equal 1 [r SPUBLISH chan2 world]
assert_equal {message chan1 hello} [$rd1 read]
assert_equal {message chan2 world} [$rd1 read]
# unsubscribe from one of the channels
sunsubscribe $rd1 {chan1}
assert_equal 0 [r SPUBLISH chan1 hello]
assert_equal 1 [r SPUBLISH chan2 world]
assert_equal {message chan2 world} [$rd1 read]
# unsubscribe from the remaining channel
sunsubscribe $rd1 {chan2}
assert_equal 0 [r SPUBLISH chan1 hello]
assert_equal 0 [r SPUBLISH chan2 world]
# clean up clients
$rd1 close
}
test "SPUBLISH/SSUBSCRIBE with two clients" {
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {1} [ssubscribe $rd2 {chan1}]
assert_equal 2 [r SPUBLISH chan1 hello]
assert_equal {message chan1 hello} [$rd1 read]
assert_equal {message chan1 hello} [$rd2 read]
# clean up clients
$rd1 close
$rd2 close
}
test "PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments" {
set rd1 [redis_deferring_client]
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {2} [ssubscribe $rd1 {chan2}]
assert_equal {3} [ssubscribe $rd1 {chan3}]
sunsubscribe $rd1
assert_equal 0 [r SPUBLISH chan1 hello]
assert_equal 0 [r SPUBLISH chan2 hello]
assert_equal 0 [r SPUBLISH chan3 hello]
# clean up clients
$rd1 close
}
test "SUBSCRIBE to one channel more than once" {
set rd1 [redis_deferring_client]
assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}]
assert_equal 1 [r SPUBLISH chan1 hello]
assert_equal {message chan1 hello} [$rd1 read]
# clean up clients
$rd1 close
}
test "UNSUBSCRIBE from non-subscribed channels" {
set rd1 [redis_deferring_client]
assert_equal {0} [sunsubscribe $rd1 {foo}]
assert_equal {0} [sunsubscribe $rd1 {bar}]
assert_equal {0} [sunsubscribe $rd1 {quux}]
# clean up clients
$rd1 close
}
test "PUBSUB command basics" {
r pubsub shardnumsub abc def
} {abc 0 def 0}
test "SPUBLISH/SSUBSCRIBE with two clients" {
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {1} [ssubscribe $rd2 {chan1}]
assert_equal 2 [r SPUBLISH chan1 hello]
assert_equal "chan1 2" [r pubsub shardnumsub chan1]
assert_equal "chan1" [r pubsub shardchannels]
# clean up clients
$rd1 close
$rd2 close
}
test "SPUBLISH/SSUBSCRIBE with PUBLISH/SUBSCRIBE" {
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {1} [subscribe $rd2 {chan1}]
assert_equal 1 [r SPUBLISH chan1 hello]
assert_equal 1 [r publish chan1 hello]
assert_equal "chan1 1" [r pubsub shardnumsub chan1]
assert_equal "chan1 1" [r pubsub numsub chan1]
assert_equal "chan1" [r pubsub shardchannels]
assert_equal "chan1" [r pubsub channels]
}
}
start_server {tags {"pubsubshard external:skip"}} {
start_server {tags {"pubsubshard external:skip"}} {
set node_0 [srv 0 client]
set node_0_host [srv 0 host]
set node_0_port [srv 0 port]
set node_1 [srv -1 client]
set node_1_host [srv -1 host]
set node_1_port [srv -1 port]
test {setup replication for following tests} {
$node_1 replicaof $node_0_host $node_0_port
wait_for_sync $node_1
}
test {publish message to master and receive on replica} {
set rd0 [redis_deferring_client node_0_host node_0_port]
set rd1 [redis_deferring_client node_1_host node_1_port]
assert_equal {1} [ssubscribe $rd1 {chan1}]
$rd0 SPUBLISH chan1 hello
assert_equal {message chan1 hello} [$rd1 read]
$rd0 SPUBLISH chan1 world
assert_equal {message chan1 world} [$rd1 read]
}
}
}