Cluster: hash slots tracking using a radix tree.

This commit is contained in:
antirez 2017-03-27 15:26:56 +02:00
parent 94751543b0
commit 1409c545da
9 changed files with 2114 additions and 64 deletions

View File

@ -139,7 +139,7 @@ endif
REDIS_SERVER_NAME=redis-server
REDIS_SENTINEL_NAME=redis-sentinel
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o
REDIS_CLI_NAME=redis-cli
REDIS_CLI_OBJ=anet.o adlist.o redis-cli.o zmalloc.o release.o anet.o ae.o crc64.o
REDIS_BENCHMARK_NAME=redis-benchmark

View File

@ -476,8 +476,10 @@ void clusterInit(void) {
}
}
/* The slots -> keys map is a sorted set. Init it. */
server.cluster->slots_to_keys = zslCreate();
/* The slots -> keys map is a radix tree. Initialize it here. */
server.cluster->slots_to_keys = raxNew();
memset(server.cluster->slots_keys_count,0,
sizeof(server.cluster->slots_keys_count));
/* Set myself->port / cport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */

View File

@ -116,7 +116,8 @@ typedef struct clusterState {
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
zskiplist *slots_to_keys;
uint64_t slots_keys_count[CLUSTER_SLOTS];
rax *slots_to_keys;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */

View File

@ -1301,90 +1301,85 @@ int *migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, int *numkey
/* Slot to Key API. This is used by Redis Cluster in order to obtain in
* a fast way a key that belongs to a specified hash slot. This is useful
* while rehashing the cluster. */
void slotToKeyAdd(robj *key) {
* while rehashing the cluster and in other conditions when we need to
* understand if we have keys for a given hash slot. */
void slotToKeyUpdateKey(robj *key, int add) {
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
unsigned char buf[64];
unsigned char *indexed = buf;
size_t keylen = sdslen(key->ptr);
sds sdskey = sdsdup(key->ptr);
zslInsert(server.cluster->slots_to_keys,hashslot,sdskey);
server.cluster->slots_keys_count[hashslot] += add ? 1 : -1;
if (keylen+2 > 64) indexed = zmalloc(keylen+2);
indexed[0] = (hashslot >> 8) & 0xff;
indexed[1] = hashslot & 0xff;
memcpy(indexed+2,key->ptr,keylen);
if (add) {
raxInsert(server.cluster->slots_to_keys,indexed,keylen+2,NULL);
} else {
raxRemove(server.cluster->slots_to_keys,indexed,keylen+2);
}
if (indexed != buf) zfree(indexed);
}
void slotToKeyAdd(robj *key) {
slotToKeyUpdateKey(key,1);
}
void slotToKeyDel(robj *key) {
unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));
zslDelete(server.cluster->slots_to_keys,hashslot,key->ptr,NULL);
slotToKeyUpdateKey(key,0);
}
void slotToKeyFlush(void) {
zslFree(server.cluster->slots_to_keys);
server.cluster->slots_to_keys = zslCreate();
raxFree(server.cluster->slots_to_keys);
server.cluster->slots_to_keys = raxNew();
memset(server.cluster->slots_keys_count,0,
sizeof(server.cluster->slots_keys_count));
}
/* Pupulate the specified array of objects with keys in the specified slot.
* New objects are returned to represent keys, it's up to the caller to
* decrement the reference count to release the keys names. */
unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) {
zskiplistNode *n;
zrangespec range;
raxIterator iter;
int j = 0;
unsigned char indexed[2];
range.min = range.max = hashslot;
range.minex = range.maxex = 0;
n = zslFirstInRange(server.cluster->slots_to_keys, &range);
while(n && n->score == hashslot && count--) {
keys[j++] = createStringObject(n->ele,sdslen(n->ele));
n = n->level[0].forward;
indexed[0] = (hashslot >> 8) & 0xff;
indexed[1] = hashslot & 0xff;
raxStart(&iter,server.cluster->slots_to_keys);
raxSeek(&iter,indexed,2,">=");
while(count-- && raxNext(&iter,NULL,0,NULL)) {
if (iter.key[0] != indexed[0] || iter.key[1] != indexed[1]) break;
keys[j++] = createStringObject((char*)iter.key+2,iter.key_len-2);
}
raxStop(&iter);
return j;
}
/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
zskiplistNode *n;
zrangespec range;
raxIterator iter;
int j = 0;
unsigned char indexed[2];
range.min = range.max = hashslot;
range.minex = range.maxex = 0;
indexed[0] = (hashslot >> 8) & 0xff;
indexed[1] = hashslot & 0xff;
raxStart(&iter,server.cluster->slots_to_keys);
while(server.cluster->slots_keys_count[hashslot]) {
raxSeek(&iter,indexed,2,">=");
raxNext(&iter,NULL,0,NULL);
n = zslFirstInRange(server.cluster->slots_to_keys, &range);
while(n && n->score == hashslot) {
sds sdskey = n->ele;
robj *key = createStringObject(sdskey,sdslen(sdskey));
n = n->level[0].forward; /* Go to the next item before freeing it. */
robj *key = createStringObject((char*)iter.key+2,iter.key_len-2);
dbDelete(&server.db[0],key);
decrRefCount(key);
j++;
}
raxStop(&iter);
return j;
}
unsigned int countKeysInSlot(unsigned int hashslot) {
zskiplist *zsl = server.cluster->slots_to_keys;
zskiplistNode *zn;
zrangespec range;
int rank, count = 0;
range.min = range.max = hashslot;
range.minex = range.maxex = 0;
/* Find first element in range */
zn = zslFirstInRange(zsl, &range);
/* Use rank of first element, if any, to determine preliminary count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->ele);
count = (zsl->length - (rank - 1));
/* Find last element in range */
zn = zslLastInRange(zsl, &range);
/* Use rank of last element, if any, to determine the actual count */
if (zn != NULL) {
rank = zslGetRank(zsl, zn->score, zn->ele);
count -= (zsl->length - rank);
}
}
return count;
return server.cluster->slots_keys_count[hashslot];
}

