redis/src/db.c

2714 lines
98 KiB
C

/*
* Copyright (c) 2009-Present, Redis Ltd.
* All rights reserved.
*
* Licensed under your choice of the Redis Source Available License 2.0
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*/
#include "server.h"
#include "cluster.h"
#include "atomicvar.h"
#include "latency.h"
#include "script.h"
#include "functions.h"
#include <signal.h>
#include <ctype.h>
#include "bio.h"
/*-----------------------------------------------------------------------------
* C-level DB API
*----------------------------------------------------------------------------*/
/* Flags for expireIfNeeded */
#define EXPIRE_FORCE_DELETE_EXPIRED 1
#define EXPIRE_AVOID_DELETE_EXPIRED 2
/* Return values for expireIfNeeded */
typedef enum {
KEY_VALID = 0, /* Could be volatile and not yet expired, non-volatile, or even non-existing key. */
KEY_EXPIRED, /* Logically expired but not yet deleted. */
KEY_DELETED /* The key was deleted now. */
} keyStatus;
keyStatus expireIfNeeded(redisDb *db, robj *key, int flags);
int keyIsExpired(redisDb *db, robj *key);
static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEntry *de);
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
* Then logarithmically increment the counter, and update the access time. */
void updateLFU(robj *val) {
unsigned long counter = LFUDecrAndReturn(val);
counter = LFULogIncr(counter);
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}
/* Lookup a key for read or write operations, or return NULL if the key is not
* found in the specified DB. This function implements the functionality of
* lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants.
*
* Side-effects of calling this function:
*
* 1. A key gets expired if it reached it's TTL.
* 2. The key's last access time is updated.
* 3. The global keys hits/misses stats are updated (reported in INFO).
* 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
*
* Flags change the behavior of this command:
*
* LOOKUP_NONE (or zero): No special flags are passed.
* LOOKUP_NOTOUCH: Don't alter the last access time of the key.
* LOOKUP_NONOTIFY: Don't trigger keyspace event on key miss.
* LOOKUP_NOSTATS: Don't increment key hits/misses counters.
* LOOKUP_WRITE: Prepare the key for writing (delete expired keys even on
* replicas, use separate keyspace stats and events (TODO)).
* LOOKUP_NOEXPIRE: Perform expiration check, but avoid deleting the key,
* so that we don't have to propagate the deletion.
*
* Note: this function also returns NULL if the key is logically expired but
* still existing, in case this is a replica and the LOOKUP_WRITE is not set.
* Even if the key expiry is master-driven, we can correctly report a key is
* expired on replicas even if the master is lagging expiring our key via DELs
* in the replication link. */
robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dbFind(db, key->ptr);
robj *val = NULL;
if (de) {
val = dictGetVal(de);
/* Forcing deletion of expired keys on a replica makes the replica
* inconsistent with the master. We forbid it on readonly replicas, but
* we have to allow it on writable replicas to make write commands
* behave consistently.
*
* It's possible that the WRITE flag is set even during a readonly
* command, since the command may trigger events that cause modules to
* perform additional writes. */
int is_ro_replica = server.masterhost && server.repl_slave_ro;
int expire_flags = 0;
if (flags & LOOKUP_WRITE && !is_ro_replica)
expire_flags |= EXPIRE_FORCE_DELETE_EXPIRED;
if (flags & LOOKUP_NOEXPIRE)
expire_flags |= EXPIRE_AVOID_DELETE_EXPIRED;
if (expireIfNeeded(db, key, expire_flags) != KEY_VALID) {
/* The key is no longer valid. */
val = NULL;
}
}
if (val) {
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
if (server.current_client && server.current_client->flags & CLIENT_NO_TOUCH &&
server.current_client->cmd->proc != touchCommand)
flags |= LOOKUP_NOTOUCH;
if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
updateLFU(val);
} else {
val->lru = LRU_CLOCK();
}
}
if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
server.stat_keyspace_hits++;
/* TODO: Use separate hits stats for WRITE */
} else {
if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE)))
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
server.stat_keyspace_misses++;
/* TODO: Use separate misses stats and notify event for WRITE */
}
return val;
}
/* Lookup a key for read operations, or return NULL if the key is not found
* in the specified DB.
*
* This API should not be used when we write to the key after obtaining
* the object linked to the key, but only for read only operations.
*
* This function is equivalent to lookupKey(). The point of using this function
* rather than lookupKey() directly is to indicate that the purpose is to read
* the key. */
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
serverAssert(!(flags & LOOKUP_WRITE));
return lookupKey(db, key, flags);
}
/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
* common case. */
robj *lookupKeyRead(redisDb *db, robj *key) {
return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}
/* Lookup a key for write operations, and as a side effect, if needed, expires
* the key if its TTL is reached. It's equivalent to lookupKey() with the
* LOOKUP_WRITE flag added.
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
return lookupKey(db, key, flags | LOOKUP_WRITE);
}
robj *lookupKeyWrite(redisDb *db, robj *key) {
return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
}
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
robj *o = lookupKeyRead(c->db, key);
if (!o) addReplyOrErrorObject(c, reply);
return o;
}
robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
robj *o = lookupKeyWrite(c->db, key);
if (!o) addReplyOrErrorObject(c, reply);
return o;
}
/* Add the key to the DB. It's up to the caller to increment the reference
* counter of the value if needed.
*
* If the update_if_existing argument is false, the program is aborted
* if the key already exists, otherwise, it can fall back to dbOverwrite. */
static void dbAddInternal(redisDb *db, robj *key, robj *val, int update_if_existing) {
dictEntry *existing;
int slot = getKeySlot(key->ptr);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key->ptr, &existing);
if (update_if_existing && existing) {
dbSetValue(db, key, val, 1, existing);
return;
}
serverAssertWithInfo(NULL, key, de != NULL);
kvstoreDictSetKey(db->keys, slot, de, sdsdup(key->ptr));
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
signalKeyAsReady(db, key, val->type);
notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id);
}
void dbAdd(redisDb *db, robj *key, robj *val) {
dbAddInternal(db, key, val, 0);
}
/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled.
* The only difference between this function and getKeySlot, is that it's not using cached key slot from the current_client
* and always calculates CRC hash.
* This is useful when slot needs to be calculated for a key that user didn't request for, such as in case of eviction. */
int calculateKeySlot(sds key) {
return server.cluster_enabled ? keyHashSlot(key, (int) sdslen(key)) : 0;
}
/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/
int getKeySlot(sds key) {
/* This is performance optimization that uses pre-set slot id from the current command,
* in order to avoid calculation of the key hash.
* This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set.
* It only gets set during the execution of command under `call` method. Other flows requesting
* the key slot would fallback to calculateKeySlot.
*/
if (server.current_client && server.current_client->slot >= 0 && server.current_client->flags & CLIENT_EXECUTING_COMMAND) {
debugServerAssertWithInfo(server.current_client, NULL, calculateKeySlot(key)==server.current_client->slot);
return server.current_client->slot;
}
return calculateKeySlot(key);
}
/* This is a special version of dbAdd() that is used only when loading
* keys from the RDB file: the key is passed as an SDS string that is
* retained by the function (and not freed by the caller).
*
* Moreover this function will not abort if the key is already busy, to
* give more control to the caller, nor will signal the key as ready
* since it is not useful in this context.
*
* The function returns 1 if the key was added to the database, taking
* ownership of the SDS string, otherwise 0 is returned, and is up to the
* caller to free the SDS string. */
int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
int slot = getKeySlot(key);
dictEntry *de = kvstoreDictAddRaw(db->keys, slot, key, NULL);
if (de == NULL) return 0;
initObjectLRUOrLFU(val);
kvstoreDictSetVal(db->keys, slot, de, val);
return 1;
}
/* Overwrite an existing key with a new value. Incrementing the reference
* count of the new value is up to the caller.
* This function does not modify the expire time of the existing key.
*
* The 'overwrite' flag is an indication whether this is done as part of a
* complete replacement of their key, which can be thought as a deletion and
* replacement (in which case we need to emit deletion signals), or just an
* update of a value of an existing key (when false).
*
* The dictEntry input is optional, can be used if we already have one.
*
* The program is aborted if the key was not already present. */
static void dbSetValue(redisDb *db, robj *key, robj *val, int overwrite, dictEntry *de) {
int slot = getKeySlot(key->ptr);
if (!de) de = kvstoreDictFind(db->keys, slot, key->ptr);
serverAssertWithInfo(NULL,key,de != NULL);
robj *old = dictGetVal(de);
val->lru = old->lru;
if (overwrite) {
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
* need to incr to retain old */
incrRefCount(old);
/* Although the key is not really deleted from the database, we regard
* overwrite as two steps of unlink+add, so we still need to call the unlink
* callback of the module. */
moduleNotifyKeyUnlink(key,old,db->id,DB_FLAG_KEY_OVERWRITE);
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
signalDeletedKeyAsReady(db,key,old->type);
decrRefCount(old);
/* Because of RM_StringDMA, old may be changed, so we need get old again */
old = dictGetVal(de);
}
kvstoreDictSetVal(db->keys, slot, de, val);
if (server.lazyfree_lazy_server_del) {
freeObjAsync(key,old,db->id);
} else {
decrRefCount(old);
}
}
/* Replace an existing key with a new value, we just replace value and don't
* emit any events */
void dbReplaceValue(redisDb *db, robj *key, robj *val) {
dbSetValue(db, key, val, 0, NULL);
}
/* High level Set operation. This function can be used in order to set
* a key, whatever it was existing or not, to a new object.
*
* 1) The ref count of the value object is incremented.
* 2) clients WATCHing for the destination key notified.
* 3) The expire time of the key is reset (the key is made persistent),
* unless 'SETKEY_KEEPTTL' is enabled in flags.
* 4) The key lookup can take place outside this interface outcome will be
* delivered with 'SETKEY_ALREADY_EXIST' or 'SETKEY_DOESNT_EXIST'
*
* All the new keys in the database should be created via this interface.
* The client 'c' argument may be set to NULL if the operation is performed
* in a context where there is no clear client performing the operation. */
void setKey(client *c, redisDb *db, robj *key, robj *val, int flags) {
int keyfound = 0;
if (flags & SETKEY_ALREADY_EXIST)
keyfound = 1;
else if (flags & SETKEY_ADD_OR_UPDATE)
keyfound = -1;
else if (!(flags & SETKEY_DOESNT_EXIST))
keyfound = (lookupKeyWrite(db,key) != NULL);
if (!keyfound) {
dbAdd(db,key,val);
} else if (keyfound<0) {
dbAddInternal(db,key,val,1);
} else {
dbSetValue(db,key,val,1,NULL);
}
incrRefCount(val);
if (!(flags & SETKEY_KEEPTTL)) removeExpire(db,key);
if (!(flags & SETKEY_NO_SIGNAL)) signalModifiedKey(c,db,key);
}
/* Return a random key, in form of a Redis object.
* If there are no keys, NULL is returned.
*
* The function makes sure to return keys not already expired. */
robj *dbRandomKey(redisDb *db) {
dictEntry *de;
int maxtries = 100;
int allvolatile = kvstoreSize(db->keys) == kvstoreSize(db->expires);
while(1) {
sds key;
robj *keyobj;
int randomSlot = kvstoreGetFairRandomDictIndex(db->keys);
de = kvstoreDictGetFairRandomKey(db->keys, randomSlot);
if (de == NULL) return NULL;
key = dictGetKey(de);
keyobj = createStringObject(key,sdslen(key));
if (dbFindExpires(db, key)) {
if (allvolatile && server.masterhost && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
* expired in the slave, so the function cannot stop because
* expireIfNeeded() is false, nor it can stop because
* dictGetFairRandomKey() returns NULL (there are keys to return).
* To prevent the infinite loop we do some tries, but if there
* are the conditions for an infinite loop, eventually we
* return a key name that may be already expired. */
return keyobj;
}
if (expireIfNeeded(db,keyobj,0) != KEY_VALID) {
decrRefCount(keyobj);
continue; /* search for another key. This expired. */
}
}
return keyobj;
}
}
/* Helper for sync and async delete. */
int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
dictEntry **plink;
int table;
int slot = getKeySlot(key->ptr);
dictEntry *de = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &plink, &table);
if (de) {
robj *val = dictGetVal(de);
/* RM_StringDMA may call dbUnshareStringValue which may free val, so we
* need to incr to retain val */
incrRefCount(val);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val,db->id,flags);
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
signalDeletedKeyAsReady(db,key,val->type);
/* We should call decr before freeObjAsync. If not, the refcount may be
* greater than 1, so freeObjAsync doesn't work */
decrRefCount(val);
if (async) {
/* Because of dbUnshareStringValue, the val in de may change. */
freeObjAsync(key, dictGetVal(de), db->id);
kvstoreDictSetVal(db->keys, slot, de, NULL);
}
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
kvstoreDictDelete(db->expires, slot, key->ptr);
kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, de, plink, table);
return 1;
} else {
return 0;
}
}
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED);
}
/* Delete a key, value, and associated expiration entry if any, from the DB. If
* the value consists of many allocations, it may be freed asynchronously. */
int dbAsyncDelete(redisDb *db, robj *key) {
return dbGenericDelete(db, key, 1, DB_FLAG_KEY_DELETED);
}
/* This is a wrapper whose behavior depends on the Redis lazy free
* configuration. Deletes the key synchronously or asynchronously. */
int dbDelete(redisDb *db, robj *key) {
return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, DB_FLAG_KEY_DELETED);
}
/* Prepare the string object stored at 'key' to be modified destructively
* to implement commands like SETBIT or APPEND.
*
* An object is usually ready to be modified unless one of the two conditions
* are true:
*
* 1) The object 'o' is shared (refcount > 1), we don't want to affect
* other users.
* 2) The object encoding is not "RAW".
*
* If the object is found in one of the above conditions (or both) by the
* function, an unshared / not-encoded copy of the string object is stored
* at 'key' in the specified 'db'. Otherwise the object 'o' itself is
* returned.
*
* USAGE:
*
* The object 'o' is what the caller already obtained by looking up 'key'
* in 'db', the usage pattern looks like this:
*
* o = lookupKeyWrite(db,key);
* if (checkType(c,o,OBJ_STRING)) return;
* o = dbUnshareStringValue(db,key,o);
*
* At this point the caller is ready to modify the object, for example
* using an sdscat() call to append some data, or anything else.
*/
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
serverAssert(o->type == OBJ_STRING);
if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
robj *decoded = getDecodedObject(o);
o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
decrRefCount(decoded);
dbReplaceValue(db,key,o);
}
return o;
}
/* Remove all keys from the database(s) structure. The dbarray argument
* may not be the server main DBs (could be a temporary DB).
*
* The dbnum can be -1 if all the DBs should be emptied, or the specified
* DB index if we want to empty only a single database.
* The function returns the number of keys removed from the database(s). */
long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
void(callback)(dict*))
{
long long removed = 0;
int startdb, enddb;
if (dbnum == -1) {
startdb = 0;
enddb = server.dbnum-1;
} else {
startdb = enddb = dbnum;
}
for (int j = startdb; j <= enddb; j++) {
removed += kvstoreSize(dbarray[j].keys);
if (async) {
emptyDbAsync(&dbarray[j]);
} else {
kvstoreEmpty(dbarray[j].keys, callback);
kvstoreEmpty(dbarray[j].expires, callback);
}
/* Because all keys of database are removed, reset average ttl. */
dbarray[j].avg_ttl = 0;
dbarray[j].expires_cursor = 0;
}
return removed;
}
/* Remove all data (keys and functions) from all the databases in a
* Redis server. If callback is given the function is called from
* time to time to signal that work is in progress.
*
* The dbnum can be -1 if all the DBs should be flushed, or the specified
* DB number if we want to flush only a single Redis database number.
*
* Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
* EMPTYDB_ASYNC if we want the memory to be freed in a different thread
* and the function to return ASAP. EMPTYDB_NOFUNCTIONS can also be set
* to specify that we do not want to delete the functions.
*
* On success the function returns the number of keys removed from the
* database(s). Otherwise -1 is returned in the specific case the
* DB number is out of range, and errno is set to EINVAL. */
long long emptyData(int dbnum, int flags, void(callback)(dict*)) {
int async = (flags & EMPTYDB_ASYNC);
int with_functions = !(flags & EMPTYDB_NOFUNCTIONS);
RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
long long removed = 0;
if (dbnum < -1 || dbnum >= server.dbnum) {
errno = EINVAL;
return -1;
}
/* Fire the flushdb modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
REDISMODULE_SUBEVENT_FLUSHDB_START,
&fi);
/* Make sure the WATCHed keys are affected by the FLUSH* commands.
* Note that we need to call the function while the keys are still
* there. */
signalFlushedDb(dbnum, async);
/* Empty redis database structure. */
removed = emptyDbStructure(server.db, dbnum, async, callback);
if (dbnum == -1) flushSlaveKeysWithExpireList();
if (with_functions) {
serverAssert(dbnum == -1);
functionsLibCtxClearCurrent(async);
}
/* Also fire the end event. Note that this event will fire almost
* immediately after the start event if the flush is asynchronous. */
moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
REDISMODULE_SUBEVENT_FLUSHDB_END,
&fi);
return removed;
}
/* Initialize temporary db on replica for use during diskless replication. */
redisDb *initTempDb(void) {
int slot_count_bits = 0;
int flags = KVSTORE_ALLOCATE_DICTS_ON_DEMAND;
if (server.cluster_enabled) {
slot_count_bits = CLUSTER_SLOT_MASK_BITS;
flags |= KVSTORE_FREE_EMPTY_DICTS;
}
redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum);
for (int i=0; i<server.dbnum; i++) {
tempDb[i].id = i;
tempDb[i].keys = kvstoreCreate(&dbDictType, slot_count_bits, flags);
tempDb[i].expires = kvstoreCreate(&dbExpiresDictType, slot_count_bits, flags);
}
return tempDb;
}
/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */
void discardTempDb(redisDb *tempDb, void(callback)(dict*)) {
int async = 1;
/* Release temp DBs. */
emptyDbStructure(tempDb, -1, async, callback);
for (int i=0; i<server.dbnum; i++) {
kvstoreRelease(tempDb[i].keys);
kvstoreRelease(tempDb[i].expires);
}
zfree(tempDb);
}
int selectDb(client *c, int id) {
if (id < 0 || id >= server.dbnum)
return C_ERR;
c->db = &server.db[id];
return C_OK;
}
long long dbTotalServerKeyCount(void) {
long long total = 0;
int j;
for (j = 0; j < server.dbnum; j++) {
total += kvstoreSize(server.db[j].keys);
}
return total;
}
/*-----------------------------------------------------------------------------
* Hooks for key space changes.
*
* Every time a key in the database is modified the function
* signalModifiedKey() is called.
*
* Every time a DB is flushed the function signalFlushDb() is called.
*----------------------------------------------------------------------------*/
/* Note that the 'c' argument may be NULL if the key was modified out of
* a context of a client. */
void signalModifiedKey(client *c, redisDb *db, robj *key) {
touchWatchedKey(db,key);
trackingInvalidateKey(c,key,1);
}
void signalFlushedDb(int dbid, int async) {
int startdb, enddb;
if (dbid == -1) {
startdb = 0;
enddb = server.dbnum-1;
} else {
startdb = enddb = dbid;
}
for (int j = startdb; j <= enddb; j++) {
scanDatabaseForDeletedKeys(&server.db[j], NULL);
touchAllWatchedKeysInDb(&server.db[j], NULL);
}
trackingInvalidateKeysOnFlush(async);
/* Changes in this method may take place in swapMainDbWithTempDb as well,
* where we execute similar calls, but with subtle differences as it's
* not simply flushing db. */
}
/*-----------------------------------------------------------------------------
* Type agnostic commands operating on the key space
*----------------------------------------------------------------------------*/
/* Return the set of flags to use for the emptyData() call for FLUSHALL
* and FLUSHDB commands.
*
* sync: flushes the database in an sync manner.
* async: flushes the database in an async manner.
* no option: determine sync or async according to the value of lazyfree-lazy-user-flush.
*
* On success C_OK is returned and the flags are stored in *flags, otherwise
* C_ERR is returned and the function sends an error to the client. */
int getFlushCommandFlags(client *c, int *flags) {
/* Parse the optional ASYNC option. */
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"sync")) {
*flags = EMPTYDB_NO_FLAGS;
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"async")) {
*flags = EMPTYDB_ASYNC;
} else if (c->argc == 1) {
*flags = server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return C_ERR;
}
return C_OK;
}
/* Flushes the whole server data set. */
void flushAllDataAndResetRDB(int flags) {
server.dirty += emptyData(-1,flags,NULL);
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
if (server.saveparamslen > 0) {
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE);
}
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchronous. */
if (!(flags & EMPTYDB_ASYNC)) {
/* Only clear the current thread cache.
* Ignore the return call since this will fail if the tcache is disabled. */
je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
jemalloc_purge();
}
#endif
}
/* Optimized FLUSHALL\FLUSHDB SYNC command finished to run by lazyfree thread */
void flushallSyncBgDone(uint64_t client_id) {
client *c = lookupClientByID(client_id);
/* Verify that client still exists */
if (!c) return;
/* Update current_client (Called functions might rely on it) */
client *old_client = server.current_client;
server.current_client = c;
/* Don't update blocked_us since command was processed in bg by lazy_free thread */
updateStatsOnUnblock(c, 0 /*blocked_us*/, elapsedUs(c->bstate.lazyfreeStartTime), 0);
/* lazyfree bg job always succeed */
addReply(c, shared.ok);
/* mark client as unblocked */
unblockClient(c, 1);
/* FLUSH command is finished. resetClient() and update replication offset. */
commandProcessed(c);
/* On flush completion, update the client's memory */
updateClientMemUsageAndBucket(c);
/* restore current_client */
server.current_client = old_client;
}
void flushCommandCommon(client *c, int isFlushAll) {
int blocking_async = 0; /* FLUSHALL\FLUSHDB SYNC opt to run as blocking ASYNC */
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
/* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */
if ((!(flags & EMPTYDB_ASYNC)) && (!(c->flags & CLIENT_AVOID_BLOCKING_ASYNC_FLUSH))) {
/* Run as ASYNC */
flags |= EMPTYDB_ASYNC;
blocking_async = 1;
}
if (isFlushAll)
flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
else
server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL);
/* Without the forceCommandPropagation, when DB(s) was already empty,
* FLUSHALL\FLUSHDB will not be replicated nor put into the AOF. */
forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
/* if blocking ASYNC, block client and add completion job request to BIO lazyfree
* worker's queue. To be called and reply with OK only after all preceding pending
* lazyfree jobs in queue were processed */
if (blocking_async) {
/* measure bg job till completion as elapsed time of flush command */
elapsedStart(&c->bstate.lazyfreeStartTime);
c->bstate.timeout = 0;
blockClient(c,BLOCKED_LAZYFREE);
bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id);
} else {
addReply(c, shared.ok);
}
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchronous.
*
* Take care purge only FLUSHDB for sync flow. FLUSHALL sync flow already
* applied at flushAllDataAndResetRDB. Async flow will apply only later on */
if ((!isFlushAll) && (!(flags & EMPTYDB_ASYNC))) {
/* Only clear the current thread cache.
* Ignore the return call since this will fail if the tcache is disabled. */
je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
jemalloc_purge();
}
#endif
}
/* FLUSHALL [SYNC|ASYNC]
*
* Flushes the whole server data set. */
void flushallCommand(client *c) {
flushCommandCommon(c, 1);
}
/* FLUSHDB [SYNC|ASYNC]
*
* Flushes the currently SELECTed Redis DB. */
void flushdbCommand(client *c) {
flushCommandCommon(c, 0);
}
/* This command implements DEL and UNLINK. */
void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;
for (j = 1; j < c->argc; j++) {
if (expireIfNeeded(c->db,c->argv[j],0) == KEY_DELETED)
continue;
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
signalModifiedKey(c,c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++;
numdel++;
}
}
addReplyLongLong(c,numdel);
}
void delCommand(client *c) {
delGenericCommand(c,server.lazyfree_lazy_user_del);
}
void unlinkCommand(client *c) {
delGenericCommand(c,1);
}
/* EXISTS key1 key2 ... key_N.
* Return value is the number of keys existing. */
void existsCommand(client *c) {
long long count = 0;
int j;
for (j = 1; j < c->argc; j++) {
if (lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH)) count++;
}
addReplyLongLong(c,count);
}
void selectCommand(client *c) {
int id;
if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK)
return;
if (server.cluster_enabled && id != 0) {
addReplyError(c,"SELECT is not allowed in cluster mode");
return;
}
if (selectDb(c,id) == C_ERR) {
addReplyError(c,"DB index is out of range");
} else {
addReply(c,shared.ok);
}
}
void randomkeyCommand(client *c) {
robj *key;
if ((key = dbRandomKey(c->db)) == NULL) {
addReplyNull(c);
return;
}
addReplyBulk(c,key);
decrRefCount(key);
}
void keysCommand(client *c) {
dictEntry *de;
sds pattern = c->argv[1]->ptr;
int plen = sdslen(pattern), allkeys, pslot = -1;
unsigned long numkeys = 0;
void *replylen = addReplyDeferredLen(c);
allkeys = (pattern[0] == '*' && plen == 1);
if (server.cluster_enabled && !allkeys) {
pslot = patternHashSlot(pattern, plen);
}
kvstoreDictIterator *kvs_di = NULL;
kvstoreIterator *kvs_it = NULL;
if (pslot != -1) {
if (!kvstoreDictSize(c->db->keys, pslot)) {
/* Requested slot is empty */
setDeferredArrayLen(c,replylen,0);
return;
}
kvs_di = kvstoreGetDictSafeIterator(c->db->keys, pslot);
} else {
kvs_it = kvstoreIteratorInit(c->db->keys);
}
robj keyobj;
while ((de = kvs_di ? kvstoreDictIteratorNext(kvs_di) : kvstoreIteratorNext(kvs_it)) != NULL) {
sds key = dictGetKey(de);
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
initStaticStringObject(keyobj, key);
if (!keyIsExpired(c->db, &keyobj)) {
addReplyBulkCBuffer(c, key, sdslen(key));
numkeys++;
}
}
if (c->flags & CLIENT_CLOSE_ASAP)
break;
}
if (kvs_di)
kvstoreReleaseDictIterator(kvs_di);
if (kvs_it)
kvstoreIteratorRelease(kvs_it);
setDeferredArrayLen(c,replylen,numkeys);
}
/* Data used by the dict scan callback. */
typedef struct {
list *keys; /* elements that collect from dict */
robj *o; /* o must be a hash/set/zset object, NULL means current db */
long long type; /* the particular type when scan the db */
sds pattern; /* pattern string, NULL means no pattern */
long sampled; /* cumulative number of keys sampled */
int no_values; /* set to 1 means to return keys only */
} scanData;
/* Helper function to compare key type in scan commands */
int objectTypeCompare(robj *o, long long target) {
if (o->type != OBJ_MODULE) {
if (o->type != target)
return 0;
else
return 1;
}
/* module type compare */
long long mt = (long long)REDISMODULE_TYPE_SIGN(((moduleValue *)o->ptr)->type->id);
if (target != -mt)
return 0;
else
return 1;
}
/* This callback is used by scanGenericCommand in order to collect elements
* returned by the dictionary iterator into a list. */
void scanCallback(void *privdata, const dictEntry *de) {
scanData *data = (scanData *)privdata;
list *keys = data->keys;
robj *o = data->o;
sds val = NULL;
sds key = NULL;
data->sampled++;
/* o and typename can not have values at the same time. */
serverAssert(!((data->type != LLONG_MAX) && o));
/* Filter an element if it isn't the type we want. */
/* TODO: uncomment in redis 8.0
if (!o && data->type != LLONG_MAX) {
robj *rval = dictGetVal(de);
if (!objectTypeCompare(rval, data->type)) return;
}*/
/* Filter element if it does not match the pattern. */
sds keysds = dictGetKey(de);
if (data->pattern) {
if (!stringmatchlen(data->pattern, sdslen(data->pattern), keysds, sdslen(keysds), 0)) {
return;
}
}
if (o == NULL) {
key = keysds;
} else if (o->type == OBJ_SET) {
key = keysds;
} else if (o->type == OBJ_HASH) {
key = keysds;
val = dictGetVal(de);
} else if (o->type == OBJ_ZSET) {
char buf[MAX_LONG_DOUBLE_CHARS];
int len = ld2string(buf, sizeof(buf), *(double *)dictGetVal(de), LD_STR_AUTO);
key = sdsdup(keysds);
val = sdsnewlen(buf, len);
} else {
serverPanic("Type not handled in SCAN callback.");
}
listAddNodeTail(keys, key);
if (val && !data->no_values) listAddNodeTail(keys, val);
}
/* Try to parse a SCAN cursor stored at object 'o':
* if the cursor is valid, store it as unsigned integer into *cursor and
* returns C_OK. Otherwise return C_ERR and send an error to the
* client. */
int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor) {
if (!string2ull(o->ptr, cursor)) {
addReplyError(c, "invalid cursor");
return C_ERR;
}
return C_OK;
}
char *obj_type_name[OBJ_TYPE_MAX] = {
"string",
"list",
"set",
"zset",
"hash",
NULL, /* module type is special */
"stream"
};
/* Helper function to get type from a string in scan commands */
long long getObjectTypeByName(char *name) {
for (long long i = 0; i < OBJ_TYPE_MAX; i++) {
if (obj_type_name[i] && !strcasecmp(name, obj_type_name[i])) {
return i;
}
}
moduleType *mt = moduleTypeLookupModuleByNameIgnoreCase(name);
if (mt != NULL) return -(REDISMODULE_TYPE_SIGN(mt->id));
return LLONG_MAX;
}
char *getObjectTypeName(robj *o) {
if (o == NULL) {
return "none";
}
serverAssert(o->type >= 0 && o->type < OBJ_TYPE_MAX);
if (o->type == OBJ_MODULE) {
moduleValue *mv = o->ptr;
return mv->type->name;
} else {
return obj_type_name[o->type];
}
}
/* This command implements SCAN, HSCAN and SSCAN commands.
* If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise
* if 'o' is NULL the command will operate on the dictionary associated with
* the current database.
*
* When 'o' is not NULL the function assumes that the first argument in
* the client arguments vector is a key so it skips it before iterating
* in order to parse options.
*
* In the case of a Hash object the function returns both the field and value
* of every element on the Hash. */
void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
int i, j;
listNode *node;
long count = 10;
sds pat = NULL;
sds typename = NULL;
long long type = LLONG_MAX;
int patlen = 0, use_pattern = 0, no_values = 0;
dict *ht;
/* Object must be NULL (to iterate keys names), or the type of the object
* must be Set, Sorted Set, or Hash. */
serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
o->type == OBJ_ZSET);
/* Set i to the first option argument. The previous one is the cursor. */
i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
/* Step 1: Parse options. */
while (i < c->argc) {
j = c->argc - i;
if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
!= C_OK)
{
return;
}
if (count < 1) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
i += 2;
} else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
pat = c->argv[i+1]->ptr;
patlen = sdslen(pat);
/* The pattern always matches if it is exactly "*", so it is
* equivalent to disabling it. */
use_pattern = !(patlen == 1 && pat[0] == '*');
i += 2;
} else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) {
/* SCAN for a particular type only applies to the db dict */
typename = c->argv[i+1]->ptr;
type = getObjectTypeByName(typename);
if (type == LLONG_MAX) {
/* TODO: uncomment in redis 8.0
addReplyErrorFormat(c, "unknown type name '%s'", typename);
return; */
}
i+= 2;
} else if (!strcasecmp(c->argv[i]->ptr, "novalues")) {
if (!o || o->type != OBJ_HASH) {
addReplyError(c, "NOVALUES option can only be used in HSCAN");
return;
}
no_values = 1;
i++;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
/* Step 2: Iterate the collection.
*
* Note that if the object is encoded with a listpack, intset, or any other
* representation that is not a hash table, we are sure that it is also
* composed of a small number of elements. So to avoid taking state we
* just return everything inside the object in a single call, setting the
* cursor to zero to signal the end of the iteration. */
/* Handle the case of a hash table. */
ht = NULL;
if (o == NULL) {
ht = NULL;
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
ht = o->ptr;
} else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = o->ptr;
ht = zs->dict;
}
list *keys = listCreate();
/* Set a free callback for the contents of the collected keys list.
* For the main keyspace dict, and when we scan a key that's dict encoded
* (we have 'ht'), we don't need to define free method because the strings
* in the list are just a shallow copy from the pointer in the dictEntry.
* When scanning a key with other encodings (e.g. listpack), we need to
* free the temporary strings we add to that list.
* The exception to the above is ZSET, where we do allocate temporary
* strings even when scanning a dict. */
if (o && (!ht || o->type == OBJ_ZSET)) {
listSetFreeMethod(keys, (void (*)(void*))sdsfree);
}
/* For main dictionary scan or data structure using hashtable. */
if (!o || ht) {
/* We set the max number of iterations to ten times the specified
* COUNT, so if the hash table is in a pathological state (very
* sparsely populated) we avoid to block too much time at the cost
* of returning no or very few elements. */
long maxiterations = count*10;
/* We pass scanData which have three pointers to the callback:
* 1. data.keys: the list to which it will add new elements;
* 2. data.o: the object containing the dictionary so that
* it is possible to fetch more data in a type-dependent way;
* 3. data.type: the specified type scan in the db, LLONG_MAX means
* type matching is no needed;
* 4. data.pattern: the pattern string;
* 5. data.sampled: the maxiteration limit is there in case we're
* working on an empty dict, one with a lot of empty buckets, and
* for the buckets are not empty, we need to limit the spampled number
* to prevent a long hang time caused by filtering too many keys;
* 6. data.no_values: to control whether values will be returned or
* only keys are returned. */
scanData data = {
.keys = keys,
.o = o,
.type = type,
.pattern = use_pattern ? pat : NULL,
.sampled = 0,
.no_values = no_values,
};
/* A pattern may restrict all matching keys to one cluster slot. */
int onlydidx = -1;
if (o == NULL && use_pattern && server.cluster_enabled) {
onlydidx = patternHashSlot(pat, patlen);
}
do {
/* In cluster mode there is a separate dictionary for each slot.
* If cursor is empty, we should try exploring next non-empty slot. */
if (o == NULL) {
cursor = kvstoreScan(c->db->keys, cursor, onlydidx, scanCallback, NULL, &data);
} else {
cursor = dictScan(ht, cursor, scanCallback, &data);
}
} while (cursor && maxiterations-- && data.sampled < count);
} else if (o->type == OBJ_SET) {
char *str;
char buf[LONG_STR_SIZE];
size_t len;
int64_t llele;
setTypeIterator *si = setTypeInitIterator(o);
while (setTypeNext(si, &str, &len, &llele) != -1) {
if (str == NULL) {
len = ll2string(buf, sizeof(buf), llele);
}
char *key = str ? str : buf;
if (use_pattern && !stringmatchlen(pat, sdslen(pat), key, len, 0)) {
continue;
}
listAddNodeTail(keys, sdsnewlen(key, len));
}
setTypeReleaseIterator(si);
cursor = 0;
} else if ((o->type == OBJ_HASH || o->type == OBJ_ZSET) &&
o->encoding == OBJ_ENCODING_LISTPACK)
{
unsigned char *p = lpFirst(o->ptr);
unsigned char *str;
int64_t len;
unsigned char intbuf[LP_INTBUF_SIZE];
while(p) {
str = lpGet(p, &len, intbuf);
/* point to the value */
p = lpNext(o->ptr, p);
if (use_pattern && !stringmatchlen(pat, sdslen(pat), (char *)str, len, 0)) {
/* jump to the next key/val pair */
p = lpNext(o->ptr, p);
continue;
}
/* add key object */
listAddNodeTail(keys, sdsnewlen(str, len));
/* add value object */
if (!no_values) {
str = lpGet(p, &len, intbuf);
listAddNodeTail(keys, sdsnewlen(str, len));
}
p = lpNext(o->ptr, p);
}
cursor = 0;
} else {
serverPanic("Not handled encoding in SCAN.");
}
/* Step 3: Filter the expired keys */
if (o == NULL && listLength(keys)) {
robj kobj;
listIter li;
listNode *ln;
listRewind(keys, &li);
while ((ln = listNext(&li))) {
sds key = listNodeValue(ln);
initStaticStringObject(kobj, key);
/* Filter an element if it isn't the type we want. */
/* TODO: remove this in redis 8.0 */
if (typename) {
robj* typecheck = lookupKeyReadWithFlags(c->db, &kobj, LOOKUP_NOTOUCH|LOOKUP_NONOTIFY);
if (!typecheck || !objectTypeCompare(typecheck, type)) {
listDelNode(keys, ln);
}
continue;
}
if (expireIfNeeded(c->db, &kobj, 0) != KEY_VALID) {
listDelNode(keys, ln);
}
}
}
/* Step 4: Reply to the client. */
addReplyArrayLen(c, 2);
addReplyBulkLongLong(c,cursor);
addReplyArrayLen(c, listLength(keys));
while ((node = listFirst(keys)) != NULL) {
sds key = listNodeValue(node);
addReplyBulkCBuffer(c, key, sdslen(key));
listDelNode(keys, node);
}
listRelease(keys);
}
/* The SCAN command completely relies on scanGenericCommand. */
void scanCommand(client *c) {
unsigned long long cursor;
if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
scanGenericCommand(c,NULL,cursor);
}
void dbsizeCommand(client *c) {
addReplyLongLong(c,kvstoreSize(c->db->keys));
}
void lastsaveCommand(client *c) {
addReplyLongLong(c,server.lastsave);
}
void typeCommand(client *c) {
robj *o;
o = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
addReplyStatus(c, getObjectTypeName(o));
}
void shutdownCommand(client *c) {
int flags = SHUTDOWN_NOFLAGS;
int abort = 0;
for (int i = 1; i < c->argc; i++) {
if (!strcasecmp(c->argv[i]->ptr,"nosave")) {
flags |= SHUTDOWN_NOSAVE;
} else if (!strcasecmp(c->argv[i]->ptr,"save")) {
flags |= SHUTDOWN_SAVE;
} else if (!strcasecmp(c->argv[i]->ptr, "now")) {
flags |= SHUTDOWN_NOW;
} else if (!strcasecmp(c->argv[i]->ptr, "force")) {
flags |= SHUTDOWN_FORCE;
} else if (!strcasecmp(c->argv[i]->ptr, "abort")) {
abort = 1;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
if ((abort && flags != SHUTDOWN_NOFLAGS) ||
(flags & SHUTDOWN_NOSAVE && flags & SHUTDOWN_SAVE))
{
/* Illegal combo. */
addReplyErrorObject(c,shared.syntaxerr);
return;
}
if (abort) {
if (abortShutdown() == C_OK)
addReply(c, shared.ok);
else
addReplyError(c, "No shutdown in progress.");
return;
}
if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) {
addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client");
return;
}
if (!(flags & SHUTDOWN_NOSAVE) && isInsideYieldingLongCommand()) {
/* Script timed out. Shutdown allowed only with the NOSAVE flag. See
* also processCommand where these errors are returned. */
if (server.busy_module_yield_flags && server.busy_module_yield_reply) {
addReplyErrorFormat(c, "-BUSY %s", server.busy_module_yield_reply);
} else if (server.busy_module_yield_flags) {
addReplyErrorObject(c, shared.slowmoduleerr);
} else if (scriptIsEval()) {
addReplyErrorObject(c, shared.slowevalerr);
} else {
addReplyErrorObject(c, shared.slowscripterr);
}
return;
}
blockClientShutdown(c);
if (prepareForShutdown(flags) == C_OK) exit(0);
/* If we're here, then shutdown is ongoing (the client is still blocked) or
* failed (the client has received an error). */
}
void renameGenericCommand(client *c, int nx) {
robj *o;
long long expire;
int samekey = 0;
/* When source and dest key is the same, no operation is performed,
* if the key exists, however we still return an error on unexisting key. */
if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
return;
if (samekey) {
addReply(c,nx ? shared.czero : shared.ok);
return;
}
incrRefCount(o);
expire = getExpire(c->db,c->argv[1]);
if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
if (nx) {
decrRefCount(o);
addReply(c,shared.czero);
return;
}
/* Overwrite: delete the old key before creating the new one
* with the same name. */
dbDelete(c->db,c->argv[2]);
}
dbAdd(c->db,c->argv[2],o);
if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
dbDelete(c->db,c->argv[1]);
signalModifiedKey(c,c->db,c->argv[1]);
signalModifiedKey(c,c->db,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
c->argv[1],c->db->id);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
c->argv[2],c->db->id);
server.dirty++;
addReply(c,nx ? shared.cone : shared.ok);
}
void renameCommand(client *c) {
renameGenericCommand(c,0);
}
void renamenxCommand(client *c) {
renameGenericCommand(c,1);
}
void moveCommand(client *c) {
robj *o;
redisDb *src, *dst;
int srcid, dbid;
long long expire;
if (server.cluster_enabled) {
addReplyError(c,"MOVE is not allowed in cluster mode");
return;
}
/* Obtain source and target DB pointers */
src = c->db;
srcid = c->db->id;
if (getIntFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK)
return;
if (selectDb(c,dbid) == C_ERR) {
addReplyError(c,"DB index is out of range");
return;
}
dst = c->db;
selectDb(c,srcid); /* Back to the source DB */
/* If the user is moving using as target the same
* DB as the source DB it is probably an error. */
if (src == dst) {
addReplyErrorObject(c,shared.sameobjecterr);
return;
}
/* Check if the element exists and get a reference */
o = lookupKeyWrite(c->db,c->argv[1]);
if (!o) {
addReply(c,shared.czero);
return;
}
expire = getExpire(c->db,c->argv[1]);
/* Return zero if the key already exists in the target DB */
if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
addReply(c,shared.czero);
return;
}
dbAdd(dst,c->argv[1],o);
if (expire != -1) setExpire(c,dst,c->argv[1],expire);
incrRefCount(o);
/* OK! key moved, free the entry in the source DB */
dbDelete(src,c->argv[1]);
signalModifiedKey(c,src,c->argv[1]);
signalModifiedKey(c,dst,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"move_from",c->argv[1],src->id);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"move_to",c->argv[1],dst->id);
server.dirty++;
addReply(c,shared.cone);
}
void copyCommand(client *c) {
robj *o;
redisDb *src, *dst;
int srcid, dbid;
long long expire;
int j, replace = 0, delete = 0;
/* Obtain source and target DB pointers
* Default target DB is the same as the source DB
* Parse the REPLACE option and targetDB option. */
src = c->db;
dst = c->db;
srcid = c->db->id;
dbid = c->db->id;
for (j = 3; j < c->argc; j++) {
int additional = c->argc - j - 1;
if (!strcasecmp(c->argv[j]->ptr,"replace")) {
replace = 1;
} else if (!strcasecmp(c->argv[j]->ptr, "db") && additional >= 1) {
if (getIntFromObjectOrReply(c, c->argv[j+1], &dbid, NULL) != C_OK)
return;
if (selectDb(c, dbid) == C_ERR) {
addReplyError(c,"DB index is out of range");
return;
}
dst = c->db;
selectDb(c,srcid); /* Back to the source DB */
j++; /* Consume additional arg. */
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
if ((server.cluster_enabled == 1) && (srcid != 0 || dbid != 0)) {
addReplyError(c,"Copying to another database is not allowed in cluster mode");
return;
}
/* If the user select the same DB as
* the source DB and using newkey as the same key
* it is probably an error. */
robj *key = c->argv[1];
robj *newkey = c->argv[2];
if (src == dst && (sdscmp(key->ptr, newkey->ptr) == 0)) {
addReplyErrorObject(c,shared.sameobjecterr);
return;
}
/* Check if the element exists and get a reference */
o = lookupKeyRead(c->db, key);
if (!o) {
addReply(c,shared.czero);
return;
}
expire = getExpire(c->db,key);
/* Return zero if the key already exists in the target DB.
* If REPLACE option is selected, delete newkey from targetDB. */
if (lookupKeyWrite(dst,newkey) != NULL) {
if (replace) {
delete = 1;
} else {
addReply(c,shared.czero);
return;
}
}
/* Duplicate object according to object's type. */
robj *newobj;
switch(o->type) {
case OBJ_STRING: newobj = dupStringObject(o); break;
case OBJ_LIST: newobj = listTypeDup(o); break;
case OBJ_SET: newobj = setTypeDup(o); break;
case OBJ_ZSET: newobj = zsetDup(o); break;
case OBJ_HASH: newobj = hashTypeDup(o); break;
case OBJ_STREAM: newobj = streamDup(o); break;
case OBJ_MODULE:
newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o);
if (!newobj) return;
break;
default:
addReplyError(c, "unknown type object");
return;
}
if (delete) {
dbDelete(dst,newkey);
}
dbAdd(dst,newkey,newobj);
if (expire != -1) setExpire(c, dst, newkey, expire);
/* OK! key copied */
signalModifiedKey(c,dst,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id);
server.dirty++;
addReply(c,shared.cone);
}
/* Helper function for dbSwapDatabases(): scans the list of keys that have
* one or more blocked clients for B[LR]POP or other blocking commands
* and signal the keys as ready if they are of the right type. See the comment
* where the function is used for more info. */
void scanDatabaseForReadyKeys(redisDb *db) {
dictEntry *de;
dictIterator *di = dictGetSafeIterator(db->blocking_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
dictEntry *kde = dbFind(db, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
signalKeyAsReady(db, key, value->type);
}
}
dictReleaseIterator(di);
}
/* Since we are unblocking XREADGROUP clients in the event the
* key was deleted/overwritten we must do the same in case the
* database was flushed/swapped. */
void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) {
dictEntry *de;
dictIterator *di = dictGetSafeIterator(emptied->blocking_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int existed = 0, exists = 0;
int original_type = -1, curr_type = -1;
dictEntry *kde = dbFind(emptied, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
original_type = value->type;
existed = 1;
}
if (replaced_with) {
kde = dbFind(replaced_with, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
curr_type = value->type;
exists = 1;
}
}
/* We want to try to unblock any client using a blocking XREADGROUP */
if ((existed && !exists) || original_type != curr_type)
signalDeletedKeyAsReady(emptied, key, original_type);
}
dictReleaseIterator(di);
}
/* Swap two databases at runtime so that all clients will magically see
* the new database even if already connected. Note that the client
* structure c->db points to a given DB, so we need to be smarter and
* swap the underlying referenced structures, otherwise we would need
* to fix all the references to the Redis DB structure.
*
* Returns C_ERR if at least one of the DB ids are out of range, otherwise
* C_OK is returned. */
int dbSwapDatabases(int id1, int id2) {
if (id1 < 0 || id1 >= server.dbnum ||
id2 < 0 || id2 >= server.dbnum) return C_ERR;
if (id1 == id2) return C_OK;
redisDb aux = server.db[id1];
redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
/* Swapdb should make transaction fail if there is any
* client watching keys */
touchAllWatchedKeysInDb(db1, db2);
touchAllWatchedKeysInDb(db2, db1);
/* Try to unblock any XREADGROUP clients if the key no longer exists. */
scanDatabaseForDeletedKeys(db1, db2);
scanDatabaseForDeletedKeys(db2, db1);
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since we want clients to
* remain in the same DB they were. */
db1->keys = db2->keys;
db1->expires = db2->expires;
db1->avg_ttl = db2->avg_ttl;
db1->expires_cursor = db2->expires_cursor;
db2->keys = aux.keys;
db2->expires = aux.expires;
db2->avg_ttl = aux.avg_ttl;
db2->expires_cursor = aux.expires_cursor;
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
* X in a given DB, may now actually be unblocked if X happens
* to exist in the new version of the DB, after the swap.
*
* However normally we only do this check for efficiency reasons
* in dbAdd() when a list is created. So here we need to rescan
* the list of clients blocked on lists and signal lists as ready
* if needed. */
scanDatabaseForReadyKeys(db1);
scanDatabaseForReadyKeys(db2);
return C_OK;
}
/* Logically, this discards (flushes) the old main database, and apply the newly loaded
* database (temp) as the main (active) database, the actual freeing of old database
* (which will now be placed in the temp one) is done later. */
void swapMainDbWithTempDb(redisDb *tempDb) {
for (int i=0; i<server.dbnum; i++) {
redisDb aux = server.db[i];
redisDb *activedb = &server.db[i], *newdb = &tempDb[i];
/* Swapping databases should make transaction fail if there is any
* client watching keys. */
touchAllWatchedKeysInDb(activedb, newdb);
/* Try to unblock any XREADGROUP clients if the key no longer exists. */
scanDatabaseForDeletedKeys(activedb, newdb);
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since clients
* remain in the same DB they were. */
activedb->keys = newdb->keys;
activedb->expires = newdb->expires;
activedb->avg_ttl = newdb->avg_ttl;
activedb->expires_cursor = newdb->expires_cursor;
newdb->keys = aux.keys;
newdb->expires = aux.expires;
newdb->avg_ttl = aux.avg_ttl;
newdb->expires_cursor = aux.expires_cursor;
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
* X in a given DB, may now actually be unblocked if X happens
* to exist in the new version of the DB, after the swap.
*
* However normally we only do this check for efficiency reasons
* in dbAdd() when a list is created. So here we need to rescan
* the list of clients blocked on lists and signal lists as ready
* if needed. */
scanDatabaseForReadyKeys(activedb);
}
trackingInvalidateKeysOnFlush(1);
flushSlaveKeysWithExpireList();
}
/* SWAPDB db1 db2 */
void swapdbCommand(client *c) {
int id1, id2;
/* Not allowed in cluster mode: we have just DB 0 there. */
if (server.cluster_enabled) {
addReplyError(c,"SWAPDB is not allowed in cluster mode");
return;
}
/* Get the two DBs indexes. */
if (getIntFromObjectOrReply(c, c->argv[1], &id1,
"invalid first DB index") != C_OK)
return;
if (getIntFromObjectOrReply(c, c->argv[2], &id2,
"invalid second DB index") != C_OK)
return;
/* Swap... */
if (dbSwapDatabases(id1,id2) == C_ERR) {
addReplyError(c,"DB index is out of range");
return;
} else {
RedisModuleSwapDbInfo si = {REDISMODULE_SWAPDBINFO_VERSION,id1,id2};
moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si);
server.dirty++;
addReply(c,shared.ok);
}
}
/*-----------------------------------------------------------------------------
* Expires API
*----------------------------------------------------------------------------*/
int removeExpire(redisDb *db, robj *key) {
return kvstoreDictDelete(db->expires, getKeySlot(key->ptr), key->ptr) == DICT_OK;
}
/* Set an expire to the specified key. If the expire is set in the context
* of an user calling a command 'c' is the client, otherwise 'c' is set
* to NULL. The 'when' parameter is the absolute unix time in milliseconds
* after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de, *existing;
/* Reuse the sds from the main dict in the expire dict */
int slot = getKeySlot(key->ptr);
kde = kvstoreDictFind(db->keys, slot, key->ptr);
serverAssertWithInfo(NULL,key,kde != NULL);
de = kvstoreDictAddRaw(db->expires, slot, dictGetKey(kde), &existing);
if (existing) {
dictSetSignedIntegerVal(existing, when);
} else {
dictSetSignedIntegerVal(de, when);
}
int writable_slave = server.masterhost && server.repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key);
}
/* Return the expire time of the specified key, or -1 if no expire
* is associated with this key (i.e. the key is non volatile) */
long long getExpire(redisDb *db, robj *key) {
dictEntry *de;
if ((de = dbFindExpires(db, key->ptr)) == NULL)
return -1;
return dictGetSignedIntegerVal(de);
}
/* Delete the specified expired key and propagate expire. */
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
mstime_t expire_latency;
latencyStartMonitor(expire_latency);
dbGenericDelete(db,keyobj,server.lazyfree_lazy_expire,DB_FLAG_KEY_EXPIRED);
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del",expire_latency);
notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
signalModifiedKey(NULL, db, keyobj);
propagateDeletion(db,keyobj,server.lazyfree_lazy_expire);
server.stat_expiredkeys++;
}
/* Propagate an implicit key deletion into replicas and the AOF file.
* When a key was deleted in the master by eviction, expiration or a similar
* mechanism a DEL/UNLINK operation for this key is sent
* to all the replicas and the AOF file if enabled.
*
* This way the key deletion is centralized in one place, and since both
* AOF and the replication link guarantee operation ordering, everything
* will be consistent even if we allow write operations against deleted
* keys.
*
* This function may be called from:
* 1. Within call(): Example: Lazy-expire on key access.
* In this case the caller doesn't have to do anything
* because call() handles server.also_propagate(); or
* 2. Outside of call(): Example: Active-expire, eviction, slot ownership changed.
* In this the caller must remember to call
* postExecutionUnitOperations, preferably just after a
* single deletion batch, so that DEL/UNLINK will NOT be wrapped
* in MULTI/EXEC */
void propagateDeletion(redisDb *db, robj *key, int lazy) {
robj *argv[2];
argv[0] = lazy ? shared.unlink : shared.del;
argv[1] = key;
incrRefCount(argv[0]);
incrRefCount(argv[1]);
/* If the master decided to delete a key we must propagate it to replicas no matter what.
* Even if module executed a command without asking for propagation. */
int prev_replication_allowed = server.replication_allowed;
server.replication_allowed = 1;
alsoPropagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
server.replication_allowed = prev_replication_allowed;
decrRefCount(argv[0]);
decrRefCount(argv[1]);
}
/* Check if the key is expired. */
int keyIsExpired(redisDb *db, robj *key) {
/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;
mstime_t when = getExpire(db,key);
mstime_t now;
if (when < 0) return 0; /* No expire for this key */
now = commandTimeSnapshot();
/* The key expired if the current (virtual or real) time is greater
* than the expire time of the key. */
return now > when;
}
/* This function is called when we are going to perform some operation
* in a given key, but such key may be already logically expired even if
* it still exists in the database. The main way this function is called
* is via lookupKey*() family of functions.
*
* The behavior of the function depends on the replication role of the
* instance, because by default replicas do not delete expired keys. They
* wait for DELs from the master for consistency matters. However even
* replicas will try to have a coherent return value for the function,
* so that read commands executed in the replica side will be able to
* behave like if the key is expired even if still present (because the
* master has yet to propagate the DEL).
*
* In masters as a side effect of finding a key which is expired, such
* key will be evicted from the database. Also this may trigger the
* propagation of a DEL/UNLINK command in AOF / replication stream.
*
* On replicas, this function does not delete expired keys by default, but
* it still returns KEY_EXPIRED if the key is logically expired. To force deletion
* of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED
* flag. Note though that if the current client is executing
* replicated commands from the master, keys are never considered expired.
*
* On the other hand, if you just want expiration check, but need to avoid
* the actual key deletion and propagation of the deletion, use the
* EXPIRE_AVOID_DELETE_EXPIRED flag.
*
* The return value of the function is KEY_VALID if the key is still valid.
* The function returns KEY_EXPIRED if the key is expired BUT not deleted,
* or returns KEY_DELETED if the key is expired and deleted. */
keyStatus expireIfNeeded(redisDb *db, robj *key, int flags) {
if (server.lazy_expire_disabled) return KEY_VALID;
if (!keyIsExpired(db,key)) return KEY_VALID;
/* If we are running in the context of a replica, instead of
* evicting the expired key from the database, we return ASAP:
* the replica key expiration is controlled by the master that will
* send us synthesized DEL operations for expired keys. The
* exception is when write operations are performed on writable
* replicas.
*
* Still we try to return the right information to the caller,
* that is, KEY_VALID if we think the key should still be valid,
* KEY_EXPIRED if we think the key is expired but don't want to delete it at this time.
*
* When replicating commands from the master, keys are never considered
* expired. */
if (server.masterhost != NULL) {
if (server.current_client && (server.current_client->flags & CLIENT_MASTER)) return KEY_VALID;
if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
}
/* In some cases we're explicitly instructed to return an indication of a
* missing key without actually deleting it, even on masters. */
if (flags & EXPIRE_AVOID_DELETE_EXPIRED)
return KEY_EXPIRED;
/* If 'expire' action is paused, for whatever reason, then don't expire any key.
* Typically, at the end of the pause we will properly expire the key OR we
* will have failed over and the new primary will send us the expire. */
if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return KEY_EXPIRED;
/* The key needs to be converted from static to heap before deleted */
int static_key = key->refcount == OBJ_STATIC_REFCOUNT;
if (static_key) {
key = createStringObject(key->ptr, sdslen(key->ptr));
}
/* Delete the key */
deleteExpiredKeyAndPropagate(db,key);
if (static_key) {
decrRefCount(key);
}
return KEY_DELETED;
}
/* CB passed to kvstoreExpand.
* The purpose is to skip expansion of unused dicts in cluster mode (all
* dicts not mapped to *my* slots) */
static int dbExpandSkipSlot(int slot) {
return !clusterNodeCoversSlot(getMyClusterNode(), slot);
}
/*
* This functions increases size of the main/expires db to match desired number.
* In cluster mode resizes all individual dictionaries for slots that this node owns.
*
* Based on the parameter `try_expand`, appropriate dict expand API is invoked.
* if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`.
* The return code is either `DICT_OK`/`DICT_ERR` for both the API(s).
* `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in
* `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed.
*/
static int dbExpandGeneric(kvstore *kvs, uint64_t db_size, int try_expand) {
int ret;
if (server.cluster_enabled) {
/* We don't know exact number of keys that would fall into each slot, but we can
* approximate it, assuming even distribution, divide it by the number of slots. */
int slots = getMyShardSlotCount();
if (slots == 0) return C_OK;
db_size = db_size / slots;
ret = kvstoreExpand(kvs, db_size, try_expand, dbExpandSkipSlot);
} else {
ret = kvstoreExpand(kvs, db_size, try_expand, NULL);
}
return ret? C_OK : C_ERR;
}
int dbExpand(redisDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->keys, db_size, try_expand);
}
int dbExpandExpires(redisDb *db, uint64_t db_size, int try_expand) {
return dbExpandGeneric(db->expires, db_size, try_expand);
}
static dictEntry *dbFindGeneric(kvstore *kvs, void *key) {
return kvstoreDictFind(kvs, getKeySlot(key), key);
}
dictEntry *dbFind(redisDb *db, void *key) {
return dbFindGeneric(db->keys, key);
}
dictEntry *dbFindExpires(redisDb *db, void *key) {
return dbFindGeneric(db->expires, key);
}
unsigned long long dbSize(redisDb *db) {
return kvstoreSize(db->keys);
}
unsigned long long dbScan(redisDb *db, unsigned long long cursor, dictScanFunction *scan_cb, void *privdata) {
return kvstoreScan(db->keys, cursor, -1, scan_cb, NULL, privdata);
}
/* -----------------------------------------------------------------------------
* API to get key arguments from commands
* ---------------------------------------------------------------------------*/
/* Prepare the getKeysResult struct to hold numkeys, either by using the
* pre-allocated keysbuf or by allocating a new array on the heap.
*
* This function must be called at least once before starting to populate
* the result, and can be called repeatedly to enlarge the result array.
*/
keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys) {
/* GETKEYS_RESULT_INIT initializes keys to NULL, point it to the pre-allocated stack
* buffer here. */
if (!result->keys) {
serverAssert(!result->numkeys);
result->keys = result->keysbuf;
}
/* Resize if necessary */
if (numkeys > result->size) {
if (result->keys != result->keysbuf) {
/* We're not using a static buffer, just (re)alloc */
result->keys = zrealloc(result->keys, numkeys * sizeof(keyReference));
} else {
/* We are using a static buffer, copy its contents */
result->keys = zmalloc(numkeys * sizeof(keyReference));
if (result->numkeys)
memcpy(result->keys, result->keysbuf, result->numkeys * sizeof(keyReference));
}
result->size = numkeys;
}
return result->keys;
}
/* Returns a bitmask with all the flags found in any of the key specs of the command.
* The 'inv' argument means we'll return a mask with all flags that are missing in at least one spec. */
int64_t getAllKeySpecsFlags(struct redisCommand *cmd, int inv) {
int64_t flags = 0;
for (int j = 0; j < cmd->key_specs_num; j++) {
keySpec *spec = cmd->key_specs + j;
flags |= inv? ~spec->flags : spec->flags;
}
return flags;
}
/* Fetch the keys based of the provided key specs. Returns the number of keys found, or -1 on error.
* There are several flags that can be used to modify how this function finds keys in a command.
*
* GET_KEYSPEC_INCLUDE_NOT_KEYS: Return 'fake' keys as if they were keys.
* GET_KEYSPEC_RETURN_PARTIAL: Skips invalid and incomplete keyspecs but returns the keys
* found in other valid keyspecs.
*/
int getKeysUsingKeySpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) {
int j, i, last, first, step;
keyReference *keys;
serverAssert(result->numkeys == 0); /* caller should initialize or reset it */
for (j = 0; j < cmd->key_specs_num; j++) {
keySpec *spec = cmd->key_specs + j;
serverAssert(spec->begin_search_type != KSPEC_BS_INVALID);
/* Skip specs that represent 'fake' keys */
if ((spec->flags & CMD_KEY_NOT_KEY) && !(search_flags & GET_KEYSPEC_INCLUDE_NOT_KEYS)) {
continue;
}
first = 0;
if (spec->begin_search_type == KSPEC_BS_INDEX) {
first = spec->bs.index.pos;
} else if (spec->begin_search_type == KSPEC_BS_KEYWORD) {
int start_index = spec->bs.keyword.startfrom > 0 ? spec->bs.keyword.startfrom : argc+spec->bs.keyword.startfrom;
int end_index = spec->bs.keyword.startfrom > 0 ? argc-1: 1;
for (i = start_index; i != end_index; i = start_index <= end_index ? i + 1 : i - 1) {
if (i >= argc || i < 1)
break;
if (!strcasecmp((char*)argv[i]->ptr,spec->bs.keyword.keyword)) {
first = i+1;
break;
}
}
/* keyword not found */
if (!first) {
continue;
}
} else {
/* unknown spec */
goto invalid_spec;
}
if (spec->find_keys_type == KSPEC_FK_RANGE) {
step = spec->fk.range.keystep;
if (spec->fk.range.lastkey >= 0) {
last = first + spec->fk.range.lastkey;
} else {
if (!spec->fk.range.limit) {
last = argc + spec->fk.range.lastkey;
} else {
serverAssert(spec->fk.range.lastkey == -1);
last = first + ((argc-first)/spec->fk.range.limit + spec->fk.range.lastkey);
}
}
} else if (spec->find_keys_type == KSPEC_FK_KEYNUM) {
step = spec->fk.keynum.keystep;
long long numkeys;
if (spec->fk.keynum.keynumidx >= argc)
goto invalid_spec;
sds keynum_str = argv[first + spec->fk.keynum.keynumidx]->ptr;
if (!string2ll(keynum_str,sdslen(keynum_str),&numkeys) || numkeys < 0) {
/* Unable to parse the numkeys argument or it was invalid */
goto invalid_spec;
}
first += spec->fk.keynum.firstkey;
last = first + (int)numkeys-1;
} else {
/* unknown spec */
goto invalid_spec;
}
int count = ((last - first)+1);
keys = getKeysPrepareResult(result, result->numkeys + count);
/* First or last is out of bounds, which indicates a syntax error */
if (last >= argc || last < first || first >= argc) {
goto invalid_spec;
}
for (i = first; i <= last; i += step) {
if (i >= argc || i < first) {
/* Modules commands, and standard commands with a not fixed number
* of arguments (negative arity parameter) do not have dispatch
* time arity checks, so we need to handle the case where the user
* passed an invalid number of arguments here. In this case we
* return no keys and expect the command implementation to report
* an arity or syntax error. */
if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
continue;
} else {
serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
}
}
keys[result->numkeys].pos = i;
keys[result->numkeys].flags = spec->flags;
result->numkeys++;
}
/* Handle incomplete specs (only after we added the current spec
* to `keys`, just in case GET_KEYSPEC_RETURN_PARTIAL was given) */
if (spec->flags & CMD_KEY_INCOMPLETE) {
goto invalid_spec;
}
/* Done with this spec */
continue;
invalid_spec:
if (search_flags & GET_KEYSPEC_RETURN_PARTIAL) {
continue;
} else {
result->numkeys = 0;
return -1;
}
}
return result->numkeys;
}
/* Return all the arguments that are keys in the command passed via argc / argv.
* This function will eventually replace getKeysFromCommand.
*
* The command returns the positions of all the key arguments inside the array,
* so the actual return value is a heap allocated array of integers. The
* length of the array is returned by reference into *numkeys.
*
* Along with the position, this command also returns the flags that are
* associated with how Redis will access the key.
*
* 'cmd' must be point to the corresponding entry into the redisCommand
* table, according to the command name in argv[0]. */
int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) {
/* The command has at least one key-spec not marked as NOT_KEY */
int has_keyspec = (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY);
/* The command has at least one key-spec marked as VARIABLE_FLAGS */
int has_varflags = (getAllKeySpecsFlags(cmd, 0) & CMD_KEY_VARIABLE_FLAGS);
/* We prefer key-specs if there are any, and their flags are reliable. */
if (has_keyspec && !has_varflags) {
int ret = getKeysUsingKeySpecs(cmd,argv,argc,search_flags,result);
if (ret >= 0)
return ret;
/* If the specs returned with an error (probably an INVALID or INCOMPLETE spec),
* fallback to the callback method. */
}
/* Resort to getkeys callback methods. */
if (cmd->flags & CMD_MODULE_GETKEYS)
return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
/* We use native getkeys as a last resort, since not all these native getkeys provide
* flags properly (only the ones that correspond to INVALID, INCOMPLETE or VARIABLE_FLAGS do.*/
if (cmd->getkeys_proc)
return cmd->getkeys_proc(cmd,argv,argc,result);
return 0;
}
/* This function returns a sanity check if the command may have keys. */
int doesCommandHaveKeys(struct redisCommand *cmd) {
return cmd->getkeys_proc || /* has getkeys_proc (non modules) */
(cmd->flags & CMD_MODULE_GETKEYS) || /* module with GETKEYS */
(getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY); /* has at least one key-spec not marked as NOT_KEY */
}
/* A simplified channel spec table that contains all of the redis commands
* and which channels they have and how they are accessed. */
typedef struct ChannelSpecs {
redisCommandProc *proc; /* Command procedure to match against */
uint64_t flags; /* CMD_CHANNEL_* flags for this command */
int start; /* The initial position of the first channel */
int count; /* The number of channels, or -1 if all remaining
* arguments are channels. */
} ChannelSpecs;
ChannelSpecs commands_with_channels[] = {
{subscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1},
{ssubscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1},
{unsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
{sunsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
{psubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_SUBSCRIBE, 1, -1},
{punsubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
{publishCommand, CMD_CHANNEL_PUBLISH, 1, 1},
{spublishCommand, CMD_CHANNEL_PUBLISH, 1, 1},
{NULL,0} /* Terminator. */
};
/* Returns 1 if the command may access any channels matched by the flags
* argument. */
int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags) {
/* If a module declares get channels, we are just going to assume
* has channels. This API is allowed to return false positives. */
if (cmd->flags & CMD_MODULE_GETCHANNELS) {
return 1;
}
for (ChannelSpecs *spec = commands_with_channels; spec->proc != NULL; spec += 1) {
if (cmd->proc == spec->proc) {
return !!(spec->flags & flags);
}
}
return 0;
}
/* Return all the arguments that are channels in the command passed via argc / argv.
* This function behaves similar to getKeysFromCommandWithSpecs, but with channels
* instead of keys.
*
* The command returns the positions of all the channel arguments inside the array,
* so the actual return value is a heap allocated array of integers. The
* length of the array is returned by reference into *numkeys.
*
* Along with the position, this command also returns the flags that are
* associated with how Redis will access the channel.
*
* 'cmd' must be point to the corresponding entry into the redisCommand
* table, according to the command name in argv[0]. */
int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
keyReference *keys;
/* If a module declares get channels, use that. */
if (cmd->flags & CMD_MODULE_GETCHANNELS) {
return moduleGetCommandChannelsViaAPI(cmd, argv, argc, result);
}
/* Otherwise check the channel spec table */
for (ChannelSpecs *spec = commands_with_channels; spec != NULL; spec += 1) {
if (cmd->proc == spec->proc) {
int start = spec->start;
int stop = (spec->count == -1) ? argc : start + spec->count;
if (stop > argc) stop = argc;
int count = 0;
keys = getKeysPrepareResult(result, stop - start);
for (int i = start; i < stop; i++ ) {
keys[count].pos = i;
keys[count++].flags = spec->flags;
}
result->numkeys = count;
return count;
}
}
return 0;
}
/* The base case is to use the keys position as given in the command table
* (firstkey, lastkey, step).
* This function works only on command with the legacy_range_key_spec,
* all other commands should be handled by getkeys_proc.
*
* If the commands keyspec is incomplete, no keys will be returned, and the provided
* keys function should be called instead.
*
* NOTE: This function does not guarantee populating the flags for
* the keys, in order to get flags you should use getKeysUsingKeySpecs. */
int getKeysUsingLegacyRangeSpec(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int j, i = 0, last, first, step;
keyReference *keys;
UNUSED(argv);
if (cmd->legacy_range_key_spec.begin_search_type == KSPEC_BS_INVALID) {
result->numkeys = 0;
return 0;
}
first = cmd->legacy_range_key_spec.bs.index.pos;
last = cmd->legacy_range_key_spec.fk.range.lastkey;
if (last >= 0)
last += first;
step = cmd->legacy_range_key_spec.fk.range.keystep;
if (last < 0) last = argc+last;
int count = ((last - first)+1);
keys = getKeysPrepareResult(result, count);
for (j = first; j <= last; j += step) {
if (j >= argc || j < first) {
/* Modules commands, and standard commands with a not fixed number
* of arguments (negative arity parameter) do not have dispatch
* time arity checks, so we need to handle the case where the user
* passed an invalid number of arguments here. In this case we
* return no keys and expect the command implementation to report
* an arity or syntax error. */
if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
result->numkeys = 0;
return 0;
} else {
serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
}
}
keys[i].pos = j;
/* Flags are omitted from legacy key specs */
keys[i++].flags = 0;
}
result->numkeys = i;
return i;
}
/* Return all the arguments that are keys in the command passed via argc / argv.
*
* The command returns the positions of all the key arguments inside the array,
* so the actual return value is a heap allocated array of integers. The
* length of the array is returned by reference into *numkeys.
*
* 'cmd' must be point to the corresponding entry into the redisCommand
* table, according to the command name in argv[0].
*
* This function uses the command table if a command-specific helper function
* is not required, otherwise it calls the command-specific function. */
int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
if (cmd->flags & CMD_MODULE_GETKEYS) {
return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
} else if (cmd->getkeys_proc) {
return cmd->getkeys_proc(cmd,argv,argc,result);
} else {
return getKeysUsingLegacyRangeSpec(cmd,argv,argc,result);
}
}
/* Free the result of getKeysFromCommand. */
void getKeysFreeResult(getKeysResult *result) {
if (result && result->keys != result->keysbuf)
zfree(result->keys);
}
/* Helper function to extract keys from following commands:
* COMMAND [destkey] <num-keys> <key> [...] <key> [...] ... <options>
*
* eg:
* ZUNION <num-keys> <key> <key> ... <key> <options>
* ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
*
* 'storeKeyOfs': destkey index, 0 means destkey not exists.
* 'keyCountOfs': num-keys index.
* 'firstKeyOfs': firstkey index.
* 'keyStep': the interval of each key, usually this value is 1.
*
* The commands using this function have a fully defined keyspec, so returning flags isn't needed. */
int genericGetKeys(int storeKeyOfs, int keyCountOfs, int firstKeyOfs, int keyStep,
robj **argv, int argc, getKeysResult *result) {
int i, num;
keyReference *keys;
num = atoi(argv[keyCountOfs]->ptr);
/* Sanity check. Don't return any key if the command is going to
* reply with syntax error. (no input keys). */
if (num < 1 || num > (argc - firstKeyOfs)/keyStep) {
result->numkeys = 0;
return 0;
}
int numkeys = storeKeyOfs ? num + 1 : num;
keys = getKeysPrepareResult(result, numkeys);
result->numkeys = numkeys;
/* Add all key positions for argv[firstKeyOfs...n] to keys[] */
for (i = 0; i < num; i++) {
keys[i].pos = firstKeyOfs+(i*keyStep);
keys[i].flags = 0;
}
if (storeKeyOfs) {
keys[num].pos = storeKeyOfs;
keys[num].flags = 0;
}
return result->numkeys;
}
int sintercardGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 1, 2, 1, argv, argc, result);
}
int zunionInterDiffStoreGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(1, 2, 3, 1, argv, argc, result);
}
int zunionInterDiffGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 1, 2, 1, argv, argc, result);
}
int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
}
int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
}
int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 1, 2, 1, argv, argc, result);
}
int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
}
int zmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 1, 2, 1, argv, argc, result);
}
int bzmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
UNUSED(cmd);
return genericGetKeys(0, 2, 3, 1, argv, argc, result);
}
/* Helper function to extract keys from the SORT RO command.
*
* SORT <sort-key>
*
* The second argument of SORT is always a key, however an arbitrary number of
* keys may be accessed while doing the sort (the BY and GET args), so the
* key-spec declares incomplete keys which is why we have to provide a concrete
* implementation to fetch the keys.
*
* This command declares incomplete keys, so the flags are correctly set for this function */
int sortROGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
keyReference *keys;
UNUSED(cmd);
UNUSED(argv);
UNUSED(argc);
keys = getKeysPrepareResult(result, 1);
keys[0].pos = 1; /* <sort-key> is always present. */
keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
result->numkeys = 1;
return result->numkeys;
}
/* Helper function to extract keys from the SORT command.
*
* SORT <sort-key> ... STORE <store-key> ...
*
* The first argument of SORT is always a key, however a list of options
* follow in SQL-alike style. Here we parse just the minimum in order to
* correctly identify keys in the "STORE" option.
*
* This command declares incomplete keys, so the flags are correctly set for this function */
int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int i, j, num, found_store = 0;
keyReference *keys;
UNUSED(cmd);
num = 0;
keys = getKeysPrepareResult(result, 2); /* Alloc 2 places for the worst case. */
keys[num].pos = 1; /* <sort-key> is always present. */
keys[num++].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
/* Search for STORE option. By default we consider options to don't
* have arguments, so if we find an unknown option name we scan the
* next. However there are options with 1 or 2 arguments, so we
* provide a list here in order to skip the right number of args. */
struct {
char *name;
int skip;
} skiplist[] = {
{"limit", 2},
{"get", 1},
{"by", 1},
{NULL, 0} /* End of elements. */
};
for (i = 2; i < argc; i++) {
for (j = 0; skiplist[j].name != NULL; j++) {
if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
i += skiplist[j].skip;
break;
} else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
/* Note: we don't increment "num" here and continue the loop
* to be sure to process the *last* "STORE" option if multiple
* ones are provided. This is same behavior as SORT. */
found_store = 1;
keys[num].pos = i+1; /* <store-key> */
keys[num].flags = CMD_KEY_OW | CMD_KEY_UPDATE;
break;
}
}
}
result->numkeys = num + found_store;
return result->numkeys;
}
/* This command declares incomplete keys, so the flags are correctly set for this function */
int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int i, j, num, first;
keyReference *keys;
UNUSED(cmd);
/* Assume the obvious form. */
first = 3;
num = 1;
/* But check for the extended one with the KEYS option. */
struct {
char* name;
int skip;
} skip_keywords[] = {
{"copy", 0},
{"replace", 0},
{"auth", 1},
{"auth2", 2},
{NULL, 0}
};
if (argc > 6) {
for (i = 6; i < argc; i++) {
if (!strcasecmp(argv[i]->ptr, "keys")) {
if (sdslen(argv[3]->ptr) > 0) {
/* This is a syntax error. So ignore the keys and leave
* the syntax error to be handled by migrateCommand. */
num = 0;
} else {
first = i + 1;
num = argc - first;
}
break;
}
for (j = 0; skip_keywords[j].name != NULL; j++) {
if (!strcasecmp(argv[i]->ptr, skip_keywords[j].name)) {
i += skip_keywords[j].skip;
break;
}
}
}
}
keys = getKeysPrepareResult(result, num);
for (i = 0; i < num; i++) {
keys[i].pos = first+i;
keys[i].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_DELETE;
}
result->numkeys = num;
return num;
}
/* Helper function to extract keys from following commands:
* GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
* [COUNT count] [STORE key|STOREDIST key]
* GEORADIUSBYMEMBER key member radius unit ... options ...
*
* This command has a fully defined keyspec, so returning flags isn't needed. */
int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int i, num;
keyReference *keys;
UNUSED(cmd);
/* Check for the presence of the stored key in the command */
int stored_key = -1;
for (i = 5; i < argc; i++) {
char *arg = argv[i]->ptr;
/* For the case when user specifies both "store" and "storedist" options, the
* second key specified would override the first key. This behavior is kept
* the same as in georadiusCommand method.
*/
if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) {
stored_key = i+1;
i++;
}
}
num = 1 + (stored_key == -1 ? 0 : 1);
/* Keys in the command come from two places:
* argv[1] = key,
* argv[5...n] = stored key if present
*/
keys = getKeysPrepareResult(result, num);
/* Add all key positions to keys[] */
keys[0].pos = 1;
keys[0].flags = 0;
if(num > 1) {
keys[1].pos = stored_key;
keys[1].flags = 0;
}
result->numkeys = num;
return num;
}
/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
* STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N
*
* This command has a fully defined keyspec, so returning flags isn't needed. */
int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
int i, num = 0;
keyReference *keys;
UNUSED(cmd);
/* We need to parse the options of the command in order to seek the first
* "STREAMS" string which is actually the option. This is needed because
* "STREAMS" could also be the name of the consumer group and even the
* name of the stream key. */
int streams_pos = -1;
for (i = 1; i < argc; i++) {
char *arg = argv[i]->ptr;
if (!strcasecmp(arg, "block")) {
i++; /* Skip option argument. */
} else if (!strcasecmp(arg, "count")) {
i++; /* Skip option argument. */
} else if (!strcasecmp(arg, "group")) {
i += 2; /* Skip option argument. */
} else if (!strcasecmp(arg, "noack")) {
/* Nothing to do. */
} else if (!strcasecmp(arg, "streams")) {
streams_pos = i;
break;
} else {
break; /* Syntax error. */
}
}
if (streams_pos != -1) num = argc - streams_pos - 1;
/* Syntax error. */
if (streams_pos == -1 || num == 0 || num % 2 != 0) {
result->numkeys = 0;
return 0;
}
num /= 2; /* We have half the keys as there are arguments because
there are also the IDs, one per key. */
keys = getKeysPrepareResult(result, num);
for (i = streams_pos+1; i < argc-num; i++) {
keys[i-streams_pos-1].pos = i;
keys[i-streams_pos-1].flags = 0;
}
result->numkeys = num;
return num;
}
/* Helper function to extract keys from the SET command, which may have
* a read flag if the GET argument is passed in. */
int setGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
keyReference *keys;
UNUSED(cmd);
keys = getKeysPrepareResult(result, 1);
keys[0].pos = 1; /* We always know the position */
result->numkeys = 1;
for (int i = 3; i < argc; i++) {
char *arg = argv[i]->ptr;
if ((arg[0] == 'g' || arg[0] == 'G') &&
(arg[1] == 'e' || arg[1] == 'E') &&
(arg[2] == 't' || arg[2] == 'T') && arg[3] == '\0')
{
keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE;
return 1;
}
}
keys[0].flags = CMD_KEY_OW | CMD_KEY_UPDATE;
return 1;
}
/* Helper function to extract keys from the BITFIELD command, which may be
* read-only if the BITFIELD GET subcommand is used. */
int bitfieldGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
keyReference *keys;
int readonly = 1;
UNUSED(cmd);
keys = getKeysPrepareResult(result, 1);
keys[0].pos = 1; /* We always know the position */
result->numkeys = 1;
for (int i = 2; i < argc; i++) {
int remargs = argc - i - 1; /* Remaining args other than current. */
char *arg = argv[i]->ptr;
if (!strcasecmp(arg, "get") && remargs >= 2) {
i += 2;
} else if ((!strcasecmp(arg, "set") || !strcasecmp(arg, "incrby")) && remargs >= 3) {
readonly = 0;
i += 3;
break;
} else if (!strcasecmp(arg, "overflow") && remargs >= 1) {
i += 1;
} else {
readonly = 0; /* Syntax error. safer to assume non-RO. */
break;
}
}
if (readonly) {
keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
} else {
keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE;
}
return 1;
}