Refactor the per-slot dict-array db.c into a new kvstore data structure (#12822)

# Description
Gather most of the scattered `redisDb`-related code from the per-slot
dict PR (#11695) and turn it to a new data structure, `kvstore`. i.e.
it's a class that represents an array of dictionaries.

# Motivation
The main motivation is code cleanliness, the idea of using an array of
dictionaries is very well-suited to becoming a self-contained data
structure.
This allowed cleaning some ugly code, among others: loops that run twice
on the main dict and expires dict, and duplicate code for allocating and
releasing this data structure.

# Notes
1. This PR reverts the part of https://github.com/redis/redis/pull/12848
where the `rehashing` list is global (handling rehashing `dict`s is
under the responsibility of `kvstore`, and should not be managed by the
server)
2. This PR also replaces the type of `server.pubsubshard_channels` from
`dict**` to `kvstore` (original PR:
https://github.com/redis/redis/pull/12804). After that was done,
server.pubsub_channels was also chosen to be a `kvstore` (with only one
`dict`, which seems odd) just to make the code cleaner by making it the
same type as `server.pubsubshard_channels`, see
`pubsubtype.serverPubSubChannels`
3. the keys and expires kvstores are currenlty configured to allocate
the individual dicts only when the first key is added (unlike before, in
which they allocated them in advance), but they won't release them when
the last key is deleted.

Worth mentioning that due to the recent change the reply of DEBUG
HTSTATS changed, in case no keys were ever added to the db.

before:
```
127.0.0.1:6379> DEBUG htstats 9
[Dictionary HT]
Hash table 0 stats (main hash table):
No stats available for empty dictionaries
[Expires HT]
Hash table 0 stats (main hash table):
No stats available for empty dictionaries
```

after:
```
127.0.0.1:6379> DEBUG htstats 9
[Dictionary HT]
[Expires HT]
```
This commit is contained in:
guybe7 2024-02-05 22:21:35 +07:00 committed by GitHub
parent f20774eced
commit 8cd62f82ca
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1126 additions and 927 deletions

View File

@ -345,7 +345,7 @@ endif
REDIS_SERVER_NAME=redis-server$(PROG_SUFFIX)
REDIS_SENTINEL_NAME=redis-sentinel$(PROG_SUFFIX)
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
REDIS_CLI_NAME=redis-cli$(PROG_SUFFIX)
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o redisassert.o crcspeed.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
REDIS_BENCHMARK_NAME=redis-benchmark$(PROG_SUFFIX)

View File

@ -1903,12 +1903,6 @@ int ACLCheckAllPerm(client *c, int *idxptr) {
return ACLCheckAllUserCommandPerm(c->user, c->cmd, c->argv, c->argc, idxptr);
}
int totalSubscriptions(void) {
return dictSize(server.pubsub_patterns) +
dictSize(server.pubsub_channels) +
server.shard_channel_count;
}
/* If 'new' can access all channels 'original' could then return NULL;
Otherwise return a list of channels that the new user can access */
list *getUpcomingChannelList(user *new, user *original) {
@ -2017,7 +2011,7 @@ int ACLShouldKillPubsubClient(client *c, list *upcoming) {
* permissions specified via the upcoming argument, and kill them if so. */
void ACLKillPubsubClientsIfNeeded(user *new, user *original) {
/* Do nothing if there are no subscribers. */
if (totalSubscriptions() == 0)
if (pubsubTotalSubscriptions() == 0)
return;
list *channels = getUpcomingChannelList(new, original);
@ -2450,7 +2444,7 @@ sds ACLLoadFromFile(const char *filename) {
/* If there are some subscribers, we need to check if we need to drop some clients. */
rax *user_channels = NULL;
if (totalSubscriptions() > 0) {
if (pubsubTotalSubscriptions() > 0) {
user_channels = raxNew();
}

View File

@ -2244,7 +2244,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
int j;
long key_count = 0;
long long updated_time = 0;
dbIterator *dbit = NULL;
kvstoreIterator *kvs_it = NULL;
/* Record timestamp at the beginning of rewriting AOF. */
if (server.aof_timestamp_enabled) {
@ -2258,15 +2258,15 @@ int rewriteAppendOnlyFileRio(rio *aof) {
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = server.db + j;
if (dbSize(db, DB_MAIN) == 0) continue;
if (kvstoreSize(db->keys) == 0) continue;
/* SELECT the new DB */
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
dbit = dbIteratorInit(db, DB_MAIN);
kvs_it = kvstoreIteratorInit(db->keys);
/* Iterate this DB writing every entry */
while((de = dbIteratorNext(dbit)) != NULL) {
while((de = kvstoreIteratorNext(kvs_it)) != NULL) {
sds keystr;
robj key, *o;
long long expiretime;
@ -2331,12 +2331,12 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (server.rdb_key_save_delay)
debugDelay(server.rdb_key_save_delay);
}
dbReleaseIterator(dbit);
kvstoreIteratorRelease(kvs_it);
}
return C_OK;
werr:
if (dbit) dbReleaseIterator(dbit);
if (kvs_it) kvstoreIteratorRelease(kvs_it);
return C_ERR;
}

View File

@ -817,7 +817,7 @@ static int shouldReturnTlsInfo(void) {
}
unsigned int countKeysInSlot(unsigned int slot) {
return dictSize(server.db->dict[slot]);
return kvstoreDictSize(server.db->keys, slot);
}
void clusterCommandHelp(client *c) {
@ -919,7 +919,7 @@ void clusterCommand(client *c) {
addReplyArrayLen(c,numkeys);
dictIterator *iter = NULL;
dictEntry *de = NULL;
iter = dictGetIterator(server.db->dict[slot]);
iter = kvstoreDictGetIterator(server.db->keys, slot);
for (unsigned int i = 0; i < numkeys; i++) {
de = dictNext(iter);
serverAssert(de != NULL);

View File

@ -5104,7 +5104,7 @@ int verifyClusterConfigWithData(void) {
/* Make sure we only have keys in DB0. */
for (j = 1; j < server.dbnum; j++) {
if (dbSize(&server.db[j], DB_MAIN)) return C_ERR;
if (kvstoreSize(server.db[j].keys)) return C_ERR;
}
/* Check that all the slots we see populated memory have a corresponding
@ -5140,7 +5140,7 @@ int verifyClusterConfigWithData(void) {
/* Remove all the shard channel related information not owned by the current shard. */
static inline void removeAllNotOwnedShardChannelSubscriptions(void) {
if (!server.shard_channel_count) return;
if (!kvstoreSize(server.pubsubshard_channels)) return;
clusterNode *currmaster = clusterNodeIsMaster(myself) ? myself : myself->slaveof;
for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (server.cluster->slots[j] != currmaster) {
@ -5734,16 +5734,17 @@ void removeChannelsInSlot(unsigned int slot) {
pubsubShardUnsubscribeAllChannelsInSlot(slot);
}
/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
if (!kvstoreDictSize(server.db->keys, hashslot))
return 0;
unsigned int j = 0;
dictIterator *iter = NULL;
dictEntry *de = NULL;
iter = dictGetSafeIterator(server.db->dict[hashslot]);
iter = kvstoreDictGetSafeIterator(server.db->keys, hashslot);
while((de = dictNext(iter)) != NULL) {
enterExecutionUnit(1, 0);
sds sdskey = dictGetKey(de);
@ -5768,8 +5769,7 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
/* Get the count of the channels for a given slot. */
unsigned int countChannelsInSlot(unsigned int hashslot) {
dict *d = server.pubsubshard_channels[hashslot];
return d ? dictSize(d) : 0;
return kvstoreDictSize(server.pubsubshard_channels, hashslot);
}
int clusterNodeIsMyself(clusterNode *n) {
@ -5939,7 +5939,7 @@ int clusterCommandSpecial(client *c) {
}
} else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
/* CLUSTER FLUSHSLOTS */
if (dbSize(&server.db[0], DB_MAIN) != 0) {
if (kvstoreSize(server.db[0].keys) != 0) {
addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
return 1;
}
@ -6205,7 +6205,7 @@ int clusterCommandSpecial(client *c) {
* slots nor keys to accept to replicate some other node.
* Slaves can switch to another master without issues. */
if (clusterNodeIsMaster(myself) &&
(myself->numslots != 0 || dbSize(&server.db[0], DB_MAIN) != 0)) {
(myself->numslots != 0 || kvstoreSize(server.db[0].keys) != 0)) {
addReplyError(c,
"To set a master the node must be empty and "
"without assigned slots.");
@ -6339,7 +6339,7 @@ int clusterCommandSpecial(client *c) {
/* Slaves can be reset while containing data, but not master nodes
* that must be empty. */
if (clusterNodeIsMaster(myself) && dbSize(c->db, DB_MAIN) != 0) {
if (clusterNodeIsMaster(myself) && kvstoreSize(c->db->keys) != 0) {
addReplyError(c,"CLUSTER RESET can't be called with "
"master nodes containing keys");
return 1;

600
src/db.c
View File

@ -33,20 +33,10 @@
#include "latency.h"
#include "script.h"
#include "functions.h"
#include "cluster.h"
#include <signal.h>
#include <ctype.h>
/* Structure for DB iterator that allows iterating across multiple slot specific dictionaries in cluster mode. */
struct dbIterator {
redisDb *db;
int slot;
int next_slot;
dictIterator di;
dbKeyType keyType;
};
/*-----------------------------------------------------------------------------
* C-level DB API
*----------------------------------------------------------------------------*/
@ -57,79 +47,8 @@ struct dbIterator {
int expireIfNeeded(redisDb *db, robj *key, int flags);
int keyIsExpired(redisDb *db, robj *key);
void cumulativeKeyCountAdd(redisDb *db, int idx, long delta, dbKeyType keyType);
static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEntry *de);
dict *dbGetDictFromIterator(dbIterator *dbit) {
if (dbit->keyType == DB_MAIN)
return dbit->db->dict[dbit->slot];
else if (dbit->keyType == DB_EXPIRES)
return dbit->db->expires[dbit->slot];
else
serverPanic("Unknown keyType");
}
/* Returns next dictionary from the iterator, or NULL if iteration is complete. */
dict *dbIteratorNextDict(dbIterator *dbit) {
if (dbit->next_slot == -1) return NULL;
dbit->slot = dbit->next_slot;
dbit->next_slot = dbGetNextNonEmptySlot(dbit->db, dbit->slot, dbit->keyType);
return dbGetDictFromIterator(dbit);
}
int dbIteratorGetCurrentSlot(dbIterator *dbit) {
serverAssert(dbit->slot >= 0 && dbit->slot < CLUSTER_SLOTS);
return dbit->slot;
}
/* Returns next entry from the multi slot db. */
dictEntry *dbIteratorNext(dbIterator *dbit) {
dictEntry *de = dbit->di.d ? dictNext(&dbit->di) : NULL;
if (!de) { /* No current dict or reached the end of the dictionary. */
dict *d = dbIteratorNextDict(dbit);
if (!d) return NULL;
if (dbit->di.d) {
/* Before we move to the next dict, reset the iter of the previous dict. */
dictIterator *iter = &dbit->di;
dictResetIterator(iter);
}
dictInitSafeIterator(&dbit->di, d);
de = dictNext(&dbit->di);
}
return de;
}
/* Returns DB iterator that can be used to iterate through sub-dictionaries.
* Primary database contains only one dictionary when node runs without cluster mode,
* or 16k dictionaries (one per slot) when node runs with cluster mode enabled.
*
* The caller should free the resulting dbit with dbReleaseIterator. */
dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType) {
dbIterator *dbit = zmalloc(sizeof(*dbit));
dbit->db = db;
dbit->slot = -1;
dbit->keyType = keyType;
dbit->next_slot = findSlotByKeyIndex(dbit->db, 1, dbit->keyType); /* Finds first non-empty slot. */
dictInitSafeIterator(&dbit->di, NULL);
return dbit;
}
/* Free the dbit returned by dbIteratorInit. */
void dbReleaseIterator(dbIterator *dbit) {
dictIterator *iter = &dbit->di;
dictResetIterator(iter);
zfree(dbit);
}
/* Returns next non-empty slot strictly after given one, or -1 if provided slot is the last one. */
int dbGetNextNonEmptySlot(redisDb *db, int slot, dbKeyType keyType) {
unsigned long long next_key = cumulativeKeyCountRead(db, slot, keyType) + 1;
return next_key <= dbSize(db, keyType) ? findSlotByKeyIndex(db, next_key, keyType) : -1;
}
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */
@ -167,7 +86,7 @@ void updateLFU(robj *val) {
* expired on replicas even if the master is lagging expiring our key via DELs
* in the replication link. */
robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dbFind(db, key->ptr, DB_MAIN);
dictEntry *de = dbFind(db, key->ptr);
robj *val = NULL;
if (de) {
val = dictGetVal(de);
@ -274,17 +193,15 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
static void dbAddInternal(redisDb *db, robj *key, robj *val, int update_if_existing) {
dictEntry *existing;
int slot = getKeySlot(key->ptr);
dict *d = db->dict[slot];
dictEntry *de = dictAddRaw(d, key->ptr, &existing);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key->ptr, &existing);
if (update_if_existing && existing) {
dbSetValue(db, key, val, 1, existing);
return;
}
serverAssertWithInfo(NULL, key, de != NULL);
dictSetKey(d, de, sdsdup(key->ptr));
kvstoreDictSetKey(db->keys, slot, de, sdsdup(key->ptr));
initObjectLRUOrLFU(val);
dictSetVal(d, de, val);
cumulativeKeyCountAdd(db, slot, 1, DB_MAIN);
kvstoreDictSetVal(db->keys, slot, de, val);
signalKeyAsReady(db, key, val->type);
notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id);
}
@ -329,12 +246,10 @@ int getKeySlot(sds key) {
* caller to free the SDS string. */
int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
int slot = getKeySlot(key);
dict *d = db->dict[slot];
dictEntry *de = dictAddRaw(d, key, NULL);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key, NULL);
if (de == NULL) return 0;
initObjectLRUOrLFU(val);
dictSetVal(d, de, val);
cumulativeKeyCountAdd(db, slot, 1, DB_MAIN);
kvstoreDictSetVal(db->keys, slot, de, val);
return 1;
}
@ -351,7 +266,8 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
*
* The program is aborted if the key was not already present. */
static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEntry *de) {
if (!de) de = dbFind(db, key->ptr, DB_MAIN);
int slot = getKeySlot(key->ptr);
if (!de) de = kvstoreDictFind(db->keys, slot, key->ptr);
serverAssertWithInfo(NULL,key,de != NULL);
robj *old = dictGetVal(de);
@ -371,14 +287,11 @@ static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEnt
/* Because of RM_StringDMA, old may be changed, so we need get old again */
old = dictGetVal(de);
}
dict *d = db->dict[getKeySlot(key->ptr)];
dictSetVal(d, de, val);
kvstoreDictSetVal(db->keys, slot, de, val);
if (server.lazyfree_lazy_server_del) {
freeObjAsync(key,old,db->id);
} else {
/* This is just decrRefCount(old); */
d->type->valDestructor(d, old);
decrRefCount(old);
}
}
@ -430,18 +343,18 @@ void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) {
robj *dbRandomKey(redisDb *db) {
dictEntry *de;
int maxtries = 100;
int allvolatile = dbSize(db, DB_MAIN) == dbSize(db, DB_EXPIRES);
int allvolatile = kvstoreSize(db->keys) == kvstoreSize(db->expires);
while(1) {
sds key;
robj *keyobj;
int randomSlot = getFairRandomSlot(db, DB_MAIN);
de = dictGetFairRandomKey(db->dict[randomSlot]);
int randomSlot = kvstoreGetFairRandomDictIndex(db->keys);
de = kvstoreDictGetFairRandomKey(db->keys, randomSlot);
if (de == NULL) return NULL;
key = dictGetKey(de);
keyobj = createStringObject(key,sdslen(key));
if (dbFind(db, key, DB_EXPIRES)) {
if (dbFindExpires(db, key)) {
if (allvolatile && server.masterhost && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
@ -462,102 +375,12 @@ robj *dbRandomKey(redisDb *db) {
}
}
/* Updates binary index tree (also known as Fenwick tree), increasing key count for a given slot.
* You can read more about this data structure here https://en.wikipedia.org/wiki/Fenwick_tree
* Time complexity is O(log(CLUSTER_SLOTS)). */
void cumulativeKeyCountAdd(redisDb *db, int slot, long delta, dbKeyType keyType) {
db->sub_dict[keyType].key_count += delta;
dict *d = (keyType == DB_MAIN ? db->dict[slot] : db->expires[slot]);
if (dictSize(d) == 1)
db->sub_dict[keyType].non_empty_slots++;
if (dictSize(d) == 0)
db->sub_dict[keyType].non_empty_slots--;
/* BIT does not need to be calculated when the cluster is turned off. */
if (!server.cluster_enabled) return;
int idx = slot + 1; /* Unlike slots, BIT is 1-based, so we need to add 1. */
while (idx <= CLUSTER_SLOTS) {
if (delta < 0) {
serverAssert(db->sub_dict[keyType].slot_size_index[idx] >= (unsigned long long)labs(delta));
}
db->sub_dict[keyType].slot_size_index[idx] += delta;
idx += (idx & -idx);
}
}
/* Returns total (cumulative) number of keys up until given slot (inclusive).
* Time complexity is O(log(CLUSTER_SLOTS)). */
unsigned long long cumulativeKeyCountRead(redisDb *db, int slot, dbKeyType keyType) {
if (!server.cluster_enabled) {
serverAssert(slot == 0);
return dbSize(db, keyType);
}
int idx = slot + 1;
unsigned long long sum = 0;
while (idx > 0) {
sum += db->sub_dict[keyType].slot_size_index[idx];
idx -= (idx & -idx);
}
return sum;
}
/* Returns fair random slot, probability of each slot being returned is proportional to the number of elements that slot dictionary holds.
* This function guarantees that it returns a slot whose dict is non-empty, unless the entire db is empty.
* Time complexity of this function is O(log(CLUSTER_SLOTS)). */
int getFairRandomSlot(redisDb *db, dbKeyType keyType) {
unsigned long target = dbSize(db, keyType) ? (randomULong() % dbSize(db, keyType)) + 1 : 0;
int slot = findSlotByKeyIndex(db, target, keyType);
return slot;
}
/* Finds a slot containing target element in a key space ordered by slot id.
* Consider this example. Slots are represented by brackets and keys by dots:
* #0 #1 #2 #3 #4
* [..][....][...][.......][.]
* ^
* target
*
* In this case slot #3 contains key that we are trying to find.
*
* The return value is 0 based slot, and the range of the target is [1..dbSize], dbSize inclusive.
*
* To find the slot, we start with the root node of the binary index tree and search through its children
* from the highest index (2^14 in our case) to the lowest index. At each node, we check if the target
* value is greater than the node's value. If it is, we remove the node's value from the target and recursively
* search for the new target using the current node as the parent.
* Time complexity of this function is O(log(CLUSTER_SLOTS))
*/
int findSlotByKeyIndex(redisDb *db, unsigned long target, dbKeyType keyType) {
if (!server.cluster_enabled || dbSize(db, keyType) == 0) return 0;
serverAssert(target <= dbSize(db, keyType));
int result = 0, bit_mask = 1 << CLUSTER_SLOT_MASK_BITS;
for (int i = bit_mask; i != 0; i >>= 1) {
int current = result + i;
/* When the target index is greater than 'current' node value the we will update
* the target and search in the 'current' node tree. */
if (target > db->sub_dict[keyType].slot_size_index[current]) {
target -= db->sub_dict[keyType].slot_size_index[current];
result = current;
}
}
/* Adjust the result to get the correct slot:
* 1. result += 1;
* After the calculations, the index of target in slot_size_index should be the next one,
* so we should add 1.
* 2. result -= 1;
* Unlike BIT(slot_size_index is 1-based), slots are 0-based, so we need to subtract 1.
* As the addition and subtraction cancel each other out, we can simply return the result. */
return result;
}
/* Helper for sync and async delete. */
int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
dictEntry **plink;
int table;
int slot = getKeySlot(key->ptr);
dict *d = db->dict[slot];
dictEntry *de = dictTwoPhaseUnlinkFind(d,key->ptr,&plink,&table);
dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &plink, &table);
if (de) {
robj *val = dictGetVal(de);
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
@ -573,17 +396,13 @@ int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
if (async) {
/* Because of dbUnshareStringValue, the val in de may change. */
freeObjAsync(key, dictGetVal(de), db->id);
dictSetVal(d, de, NULL);
kvstoreDictSetVal(db->keys, slot, de, NULL);
}
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires[slot]) > 0) {
if (dictDelete(db->expires[slot],key->ptr) == DICT_OK) {
cumulativeKeyCountAdd(db, slot, -1, DB_EXPIRES);
}
}
dictTwoPhaseUnlinkFree(d,de,plink,table);
cumulativeKeyCountAdd(db, slot, -1, DB_MAIN);
kvstoreDictDelete(db->expires, slot, key->ptr);
kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, de, plink, table);
return 1;
} else {
return 0;
@ -665,40 +484,16 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
}
for (int j = startdb; j <= enddb; j++) {
removed += dbSize(&dbarray[j], DB_MAIN);
removed += kvstoreSize(dbarray[j].keys);
if (async) {
emptyDbAsync(&dbarray[j]);
} else {
dbDictMetadata *metadata;
for (int k = 0; k < dbarray[j].dict_count; k++) {
dictEmpty(dbarray[j].dict[k],callback);
metadata = (dbDictMetadata *)dictMetadata(dbarray[j].dict[k]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}
dictEmpty(dbarray[j].expires[k],callback);
metadata = (dbDictMetadata *)dictMetadata(dbarray[j].expires[k]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}
}
kvstoreEmpty(dbarray[j].keys, callback);
kvstoreEmpty(dbarray[j].expires, callback);
}
/* Because all keys of database are removed, reset average ttl. */
dbarray[j].avg_ttl = 0;
dbarray[j].expires_cursor = 0;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
dbarray[j].sub_dict[subdict].non_empty_slots = 0;
dbarray[j].sub_dict[subdict].key_count = 0;
dbarray[j].sub_dict[subdict].resize_cursor = 0;
if (server.cluster_enabled) {
dbarray[j].sub_dict[subdict].bucket_count = 0;
unsigned long long *slot_size_index = dbarray[j].sub_dict[subdict].slot_size_index;
memset(slot_size_index, 0, sizeof(unsigned long long) * (CLUSTER_SLOTS + 1));
}
}
}
return removed;
@ -764,12 +559,9 @@ redisDb *initTempDb(void) {
redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum);
for (int i=0; i<server.dbnum; i++) {
tempDb[i].id = i;
tempDb[i].dict_count = (server.cluster_enabled) ? CLUSTER_SLOTS : 1;
tempDb[i].dict = dictCreateMultiple(&dbDictType, tempDb[i].dict_count);
tempDb[i].expires = dictCreateMultiple(&dbExpiresDictType, tempDb[i].dict_count);
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
tempDb[i].sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL;
}
int slotCountBits = server.cluster_enabled? CLUSTER_SLOT_MASK_BITS : 0;
tempDb[i].keys = kvstoreCreate(&dbDictType, slotCountBits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
tempDb[i].expires = kvstoreCreate(&dbExpiresDictType, slotCountBits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
}
return tempDb;
@ -782,17 +574,8 @@ void discardTempDb(redisDb *tempDb, void(callback)(dict*)) {
/* Release temp DBs. */
emptyDbStructure(tempDb, -1, async, callback);
for (int i=0; i<server.dbnum; i++) {
for (int j=0; j<tempDb[i].dict_count; j++) {
dictRelease(tempDb[i].dict[j]);
dictRelease(tempDb[i].expires[j]);
}
zfree(tempDb[i].dict);
zfree(tempDb[i].expires);
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
if (server.cluster_enabled) {
zfree(tempDb[i].sub_dict[subdict].slot_size_index);
}
}
kvstoreRelease(tempDb[i].keys);
kvstoreRelease(tempDb[i].expires);
}
zfree(tempDb);
@ -809,7 +592,7 @@ long long dbTotalServerKeyCount(void) {
long long total = 0;
int j;
for (j = 0; j < server.dbnum; j++) {
total += dbSize(&server.db[j], DB_MAIN);
total += kvstoreSize(server.db[j].keys);
}
return total;
}
@ -1011,21 +794,26 @@ void keysCommand(client *c) {
dictEntry *de;
sds pattern = c->argv[1]->ptr;
int plen = sdslen(pattern), allkeys, pslot = -1;
long numkeys = 0;
unsigned long numkeys = 0;
void *replylen = addReplyDeferredLen(c);
allkeys = (pattern[0] == '*' && plen == 1);
if (server.cluster_enabled && !allkeys) {
pslot = patternHashSlot(pattern, plen);
}
dictIterator *di = NULL;
dbIterator *dbit = NULL;
kvstoreIterator *kvs_it = NULL;
if (pslot != -1) {
di = dictGetSafeIterator(c->db->dict[pslot]);
if (!kvstoreDictSize(c->db->keys, pslot)) {
/* Requested slot is empty */
setDeferredArrayLen(c,replylen,0);
return;
}
di = kvstoreDictGetSafeIterator(c->db->keys, pslot);
} else {
dbit = dbIteratorInit(c->db, DB_MAIN);
kvs_it = kvstoreIteratorInit(c->db->keys);
}
robj keyobj;
while ((de = di ? dictNext(di) : dbIteratorNext(dbit)) != NULL) {
while ((de = di ? dictNext(di) : kvstoreIteratorNext(kvs_it)) != NULL) {
sds key = dictGetKey(de);
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
@ -1040,8 +828,8 @@ void keysCommand(client *c) {
}
if (di)
dictReleaseIterator(di);
if (dbit)
dbReleaseIterator(dbit);
if (kvs_it)
kvstoreIteratorRelease(kvs_it);
setDeferredArrayLen(c,replylen,numkeys);
}
@ -1312,15 +1100,15 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
};
/* A pattern may restrict all matching keys to one cluster slot. */
int onlyslot = -1;
int onlydidx = -1;
if (o == NULL && use_pattern && server.cluster_enabled) {
onlyslot = patternHashSlot(pat, patlen);
onlydidx = patternHashSlot(pat, patlen);
}
do {
/* In cluster mode there is a separate dictionary for each slot.
* If cursor is empty, we should try exploring next non-empty slot. */
if (o == NULL) {
cursor = dbScan(c->db, DB_MAIN, cursor, onlyslot, scanCallback, NULL, &data);
cursor = kvstoreScan(c->db->keys, cursor, onlydidx, scanCallback, NULL, &data);
} else {
cursor = dictScan(ht, cursor, scanCallback, &data);
}
@ -1412,20 +1200,6 @@ void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
listRelease(keys);
}
void addSlotIdToCursor(int slot, unsigned long long *cursor) {
if (!server.cluster_enabled) return;
/* Slot id can be -1 when iteration is over and there are no more slots to visit. */
if (slot < 0) return;
*cursor = (*cursor << CLUSTER_SLOT_MASK_BITS) | slot;
}
int getAndClearSlotIdFromCursor(unsigned long long *cursor) {
if (!server.cluster_enabled) return 0;
int slot = (int) (*cursor & CLUSTER_SLOT_MASK);
*cursor = *cursor >> CLUSTER_SLOT_MASK_BITS;
return slot;
}
/* The SCAN command completely relies on scanGenericCommand. */
void scanCommand(client *c) {
unsigned long long cursor;
@ -1434,113 +1208,7 @@ void scanCommand(client *c) {
}
void dbsizeCommand(client *c) {
redisDb *db = c->db;
unsigned long long int size = dbSize(db, DB_MAIN);
addReplyLongLong(c, size);
}
unsigned long long int dbSize(redisDb *db, dbKeyType keyType) {
return db->sub_dict[keyType].key_count;
}
int dbNonEmptySlots(redisDb *db, dbKeyType keyType) {
return db->sub_dict[keyType].non_empty_slots;
}
/* This method provides the cumulative sum of all the dictionary buckets
* across dictionaries in a database. */
unsigned long dbBuckets(redisDb *db, dbKeyType keyType) {
if (server.cluster_enabled) {
return db->sub_dict[keyType].bucket_count;
} else {
if (keyType == DB_MAIN)
return dictBuckets(db->dict[0]);
else if (keyType == DB_EXPIRES)
return dictBuckets(db->expires[0]);
else
serverPanic("Unknown keyType");
}
}
size_t dbMemUsage(redisDb *db, dbKeyType keyType) {
size_t mem = 0;
unsigned long long keys_count = dbSize(db, keyType);
mem += keys_count * dictEntryMemUsage() +
dbBuckets(db, keyType) * sizeof(dictEntry*) +
db->dict_count * (sizeof(dict) + dictMetadataSize(db->dict[0]));
if (keyType == DB_MAIN) {
mem+=keys_count * sizeof(robj);
}
return mem;
}
dictEntry *dbFind(redisDb *db, void *key, dbKeyType keyType){
int slot = getKeySlot(key);
if (keyType == DB_MAIN)
return dictFind(db->dict[slot], key);
else if (keyType == DB_EXPIRES)
return dictFind(db->expires[slot], key);
else
serverPanic("Unknown keyType");
}
/*
* This method is used to iterate over the elements of the entire database specifically across slots.
* It's a three pronged approach.
*
* 1. It uses the provided cursor `v` to retrieve the slot from it.
* 2. If the dictionary is in a valid state checked through the provided callback `dictScanValidFunction`,
* it performs a dictScan over the appropriate `keyType` dictionary of `db`.
* 3. If the slot is entirely scanned i.e. the cursor has reached 0, the next non empty slot is discovered.
* The slot information is embedded into the cursor and returned.
*
* To restrict the scan to a single cluster slot, pass a valid slot as
* 'onlyslot', otherwise pass -1.
*/
unsigned long long dbScan(redisDb *db, dbKeyType keyType, unsigned long long v,
int onlyslot, dictScanFunction *fn,
int (dictScanValidFunction)(dict *d), void *privdata) {
dict *d;
unsigned long long cursor = 0;
/* During main dictionary traversal in cluster mode, 48 lower bits in the cursor are used for positioning in the HT.
* Following 14 bits are used for the slot number, ranging from 0 to 2^14-1.
* Slot is always 0 at the start of iteration and can be incremented only in cluster mode. */
int slot = getAndClearSlotIdFromCursor(&v);
if (onlyslot >= 0) {
if (slot < onlyslot) {
/* Fast-forward to onlyslot. */
serverAssert(onlyslot < CLUSTER_SLOTS);
slot = onlyslot;
v = 0;
} else if (slot > onlyslot) {
/* The cursor is already past onlyslot. */
return 0;
}
}
if (keyType == DB_MAIN)
d = db->dict[slot];
else if (keyType == DB_EXPIRES)
d = db->expires[slot];
else
serverPanic("Unknown keyType");
int is_dict_valid = (dictScanValidFunction == NULL || dictScanValidFunction(d) == C_OK);
if (is_dict_valid) {
cursor = dictScan(d, v, fn, privdata);
} else {
serverLog(LL_DEBUG, "Slot [%d] not valid for scanning, skipping.", slot);
}
/* scanning done for the current dictionary or if the scanning wasn't possible, move to the next slot. */
if (cursor == 0 || !is_dict_valid) {
if (onlyslot >= 0)
return 0;
slot = dbGetNextNonEmptySlot(db, slot, keyType);
}
if (slot == -1) {
return 0;
}
addSlotIdToCursor(slot, &cursor);
return cursor;
addReplyLongLong(c,kvstoreSize(c->db->keys));
}
void lastsaveCommand(client *c) {
@ -1837,7 +1505,7 @@ void scanDatabaseForReadyKeys(redisDb *db) {
dictIterator *di = dictGetSafeIterator(db->blocking_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
dictEntry *kde = dbFind(db, key->ptr, DB_MAIN);
dictEntry *kde = dbFind(db, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
signalKeyAsReady(db, key, value->type);
@ -1857,7 +1525,7 @@ void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) {
int existed = 0, exists = 0;
int original_type = -1, curr_type = -1;
dictEntry *kde = dbFind(emptied, key->ptr, DB_MAIN);
dictEntry *kde = dbFind(emptied, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
original_type = value->type;
@ -1865,7 +1533,7 @@ void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) {
}
if (replaced_with) {
kde = dbFind(replaced_with, key->ptr, DB_MAIN);
kde = dbFind(replaced_with, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
curr_type = value->type;
@ -1906,31 +1574,15 @@ int dbSwapDatabases(int id1, int id2) {
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since we want clients to
* remain in the same DB they were. */
db1->dict = db2->dict;
db1->keys = db2->keys;
db1->expires = db2->expires;
db1->avg_ttl = db2->avg_ttl;
db1->expires_cursor = db2->expires_cursor;
db1->dict_count = db2->dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
db1->sub_dict[subdict].key_count = db2->sub_dict[subdict].key_count;
db1->sub_dict[subdict].bucket_count = db2->sub_dict[subdict].bucket_count;
db1->sub_dict[subdict].non_empty_slots = db2->sub_dict[subdict].non_empty_slots;
db1->sub_dict[subdict].resize_cursor = db2->sub_dict[subdict].resize_cursor;
db1->sub_dict[subdict].slot_size_index = db2->sub_dict[subdict].slot_size_index;
}
db2->dict = aux.dict;
db2->keys = aux.keys;
db2->expires = aux.expires;
db2->avg_ttl = aux.avg_ttl;
db2->expires_cursor = aux.expires_cursor;
db2->dict_count = aux.dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
db2->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count;
db2->sub_dict[subdict].bucket_count = aux.sub_dict[subdict].bucket_count;
db2->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots;
db2->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor;
db2->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index;
}
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
@ -1964,31 +1616,16 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since clients
* remain in the same DB they were. */
activedb->dict = newdb->dict;
activedb->keys = newdb->keys;
activedb->expires = newdb->expires;
activedb->avg_ttl = newdb->avg_ttl;
activedb->expires_cursor = newdb->expires_cursor;
activedb->dict_count = newdb->dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
activedb->sub_dict[subdict].key_count = newdb->sub_dict[subdict].key_count;
activedb->sub_dict[subdict].bucket_count = newdb->sub_dict[subdict].bucket_count;
activedb->sub_dict[subdict].non_empty_slots = newdb->sub_dict[subdict].non_empty_slots;
activedb->sub_dict[subdict].resize_cursor = newdb->sub_dict[subdict].resize_cursor;
activedb->sub_dict[subdict].slot_size_index = newdb->sub_dict[subdict].slot_size_index;
}
newdb->dict = aux.dict;
newdb->keys = aux.keys;
newdb->expires = aux.expires;
newdb->avg_ttl = aux.avg_ttl;
newdb->expires_cursor = aux.expires_cursor;
newdb->dict_count = aux.dict_count;
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
newdb->sub_dict[subdict].key_count = aux.sub_dict[subdict].key_count;
newdb->sub_dict[subdict].bucket_count = aux.sub_dict[subdict].bucket_count;
newdb->sub_dict[subdict].non_empty_slots = aux.sub_dict[subdict].non_empty_slots;
newdb->sub_dict[subdict].resize_cursor = aux.sub_dict[subdict].resize_cursor;
newdb->sub_dict[subdict].slot_size_index = aux.sub_dict[subdict].slot_size_index;
}
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
* X in a given DB, may now actually be unblocked if X happens
@ -2041,13 +1678,7 @@ void swapdbCommand(client *c) {
*----------------------------------------------------------------------------*/
int removeExpire(redisDb *db, robj *key) {
int slot = getKeySlot(key->ptr);
if (dictDelete(db->expires[slot],key->ptr) == DICT_OK) {
cumulativeKeyCountAdd(db, slot, -1, DB_EXPIRES);
return 1;
} else {
return 0;
}
return kvstoreDictDelete(db->expires, getKeySlot(key->ptr), key->ptr) == DICT_OK;
}
/* Set an expire to the specified key. If the expire is set in the context
@ -2058,15 +1689,14 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de, *existing;
/* Reuse the sds from the main dict in the expire dict */
kde = dbFind(db, key->ptr, DB_MAIN);
serverAssertWithInfo(NULL,key,kde != NULL);
int slot = getKeySlot(key->ptr);
de = dictAddRaw(db->expires[slot], dictGetKey(kde), &existing);
kde = kvstoreDictFind(db->keys, slot, key->ptr);
serverAssertWithInfo(NULL,key,kde != NULL);
de = kvstoreDictAddRaw(db->expires, slot, dictGetKey(kde), &existing);
if (existing) {
dictSetSignedIntegerVal(existing, when);
} else {
dictSetSignedIntegerVal(de, when);
cumulativeKeyCountAdd(db, slot, 1, DB_EXPIRES);
}
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
@ -2079,9 +1709,8 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) {
long long getExpire(redisDb *db, robj *key) {
dictEntry *de;
/* No expire? return ASAP */
if (dictSize(db->expires[getKeySlot(key->ptr)]) == 0 ||
(de = dbFind(db,key->ptr, DB_EXPIRES)) == NULL) return -1;
if ((de = dbFindExpires(db, key->ptr)) == NULL)
return -1;
return dictGetSignedIntegerVal(de);
}
@ -2228,6 +1857,13 @@ int expireIfNeeded(redisDb *db, robj *key, int flags) {
return 1;
}
/* CB passed to kvstoreExpand.
* The purpose is to skip expansion of unused dicts in cluster mode (all
* dicts not mapped to *my* slots) */
static int dbExpandSkipSlot(int slot) {
return !clusterNodeCoversSlot(getMyClusterNode(), slot);
}
/*
* This functions increases size of the main/expires db to match desired number.
* In cluster mode resizes all individual dictionaries for slots that this node owns.
@ -2238,47 +1874,48 @@ int expireIfNeeded(redisDb *db, robj *key, int flags) {
* `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in
* `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed.
*/
int dbExpand(const redisDb *db, uint64_t db_size, dbKeyType keyType, int try_expand) {
dict *d;
static int dbExpandGeneric(kvstore *kvs, uint64_t db_size, int try_expand) {
int ret;
if (server.cluster_enabled) {
/* We don't know exact number of keys that would fall into each slot, but we can
* approximate it, assuming even distribution, divide it by the number of slots. */
int slots = getMyShardSlotCount();
if (slots == 0) return C_OK;
db_size = db_size / slots;
for (int i = 0; i < CLUSTER_SLOTS; i++) {
if (clusterNodeCoversSlot(getMyClusterNode(), i)) {
if (keyType == DB_MAIN) {
d = db->dict[i];
} else {
d = db->expires[i];
}
int result = try_expand ? dictTryExpand(d, db_size) : dictExpand(d, db_size);
if (try_expand && result == DICT_ERR) {
serverLog(LL_WARNING, "Dict expansion failed for db type: %s, slot: %d",
keyType == DB_MAIN ? "main" : "expires", i);
return C_ERR;
} else if (result == DICT_ERR) {
serverLog(LL_DEBUG, "Dict expansion skipped for db type: %s, slot: %d",
keyType == DB_MAIN ? "main" : "expires", i);
}
}
}
ret = kvstoreExpand(kvs, db_size, try_expand, dbExpandSkipSlot);
} else {
if (keyType == DB_MAIN) {
d = db->dict[0];
} else {
d = db->expires[0];
}
int result = try_expand ? dictTryExpand(d, db_size) : dictExpand(d, db_size);
if (try_expand && result == DICT_ERR) {
serverLog(LL_WARNING, "Dict expansion failed for db type: %s",
keyType == DB_MAIN ? "main" : "expires");
return C_ERR;
}
ret = kvstoreExpand(kvs, db_size, try_expand, NULL);
}
return C_OK;
return ret? C_OK : C_ERR;
}
int dbExpand(redisDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->keys, db_size, try_expand);
}
int dbExpandExpires(redisDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->expires, db_size, try_expand);
}
static dictEntry *dbFindGeneric(kvstore *kvs, void *key) {
return kvstoreDictFind(kvs, getKeySlot(key), key);
}
dictEntry *dbFind(redisDb *db, void *key) {
return dbFindGeneric(db->keys, key);
}
dictEntry *dbFindExpires(redisDb *db, void *key) {
return dbFindGeneric(db->expires, key);
}
unsigned long long dbSize(redisDb *db) {
return kvstoreSize(db->keys);
}
unsigned long long dbScan(redisDb *db, unsigned long long cursor, dictScanFunction *scan_cb, void *privdata) {
return kvstoreScan(db->keys, cursor, -1, scan_cb, NULL, privdata);
}
/* -----------------------------------------------------------------------------
@ -3020,42 +2657,3 @@ int bitfieldGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResu
}
return 1;
}
void dbGetStats(char *buf, size_t bufsize, redisDb *db, int full, dbKeyType keyType) {
size_t l;
char *orig_buf = buf;
size_t orig_bufsize = bufsize;
dictStats *mainHtStats = NULL;
dictStats *rehashHtStats = NULL;
dict *d;
dbIterator *dbit = dbIteratorInit(db, keyType);
while ((d = dbIteratorNextDict(dbit))) {
dictStats *stats = dictGetStatsHt(d, 0, full);
if (!mainHtStats) {
mainHtStats = stats;
} else {
dictCombineStats(stats, mainHtStats);
dictFreeStats(stats);
}
if (dictIsRehashing(d)) {
stats = dictGetStatsHt(d, 1, full);
if (!rehashHtStats) {
rehashHtStats = stats;
} else {
dictCombineStats(stats, rehashHtStats);
dictFreeStats(stats);
}
}
}
dbReleaseIterator(dbit);
l = dictGetStatsMsg(buf, bufsize, mainHtStats, full);
dictFreeStats(mainHtStats);
buf += l;
bufsize -= l;
if (rehashHtStats && bufsize > 0) {
dictGetStatsMsg(buf, bufsize, rehashHtStats, full);
dictFreeStats(rehashHtStats);
}
/* Make sure there is a NULL term at the end. */
if (orig_bufsize) orig_buf[orig_bufsize - 1] = '\0';
}

View File

@ -76,7 +76,6 @@ int bugReportStart(void);
void printCrashReport(void);
void bugReportEnd(int killViaSignal, int sig);
void logStackTrace(void *eip, int uplevel, int current_thread);
void dbGetStats(char *buf, size_t bufsize, redisDb *db, int full, dbKeyType keyType);
void sigalrmSignalHandler(int sig, siginfo_t *info, void *secret);
/* ================================= Debugging ============================== */
@ -290,15 +289,16 @@ void computeDatasetDigest(unsigned char *final) {
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
if (dbSize(db, DB_MAIN) == 0) continue;
dbIterator *dbit = dbIteratorInit(db, DB_MAIN);
if (kvstoreSize(db->keys) == 0)
continue;
kvstoreIterator *kvs_it = kvstoreIteratorInit(db->keys);
/* hash the DB id, so the same dataset moved in a different DB will lead to a different digest */
aux = htonl(j);
mixDigest(final,&aux,sizeof(aux));
/* Iterate this DB writing every entry */
while((de = dbIteratorNext(dbit)) != NULL) {
while((de = kvstoreIteratorNext(kvs_it)) != NULL) {
sds key;
robj *keyobj, *o;
@ -315,7 +315,7 @@ void computeDatasetDigest(unsigned char *final) {
xorDigest(final,digest,20);
decrRefCount(keyobj);
}
dbReleaseIterator(dbit);
kvstoreIteratorRelease(kvs_it);
}
}
@ -606,7 +606,7 @@ NULL
robj *val;
char *strenc;
if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) {
if ((de = dbFind(c->db, c->argv[2]->ptr)) == NULL) {
addReplyErrorObject(c,shared.nokeyerr);
return;
}
@ -658,7 +658,7 @@ NULL
robj *val;
sds key;
if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) {
if ((de = dbFind(c->db, c->argv[2]->ptr)) == NULL) {
addReplyErrorObject(c,shared.nokeyerr);
return;
}
@ -719,7 +719,7 @@ NULL
return;
}
if (dbExpand(c->db, keys, DB_MAIN, 1) == C_ERR) {
if (dbExpand(c->db, keys, 1) == C_ERR) {
addReplyError(c, "OOM in dictTryExpand");
return;
}
@ -767,7 +767,7 @@ NULL
/* We don't use lookupKey because a debug command should
* work on logically expired keys */
dictEntry *de;
robj *o = ((de = dbFind(c->db, c->argv[j]->ptr, DB_MAIN)) == NULL) ? NULL : dictGetVal(de);
robj *o = ((de = dbFind(c->db, c->argv[j]->ptr)) == NULL) ? NULL : dictGetVal(de);
if (o) xorObjectDigest(c->db,c->argv[j],digest,o);
sds d = sdsempty();
@ -911,11 +911,11 @@ NULL
full = 1;
stats = sdscatprintf(stats,"[Dictionary HT]\n");
dbGetStats(buf, sizeof(buf), &server.db[dbid], full, DB_MAIN);
kvstoreGetStats(server.db[dbid].keys, buf, sizeof(buf), full);
stats = sdscat(stats,buf);
stats = sdscatprintf(stats,"[Expires HT]\n");
dbGetStats(buf, sizeof(buf), &server.db[dbid], full, DB_EXPIRES);
kvstoreGetStats(server.db[dbid].expires, buf, sizeof(buf), full);
stats = sdscat(stats,buf);
addReplyVerbatim(c,stats,sdslen(stats),"txt");
@ -2051,7 +2051,7 @@ void logCurrentClient(client *cc, const char *title) {
dictEntry *de;
key = getDecodedObject(cc->argv[1]);
de = dbFind(cc->db, key->ptr, DB_MAIN);
de = dbFind(cc->db, key->ptr);
if (de) {
val = dictGetVal(de);
serverLog(LL_WARNING,"key '%s' found in DB containing the following object:", (char*)key->ptr);

View File

@ -684,21 +684,21 @@ void defragKey(defragCtx *ctx, dictEntry *de) {
/* Try to defrag the key name. */
newsds = activeDefragSds(keysds);
if (newsds) {
dictSetKey(db->dict[slot], de, newsds);
if (dbSize(db, DB_EXPIRES)) {
kvstoreDictSetKey(db->keys, slot, de, newsds);
if (kvstoreSize(db->expires)) {
/* We can't search in db->expires for that key after we've released
* the pointer it holds, since it won't be able to do the string
* compare, but we can find the entry using key hash and pointer. */
uint64_t hash = dictGetHash(db->dict[slot], newsds);
dictEntry *expire_de = dictFindEntryByPtrAndHash(db->expires[slot], keysds, hash);
if (expire_de) dictSetKey(db->expires[slot], expire_de, newsds);
uint64_t hash = kvstoreGetHash(db->keys, newsds);
dictEntry *expire_de = kvstoreDictFindEntryByPtrAndHash(db->expires, slot, keysds, hash);
if (expire_de) kvstoreDictSetKey(db->expires, slot, expire_de, newsds);
}
}
/* Try to defrag robj and / or string value. */
ob = dictGetVal(de);
if ((newob = activeDefragStringOb(ob))) {
dictSetVal(db->dict[slot], de, newob);
kvstoreDictSetVal(db->keys, slot, de, newob);
ob = newob;
}
@ -856,7 +856,7 @@ int defragLaterStep(redisDb *db, int slot, long long endtime) {
}
/* each time we enter this function we need to fetch the key from the dict again (if it still exists) */
dictEntry *de = dictFind(db->dict[slot], defrag_later_current_key);
dictEntry *de = kvstoreDictFind(db->keys, slot, defrag_later_current_key);
key_defragged = server.stat_active_defrag_hits;
do {
int quit = 0;
@ -1022,13 +1022,12 @@ void activeDefragCycle(void) {
db = &server.db[current_db];
cursor = 0;
expires_cursor = 0;
slot = findSlotByKeyIndex(db, 1, DB_MAIN);
slot = kvstoreFindDictIndexByKeyIndex(db->keys, 1);
defrag_later_item_in_progress = 0;
ctx.db = db;
ctx.slot = slot;
}
do {
dict *d = db->dict[slot];
/* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */
if (defragLaterStep(db, slot, endtime)) {
quit = 1; /* time is up, we didn't finish all the work */
@ -1038,13 +1037,14 @@ void activeDefragCycle(void) {
if (!defrag_later_item_in_progress) {
/* Scan the keyspace dict unless we're scanning the expire dict. */
if (!expires_cursor)
cursor = dictScanDefrag(d, cursor, defragScanCallback,
&defragfns, &ctx);
cursor = kvstoreDictScanDefrag(db->keys, slot, cursor,
defragScanCallback,
&defragfns, &ctx);
/* When done scanning the keyspace dict, we scan the expire dict. */
if (!cursor)
expires_cursor = dictScanDefrag(db->expires[slot], expires_cursor,
scanCallbackCountScanned,
&defragfns, NULL);
expires_cursor = kvstoreDictScanDefrag(db->expires, slot, expires_cursor,
scanCallbackCountScanned,
&defragfns, NULL);
}
if (!(cursor || expires_cursor)) {
/* Move to the next slot only if regular and large item scanning has been completed. */
@ -1052,7 +1052,7 @@ void activeDefragCycle(void) {
defrag_later_item_in_progress = 1;
continue;
}
slot = dbGetNextNonEmptySlot(db, slot, DB_MAIN);
slot = kvstoreGetNextNonEmptyDictIndex(db->keys, slot);
defrag_later_item_in_progress = 0;
ctx.slot = slot;
}

View File

@ -194,16 +194,6 @@ dict *dictCreate(dictType *type)
return d;
}
/* Create an array of dictionaries */
dict **dictCreateMultiple(dictType *type, int count)
{
dict **d = zmalloc(sizeof(dict*) * count);
for (int i = 0; i < count; i++) {
d[i] = dictCreate(type);
}
return d;
}
/* Initialize the hash table */
int _dictInit(dict *d, dictType *type)
{

View File

@ -51,6 +51,7 @@ typedef struct dictEntry dictEntry; /* opaque */
typedef struct dict dict;
typedef struct dictType {
/* Callbacks */
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(dict *d, const void *key);
void *(*valDup)(dict *d, const void *obj);
@ -66,6 +67,10 @@ typedef struct dictType {
/* Allow a dict to carry extra caller-defined metadata. The
* extra memory is initialized to 0 when a dict is allocated. */
size_t (*dictMetadataBytes)(dict *d);
/* Data */
void *userdata;
/* Flags */
/* The 'no_value' flag, if set, indicates that values are not used, i.e. the
* dict is a set. When this flag is set, it's not possible to access the
@ -177,7 +182,6 @@ typedef enum {
/* API */
dict *dictCreate(dictType *type);
dict **dictCreateMultiple(dictType *type, int count);
int dictExpand(dict *d, unsigned long size);
int dictTryExpand(dict *d, unsigned long size);
int dictShrink(dict *d, unsigned long size);

View File

@ -143,11 +143,12 @@ void evictionPoolAlloc(void) {
* We insert keys on place in ascending order, so keys with the smaller
* idle time are on the left, and keys with the higher idle time on the
* right. */
int evictionPoolPopulate(int dbid, int slot, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
int evictionPoolPopulate(redisDb *db, kvstore *samplekvs, struct evictionPoolEntry *pool) {
int j, k, count;
dictEntry *samples[server.maxmemory_samples];
count = dictGetSomeKeys(sampledict,samples,server.maxmemory_samples);
int slot = kvstoreGetFairRandomDictIndex(samplekvs);
count = kvstoreDictGetSomeKeys(samplekvs,slot,samples,server.maxmemory_samples);
for (j = 0; j < count; j++) {
unsigned long long idle;
sds key;
@ -161,7 +162,8 @@ int evictionPoolPopulate(int dbid, int slot, dict *sampledict, dict *keydict, st
* dictionary (but the expires one) we need to lookup the key
* again in the key dictionary to obtain the value object. */
if (server.maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
if (sampledict != keydict) de = dictFind(keydict, key);
if (samplekvs != db->keys)
de = kvstoreDictFind(db->keys, slot, key);
o = dictGetVal(de);
}
@ -236,7 +238,7 @@ int evictionPoolPopulate(int dbid, int slot, dict *sampledict, dict *keydict, st
pool[k].key = pool[k].cached;
}
pool[k].idle = idle;
pool[k].dbid = dbid;
pool[k].dbid = db->id;
pool[k].slot = slot;
}
@ -578,16 +580,12 @@ int performEvictions(void) {
sds bestkey = NULL;
int bestdbid;
redisDb *db;
dict *dict;
dictEntry *de;
if (server.maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
server.maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
{
struct evictionPoolEntry *pool = EvictionPoolLRU;
dbKeyType keyType = (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS ?
DB_MAIN : DB_EXPIRES);
while (bestkey == NULL) {
unsigned long total_keys = 0;
@ -596,17 +594,21 @@ int performEvictions(void) {
* every DB. */
for (i = 0; i < server.dbnum; i++) {
db = server.db+i;
kvstore *kvs;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
kvs = db->keys;
} else {
kvs = db->expires;
}
unsigned long sampled_keys = 0;
unsigned long current_db_keys = dbSize(db, keyType);
unsigned long current_db_keys = kvstoreSize(kvs);
if (current_db_keys == 0) continue;
total_keys += current_db_keys;
int l = dbNonEmptySlots(db, keyType);
int l = kvstoreNumNonEmptyDicts(kvs);
/* Do not exceed the number of non-empty slots when looping. */
while (l--) {
int slot = getFairRandomSlot(db, keyType);
dict = (keyType == DB_MAIN ? db->dict[slot] : db->expires[slot]);
sampled_keys += evictionPoolPopulate(i, slot, dict, db->dict[slot], pool);
sampled_keys += evictionPoolPopulate(db, kvs, pool);
/* We have sampled enough keys in the current db, exit the loop. */
if (sampled_keys >= (unsigned long) server.maxmemory_samples)
break;
@ -624,13 +626,13 @@ int performEvictions(void) {
if (pool[k].key == NULL) continue;
bestdbid = pool[k].dbid;
kvstore *kvs;
if (server.maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
de = dictFind(server.db[bestdbid].dict[pool[k].slot],
pool[k].key);
kvs = server.db[bestdbid].keys;
} else {
de = dictFind(server.db[bestdbid].expires[pool[k].slot],
pool[k].key);
kvs = server.db[bestdbid].expires;
}
de = kvstoreDictFind(kvs, pool[k].slot, pool[k].key);
/* Remove the entry from the pool. */
if (pool[k].key != pool[k].cached)
@ -660,10 +662,15 @@ int performEvictions(void) {
for (i = 0; i < server.dbnum; i++) {
j = (++next_db) % server.dbnum;
db = server.db+j;
dict = (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->dict[getFairRandomSlot(db, DB_MAIN)] : db->expires[getFairRandomSlot(db, DB_EXPIRES)];
if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict);
kvstore *kvs;
if (server.maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) {
kvs = db->keys;
} else {
kvs = db->expires;
}
int slot = kvstoreGetFairRandomDictIndex(kvs);
de = kvstoreDictGetRandomKey(kvs, slot);
if (de) {
bestkey = dictGetKey(de);
bestdbid = j;
break;

View File

@ -253,7 +253,8 @@ void activeExpireCycle(int type) {
* distribute the time evenly across DBs. */
current_db++;
if (dbSize(db, DB_EXPIRES)) dbs_performed++;
if (kvstoreSize(db->expires))
dbs_performed++;
/* Continue to expire if at the end of the cycle there are still
* a big percentage of keys to expire, compared to the number of keys
@ -264,7 +265,7 @@ void activeExpireCycle(int type) {
iteration++;
/* If there is nothing to expire try next DB ASAP. */
if ((num = dbSize(db, DB_EXPIRES)) == 0) {
if ((num = kvstoreSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
@ -294,7 +295,7 @@ void activeExpireCycle(int type) {
int origin_ttl_samples = data.ttl_samples;
while (data.sampled < num && checked_buckets < max_buckets) {
db->expires_cursor = dbScan(db, DB_EXPIRES, db->expires_cursor, -1, expireScanCallback, isExpiryDictValidForSamplingCb, &data);
db->expires_cursor = kvstoreScan(db->expires, db->expires_cursor, -1, expireScanCallback, isExpiryDictValidForSamplingCb, &data);
if (db->expires_cursor == 0) {
db_done = 1;
break;
@ -429,7 +430,7 @@ void expireSlaveKeys(void) {
while(dbids && dbid < server.dbnum) {
if ((dbids & 1) != 0) {
redisDb *db = server.db+dbid;
dictEntry *expire = dictFind(db->expires[getKeySlot(keyname)],keyname);
dictEntry *expire = dbFindExpires(db, keyname);
int expired = 0;
if (expire &&

749
src/kvstore.c Normal file
View File

@ -0,0 +1,749 @@
/*
* Index-based KV store implementation
* This file implements a KV store comprised of an array of dicts (see dict.c)
* The purpose of this KV store is to have easy access to all keys that belong
* in the same dict (i.e. are in the same dict-index)
*
* For example, when Redis is running in cluster mode, we use kvstore to save
* all keys that map to the same hash-slot in a separate dict within the kvstore
* struct.
* This enables us to easily access all keys that map to a specific hash-slot.
*
* Copyright (c) Redis contributors.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "fmacros.h"
#include <string.h>
#include <stddef.h>
#include "zmalloc.h"
#include "kvstore.h"
#include "redisassert.h"
#include "monotonic.h"
#define UNUSED(V) ((void) V)
struct _kvstore {
int flags;
dictType dtype;
dict **dicts;
long long num_dicts;
long long num_dicts_bits;
list *rehashing; /* List of dictionaries in this kvstore that are currently rehashing. */
int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used if num_dicts > 1). */
int allocated_dicts; /* The number of allocated dicts. */
int non_empty_dicts; /* The number of non-empty dicts. */
unsigned long long key_count; /* Total number of keys in this kvstore. */
unsigned long long bucket_count; /* Total number of buckets in this kvstore across dictionaries. */
unsigned long long *dict_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given dict-index. */
};
/* Structure for kvstore iterator that allows iterating across multiple dicts. */
struct _kvstoreIterator {
kvstore *kvs;
long long didx;
long long next_didx;
dictIterator di;
};
/* Dict metadata for database, used for record the position in rehashing list. */
typedef struct {
listNode *rehashing_node; /* list node in rehashing list */
} kvstoreDictMetadata;
/**********************************/
/*** Helpers **********************/
/**********************************/
static dict *kvstoreGetDict(kvstore *kvs, int didx) {
return kvs->dicts[didx];
}
/* Returns total (cumulative) number of keys up until given dict-index (inclusive).
* Time complexity is O(log(kvs->num_dicts)). */
static unsigned long long cumulativeKeyCountRead(kvstore *kvs, int didx) {
if (kvs->num_dicts == 1) {
assert(didx == 0);
return kvstoreSize(kvs);
}
int idx = didx + 1;
unsigned long long sum = 0;
while (idx > 0) {
sum += kvs->dict_size_index[idx];
idx -= (idx & -idx);
}
return sum;
}
static void addDictIndexToCursor(kvstore *kvs, int didx, unsigned long long *cursor) {
if (kvs->num_dicts == 1)
return;
/* didx can be -1 when iteration is over and there are no more dicts to visit. */
if (didx < 0)
return;
*cursor = (*cursor << kvs->num_dicts_bits) | didx;
}
static int getAndClearDictIndexFromCursor(kvstore *kvs, unsigned long long *cursor) {
if (kvs->num_dicts == 1)
return 0;
int didx = (int) (*cursor & (kvs->num_dicts-1));
*cursor = *cursor >> kvs->num_dicts_bits;
return didx;
}
/* Updates binary index tree (also known as Fenwick tree), increasing key count for a given dict.
* You can read more about this data structure here https://en.wikipedia.org/wiki/Fenwick_tree
* Time complexity is O(log(kvs->num_dicts)). */
static void cumulativeKeyCountAdd(kvstore *kvs, int didx, long delta) {
kvs->key_count += delta;
dict *d = kvstoreGetDict(kvs, didx);
size_t dsize = dictSize(d);
int non_empty_dicts_delta = dsize == 1? 1 : dsize == 0? -1 : 0;
kvs->non_empty_dicts += non_empty_dicts_delta;
/* BIT does not need to be calculated when there's only one dict. */
if (kvs->num_dicts == 1)
return;
/* Update the BIT */
int idx = didx + 1; /* Unlike dict indices, BIT is 1-based, so we need to add 1. */
while (idx <= kvs->num_dicts) {
if (delta < 0) {
assert(kvs->dict_size_index[idx] >= (unsigned long long)labs(delta));
}
kvs->dict_size_index[idx] += delta;
idx += (idx & -idx);
}
}
static void createDictIfNeeded(kvstore *kvs, int didx) {
if (kvstoreGetDict(kvs, didx))
return;
kvs->dicts[didx] = dictCreate(&kvs->dtype);
kvs->allocated_dicts++;
}
static void freeDictIfNeeded(kvstore *kvs, int didx) {
if (!(kvs->flags & KVSTORE_FREE_EMPTY_DICTS) ||
!kvstoreGetDict(kvs, didx) ||
kvstoreDictSize(kvs, didx) != 0)
return;
dictRelease(kvs->dicts[didx]);
kvs->dicts[didx] = NULL;
kvs->allocated_dicts--;
}
/**********************************/
/*** dict callbacks ***************/
/**********************************/
/* Adds dictionary to the rehashing list, which allows us
* to quickly find rehash targets during incremental rehashing.
*
* If there are multiple dicts, updates the bucket count for the given dictionary
* in a DB, bucket count incremented with the new ht size during the rehashing phase.
* If there's one dict, bucket count can be retrieved directly from single dict bucket. */
static void kvstoreDictRehashingStarted(dict *d) {
kvstore *kvs = d->type->userdata;
kvstoreDictMetadata *metadata = (kvstoreDictMetadata *)dictMetadata(d);
listAddNodeTail(kvs->rehashing, d);
metadata->rehashing_node = listLast(kvs->rehashing);
if (kvs->num_dicts == 1)
return;
unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
kvs->bucket_count += to; /* Started rehashing (Add the new ht size) */
}
/* Remove dictionary from the rehashing list.
*
* Updates the bucket count for the given dictionary in a DB. It removes
* the old ht size of the dictionary from the total sum of buckets for a DB. */
static void kvstoreDictRehashingCompleted(dict *d) {
kvstore *kvs = d->type->userdata;
kvstoreDictMetadata *metadata = (kvstoreDictMetadata *)dictMetadata(d);
if (metadata->rehashing_node) {
listDelNode(kvs->rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}
if (kvs->num_dicts == 1)
return;
unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
kvs->bucket_count -= from; /* Finished rehashing (Remove the old ht size) */
}
/* Returns the size of the DB dict metadata in bytes. */
static size_t kvstoreDictMetadataSize(dict *d) {
UNUSED(d);
return sizeof(kvstoreDictMetadata);
}
/**********************************/
/*** API **************************/
/**********************************/
/* Create an array of dictionaries
* num_dicts_bits is the log2 of the amount of dictionaries needed (e.g. 0 for 1 dict,
* 3 for 8 dicts, etc.) */
kvstore *kvstoreCreate(dictType *type, int num_dicts_bits, int flags) {
/* We can't support more than 2^16 dicts because we want to save 48 bits
* for the dict cursor, see kvstoreScan */
assert(num_dicts_bits <= 16);
kvstore *kvs = zcalloc(sizeof(*kvs));
memcpy(&kvs->dtype, type, sizeof(kvs->dtype));
kvs->flags = flags;
/* kvstore must be the one to set this callbacks, so we make sure the
* caller didn't do it */
assert(!type->userdata);
assert(!type->dictMetadataBytes);
assert(!type->rehashingStarted);
assert(!type->rehashingCompleted);
kvs->dtype.userdata = kvs;
kvs->dtype.dictMetadataBytes = kvstoreDictMetadataSize;
kvs->dtype.rehashingStarted = kvstoreDictRehashingStarted;
kvs->dtype.rehashingCompleted = kvstoreDictRehashingCompleted;
kvs->num_dicts_bits = num_dicts_bits;
kvs->num_dicts = 1 << kvs->num_dicts_bits;
kvs->dicts = zcalloc(sizeof(dict*) * kvs->num_dicts);
if (!(kvs->flags & KVSTORE_ALLOCATE_DICTS_ON_DEMAND)) {
for (int i = 0; i < kvs->num_dicts; i++)
createDictIfNeeded(kvs, i);
}
kvs->rehashing = listCreate();
kvs->key_count = 0;
kvs->non_empty_dicts = 0;
kvs->resize_cursor = 0;
kvs->dict_size_index = kvs->num_dicts > 1? zcalloc(sizeof(unsigned long long) * (kvs->num_dicts + 1)) : NULL;
kvs->bucket_count = 0;
return kvs;
}
void kvstoreEmpty(kvstore *kvs, void(callback)(dict*)) {
for (int didx = 0; didx < kvs->num_dicts; didx++) {
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
continue;
kvstoreDictMetadata *metadata = (kvstoreDictMetadata *)dictMetadata(d);
if (metadata->rehashing_node)
metadata->rehashing_node = NULL;
dictEmpty(d, callback);
}
listEmpty(kvs->rehashing);
kvs->key_count = 0;
kvs->non_empty_dicts = 0;
kvs->resize_cursor = 0;
kvs->bucket_count = 0;
if (kvs->dict_size_index)
memset(kvs->dict_size_index, 0, sizeof(unsigned long long) * (kvs->num_dicts + 1));
}
void kvstoreRelease(kvstore *kvs) {
for (int didx = 0; didx < kvs->num_dicts; didx++) {
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
continue;
kvstoreDictMetadata *metadata = (kvstoreDictMetadata *)dictMetadata(d);
if (metadata->rehashing_node)
metadata->rehashing_node = NULL;
dictRelease(d);
}
zfree(kvs->dicts);
listRelease(kvs->rehashing);
if (kvs->dict_size_index)
zfree(kvs->dict_size_index);
zfree(kvs);
}
unsigned long long int kvstoreSize(kvstore *kvs) {
if (kvs->num_dicts != 1) {
return kvs->key_count;
} else {
return kvs->dicts[0]? dictSize(kvs->dicts[0]) : 0;
}
}
/* This method provides the cumulative sum of all the dictionary buckets
* across dictionaries in a database. */
unsigned long kvstoreBuckets(kvstore *kvs) {
if (kvs->num_dicts != 1) {
return kvs->bucket_count;
} else {
return kvs->dicts[0]? dictBuckets(kvs->dicts[0]) : 0;
}
}
size_t kvstoreMemUsage(kvstore *kvs) {
size_t mem = sizeof(*kvs);
unsigned long long keys_count = kvstoreSize(kvs);
mem += keys_count * dictEntryMemUsage() +
kvstoreBuckets(kvs) * sizeof(dictEntry*) +
kvs->allocated_dicts * (sizeof(dict) + kvstoreDictMetadataSize(NULL));
/* Values are dict* shared with kvs->dicts */
mem += listLength(kvs->rehashing) * sizeof(listNode);
if (kvs->dict_size_index)
mem += sizeof(unsigned long long) * (kvs->num_dicts + 1);
return mem;
}
/*
* This method is used to iterate over the elements of the entire kvstore specifically across dicts.
* It's a three pronged approach.
*
* 1. It uses the provided cursor `cursor` to retrieve the dict index from it.
* 2. If the dictionary is in a valid state checked through the provided callback `dictScanValidFunction`,
* it performs a dictScan over the appropriate `keyType` dictionary of `db`.
* 3. If the dict is entirely scanned i.e. the cursor has reached 0, the next non empty dict is discovered.
* The dict information is embedded into the cursor and returned.
*
* To restrict the scan to a single dict, pass a valid dict index as
* 'onlydidx', otherwise pass -1.
*/
unsigned long long kvstoreScan(kvstore *kvs, unsigned long long cursor,
int onlydidx, dictScanFunction *scan_cb,
kvstoreScanShouldSkipDict *skip_cb,
void *privdata)
{
unsigned long long _cursor = 0;
/* During dictionary traversal, 48 upper bits in the cursor are used for positioning in the HT.
* Following lower bits are used for the dict index number, ranging from 0 to 2^num_dicts_bits-1.
* Dict index is always 0 at the start of iteration and can be incremented only if there are
* multiple dicts. */
int didx = getAndClearDictIndexFromCursor(kvs, &cursor);
if (onlydidx >= 0) {
if (didx < onlydidx) {
/* Fast-forward to onlydidx. */
assert(onlydidx < kvs->num_dicts);
didx = onlydidx;
cursor = 0;
} else if (didx > onlydidx) {
/* The cursor is already past onlydidx. */
return 0;
}
}
dict *d = kvstoreGetDict(kvs, didx);
int skip = !d || (skip_cb && skip_cb(d));
if (!skip) {
_cursor = dictScan(d, cursor, scan_cb, privdata);
}
/* scanning done for the current dictionary or if the scanning wasn't possible, move to the next dict index. */
if (_cursor == 0 || skip) {
if (onlydidx >= 0)
return 0;
didx = kvstoreGetNextNonEmptyDictIndex(kvs, didx);
}
if (didx == -1) {
return 0;
}
addDictIndexToCursor(kvs, didx, &_cursor);
return _cursor;
}
/*
* This functions increases size of kvstore to match desired number.
* It resizes all individual dictionaries, unless skip_cb indicates otherwise.
*
* Based on the parameter `try_expand`, appropriate dict expand API is invoked.
* if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`.
* The return code is either `DICT_OK`/`DICT_ERR` for both the API(s).
* `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in
* `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed.
*/
int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb) {
for (int i = 0; i < kvs->num_dicts; i++) {
dict *d = kvstoreGetDict(kvs, i);
if (!d || (skip_cb && skip_cb(i)))
continue;
int result = try_expand ? dictTryExpand(d, newsize) : dictExpand(d, newsize);
if (try_expand && result == DICT_ERR)
return 0;
}
return 1;
}
/* Returns fair random dict index, probability of each dict being returned is proportional to the number of elements that dictionary holds.
* This function guarantees that it returns a dict-index of a non-empty dict, unless the entire kvstore is empty.
* Time complexity of this function is O(log(kvs->num_dicts)). */
int kvstoreGetFairRandomDictIndex(kvstore *kvs) {
unsigned long target = kvstoreSize(kvs) ? (randomULong() % kvstoreSize(kvs)) + 1 : 0;
return kvstoreFindDictIndexByKeyIndex(kvs, target);
}
void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full) {
buf[0] = '\0';
size_t l;
char *orig_buf = buf;
size_t orig_bufsize = bufsize;
dictStats *mainHtStats = NULL;
dictStats *rehashHtStats = NULL;
dict *d;
kvstoreIterator *kvs_it = kvstoreIteratorInit(kvs);
while ((d = kvstoreIteratorNextDict(kvs_it))) {
dictStats *stats = dictGetStatsHt(d, 0, full);
if (!mainHtStats) {
mainHtStats = stats;
} else {
dictCombineStats(stats, mainHtStats);
dictFreeStats(stats);
}
if (dictIsRehashing(d)) {
stats = dictGetStatsHt(d, 1, full);
if (!rehashHtStats) {
rehashHtStats = stats;
} else {
dictCombineStats(stats, rehashHtStats);
dictFreeStats(stats);
}
}
}
kvstoreIteratorRelease(kvs_it);
if (mainHtStats && bufsize > 0) {
l = dictGetStatsMsg(buf, bufsize, mainHtStats, full);
dictFreeStats(mainHtStats);
buf += l;
bufsize -= l;
}
if (rehashHtStats && bufsize > 0) {
l = dictGetStatsMsg(buf, bufsize, rehashHtStats, full);
dictFreeStats(rehashHtStats);
buf += l;
bufsize -= l;
}
/* Make sure there is a NULL term at the end. */
if (orig_bufsize) orig_buf[orig_bufsize - 1] = '\0';
}
/* Finds a dict containing target element in a key space ordered by dict index.
* Consider this example. Dictionaries are represented by brackets and keys by dots:
* #0 #1 #2 #3 #4
* [..][....][...][.......][.]
* ^
* target
*
* In this case dict #3 contains key that we are trying to find.
*
* The return value is 0 based dict-index, and the range of the target is [1..kvstoreSize], kvstoreSize inclusive.
*
* To find the dict, we start with the root node of the binary index tree and search through its children
* from the highest index (2^num_dicts_bits in our case) to the lowest index. At each node, we check if the target
* value is greater than the node's value. If it is, we remove the node's value from the target and recursively
* search for the new target using the current node as the parent.
* Time complexity of this function is O(log(kvs->num_dicts))
*/
int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target) {
if (kvs->num_dicts == 1 || kvstoreSize(kvs) == 0)
return 0;
assert(target <= kvstoreSize(kvs));
int result = 0, bit_mask = 1 << kvs->num_dicts_bits;
for (int i = bit_mask; i != 0; i >>= 1) {
int current = result + i;
/* When the target index is greater than 'current' node value the we will update
* the target and search in the 'current' node tree. */
if (target > kvs->dict_size_index[current]) {
target -= kvs->dict_size_index[current];
result = current;
}
}
/* Adjust the result to get the correct dict:
* 1. result += 1;
* After the calculations, the index of target in dict_size_index should be the next one,
* so we should add 1.
* 2. result -= 1;
* Unlike BIT(dict_size_index is 1-based), dict indices are 0-based, so we need to subtract 1.
* As the addition and subtraction cancel each other out, we can simply return the result. */
return result;
}
/* Returns next non-empty dict index strictly after given one, or -1 if provided didx is the last one. */
int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx) {
unsigned long long next_key = cumulativeKeyCountRead(kvs, didx) + 1;
return next_key <= kvstoreSize(kvs) ? kvstoreFindDictIndexByKeyIndex(kvs, next_key) : -1;
}
int kvstoreNumNonEmptyDicts(kvstore *kvs) {
return kvs->non_empty_dicts;
}
int kvstoreNumDicts(kvstore *kvs) {
return kvs->num_dicts;
}
/* Returns kvstore iterator that can be used to iterate through sub-dictionaries.
*
* The caller should free the resulting kvs_it with kvstoreIteratorRelease. */
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs) {
kvstoreIterator *kvs_it = zmalloc(sizeof(*kvs_it));
kvs_it->kvs = kvs;
kvs_it->didx = -1;
kvs_it->next_didx = kvstoreFindDictIndexByKeyIndex(kvs_it->kvs, 1); /* Finds first non-empty dict index. */
dictInitSafeIterator(&kvs_it->di, NULL);
return kvs_it;
}
/* Free the dbit returned by dbIteratorInit. */
void kvstoreIteratorRelease(kvstoreIterator *kvs_it) {
dictIterator *iter = &kvs_it->di;
dictResetIterator(iter);
zfree(kvs_it);
}
/* Returns next dictionary from the iterator, or NULL if iteration is complete. */
dict *kvstoreIteratorNextDict(kvstoreIterator *kvs_it) {
if (kvs_it->next_didx == -1)
return NULL;
kvs_it->didx = kvs_it->next_didx;
kvs_it->next_didx = kvstoreGetNextNonEmptyDictIndex(kvs_it->kvs, kvs_it->didx);
return kvs_it->kvs->dicts[kvs_it->didx];
}
int kvstoreIteratorGetCurrentDictIndex(kvstoreIterator *kvs_it) {
assert(kvs_it->didx >= 0 && kvs_it->didx < kvs_it->kvs->num_dicts);
return kvs_it->didx;
}
/* Returns next entry. */
dictEntry *kvstoreIteratorNext(kvstoreIterator *kvs_it) {
dictEntry *de = kvs_it->di.d ? dictNext(&kvs_it->di) : NULL;
if (!de) { /* No current dict or reached the end of the dictionary. */
dict *d = kvstoreIteratorNextDict(kvs_it);
if (!d)
return NULL;
if (kvs_it->di.d) {
/* Before we move to the next dict, reset the iter of the previous dict. */
dictIterator *iter = &kvs_it->di;
dictResetIterator(iter);
}
dictInitSafeIterator(&kvs_it->di, d);
de = dictNext(&kvs_it->di);
}
return de;
}
/* This method traverses through kvstore dictionaries and triggers a resize .
* It first tries to shrink if needed, and if it isn't, it tries to expand. */
void kvstoreTryResizeDicts(kvstore *kvs, int limit) {
if (limit > kvs->num_dicts)
limit = kvs->num_dicts;
for (int i = 0; i < limit; i++) {
int didx = kvs->resize_cursor;
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
continue;
if (dictShrinkIfNeeded(d) == DICT_ERR) {
dictExpandIfNeeded(d);
}
kvs->resize_cursor = (didx + 1) % kvs->num_dicts;
}
}
/* Our hash table implementation performs rehashing incrementally while
* we write/read from the hash table. Still if the server is idle, the hash
* table will use two tables for a long time. So we try to use 1 millisecond
* of CPU time at every call of this function to perform some rehashing.
*
* The function returns the amount of microsecs spent if some rehashing was
* performed, otherwise 0 is returned. */
uint64_t kvstoreIncrementallyRehash(kvstore *kvs, uint64_t threshold_us) {
if (listLength(kvs->rehashing) == 0)
return 0;
/* Our goal is to rehash as many dictionaries as we can before reaching predefined threshold,
* after each dictionary completes rehashing, it removes itself from the list. */
listNode *node;
monotime timer;
uint64_t elapsed_us = UINT64_MAX;
elapsedStart(&timer);
while ((node = listFirst(kvs->rehashing))) {
elapsed_us = elapsedUs(timer);
if (elapsed_us >= threshold_us) {
break; /* Reached the time limit. */
}
dictRehashMicroseconds(listNodeValue(node), threshold_us - elapsed_us);
}
assert(elapsed_us != UINT64_MAX);
return elapsed_us;
}
unsigned long kvstoreDictSize(kvstore *kvs, int didx)
{
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
return 0;
return dictSize(d);
}
dictIterator *kvstoreDictGetIterator(kvstore *kvs, int didx)
{
dict *d = kvstoreGetDict(kvs, didx);
return dictGetIterator(d);
}
dictIterator *kvstoreDictGetSafeIterator(kvstore *kvs, int didx)
{
dict *d = kvstoreGetDict(kvs, didx);
return dictGetSafeIterator(d);
}
dictEntry *kvstoreDictGetRandomKey(kvstore *kvs, int didx)
{
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
return NULL;
return dictGetRandomKey(d);
}
dictEntry *kvstoreDictGetFairRandomKey(kvstore *kvs, int didx)
{
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
return NULL;
return dictGetFairRandomKey(d);
}
dictEntry *kvstoreDictFindEntryByPtrAndHash(kvstore *kvs, int didx, const void *oldptr, uint64_t hash)
{
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
return NULL;
return dictFindEntryByPtrAndHash(d, oldptr, hash);
}
unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, unsigned int count)
{
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
return 0;
return dictGetSomeKeys(d, des, count);
}
int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size)
{
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
return DICT_ERR;
return dictExpand(d, size);
}
unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata)
{
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
return 0;
return dictScanDefrag(d, v, fn, defragfns, privdata);
}
uint64_t kvstoreGetHash(kvstore *kvs, const void *key)
{
return kvs->dtype.hashFunction(key);
}
void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key)
{
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
return NULL;
return dictFetchValue(d, key);
}
dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key) {
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
return NULL;
return dictFind(d, key);
}
dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing) {
createDictIfNeeded(kvs, didx);
dict *d = kvstoreGetDict(kvs, didx);
dictEntry *ret = dictAddRaw(d, key, existing);
if (ret)
cumulativeKeyCountAdd(kvs, didx, 1);
return ret;
}
void kvstoreDictSetKey(kvstore *kvs, int didx, dictEntry* de, void *key) {
dict *d = kvstoreGetDict(kvs, didx);
dictSetKey(d, de, key);
}
void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val) {
dict *d = kvstoreGetDict(kvs, didx);
dictSetVal(d, de, val);
}
dictEntry *kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, dictEntry ***plink, int *table_index) {
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
return NULL;
return dictTwoPhaseUnlinkFind(kvstoreGetDict(kvs, didx), key, plink, table_index);
}
void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntry *he, dictEntry **plink, int table_index) {
dict *d = kvstoreGetDict(kvs, didx);
dictTwoPhaseUnlinkFree(d, he, plink, table_index);
cumulativeKeyCountAdd(kvs, didx, -1);
freeDictIfNeeded(kvs, didx);
}
int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) {
dict *d = kvstoreGetDict(kvs, didx);
if (!d)
return DICT_ERR;
int ret = dictDelete(kvstoreGetDict(kvs, didx), key);
if (ret == DICT_OK) {
cumulativeKeyCountAdd(kvs, didx, -1);
freeDictIfNeeded(kvs, didx);
}
return ret;
}

65
src/kvstore.h Normal file
View File

@ -0,0 +1,65 @@
#ifndef DICTARRAY_H_
#define DICTARRAY_H_
#include "dict.h"
#include "adlist.h"
typedef struct _kvstore kvstore;
typedef struct _kvstoreIterator kvstoreIterator;
typedef int (kvstoreScanShouldSkipDict)(dict *d);
typedef int (kvstoreExpandShouldSkipDictIndex)(int didx);
#define KVSTORE_ALLOCATE_DICTS_ON_DEMAND (1<<0)
#define KVSTORE_FREE_EMPTY_DICTS (1<<1)
kvstore *kvstoreCreate(dictType *type, int num_dicts_bits, int flags);
void kvstoreEmpty(kvstore *kvs, void(callback)(dict*));
void kvstoreRelease(kvstore *kvs);
unsigned long long kvstoreSize(kvstore *kvs);
unsigned long kvstoreBuckets(kvstore *kvs);
size_t kvstoreMemUsage(kvstore *kvs);
unsigned long long kvstoreScan(kvstore *kvs, unsigned long long cursor,
int onlydidx, dictScanFunction *scan_cb,
kvstoreScanShouldSkipDict *skip_cb,
void *privdata);
int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb);
int kvstoreGetFairRandomDictIndex(kvstore *kvs);
void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full);
int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target);
int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx);
int kvstoreNumNonEmptyDicts(kvstore *kvs);
int kvstoreNumDicts(kvstore *kvs);
uint64_t kvstoreGetHash(kvstore *kvs, const void *key);
/* kvstore iterator specific functions */
kvstoreIterator *kvstoreIteratorInit(kvstore *kvs);
void kvstoreIteratorRelease(kvstoreIterator *kvs_it);
dict *kvstoreIteratorNextDict(kvstoreIterator *kvs_it);
int kvstoreIteratorGetCurrentDictIndex(kvstoreIterator *kvs_it);
dictEntry *kvstoreIteratorNext(kvstoreIterator *kvs_it);
/* Rehashing */
void kvstoreTryResizeDicts(kvstore *kvs, int limit);
uint64_t kvstoreIncrementallyRehash(kvstore *kvs, uint64_t threshold_ms);
/* Specific dict access by dict-index */
unsigned long kvstoreDictSize(kvstore *kvs, int didx);
dictIterator *kvstoreDictGetIterator(kvstore *kvs, int didx);
dictIterator *kvstoreDictGetSafeIterator(kvstore *kvs, int didx);
dictEntry *kvstoreDictGetRandomKey(kvstore *kvs, int didx);
dictEntry *kvstoreDictGetFairRandomKey(kvstore *kvs, int didx);
dictEntry *kvstoreDictFindEntryByPtrAndHash(kvstore *kvs, int didx, const void *oldptr, uint64_t hash);
unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, unsigned int count);
int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size);
unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key);
dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key);
dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing);
void kvstoreDictSetKey(kvstore *kvs, int didx, dictEntry* de, void *key);
void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val);
dictEntry *kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, dictEntry ***plink, int *table_index);
void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntry *he, dictEntry **plink, int table_index);
int kvstoreDictDelete(kvstore *kvs, int didx, const void *key);
#endif /* DICTARRAY_H_ */

View File

@ -2,6 +2,7 @@
#include "bio.h"
#include "atomicvar.h"
#include "functions.h"
#include "cluster.h"
static redisAtomic size_t lazyfree_objects = 0;
static redisAtomic size_t lazyfreed_objects = 0;
@ -19,19 +20,14 @@ void lazyfreeFreeObject(void *args[]) {
* database which was substituted with a fresh one in the main thread
* when the database was logically deleted. */
void lazyfreeFreeDatabase(void *args[]) {
dict **ht1 = (dict **) args[0];
dict **ht2 = (dict **) args[1];
int *dictCount = (int *) args[2];
for (int i=0; i<*dictCount; i++) {
size_t numkeys = dictSize(ht1[i]);
dictRelease(ht1[i]);
dictRelease(ht2[i]);
atomicDecr(lazyfree_objects,numkeys);
atomicIncr(lazyfreed_objects,numkeys);
}
zfree(ht1);
zfree(ht2);
zfree(dictCount);
kvstore *da1 = args[0];
kvstore *da2 = args[1];
size_t numkeys = kvstoreSize(da1);
kvstoreRelease(da1);
kvstoreRelease(da2);
atomicDecr(lazyfree_objects,numkeys);
atomicIncr(lazyfreed_objects,numkeys);
}
/* Release the key tracking table. */
@ -179,28 +175,12 @@ void freeObjAsync(robj *key, robj *obj, int dbid) {
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
dbDictMetadata *metadata;
for (int i = 0; i < db->dict_count; i++) {
metadata = (dbDictMetadata *)dictMetadata(db->dict[i]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}
metadata = (dbDictMetadata *)dictMetadata(db->expires[i]);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}
}
dict **oldDict = db->dict;
dict **oldExpires = db->expires;
atomicIncr(lazyfree_objects,dbSize(db, DB_MAIN));
db->dict = dictCreateMultiple(&dbDictType, db->dict_count);
db->expires = dictCreateMultiple(&dbExpiresDictType, db->dict_count);
int *count = zmalloc(sizeof(int));
*count = db->dict_count;
bioCreateLazyFreeJob(lazyfreeFreeDatabase, 3, oldDict, oldExpires, count);
int slotCountBits = server.cluster_enabled? CLUSTER_SLOT_MASK_BITS : 0;
kvstore *oldkeys = db->keys, *oldexpires = db->expires;
db->keys = kvstoreCreate(&dbDictType, slotCountBits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
db->expires = kvstoreCreate(&dbExpiresDictType, slotCountBits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
atomicIncr(lazyfree_objects, kvstoreSize(oldkeys));
bioCreateLazyFreeJob(lazyfreeFreeDatabase, 2, oldkeys, oldexpires);
}
/* Free the key tracking table.

View File

@ -4295,7 +4295,7 @@ void RM_ResetDataset(int restart_aof, int async) {
/* Returns the number of keys in the current db. */
unsigned long long RM_DbSize(RedisModuleCtx *ctx) {
return dbSize(ctx->client->db, DB_MAIN);
return dbSize(ctx->client->db);
}
/* Returns a name of a random key, or NULL if current db is empty. */
@ -11058,7 +11058,7 @@ int RM_Scan(RedisModuleCtx *ctx, RedisModuleScanCursor *cursor, RedisModuleScanC
}
int ret = 1;
ScanCBData data = { ctx, privdata, fn };
cursor->cursor = dbScan(ctx->client->db, DB_MAIN, cursor->cursor, -1, moduleScanCallback, NULL, &data);
cursor->cursor = dbScan(ctx->client->db, cursor->cursor, moduleScanCallback, &data);
if (cursor->cursor == 0) {
cursor->done = 1;
ret = 0;

View File

@ -394,7 +394,7 @@ void touchWatchedKey(redisDb *db, robj *key) {
/* The key was already expired when WATCH was called. */
if (db == wk->db &&
equalStringObjects(key, wk->key) &&
dictFind(db->dict[calculateKeySlot(key->ptr)], key->ptr) == NULL)
dbFind(db, key->ptr) == NULL)
{
/* Already expired key is deleted, so logically no change. Clear
* the flag. Deleted keys are not flagged as expired. */
@ -432,9 +432,9 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) {
dictIterator *di = dictGetSafeIterator(emptied->watched_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int exists_in_emptied = dictFind(emptied->dict[calculateKeySlot(key->ptr)], key->ptr) != NULL;
int exists_in_emptied = dbFind(emptied, key->ptr) != NULL;
if (exists_in_emptied ||
(replaced_with && dictFind(replaced_with->dict[calculateKeySlot(key->ptr)], key->ptr)))
(replaced_with && dbFind(replaced_with, key->ptr) != NULL))
{
list *clients = dictGetVal(de);
if (!clients) continue;
@ -442,7 +442,7 @@ void touchAllWatchedKeysInDb(redisDb *emptied, redisDb *replaced_with) {
while((ln = listNext(&li))) {
watchedKey *wk = redis_member2struct(watchedKey, node, ln);
if (wk->expired) {
if (!replaced_with || !dictFind(replaced_with->dict[calculateKeySlot(key->ptr)], key->ptr)) {
if (!replaced_with || !dbFind(replaced_with, key->ptr)) {
/* Expired key now deleted. No logical change. Clear the
* flag. Deleted keys are not flagged as expired. */
wk->expired = 0;

View File

@ -1246,18 +1246,19 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
unsigned long long keyscount = dbSize(db, DB_MAIN);
unsigned long long keyscount = kvstoreSize(db->keys);
if (keyscount == 0) continue;
mh->total_keys += keyscount;
mh->db = zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1));
mh->db[mh->num_dbs].dbid = j;
mem = dbMemUsage(db, DB_MAIN);
mem = kvstoreMemUsage(db->keys) +
keyscount * sizeof(robj);
mh->db[mh->num_dbs].overhead_ht_main = mem;
mem_total+=mem;
mem = dbMemUsage(db, DB_EXPIRES);
mem = kvstoreMemUsage(db->expires);
mh->db[mh->num_dbs].overhead_ht_expires = mem;
mem_total+=mem;
@ -1544,7 +1545,7 @@ NULL
return;
}
}
if ((de = dbFind(c->db, c->argv[2]->ptr, DB_MAIN)) == NULL) {
if ((de = dbFind(c->db, c->argv[2]->ptr)) == NULL) {
addReplyNull(c);
return;
}

View File

@ -36,7 +36,7 @@ typedef struct pubsubtype {
int shard;
dict *(*clientPubSubChannels)(client*);
int (*subscriptionCount)(client*);
dict **(*serverPubSubChannels)(unsigned int);
kvstore **serverPubSubChannels;
robj **subscribeMsg;
robj **unsubscribeMsg;
robj **messageBulk;
@ -62,22 +62,12 @@ dict* getClientPubSubChannels(client *c);
*/
dict* getClientPubSubShardChannels(client *c);
/*
* Get server's global Pub/Sub channels dict.
*/
dict **getServerPubSubChannels(unsigned int slot);
/*
* Get server's shard level Pub/Sub channels dict.
*/
dict **getServerPubSubShardChannels(unsigned int slot);
/*
* Get list of channels client is subscribed to.
* If a pattern is provided, the subset of channels is returned
* matching the pattern.
*/
void channelList(client *c, sds pat, dict** pubsub_channels, int is_sharded);
void channelList(client *c, sds pat, kvstore *pubsub_channels);
/*
* Pub/Sub type for global channels.
@ -86,7 +76,7 @@ pubsubtype pubSubType = {
.shard = 0,
.clientPubSubChannels = getClientPubSubChannels,
.subscriptionCount = clientSubscriptionsCount,
.serverPubSubChannels = getServerPubSubChannels,
.serverPubSubChannels = &server.pubsub_channels,
.subscribeMsg = &shared.subscribebulk,
.unsubscribeMsg = &shared.unsubscribebulk,
.messageBulk = &shared.messagebulk,
@ -99,7 +89,7 @@ pubsubtype pubSubShardType = {
.shard = 1,
.clientPubSubChannels = getClientPubSubShardChannels,
.subscriptionCount = clientShardSubscriptionsCount,
.serverPubSubChannels = getServerPubSubShardChannels,
.serverPubSubChannels = &server.pubsubshard_channels,
.subscribeMsg = &shared.ssubscribebulk,
.unsubscribeMsg = &shared.sunsubscribebulk,
.messageBulk = &shared.smessagebulk,
@ -218,15 +208,14 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
/* Return the number of pubsub channels + patterns is handled. */
int serverPubsubSubscriptionCount(void) {
return dictSize(server.pubsub_channels) + dictSize(server.pubsub_patterns);
return kvstoreSize(server.pubsub_channels) + dictSize(server.pubsub_patterns);
}
/* Return the number of pubsub shard level channels is handled. */
int serverPubsubShardSubscriptionCount(void) {
return server.shard_channel_count;
return kvstoreSize(server.pubsubshard_channels);
}
/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
return dictSize(c->pubsub_channels) + dictSize(c->pubsub_patterns);
@ -245,16 +234,6 @@ dict* getClientPubSubShardChannels(client *c) {
return c->pubsubshard_channels;
}
dict **getServerPubSubChannels(unsigned int slot) {
UNUSED(slot);
return &server.pubsub_channels;
}
dict **getServerPubSubShardChannels(unsigned int slot) {
serverAssert(server.cluster_enabled || slot == 0);
return &server.pubsubshard_channels[slot];
}
/* Return the number of pubsub + pubsub shard level channels
* a client is subscribed to. */
int clientTotalPubSubSubscriptionCount(client *c) {
@ -278,8 +257,7 @@ void unmarkClientAsPubSub(client *c) {
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
dict **d_ptr;
dictEntry *de;
dictEntry *de, *existing;
dict *clients = NULL;
int retval = 0;
unsigned int slot = 0;
@ -292,23 +270,17 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
if (server.cluster_enabled && type.shard) {
slot = getKeySlot(channel->ptr);
}
d_ptr = type.serverPubSubChannels(slot);
if (*d_ptr == NULL) {
*d_ptr = dictCreate(&objToDictDictType);
de = NULL;
de = kvstoreDictAddRaw(*type.serverPubSubChannels, slot, channel, &existing);
if (existing) {
clients = dictGetVal(existing);
} else {
de = dictFind(*d_ptr, channel);
}
if (de == NULL) {
clients = dictCreate(&clientDictType);
dictAdd(*d_ptr, channel, clients);
kvstoreDictSetVal(*type.serverPubSubChannels, slot, de, clients);
incrRefCount(channel);
if (type.shard) {
server.shard_channel_count++;
}
} else {
clients = dictGetVal(de);
}
serverAssert(dictAdd(clients, c, NULL) != DICT_ERR);
}
/* Notify the client */
@ -319,7 +291,6 @@ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
* 0 if the client was not subscribed to the specified channel. */
int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype type) {
dict *d;
dictEntry *de;
dict *clients;
int retval = 0;
@ -334,9 +305,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty
if (server.cluster_enabled && type.shard) {
slot = getKeySlot(channel->ptr);
}
d = *type.serverPubSubChannels(slot);
serverAssertWithInfo(c,NULL,d != NULL);
de = dictFind(d, channel);
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
serverAssertWithInfo(c, NULL, dictDelete(clients, c) == DICT_OK);
@ -344,15 +313,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty
/* Free the dict and associated hash entry at all if this was
* the latest client, so that it will be possible to abuse
* Redis PUBSUB creating millions of channels. */
dictDelete(d, channel);
if (type.shard) {
if (dictSize(d) == 0) {
dictRelease(d);
dict **d_ptr = type.serverPubSubChannels(slot);
*d_ptr = NULL;
}
server.shard_channel_count--;
}
kvstoreDictDelete(*type.serverPubSubChannels, slot, channel);
}
}
/* Notify the client */
@ -365,11 +326,10 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify, pubsubtype ty
/* Unsubscribe all shard channels in a slot. */
void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) {
dict *d = server.pubsubshard_channels[slot];
if (!d) {
if (!kvstoreDictSize(server.pubsubshard_channels, slot))
return;
}
dictIterator *di = dictGetSafeIterator(d);
dictIterator *di = kvstoreDictGetSafeIterator(server.pubsubshard_channels, slot);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
robj *channel = dictGetKey(de);
@ -389,12 +349,9 @@ void pubsubShardUnsubscribeAllChannelsInSlot(unsigned int slot) {
}
}
dictReleaseIterator(iter);
server.shard_channel_count--;
dictDelete(d, channel);
kvstoreDictDelete(server.pubsubshard_channels, slot, channel);
}
dictReleaseIterator(di);
dictRelease(d);
server.pubsubshard_channels[slot] = NULL;
}
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
@ -513,7 +470,6 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
*/
int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type) {
int receivers = 0;
dict *d;
dictEntry *de;
dictIterator *di;
unsigned int slot = 0;
@ -522,8 +478,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
if (server.cluster_enabled && type.shard) {
slot = keyHashSlot(channel->ptr, sdslen(channel->ptr));
}
d = *type.serverPubSubChannels(slot);
de = d ? dictFind(d, channel) : NULL;
de = kvstoreDictFind(*type.serverPubSubChannels, slot, channel);
if (de) {
dict *clients = dictGetVal(de);
dictEntry *entry;
@ -693,14 +648,14 @@ NULL
{
/* PUBSUB CHANNELS [<pattern>] */
sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
channelList(c, pat, &server.pubsub_channels, 0);
channelList(c, pat, server.pubsub_channels);
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
/* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
int j;
addReplyArrayLen(c,(c->argc-2)*2);
for (j = 2; j < c->argc; j++) {
dict *d = dictFetchValue(server.pubsub_channels, c->argv[j]);
dict *d = kvstoreDictFetchValue(server.pubsub_channels, 0, c->argv[j]);
addReplyBulk(c,c->argv[j]);
addReplyLongLong(c, d ? dictSize(d) : 0);
@ -713,35 +668,33 @@ NULL
{
/* PUBSUB SHARDCHANNELS */
sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
channelList(c,pat,server.pubsubshard_channels,server.cluster_enabled);
channelList(c,pat,server.pubsubshard_channels);
} else if (!strcasecmp(c->argv[1]->ptr,"shardnumsub") && c->argc >= 2) {
/* PUBSUB SHARDNUMSUB [ShardChannel_1 ... ShardChannel_N] */
int j;
addReplyArrayLen(c, (c->argc-2)*2);
for (j = 2; j < c->argc; j++) {
unsigned int slot = calculateKeySlot(c->argv[j]->ptr);
dict *d = server.pubsubshard_channels[slot];
dict *clients = d ? dictFetchValue(d, c->argv[j]) : NULL;
dict *clients = kvstoreDictFetchValue(server.pubsubshard_channels, slot, c->argv[j]);
addReplyBulk(c,c->argv[j]);
addReplyLongLong(c, d ? dictSize(clients) : 0);
addReplyLongLong(c, clients ? dictSize(clients) : 0);
}
} else {
addReplySubcommandSyntaxError(c);
}
}
void channelList(client *c, sds pat, dict **pubsub_channels, int is_sharded) {
void channelList(client *c, sds pat, kvstore *pubsub_channels) {
long mblen = 0;
void *replylen;
unsigned int slot_cnt = is_sharded ? CLUSTER_SLOTS : 1;
unsigned int slot_cnt = kvstoreNumDicts(pubsub_channels);
replylen = addReplyDeferredLen(c);
for (unsigned int i = 0; i < slot_cnt; i++) {
if (pubsub_channels[i] == NULL) {
if (!kvstoreDictSize(pubsub_channels, i))
continue;
}
dictIterator *di = dictGetIterator(pubsub_channels[i]);
dictIterator *di = kvstoreDictGetIterator(pubsub_channels, i);
dictEntry *de;
while((de = dictNext(di)) != NULL) {
robj *cobj = dictGetKey(de);
@ -805,3 +758,9 @@ size_t pubsubMemOverhead(client *c) {
mem += dictMemUsage(c->pubsubshard_channels);
return mem;
}
int pubsubTotalSubscriptions(void) {
return dictSize(server.pubsub_patterns) +
kvstoreSize(server.pubsub_channels) +
kvstoreSize(server.pubsubshard_channels);
}

View File

@ -1301,12 +1301,12 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
dictEntry *de;
ssize_t written = 0;
ssize_t res;
dbIterator *dbit = NULL;
kvstoreIterator *kvs_it = NULL;
static long long info_updated_time = 0;
char *pname = (rdbflags & RDBFLAGS_AOF_PREAMBLE) ? "AOF rewrite" : "RDB";
redisDb *db = server.db + dbid;
unsigned long long int db_size = dbSize(db, DB_MAIN);
unsigned long long int db_size = kvstoreSize(db->keys);
if (db_size == 0) return 0;
/* Write the SELECT DB opcode */
@ -1316,7 +1316,7 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
written += res;
/* Write the RESIZE DB opcode. */
unsigned long long expires_size = dbSize(db, DB_EXPIRES);
unsigned long long expires_size = kvstoreSize(db->expires);
if ((res = rdbSaveType(rdb,RDB_OPCODE_RESIZEDB)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb,db_size)) < 0) goto werr;
@ -1324,20 +1324,20 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
if ((res = rdbSaveLen(rdb,expires_size)) < 0) goto werr;
written += res;
dbit = dbIteratorInit(db, DB_MAIN);
kvs_it = kvstoreIteratorInit(db->keys);
int last_slot = -1;
/* Iterate this DB writing every entry */
while ((de = dbIteratorNext(dbit)) != NULL) {
int curr_slot = dbIteratorGetCurrentSlot(dbit);
while ((de = kvstoreIteratorNext(kvs_it)) != NULL) {
int curr_slot = kvstoreIteratorGetCurrentDictIndex(kvs_it);
/* Save slot info. */
if (server.cluster_enabled && curr_slot != last_slot) {
if ((res = rdbSaveType(rdb, RDB_OPCODE_SLOT_INFO)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb, curr_slot)) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb, dictSize(db->dict[curr_slot]))) < 0) goto werr;
if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->keys, curr_slot))) < 0) goto werr;
written += res;
if ((res = rdbSaveLen(rdb, dictSize(db->expires[curr_slot]))) < 0) goto werr;
if ((res = rdbSaveLen(rdb, kvstoreDictSize(db->expires, curr_slot))) < 0) goto werr;
written += res;
last_slot = curr_slot;
}
@ -1368,11 +1368,11 @@ ssize_t rdbSaveDb(rio *rdb, int dbid, int rdbflags, long *key_counter) {
}
}
}
dbReleaseIterator(dbit);
kvstoreIteratorRelease(kvs_it);
return written;
werr:
if (dbit) dbReleaseIterator(dbit);
if (kvs_it) kvstoreIteratorRelease(kvs_it);
return -1;
}
@ -3027,7 +3027,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
return retval;
}
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned.
* The rdb_loading_ctx argument holds objects to which the rdb will be loaded to,
@ -3131,8 +3130,8 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
continue; /* Ignore gracefully. */
}
/* In cluster mode we resize individual slot specific dictionaries based on the number of keys that slot holds. */
dictExpand(db->dict[slot_id], slot_size);
dictExpand(db->expires[slot_id], expires_slot_size);
kvstoreDictExpand(db->keys, slot_id, slot_size);
kvstoreDictExpand(db->expires, slot_id, slot_size);
should_expand_db = 0;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) {
@ -3266,8 +3265,8 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
/* If there is no slot info, it means that it's either not cluster mode or we are trying to load legacy RDB file.
* In this case we want to estimate number of keys per slot and resize accordingly. */
if (should_expand_db) {
dbExpand(db, db_size, DB_MAIN, 0);
dbExpand(db, expires_size, DB_EXPIRES, 0);
dbExpand(db, db_size, 0);
dbExpandExpires(db, db_size, 0);
should_expand_db = 0;
}

View File

@ -436,63 +436,6 @@ int dictResizeAllowed(size_t moreMem, double usedRatio) {
}
}
/* Adds dictionary to the rehashing list, which allows us
* to quickly find rehash targets during incremental rehashing.
*
* Updates the bucket count in cluster-mode for the given dictionary in a DB, bucket count
* incremented with the new ht size during the rehashing phase. In non-cluster mode,
* bucket count can be retrieved directly from single dict bucket. */
void dictRehashingStarted(dict *d, dbKeyType keyType) {
dbDictMetadata *metadata = (dbDictMetadata *)dictMetadata(d);
listAddNodeTail(server.rehashing, d);
metadata->rehashing_node = listLast(server.rehashing);
if (!server.cluster_enabled) return;
unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
server.db[0].sub_dict[keyType].bucket_count += to; /* Started rehashing (Add the new ht size) */
}
/* Remove dictionary from the rehashing list.
*
* Updates the bucket count for the given dictionary in a DB. It removes
* the old ht size of the dictionary from the total sum of buckets for a DB. */
void dictRehashingCompleted(dict *d, dbKeyType keyType) {
dbDictMetadata *metadata = (dbDictMetadata *)dictMetadata(d);
if (metadata->rehashing_node) {
listDelNode(server.rehashing, metadata->rehashing_node);
metadata->rehashing_node = NULL;
}
if (!server.cluster_enabled) return;
unsigned long long from, to;
dictRehashingInfo(d, &from, &to);
server.db[0].sub_dict[keyType].bucket_count -= from; /* Finished rehashing (Remove the old ht size) */
}
void dbDictRehashingStarted(dict *d) {
dictRehashingStarted(d, DB_MAIN);
}
void dbDictRehashingCompleted(dict *d) {
dictRehashingCompleted(d, DB_MAIN);
}
void dbExpiresRehashingStarted(dict *d) {
dictRehashingStarted(d, DB_EXPIRES);
}
void dbExpiresRehashingCompleted(dict *d) {
dictRehashingCompleted(d, DB_EXPIRES);
}
/* Returns the size of the DB dict metadata in bytes. */
size_t dbDictMetadataSize(dict *d) {
UNUSED(d);
/* NOTICE: this also affects overhead_ht_main and overhead_ht_expires in getMemoryOverheadData. */
return sizeof(dbDictMetadata);
}
/* Generic hash table type where keys are Redis Objects, Values
* dummy pointers. */
dictType objectKeyPointerValueDictType = {
@ -524,6 +467,8 @@ dictType setDictType = {
NULL, /* val dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
NULL, /* val destructor */
NULL, /* allow to expand */
.no_value = 1, /* no values in this dict */
.keys_are_odd = 1 /* an SDS string is always an odd pointer */
};
@ -536,7 +481,7 @@ dictType zsetDictType = {
dictSdsKeyCompare, /* key compare */
NULL, /* Note: SDS string shared & freed by skiplist */
NULL, /* val destructor */
NULL /* allow to expand */
NULL, /* allow to expand */
};
/* Db->dict, keys are sds strings, vals are Redis objects. */
@ -548,9 +493,6 @@ dictType dbDictType = {
dictSdsDestructor, /* key destructor */
dictObjectDestructor, /* val destructor */
dictResizeAllowed, /* allow to resize */
dbDictRehashingStarted,
dbDictRehashingCompleted,
dbDictMetadataSize,
};
/* Db->expires */
@ -562,9 +504,6 @@ dictType dbExpiresDictType = {
NULL, /* key destructor */
NULL, /* val destructor */
dictResizeAllowed, /* allow to resize */
dbExpiresRehashingStarted,
dbExpiresRehashingCompleted,
dbDictMetadataSize,
};
/* Command table. sds string -> command struct pointer. */
@ -586,7 +525,7 @@ dictType hashDictType = {
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
dictSdsDestructor, /* val destructor */
NULL /* allow to expand */
NULL, /* allow to expand */
};
/* Dict type without destructor */
@ -693,53 +632,6 @@ dictType clientDictType = {
.no_value = 1 /* no values in this dict */
};
/* In cluster-enabled setup, this method traverses through all main/expires dictionaries (CLUSTER_SLOTS)
* and triggers a resize if the percentage of used buckets in the HT reaches (100 / HASHTABLE_MIN_FILL)
* we shrink the hash table to save memory, or expand the hash when the percentage of used buckets reached
* 100.
*
* In non cluster-enabled setup, it resize main/expires dictionary based on the same condition described above. */
void tryResizeHashTables(int dbid) {
redisDb *db = &server.db[dbid];
int dicts_per_call = min(CRON_DICTS_PER_DB, db->dict_count);
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
for (int i = 0; i < dicts_per_call; i++) {
int slot = db->sub_dict[subdict].resize_cursor;
dict *d = (subdict == DB_MAIN ? db->dict[slot] : db->expires[slot]);
if (dictShrinkIfNeeded(d) == DICT_ERR) {
dictExpandIfNeeded(d);
}
db->sub_dict[subdict].resize_cursor = (slot + 1) % db->dict_count;
}
}
}
/* Our hash table implementation performs rehashing incrementally while
* we write/read from the hash table. Still if the server is idle, the hash
* table will use two tables for a long time. So we try to use 1 millisecond
* of CPU time at every call of this function to perform some rehashing.
*
* The function returns 1 if some rehashing was performed, otherwise 0
* is returned. */
int incrementallyRehash(void) {
if (listLength(server.rehashing) == 0) return 0;
serverLog(LL_DEBUG,"Rehashing list length: %lu", listLength(server.rehashing));
/* Our goal is to rehash as many dictionaries as we can before reaching predefined threshold,
* after each dictionary completes rehashing, it removes itself from the list. */
listNode *node;
monotime timer;
elapsedStart(&timer);
while ((node = listFirst(server.rehashing))) {
uint64_t elapsed_us = elapsedUs(timer);
if (elapsed_us >= INCREMENTAL_REHASHING_THRESHOLD_US) {
break; /* Reached the time limit. */
}
dictRehashMicroseconds(listNodeValue(node), INCREMENTAL_REHASHING_THRESHOLD_US - elapsed_us);
}
return 1;
}
/* This function is called once a background process of some kind terminates,
* as we want to avoid resizing the hash tables when there is a child in order
* to play well with copy-on-write (otherwise when a resize happens lots of
@ -1179,21 +1071,33 @@ void databasesCron(void) {
* DB we'll be able to start from the successive in the next
* cron loop iteration. */
static unsigned int resize_db = 0;
static unsigned int rehash_db = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
int j;
/* Don't test more DBs than we have. */
if (dbs_per_call > server.dbnum) dbs_per_call = server.dbnum;
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % server.dbnum);
redisDb *db = &server.db[resize_db % server.dbnum];
kvstoreTryResizeDicts(db->keys, CRON_DICTS_PER_DB);
kvstoreTryResizeDicts(db->expires, CRON_DICTS_PER_DB);
resize_db++;
}
/* Rehash */
if (server.activerehashing) {
incrementallyRehash();
uint64_t elapsed_us = 0;
for (j = 0; j < dbs_per_call; j++) {
redisDb *db = &server.db[rehash_db % server.dbnum];
elapsed_us += kvstoreIncrementallyRehash(db->keys, INCREMENTAL_REHASHING_THRESHOLD_US);
if (elapsed_us >= INCREMENTAL_REHASHING_THRESHOLD_US)
break;
elapsed_us += kvstoreIncrementallyRehash(db->expires, INCREMENTAL_REHASHING_THRESHOLD_US);
if (elapsed_us >= INCREMENTAL_REHASHING_THRESHOLD_US)
break;
rehash_db++;
}
}
}
}
@ -1449,9 +1353,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
for (j = 0; j < server.dbnum; j++) {
long long size, used, vkeys;
size = dbBuckets(&server.db[j], DB_MAIN);
used = dbSize(&server.db[j], DB_MAIN);
vkeys = dbSize(&server.db[j], DB_EXPIRES);
size = kvstoreBuckets(server.db[j].keys);
used = kvstoreSize(server.db[j].keys);
vkeys = kvstoreSize(server.db[j].expires);
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
}
@ -2669,17 +2573,6 @@ void makeThreadKillable(void) {
pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL);
}
/* When adding fields, please check the initTempDb related logic. */
void initDbState(redisDb *db){
for (dbKeyType subdict = DB_MAIN; subdict <= DB_EXPIRES; subdict++) {
db->sub_dict[subdict].non_empty_slots = 0;
db->sub_dict[subdict].key_count = 0;
db->sub_dict[subdict].resize_cursor = 0;
db->sub_dict[subdict].slot_size_index = server.cluster_enabled ? zcalloc(sizeof(unsigned long long) * (CLUSTER_SLOTS + 1)) : NULL;
db->sub_dict[subdict].bucket_count = 0;
}
}
void initServer(void) {
int j;
@ -2755,10 +2648,10 @@ void initServer(void) {
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
/* Create the Redis databases, and initialize other internal state. */
int slot_count = (server.cluster_enabled) ? CLUSTER_SLOTS : 1;
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreateMultiple(&dbDictType, slot_count);
server.db[j].expires = dictCreateMultiple(&dbExpiresDictType,slot_count);
int slot_count_bits = (server.cluster_enabled) ? CLUSTER_SLOT_MASK_BITS : 0;
for (j = 0; j < server.dbnum; j++) {
server.db[j].keys = kvstoreCreate(&dbDictType, slot_count_bits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
server.db[j].expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
server.db[j].expires_cursor = 0;
server.db[j].blocking_keys = dictCreate(&keylistDictType);
server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType);
@ -2767,16 +2660,15 @@ void initServer(void) {
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
server.db[j].dict_count = slot_count;
initDbState(&server.db[j]);
listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
}
server.rehashing = listCreate();
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&objToDictDictType);
/* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which
* seems odd) just to make the code cleaner by making it be the same type as server.pubsubshard_channels
* (which has to be kvstore), see pubsubtype.serverPubSubChannels */
server.pubsub_channels = kvstoreCreate(&objToDictDictType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
server.pubsub_patterns = dictCreate(&objToDictDictType);
server.pubsubshard_channels = zcalloc(sizeof(dict *) * slot_count);
server.shard_channel_count = 0;
server.pubsubshard_channels = kvstoreCreate(&objToDictDictType, slot_count_bits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS);
server.pubsub_clients = 0;
server.cronloops = 0;
server.in_exec = 0;
@ -5906,9 +5798,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"current_eviction_exceeded_time:%lld\r\n", current_eviction_exceeded_time / 1000,
"keyspace_hits:%lld\r\n", server.stat_keyspace_hits,
"keyspace_misses:%lld\r\n", server.stat_keyspace_misses,
"pubsub_channels:%ld\r\n", dictSize(server.pubsub_channels),
"pubsub_channels:%llu\r\n", kvstoreSize(server.pubsub_channels),
"pubsub_patterns:%lu\r\n", dictSize(server.pubsub_patterns),
"pubsubshard_channels:%llu\r\n", server.shard_channel_count,
"pubsubshard_channels:%llu\r\n", kvstoreSize(server.pubsubshard_channels),
"latest_fork_usec:%lld\r\n", server.stat_fork_time,
"total_forks:%lld\r\n", server.stat_total_forks,
"migrate_cached_sockets:%ld\r\n", dictSize(server.migrate_cached_sockets),
@ -6135,8 +6027,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
for (j = 0; j < server.dbnum; j++) {
long long keys, vkeys;
keys = dbSize(&server.db[j], DB_MAIN);
vkeys = dbSize(&server.db[j], DB_EXPIRES);
keys = kvstoreSize(server.db[j].keys);
vkeys = kvstoreSize(server.db[j].expires);
if (keys || vkeys) {
info = sdscatprintf(info,
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",

View File

@ -67,6 +67,7 @@ typedef long long ustime_t; /* microsecond time type. */
#include "ae.h" /* Event driven programming library */
#include "sds.h" /* Dynamic safe strings */
#include "dict.h" /* Hash tables */
#include "kvstore.h" /* Slot-based hash table */
#include "adlist.h" /* Linked lists */
#include "zmalloc.h" /* total memory usage aware version of malloc/free */
#include "anet.h" /* Networking the easy way */
@ -970,31 +971,12 @@ typedef struct replBufBlock {
char buf[];
} replBufBlock;
/* When adding fields, please check the swap db related logic. */
typedef struct dbDictState {
int resize_cursor; /* Cron job uses this cursor to gradually resize all dictionaries. */
int non_empty_slots; /* The number of non-empty slots. */
unsigned long long key_count; /* Total number of keys in this DB. */
unsigned long long bucket_count; /* Total number of buckets in this DB across dictionaries (only used for cluster-enabled). */
unsigned long long *slot_size_index; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given slot. */
} dbDictState;
typedef enum dbKeyType {
DB_MAIN,
DB_EXPIRES
} dbKeyType;
/* Dict metadata for database, used for record the position in rehashing list. */
typedef struct dbDictMetadata {
listNode *rehashing_node; /* list node in rehashing list */
} dbDictMetadata;
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict **dict; /* The keyspace for this DB */
dict **expires; /* Timeout of keys with a timeout set */
kvstore *keys; /* The keyspace for this DB */
kvstore *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *blocking_keys_unblock_on_nokey; /* Keys with clients waiting for
* data, and should be unblocked if key is deleted (XREADEDGROUP).
@ -1005,8 +987,6 @@ typedef struct redisDb {
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
int dict_count; /* Indicates total number of dictionaries owned by this DB, 1 dict per slot in cluster mode. */
dbDictState sub_dict[2]; /* Metadata for main and expires dictionaries */
} redisDb;
/* forward declaration for functions ctx */
@ -1574,7 +1554,6 @@ struct redisServer {
int hz; /* serverCron() calls frequency in hertz */
int in_fork_child; /* indication that this is a fork child */
redisDb *db;
list *rehashing; /* List of dictionaries in DBs that are currently rehashing. */
dict *commands; /* Command table */
dict *orig_commands; /* Command table before command renaming. */
aeEventLoop *el;
@ -1994,12 +1973,11 @@ struct redisServer {
size_t blocking_op_nesting; /* Nesting level of blocking operation, used to reset blocked_last_cron. */
long long blocked_last_cron; /* Indicate the mstime of the last time we did cron jobs from a blocking operation */
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
kvstore *pubsub_channels; /* Map channels to list of subscribed clients */
dict *pubsub_patterns; /* A dict of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */
dict **pubsubshard_channels; /* Map shard channels in every slot to list of subscribed clients */
unsigned long long shard_channel_count;
kvstore *pubsubshard_channels; /* Map shard channels in every slot to list of subscribed clients */
unsigned int pubsub_clients; /* # of clients in Pub/Sub mode */
/* Cluster */
int cluster_enabled; /* Is cluster enabled? */
@ -2445,20 +2423,6 @@ typedef struct {
unsigned char *lpi; /* listpack iterator */
} setTypeIterator;
typedef struct dbIterator dbIterator;
/* DB iterator specific functions */
dbIterator *dbIteratorInit(redisDb *db, dbKeyType keyType);
void dbReleaseIterator(dbIterator *dbit);
dict *dbIteratorNextDict(dbIterator *dbit);
dict *dbGetDictFromIterator(dbIterator *dbit);
int dbIteratorGetCurrentSlot(dbIterator *dbit);
dictEntry *dbIteratorNext(dbIterator *iter);
/* SCAN specific commands for easy cursor manipulation, shared between main code and modules. */
int getAndClearSlotIdFromCursor(unsigned long long *cursor);
void addSlotIdToCursor(int slot, unsigned long long *cursor);
/* Structure to hold hash iteration abstraction. Note that iteration over
* hashes involves both fields and values. Because it is possible that
* not both are required, store pointers in the iterator to avoid
@ -3143,21 +3107,16 @@ void dismissMemoryInChild(void);
#define RESTART_SERVER_GRACEFULLY (1<<0) /* Do proper shutdown. */
#define RESTART_SERVER_CONFIG_REWRITE (1<<1) /* CONFIG REWRITE before restart.*/
int restartServer(int flags, mstime_t delay);
unsigned long long int dbSize(redisDb *db, dbKeyType keyType);
int dbNonEmptySlots(redisDb *db, dbKeyType keyType);
int getKeySlot(sds key);
int calculateKeySlot(sds key);
unsigned long dbBuckets(redisDb *db, dbKeyType keyType);
size_t dbMemUsage(redisDb *db, dbKeyType keyType);
dictEntry *dbFind(redisDb *db, void *key, dbKeyType keyType);
unsigned long long dbScan(redisDb *db, dbKeyType keyType, unsigned long long cursor,
int onlyslot, dictScanFunction *fn,
int (dictScanValidFunction)(dict *d), void *privdata);
int dbExpand(const redisDb *db, uint64_t db_size, dbKeyType keyType, int try_expand);
unsigned long long cumulativeKeyCountRead(redisDb *db, int idx, dbKeyType keyType);
int getFairRandomSlot(redisDb *db, dbKeyType keyType);
int dbGetNextNonEmptySlot(redisDb *db, int slot, dbKeyType keyType);
int findSlotByKeyIndex(redisDb *db, unsigned long target, dbKeyType keyType);
/* kvstore wrappers */
int dbExpand(redisDb *db, uint64_t db_size, int try_expand);
int dbExpandExpires(redisDb *db, uint64_t db_size, int try_expand);
dictEntry *dbFind(redisDb *db, void *key);
dictEntry *dbFindExpires(redisDb *db, void *key);
unsigned long long dbSize(redisDb *db);
unsigned long long dbScan(redisDb *db, unsigned long long cursor, dictScanFunction *scan_cb, void *privdata);
/* Set data type */
robj *setTypeCreate(sds value, size_t size_hint);
@ -3214,6 +3173,7 @@ int serverPubsubSubscriptionCount(void);
int serverPubsubShardSubscriptionCount(void);
size_t pubsubMemOverhead(client *c);
void unmarkClientAsPubSub(client *c);
int pubsubTotalSubscriptions(void);
/* Keyspace events notification */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);

View File

@ -527,9 +527,9 @@ start_server {tags {"other external:skip"}} {
# Set a key to enable overhead display of db 0
r set a b
# The dict containing 128 keys must have expanded,
# its hash table itself takes a lot more than 200 bytes
# its hash table itself takes a lot more than 400 bytes
wait_for_condition 100 50 {
[get_overhead_hashtable_main] < 200
[get_overhead_hashtable_main] < 400
} else {
fail "dict did not resize in time"
}