View File

@ -97,11 +97,14 @@ void emptyDbAsync(redisDb *db) {
/* Empty the slots-keys map of Redis CLuster by creating a new empty one
* and scheduiling the old for lazy freeing. */
void slotToKeyFlushAsync(void) {
zskiplist *oldsl = server.cluster->slots_to_keys;
server.cluster->slots_to_keys = zslCreate();
atomicIncr(lazyfree_objects,oldsl->length,
rax *old = server.cluster->slots_to_keys;
server.cluster->slots_to_keys = raxNew();
memset(server.cluster->slots_keys_count,0,
sizeof(server.cluster->slots_keys_count));
atomicIncr(lazyfree_objects,old->numele,
lazyfree_objects_mutex);
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,oldsl);
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,NULL,old);
}
/* Release objects from the lazyfree thread. It's just decrRefCount()
@ -125,8 +128,8 @@ void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
/* Release the skiplist mapping Redis Cluster keys to slots in the
* lazyfree thread. */
void lazyfreeFreeSlotsMapFromBioThread(zskiplist *sl) {
size_t len = sl->length;
zslFree(sl);
void lazyfreeFreeSlotsMapFromBioThread(rax *rt) {
size_t len = rt->numele;
raxFree(rt);
atomicDecr(lazyfree_objects,len,lazyfree_objects_mutex);
}

1845
src/rax.c Normal file

File diff suppressed because it is too large Load Diff

158
src/rax.h Normal file
View File

@ -0,0 +1,158 @@
#ifndef RAX_H
#define RAX_H
#include <stdint.h>
/* Representation of a radix tree as implemented in this file, that contains
* the strings "foo", "foobar" and "footer" after the insertion of each
* word. When the node represents a key inside the radix tree, we write it
* between [], otherwise it is written between ().
*
* This is the vanilla representation:
*
* (f) ""
* \
* (o) "f"
* \
* (o) "fo"
* \
* [t b] "foo"
* / \
* "foot" (e) (a) "foob"
* / \
* "foote" (r) (r) "fooba"
* / \
* "footer" [] [] "foobar"
*
* However, this implementation implements a very common optimization where
* successive nodes having a single child are "compressed" into the node
* itself as a string of characters, each representing a next-level child,
* and only the link to the node representing the last character node is
* provided inside the representation. So the above representation is turend
* into:
*
* ["foo"] ""
* |
* [t b] "foo"
* / \
* "foot" ("er") ("ar") "foob"
* / \
* "footer" [] [] "foobar"
*
* However this optimization makes the implementation a bit more complex.
* For instance if a key "first" is added in the above radix tree, a
* "node splitting" operation is needed, since the "foo" prefix is no longer
* composed of nodes having a single child one after the other. This is the
* above tree and the resulting node splitting after this event happens:
*
*
* (f) ""
* /
* (i o) "f"
* / \
* "firs" ("rst") (o) "fo"
* / \
* "first" [] [t b] "foo"
* / \
* "foot" ("er") ("ar") "foob"
* / \
* "footer" [] [] "foobar"
*
* Similarly after deletion, if a new chain of nodes having a single child
* is created (the chain must also not include nodes that represent keys),
* it must be compressed back into a single node.
*
*/
#define RAX_NODE_MAX_SIZE ((1<<29)-1)
typedef struct raxNode {
uint32_t iskey:1; /* Does this node contain a key? */
uint32_t isnull:1; /* Associated value is NULL (don't store it). */
uint32_t iscompr:1; /* Node is compressed. */
uint32_t size:29; /* Number of children, or compressed string len. */
/* Data layout is as follows:
*
* If node is not compressed we have 'size' bytes, one for each children
* character, and 'size' raxNode pointers, point to each child node.
* Note how the character is not stored in the children but in the
* edge of the parents:
*
* [header strlen=0][abc][a-ptr][b-ptr][c-ptr](value-ptr?)
*
* if node is compressed (strlen != 0) the node has 1 children.
* In that case the 'size' bytes of the string stored immediately at
* the start of the data section, represent a sequence of successive
* nodes linked one after the other, for which only the last one in
* the sequence is actually represented as a node, and pointed to by
* the current compressed node.
*
* [header strlen=3][xyz][z-ptr](value-ptr?)
*
* Both compressed and not compressed nodes can represent a key
* with associated data in the radix tree at any level (not just terminal
* nodes).
*
* If the node has an associated key (iskey=1) and is not NULL
* (isnull=0), then after the raxNode pointers poiting to the
* childen, an additional value pointer is present (as you can see
* in the representation above as "value-ptr" field).
*/
unsigned char data[];
} raxNode;
typedef struct rax {
raxNode *head;
uint64_t numele;
uint64_t numnodes;
} rax;
/* Stack data structure used by raxLowWalk() in order to, optionally, return
* a list of parent nodes to the caller. The nodes do not have a "parent"
* field for space concerns, so we use the auxiliary stack when needed. */
#define RAX_STACK_STATIC_ITEMS 32
typedef struct raxStack {
void **stack; /* Points to static_items or an heap allocated array. */
size_t items, maxitems; /* Number of items contained and total space. */
/* Up to RAXSTACK_STACK_ITEMS items we avoid to allocate on the heap
* and use this static array of pointers instead. */
void *static_items[RAX_STACK_STATIC_ITEMS];
int oom; /* True if pushing into this stack failed for OOM at some point. */
} raxStack;
/* Radix tree iterator state is encapsulated into this data structure. */
#define RAX_ITER_STATIC_LEN 128
#define RAX_ITER_JUST_SEEKED (1<<0) /* Iterator was just seeked. Return current
element for the first iteration and
clear the flag. */
#define RAX_ITER_EOF (1<<1) /* End of iteration reached. */
#define RAX_ITER_SAFE (1<<2) /* Safe iterator, allows operations while
iterating. But it is slower. */
typedef struct raxIterator {
int flags;
rax *rt; /* Radix tree we are iterating. */
unsigned char *key; /* The current string. */
void *data; /* Data associated to this key. */
size_t key_len; /* Current key length. */
size_t key_max; /* Max key len the current key buffer can hold. */
unsigned char key_static_string[RAX_ITER_STATIC_LEN];
raxNode *node; /* Current node. Only for unsafe iteration. */
raxStack stack; /* Stack used for unsafe iteration. */
} raxIterator;
/* A special pointer returned for not found items. */
extern void *raxNotFound;
/* Exported API. */
rax *raxNew(void);
int raxInsert(rax *rax, unsigned char *s, size_t len, void *data);
int raxRemove(rax *rax, unsigned char *s, size_t len);
void *raxFind(rax *rax, unsigned char *s, size_t len);
void raxFree(rax *rax);
void raxStart(raxIterator *it, rax *rt);
int raxSeek(raxIterator *it, unsigned char *ele, size_t len, const char *op);
int raxNext(raxIterator *it, unsigned char *stop, size_t stoplen, char *op);
int raxPrev(raxIterator *it, unsigned char *stop, size_t stoplen, char *op);
void raxStop(raxIterator *it);
void raxShow(rax *rax);
#endif

44
src/rax_malloc.h Normal file
View File

@ -0,0 +1,44 @@
/* Rax -- A radix tree implementation.
*
* Copyright (c) 2017, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
/* Allocator selection.
*
* This file is used in order to change the Rax allocator at compile time.
* Just define the following defines to what you want to use. Also add
* the include of your alternate allocator if needed (not needed in order
* to use the default libc allocator). */
#ifndef RAX_ALLOC_H
#define RAX_ALLOC_H
#include "zmalloc.h"
#define rax_malloc zmalloc
#define rax_realloc zrealloc
#define rax_free zfree
#endif

View File

@ -63,7 +63,9 @@ typedef long long mstime_t; /* millisecond time type. */
#include "util.h" /* Misc functions useful in many places */
#include "latency.h" /* Latency monitor API */
#include "sparkline.h" /* ASCII graphs API */
#include "quicklist.h"
#include "quicklist.h" /* Lists are encoded as linked lists of
N-elements flat arrays */
#include "rax.h" /* Radix tree */
/* Following includes allow test functions to be called from Redis main() */
#include "zipmap.h"