Replica keep serving data during repl-diskless-load=swapdb for better availability (#9323)

For diskless replication in swapdb mode, considering we already spend replica memory
having a backup of current db to restore in case of failure, we can have the following benefits
by instead swapping database only in case we succeeded in transferring db from master:

- Avoid `LOADING` response during failed and successful synchronization for cases where the
  replica is already up and running with data.
- Faster total time of diskless replication, because now we're moving from Transfer + Flush + Load
  time to Transfer + Load only. Flushing the tempDb is done asynchronously after swapping.
- This could be implemented also for disk replication with similar benefits if consumers are willing
  to spend the extra memory usage.

General notes:
- The concept of `backupDb` becomes `tempDb` for clarity.
- Async loading mode will only kick in if the replica is syncing from a master that has the same
  repl-id the one it had before. i.e. the data it's getting belongs to a different time of the same timeline. 
- New property in INFO: `async_loading` to differentiate from the blocking loading
- Slot to Key mapping is now a field of `redisDb` as it's more natural to access it from both server.db
  and the tempDb that is passed around.
- Because this is affecting replicas only, we assume that if they are not readonly and write commands
  during replication, they are lost after SYNC same way as before, but we're still denying CONFIG SET
  here anyways to avoid complications.

Considerations for review:
- We have many cases where server.loading flag is used and even though I tried my best, there may
  be cases where async_loading should be checked as well and cases where it shouldn't (would require
  very good understanding of whole code)
- Several places that had different behavior depending on the loading flag where actually meant to just
  handle commands coming from the AOF client differently than ones coming from real clients, changed
  to check CLIENT_ID_AOF instead.

**Additional for Release Notes**
- Bugfix - server.dirty was not incremented for any kind of diskless replication, as effect it wouldn't
  contribute on triggering next database SAVE
- New flag for RM_GetContextFlags module API: REDISMODULE_CTX_FLAGS_ASYNC_LOADING
- Deprecated RedisModuleEvent_ReplBackup. Starting from Redis 7.0, we don't fire this event.
  Instead, we have the new RedisModuleEvent_ReplAsyncLoad holding 3 sub-events: STARTED,
  ABORTED and COMPLETED.
- New module flag REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD for RedisModule_SetModuleOptions
  to allow modules to declare they support the diskless replication with async loading (when absent, we fall
  back to disk-based loading).

Co-authored-by: Eduardo Semprebon <edus@saxobank.com>
Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
Eduardo Semprebon 2021-11-04 09:46:50 +01:00 committed by GitHub
parent 06dd202a05
commit 91d0c758e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 766 additions and 315 deletions

View File

@ -605,9 +605,13 @@ repl-diskless-sync-delay 5
#
# "disabled" - Don't use diskless load (store the rdb file to the disk first)
# "on-empty-db" - Use diskless load only when it is completely safe.
# "swapdb" - Keep a copy of the current db contents in RAM while parsing
# the data directly from the socket. note that this requires
# sufficient memory, if you don't have it, you risk an OOM kill.
# "swapdb" - Keep current db contents in RAM while parsing the data directly
# from the socket. Replicas in this mode can keep serving current
# data set while replication is in progress, except for cases where
# they can't recognize master as having a data set from same
# replication history.
# Note that this requires sufficient memory, if you don't have it,
# you risk an OOM kill.
repl-diskless-load disabled
# Replicas send PINGs to server in a predefined interval. It's possible to

View File

@ -748,7 +748,7 @@ int loadAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,"Reading RDB preamble from AOF file...");
if (fseek(fp,0,SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb,fp);
if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL) != C_OK) {
if (rdbLoadRio(&rdb,RDBFLAGS_AOF_PREAMBLE,NULL,server.db) != C_OK) {
serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted");
goto readerr;
} else {

View File

@ -593,8 +593,8 @@ void clusterInit(void) {
serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
}
/* Reset data for the Slot to key API. */
slotToKeyFlush();
/* Initialize data for the Slot to key API. */
slotToKeyInit(server.db);
/* Set myself->port/cport/pport to my listening ports, we'll just need to
* discover the IP address via MEET messages. */
@ -4954,7 +4954,7 @@ NULL
unsigned int keys_in_slot = countKeysInSlot(slot);
unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
addReplyArrayLen(c,numkeys);
dictEntry *de = server.cluster->slots_to_keys[slot].head;
dictEntry *de = (*server.db->slots_to_keys).by_slot[slot].head;
for (unsigned int j = 0; j < numkeys; j++) {
serverAssert(de != NULL);
sds sdskey = dictGetKey(de);
@ -6201,26 +6201,28 @@ int clusterRedirectBlockedClientIfNeeded(client *c) {
* while rehashing the cluster and in other conditions when we need to
* understand if we have keys for a given hash slot. */
void slotToKeyAddEntry(dictEntry *entry) {
void slotToKeyAddEntry(dictEntry *entry, redisDb *db) {
sds key = entry->key;
unsigned int hashslot = keyHashSlot(key, sdslen(key));
server.cluster->slots_to_keys[hashslot].count++;
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
slot_to_keys->count++;
/* Insert entry before the first element in the list. */
dictEntry *first = server.cluster->slots_to_keys[hashslot].head;
dictEntry *first = slot_to_keys->head;
dictEntryNextInSlot(entry) = first;
if (first != NULL) {
serverAssert(dictEntryPrevInSlot(first) == NULL);
dictEntryPrevInSlot(first) = entry;
}
serverAssert(dictEntryPrevInSlot(entry) == NULL);
server.cluster->slots_to_keys[hashslot].head = entry;
slot_to_keys->head = entry;
}
void slotToKeyDelEntry(dictEntry *entry) {
void slotToKeyDelEntry(dictEntry *entry, redisDb *db) {
sds key = entry->key;
unsigned int hashslot = keyHashSlot(key, sdslen(key));
server.cluster->slots_to_keys[hashslot].count--;
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
slot_to_keys->count--;
/* Connect previous and next entries to each other. */
dictEntry *next = dictEntryNextInSlot(entry);
@ -6232,14 +6234,14 @@ void slotToKeyDelEntry(dictEntry *entry) {
dictEntryNextInSlot(prev) = next;
} else {
/* The removed entry was the first in the list. */
serverAssert(server.cluster->slots_to_keys[hashslot].head == entry);
server.cluster->slots_to_keys[hashslot].head = next;
serverAssert(slot_to_keys->head == entry);
slot_to_keys->head = next;
}
}
/* Updates neighbour entries when an entry has been replaced (e.g. reallocated
* during active defrag). */
void slotToKeyReplaceEntry(dictEntry *entry) {
void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db) {
dictEntry *next = dictEntryNextInSlot(entry);
dictEntry *prev = dictEntryPrevInSlot(entry);
if (next != NULL) {
@ -6251,33 +6253,33 @@ void slotToKeyReplaceEntry(dictEntry *entry) {
/* The replaced entry was the first in the list. */
sds key = entry->key;
unsigned int hashslot = keyHashSlot(key, sdslen(key));
server.cluster->slots_to_keys[hashslot].head = entry;
slotToKeys *slot_to_keys = &(*db->slots_to_keys).by_slot[hashslot];
slot_to_keys->head = entry;
}
}
/* Copies the slots-keys map to the specified backup structure. */
void slotToKeyCopyToBackup(clusterSlotsToKeysData *backup) {
memcpy(backup, server.cluster->slots_to_keys,
sizeof(server.cluster->slots_to_keys));
/* Initialize slots-keys map of given db. */
void slotToKeyInit(redisDb *db) {
db->slots_to_keys = zcalloc(sizeof(clusterSlotToKeyMapping));
}
/* Overwrites the slots-keys map by copying the provided backup structure. */
void slotToKeyRestoreBackup(clusterSlotsToKeysData *backup) {
memcpy(server.cluster->slots_to_keys, backup,
sizeof(server.cluster->slots_to_keys));
/* Empty slots-keys map of given db. */
void slotToKeyFlush(redisDb *db) {
memset(db->slots_to_keys, 0,
sizeof(clusterSlotToKeyMapping));
}
/* Empty the slots-keys map of Redis Cluster. */
void slotToKeyFlush(void) {
memset(&server.cluster->slots_to_keys, 0,
sizeof(server.cluster->slots_to_keys));
/* Free slots-keys map of given db. */
void slotToKeyDestroy(redisDb *db) {
zfree(db->slots_to_keys);
db->slots_to_keys = NULL;
}
/* Remove all the keys in the specified hash slot.
* The number of removed items is returned. */
unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int j = 0;
dictEntry *de = server.cluster->slots_to_keys[hashslot].head;
dictEntry *de = (*server.db->slots_to_keys).by_slot[hashslot].head;
while (de != NULL) {
sds sdskey = dictGetKey(de);
de = dictEntryNextInSlot(de);
@ -6290,5 +6292,5 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
}
unsigned int countKeysInSlot(unsigned int hashslot) {
return server.cluster->slots_to_keys[hashslot].count;
return (*server.db->slots_to_keys).by_slot[hashslot].count;
}

View File

@ -141,14 +141,17 @@ typedef struct clusterNode {
list *fail_reports; /* List of nodes signaling this as failing */
} clusterNode;
/* State for the Slot to Key API, for a single slot. The keys in the same slot
* are linked together using dictEntry metadata. See also "Slot to Key API" in
* cluster.c. */
struct clusterSlotToKeys {
/* Slot to keys for a single slot. The keys in the same slot are linked together
* using dictEntry metadata. */
typedef struct slotToKeys {
uint64_t count; /* Number of keys in the slot. */
dictEntry *head; /* The first key-value entry in the slot. */
} slotToKeys;
/* Slot to keys mapping for all slots, opaque outside this file. */
struct clusterSlotToKeyMapping {
slotToKeys by_slot[CLUSTER_SLOTS];
};
typedef struct clusterSlotToKeys clusterSlotsToKeysData[CLUSTER_SLOTS];
/* Dict entry metadata for cluster mode, used for the Slot to Key API to form a
* linked list of the entries belonging to the same slot. */
@ -168,7 +171,6 @@ typedef struct clusterState {
clusterNode *migrating_slots_to[CLUSTER_SLOTS];
clusterNode *importing_slots_from[CLUSTER_SLOTS];
clusterNode *slots[CLUSTER_SLOTS];
clusterSlotsToKeysData slots_to_keys;
/* The following fields are used to take the slave state on elections. */
mstime_t failover_auth_time; /* Time of previous or next election. */
int failover_auth_count; /* Number of votes received so far. */
@ -315,11 +317,11 @@ unsigned long getClusterConnectionsCount(void);
int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len);
void clusterPropagatePublish(robj *channel, robj *message);
unsigned int keyHashSlot(char *key, int keylen);
void slotToKeyAddEntry(dictEntry *entry);
void slotToKeyDelEntry(dictEntry *entry);
void slotToKeyReplaceEntry(dictEntry *entry);
void slotToKeyCopyToBackup(clusterSlotsToKeysData *backup);
void slotToKeyRestoreBackup(clusterSlotsToKeysData *backup);
void slotToKeyFlush(void);
void slotToKeyAddEntry(dictEntry *entry, redisDb *db);
void slotToKeyDelEntry(dictEntry *entry, redisDb *db);
void slotToKeyReplaceEntry(dictEntry *entry, redisDb *db);
void slotToKeyInit(redisDb *db);
void slotToKeyFlush(redisDb *db);
void slotToKeyDestroy(redisDb *db);
#endif /* __CLUSTER_H */

145
src/db.c
View File

@ -35,12 +35,6 @@
#include <signal.h>
#include <ctype.h>
/* Database backup. */
struct dbBackup {
redisDb *dbarray;
clusterSlotsToKeysData slots_to_keys;
};
/*-----------------------------------------------------------------------------
* C-level DB API
*----------------------------------------------------------------------------*/
@ -187,7 +181,7 @@ void dbAdd(redisDb *db, robj *key, robj *val) {
serverAssertWithInfo(NULL, key, de != NULL);
dictSetVal(db->dict, de, val);
signalKeyAsReady(db, key, val->type);
if (server.cluster_enabled) slotToKeyAddEntry(de);
if (server.cluster_enabled) slotToKeyAddEntry(de, db);
}
/* This is a special version of dbAdd() that is used only when loading
@ -205,7 +199,7 @@ int dbAddRDBLoad(redisDb *db, sds key, robj *val) {
dictEntry *de = dictAddRaw(db->dict, key, NULL);
if (de == NULL) return 0;
dictSetVal(db->dict, de, val);
if (server.cluster_enabled) slotToKeyAddEntry(de);
if (server.cluster_enabled) slotToKeyAddEntry(de, db);
return 1;
}
@ -321,7 +315,7 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) {
freeObjAsync(key, val, db->id);
dictSetVal(db->dict, de, NULL);
}
if (server.cluster_enabled) slotToKeyDelEntry(de);
if (server.cluster_enabled) slotToKeyDelEntry(de, db);
dictFreeUnlinkedEntry(db->dict,de);
return 1;
} else {
@ -385,7 +379,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
}
/* Remove all keys from the database(s) structure. The dbarray argument
* may not be the server main DBs (could be a backup).
* may not be the server main DBs (could be a temporary DB).
*
* The dbnum can be -1 if all the DBs should be emptied, or the specified
* DB index if we want to empty only a single database.
@ -459,7 +453,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*)) {
/* Flush slots to keys map if enable cluster, we can flush entire
* slots to keys map whatever dbnum because only support one DB
* in cluster mode. */
if (server.cluster_enabled) slotToKeyFlush();
if (server.cluster_enabled) slotToKeyFlush(server.db);
if (dbnum == -1) flushSlaveKeysWithExpireList();
@ -472,77 +466,40 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*)) {
return removed;
}
/* Store a backup of the database for later use, and put an empty one
* instead of it. */
dbBackup *backupDb(void) {
dbBackup *backup = zmalloc(sizeof(dbBackup));
/* Backup main DBs. */
backup->dbarray = zmalloc(sizeof(redisDb)*server.dbnum);
/* Initialize temporary db on replica for use during diskless replication. */
redisDb *initTempDb(void) {
redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum);
for (int i=0; i<server.dbnum; i++) {
backup->dbarray[i] = server.db[i];
server.db[i].dict = dictCreate(&dbDictType);
server.db[i].expires = dictCreate(&dbExpiresDictType);
tempDb[i].dict = dictCreate(&dbDictType);
tempDb[i].expires = dictCreate(&dbExpiresDictType);
tempDb[i].slots_to_keys = NULL;
}
/* Backup cluster slots to keys map if enable cluster. */
if (server.cluster_enabled) {
slotToKeyCopyToBackup(&backup->slots_to_keys);
slotToKeyFlush();
/* Prepare temp slot to key map to be written during async diskless replication. */
slotToKeyInit(tempDb);
}
moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP,
REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE,
NULL);
return backup;
return tempDb;
}
/* Discard a previously created backup, this can be slow (similar to FLUSHALL)
* Arguments are similar to the ones of emptyDb, see EMPTYDB_ flags. */
void discardDbBackup(dbBackup *backup, int flags, void(callback)(dict*)) {
int async = (flags & EMPTYDB_ASYNC);
/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */
void discardTempDb(redisDb *tempDb, void(callback)(dict*)) {
int async = 1;
/* Release main DBs backup . */
emptyDbStructure(backup->dbarray, -1, async, callback);
/* Release temp DBs. */
emptyDbStructure(tempDb, -1, async, callback);
for (int i=0; i<server.dbnum; i++) {
dictRelease(backup->dbarray[i].dict);
dictRelease(backup->dbarray[i].expires);
dictRelease(tempDb[i].dict);
dictRelease(tempDb[i].expires);
}
/* Release backup. */
zfree(backup->dbarray);
zfree(backup);
moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP,
REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD,
NULL);
}
/* Restore the previously created backup (discarding what currently resides
* in the db).
* This function should be called after the current contents of the database
* was emptied with a previous call to emptyDb (possibly using the async mode). */
void restoreDbBackup(dbBackup *backup) {
/* Restore main DBs. */
for (int i=0; i<server.dbnum; i++) {
serverAssert(dictSize(server.db[i].dict) == 0);
serverAssert(dictSize(server.db[i].expires) == 0);
dictRelease(server.db[i].dict);
dictRelease(server.db[i].expires);
server.db[i] = backup->dbarray[i];
if (server.cluster_enabled) {
/* Release temp slot to key map. */
slotToKeyDestroy(tempDb);
}
/* Restore slots to keys map backup if enable cluster. */
if (server.cluster_enabled) slotToKeyRestoreBackup(&backup->slots_to_keys);
/* Release backup. */
zfree(backup->dbarray);
zfree(backup);
moduleFireServerEvent(REDISMODULE_EVENT_REPL_BACKUP,
REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE,
NULL);
zfree(tempDb);
}
int selectDb(client *c, int id) {
@ -591,6 +548,10 @@ void signalFlushedDb(int dbid, int async) {
}
trackingInvalidateKeysOnFlush(async);
/* Changes in this method may take place in swapMainDbWithTempDb as well,
* where we execute similar calls, but with subtle differences as it's
* not simply flushing db. */
}
/*-----------------------------------------------------------------------------
@ -1358,6 +1319,54 @@ int dbSwapDatabases(int id1, int id2) {
return C_OK;
}
/* Logically, this discards (flushes) the old main database, and apply the newly loaded
* database (temp) as the main (active) database, the actual freeing of old database
* (which will now be placed in the temp one) is done later. */
void swapMainDbWithTempDb(redisDb *tempDb) {
if (server.cluster_enabled) {
/* Swap slots_to_keys from tempdb just loaded with main db slots_to_keys. */
clusterSlotToKeyMapping *aux = server.db->slots_to_keys;
server.db->slots_to_keys = tempDb->slots_to_keys;
tempDb->slots_to_keys = aux;
}
for (int i=0; i<server.dbnum; i++) {
redisDb aux = server.db[i];
redisDb *activedb = &server.db[i], *newdb = &tempDb[i];
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since clients
* remain in the same DB they were. */
activedb->dict = newdb->dict;
activedb->expires = newdb->expires;
activedb->avg_ttl = newdb->avg_ttl;
activedb->expires_cursor = newdb->expires_cursor;
newdb->dict = aux.dict;
newdb->expires = aux.expires;
newdb->avg_ttl = aux.avg_ttl;
newdb->expires_cursor = aux.expires_cursor;
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
* X in a given DB, may now actually be unblocked if X happens
* to exist in the new version of the DB, after the swap.
*
* However normally we only do this check for efficiency reasons
* in dbAdd() when a list is created. So here we need to rescan
* the list of clients blocked on lists and signal lists as ready
* if needed.
*
* Also the swapdb should make transaction fail if there is any
* client watching keys. */
scanDatabaseForReadyLists(activedb);
touchAllWatchedKeysInDb(activedb, newdb);
}
trackingInvalidateKeysOnFlush(1);
flushSlaveKeysWithExpireList();
}
/* SWAPDB db1 db2 */
void swapdbCommand(client *c) {
int id1, id2;

View File

@ -903,7 +903,7 @@ void defragDictBucketCallback(dict *d, dictEntry **bucketref) {
*bucketref = newde;
if (server.cluster_enabled && d == server.db[0].dict) {
/* Cluster keyspace dict. Update slot-to-entries mapping. */
slotToKeyReplaceEntry(newde);
slotToKeyReplaceEntry(newde, server.db);
}
}
bucketref = &(*bucketref)->next;

View File

@ -1344,6 +1344,10 @@ int RM_BlockedClientMeasureTimeEnd(RedisModuleBlockedClient *bc) {
*
* REDISMODULE_OPTION_NO_IMPLICIT_SIGNAL_MODIFIED:
* See RM_SignalModifiedKey().
*
* REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD:
* Setting this flag indicates module awareness of diskless async replication (repl-diskless-load=swapdb)
* and that redis could be serving reads during replication instead of blocking with LOADING status.
*/
void RM_SetModuleOptions(RedisModuleCtx *ctx, int options) {
ctx->module->options = options;
@ -2714,7 +2718,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
if (server.cluster_enabled)
flags |= REDISMODULE_CTX_FLAGS_CLUSTER;
if (server.loading)
if (server.async_loading)
flags |= REDISMODULE_CTX_FLAGS_ASYNC_LOADING;
else if (server.loading)
flags |= REDISMODULE_CTX_FLAGS_LOADING;
/* Maxmemory and eviction policy */
@ -5515,6 +5521,24 @@ int moduleAllDatatypesHandleErrors() {
return 1;
}
/* Returns 0 if module did not declare REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD, in which case
* diskless async loading should be avoided because module doesn't know there can be traffic during
* database full resynchronization. */
int moduleAllModulesHandleReplAsyncLoad() {
dictIterator *di = dictGetIterator(modules);
dictEntry *de;
while ((de = dictNext(di)) != NULL) {
struct RedisModule *module = dictGetVal(de);
if (!(module->options & REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD)) {
dictReleaseIterator(di);
return 0;
}
}
dictReleaseIterator(di);
return 1;
}
/* Returns true if any previous IO API failed.
* for `Load*` APIs the REDISMODULE_OPTIONS_HANDLE_IO_ERRORS flag must be set with
* RedisModule_SetModuleOptions first. */
@ -9165,8 +9189,12 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* int32_t dbnum_second; // Swap Db second dbnum
*
* * RedisModuleEvent_ReplBackup
*
* WARNING: Replication Backup events are deprecated since Redis 7.0 and are never fired.
* See RedisModuleEvent_ReplAsyncLoad for understanding how Async Replication Loading events
* are now triggered when repl-diskless-load is set to swapdb.
*
* Called when diskless-repl-load config is set to swapdb,
* Called when repl-diskless-load config is set to swapdb,
* And redis needs to backup the the current database for the
* possibility to be restored later. A module with global data and
* maybe with aux_load and aux_save callbacks may need to use this
@ -9176,6 +9204,19 @@ void ModuleForkDoneHandler(int exitcode, int bysignal) {
* * `REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE`
* * `REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE`
* * `REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD`
*
* * RedisModuleEvent_ReplAsyncLoad
*
* Called when repl-diskless-load config is set to swapdb and a replication with a master of same
* data set history (matching replication ID) occurs.
* In which case redis serves current data set while loading new database in memory from socket.
* Modules must have declared they support this mechanism in order to activate it, through
* REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD flag.
* The following sub events are available:
*
* * `REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED`
* * `REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED`
* * `REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED`
*
* * RedisModuleEvent_ForkChild
*
@ -9255,8 +9296,8 @@ int RM_IsSubEventSupported(RedisModuleEvent event, int64_t subevent) {
return subevent < _REDISMODULE_SUBEVENT_LOADING_PROGRESS_NEXT;
case REDISMODULE_EVENT_SWAPDB:
return subevent < _REDISMODULE_SUBEVENT_SWAPDB_NEXT;
case REDISMODULE_EVENT_REPL_BACKUP:
return subevent < _REDISMODULE_SUBEVENT_REPL_BACKUP_NEXT;
case REDISMODULE_EVENT_REPL_ASYNC_LOAD:
return subevent < _REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_NEXT;
case REDISMODULE_EVENT_FORK_CHILD:
return subevent < _REDISMODULE_SUBEVENT_FORK_CHILD_NEXT;
default:
@ -9794,6 +9835,8 @@ sds genModulesInfoStringRenderModuleOptions(struct RedisModule *module) {
sds output = sdsnew("[");
if (module->options & REDISMODULE_OPTIONS_HANDLE_IO_ERRORS)
output = sdscat(output,"handle-io-errors|");
if (module->options & REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD)
output = sdscat(output,"handle-repl-async-load|");
output = sdstrim(output,"|");
output = sdscat(output,"]");
return output;

View File

@ -244,7 +244,11 @@ void execCommand(client *c) {
"This command is no longer allowed for the "
"following reason: %s", reason);
} else {
call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
if (c->id == CLIENT_ID_AOF)
call(c,CMD_CALL_NONE);
else
call(c,CMD_CALL_FULL);
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
}
@ -398,7 +402,7 @@ void touchWatchedKey(redisDb *db, robj *key) {
/* Set CLIENT_DIRTY_CAS to all clients of DB when DB is dirty.
* It may happen in the following situations:
* FLUSHDB, FLUSHALL, SWAPDB
* FLUSHDB, FLUSHALL, SWAPDB, end of successful diskless replication.
*
* replaced_with: for SWAPDB, the WATCH should be invalidated if
* the key exists in either of them, and skipped only if it

View File

@ -48,6 +48,10 @@
/* This macro is called when RDB read failed (possibly a short read) */
#define rdbReportReadError(...) rdbReportError(0, __LINE__,__VA_ARGS__)
/* This macro tells if we are in the context of a RESTORE command, and not loading an RDB or AOF. */
#define isRestoreContext() \
(server.current_client == NULL || server.current_client->id == CLIENT_ID_AOF) ? 0 : 1
char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */
extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
@ -68,7 +72,7 @@ void rdbReportError(int corruption_error, int linenum, char *reason, ...) {
vsnprintf(msg+len,sizeof(msg)-len,reason,ap);
va_end(ap);
if (!server.loading) {
if (isRestoreContext()) {
/* If we're in the context of a RESTORE command, just propagate the error. */
/* log in VERBOSE, and return (don't exit). */
serverLog(LL_VERBOSE, "%s", msg);
@ -381,7 +385,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
if ((clen = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if ((c = ztrymalloc(clen)) == NULL) {
serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen);
serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)clen);
goto err;
}
@ -392,7 +396,7 @@ void *rdbLoadLzfStringObject(rio *rdb, int flags, size_t *lenptr) {
val = sdstrynewlen(SDS_NOINIT,len);
}
if (!val) {
serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len);
serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbLoadLzfStringObject failed allocating %llu bytes", (unsigned long long)len);
goto err;
}
@ -525,7 +529,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
if (plain || sds) {
void *buf = plain ? ztrymalloc(len) : sdstrynewlen(SDS_NOINIT,len);
if (!buf) {
serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
return NULL;
}
if (lenptr) *lenptr = len;
@ -541,7 +545,7 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) {
robj *o = encode ? tryCreateStringObject(SDS_NOINIT,len) :
tryCreateRawStringObject(SDS_NOINIT,len);
if (!o) {
serverLog(server.loading? LL_WARNING: LL_VERBOSE, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
serverLog(isRestoreContext()? LL_VERBOSE: LL_WARNING, "rdbGenericLoadStringObject failed allocating %llu bytes", len);
return NULL;
}
if (len && rioRead(rdb,o->ptr,len) == 0) {
@ -2517,9 +2521,10 @@ emptykey:
/* Mark that we are loading in the global state and setup the fields
* needed to provide loading stats. */
void startLoading(size_t size, int rdbflags) {
void startLoading(size_t size, int rdbflags, int async) {
/* Load the DB */
server.loading = 1;
if (async == 1) server.async_loading = 1;
server.loading_start_time = time(NULL);
server.loading_loaded_bytes = 0;
server.loading_total_bytes = size;
@ -2547,7 +2552,7 @@ void startLoadingFile(FILE *fp, char* filename, int rdbflags) {
if (fstat(fileno(fp), &sb) == -1)
sb.st_size = 0;
rdbFileBeingLoaded = filename;
startLoading(sb.st_size, rdbflags);
startLoading(sb.st_size, rdbflags, 0);
}
/* Refresh the loading progress info */
@ -2560,6 +2565,7 @@ void loadingProgress(off_t pos) {
/* Loading finished */
void stopLoading(int success) {
server.loading = 0;
server.async_loading = 0;
blockingOperationEnds();
rdbFileBeingLoaded = NULL;
@ -2610,10 +2616,10 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *dbarray) {
uint64_t dbid = 0;
int type, rdbver;
redisDb *db = server.db+0;
redisDb *db = dbarray+0;
char buf[1024];
int error;
long long empty_keys_skipped = 0;
@ -2685,7 +2691,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
"databases. Exiting\n", server.dbnum);
exit(1);
}
db = server.db+dbid;
db = dbarray+dbid;
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently
@ -2962,7 +2968,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
if ((fp = fopen(filename,"r")) == NULL) return C_ERR;
startLoadingFile(fp, filename,rdbflags);
rioInitWithFile(&rdb,fp);
retval = rdbLoadRio(&rdb,rdbflags,rsi);
retval = rdbLoadRio(&rdb,rdbflags,rsi,server.db);
fclose(fp);
stopLoading(retval==C_OK);
return retval;

View File

@ -166,7 +166,7 @@ int rdbSaveBinaryDoubleValue(rio *rdb, double val);
int rdbLoadBinaryDoubleValue(rio *rdb, double *val);
int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi, redisDb *db);
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);

View File

@ -150,11 +150,13 @@ typedef struct RedisModuleStreamID {
#define REDISMODULE_CTX_FLAGS_DENY_BLOCKING (1<<21)
/* The current client uses RESP3 protocol */
#define REDISMODULE_CTX_FLAGS_RESP3 (1<<22)
/* Redis is currently async loading database for diskless replication. */
#define REDISMODULE_CTX_FLAGS_ASYNC_LOADING (1<<23)
/* Next context flag, must be updated when adding new flags above!
This flag should not be used directly by the module.
* Use RedisModule_GetContextFlagsAll instead. */
#define _REDISMODULE_CTX_FLAGS_NEXT (1<<23)
#define _REDISMODULE_CTX_FLAGS_NEXT (1<<24)
/* Keyspace changes notification classes. Every class is associated with a
* character for configuration purposes.
@ -229,6 +231,10 @@ typedef uint64_t RedisModuleTimerID;
/* Declare that the module can handle errors with RedisModule_SetModuleOptions. */
#define REDISMODULE_OPTIONS_HANDLE_IO_ERRORS (1<<0)
/* Declare that the module can handle diskless async replication with RedisModule_SetModuleOptions. */
#define REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD (1<<1)
/* When set, Redis will not call RedisModule_SignalModifiedKey(), implicitly in
* RedisModule_CloseKey, and the module needs to do that when manually when keys
* are modified from the user's sperspective, to invalidate WATCH. */
@ -249,9 +255,10 @@ typedef uint64_t RedisModuleTimerID;
#define REDISMODULE_EVENT_MODULE_CHANGE 9
#define REDISMODULE_EVENT_LOADING_PROGRESS 10
#define REDISMODULE_EVENT_SWAPDB 11
#define REDISMODULE_EVENT_REPL_BACKUP 12
#define REDISMODULE_EVENT_REPL_BACKUP 12 /* Deprecated since Redis 7.0, not used anymore. */
#define REDISMODULE_EVENT_FORK_CHILD 13
#define _REDISMODULE_EVENT_NEXT 14 /* Next event flag, should be updated if a new event added. */
#define REDISMODULE_EVENT_REPL_ASYNC_LOAD 14
#define _REDISMODULE_EVENT_NEXT 15 /* Next event flag, should be updated if a new event added. */
typedef struct RedisModuleEvent {
uint64_t id; /* REDISMODULE_EVENT_... defines. */
@ -311,8 +318,14 @@ static const RedisModuleEvent
REDISMODULE_EVENT_SWAPDB,
1
},
/* Deprecated since Redis 7.0, not used anymore. */
__attribute__ ((deprecated))
RedisModuleEvent_ReplBackup = {
REDISMODULE_EVENT_REPL_BACKUP,
REDISMODULE_EVENT_REPL_BACKUP,
1
},
RedisModuleEvent_ReplAsyncLoad = {
REDISMODULE_EVENT_REPL_ASYNC_LOAD,
1
},
RedisModuleEvent_ForkChild = {
@ -363,11 +376,17 @@ static const RedisModuleEvent
#define REDISMODULE_SUBEVENT_LOADING_PROGRESS_AOF 1
#define _REDISMODULE_SUBEVENT_LOADING_PROGRESS_NEXT 2
/* Replication Backup events are deprecated since Redis 7.0 and are never fired. */
#define REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE 0
#define REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE 1
#define REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD 2
#define _REDISMODULE_SUBEVENT_REPL_BACKUP_NEXT 3
#define REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED 0
#define REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED 1
#define REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED 2
#define _REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_NEXT 3
#define REDISMODULE_SUBEVENT_FORK_CHILD_BORN 0
#define REDISMODULE_SUBEVENT_FORK_CHILD_DIED 1
#define _REDISMODULE_SUBEVENT_FORK_CHILD_NEXT 2

View File

@ -1623,7 +1623,8 @@ void replicationSendNewlineToMaster(void) {
}
/* Callback used by emptyDb() while flushing away old data to load
* the new dataset received by the master. */
* the new dataset received by the master and by discardTempDb()
* after loading succeeded or failed. */
void replicationEmptyDbCallback(dict *d) {
UNUSED(d);
if (server.repl_state == REPL_STATE_TRANSFER)
@ -1689,36 +1690,49 @@ static int useDisklessLoad() {
/* compute boolean decision to use diskless load */
int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
(server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
/* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
if (enabled && !moduleAllDatatypesHandleErrors()) {
serverLog(LL_WARNING,
"Skipping diskless-load because there are modules that don't handle read errors.");
enabled = 0;
if (enabled) {
/* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
if (!moduleAllDatatypesHandleErrors()) {
serverLog(LL_WARNING,
"Skipping diskless-load because there are modules that don't handle read errors.");
enabled = 0;
}
/* Check all modules handle async replication, otherwise it's not safe to use diskless load. */
else if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB && !moduleAllModulesHandleReplAsyncLoad()) {
serverLog(LL_WARNING,
"Skipping diskless-load because there are modules that are not aware of async replication.");
enabled = 0;
}
}
return enabled;
}
/* Helper function for readSyncBulkPayload() to make backups of the current
* databases before socket-loading the new ones. The backups may be restored
* by disklessLoadRestoreBackup or freed by disklessLoadDiscardBackup later. */
dbBackup *disklessLoadMakeBackup(void) {
return backupDb();
/* Helper function for readSyncBulkPayload() to initialize tempDb
* before socket-loading the new db from master. The tempDb may be populated
* by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */
redisDb *disklessLoadInitTempDb(void) {
return initTempDb();
}
/* Helper function for readSyncBulkPayload(): when replica-side diskless
* database loading is used, Redis makes a backup of the existing databases
* before loading the new ones from the socket.
*
* If the socket loading went wrong, we want to restore the old backups
* into the server databases. */
void disklessLoadRestoreBackup(dbBackup *backup) {
restoreDbBackup(backup);
/* Helper function for readSyncBulkPayload() to discard our tempDb
* when the loading succeeded or failed. */
void disklessLoadDiscardTempDb(redisDb *tempDb) {
discardTempDb(tempDb, replicationEmptyDbCallback);
}
/* Helper function for readSyncBulkPayload() to discard our old backups
* when the loading succeeded. */
void disklessLoadDiscardBackup(dbBackup *backup, int flag) {
discardDbBackup(backup, flag, replicationEmptyDbCallback);
/* If we know we got an entirely different data set from our master
* we have no way to incrementally feed our replicas after that.
* We want our replicas to resync with us as well, if we have any sub-replicas.
* This is useful on readSyncBulkPayload in places where we just finished transferring db. */
void replicationAttachToNewMaster() {
/* Replica starts to apply data from new master, we must discard the cached
* master structure. */
serverAssert(server.master == NULL);
replicationDiscardCachedMaster();
disconnectSlaves(); /* Force our replicas to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained replicas to PSYNC. */
}
/* Asynchronously read the SYNC payload we receive from a master */
@ -1727,7 +1741,7 @@ void readSyncBulkPayload(connection *conn) {
char buf[PROTO_IOBUF_LEN];
ssize_t nread, readlen, nwritten;
int use_diskless_load = useDisklessLoad();
dbBackup *diskless_load_backup = NULL;
redisDb *diskless_load_tempDb = NULL;
int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
EMPTYDB_NO_FLAGS;
off_t left;
@ -1895,58 +1909,61 @@ void readSyncBulkPayload(connection *conn) {
*
* 2. Or when we are done reading from the socket to the RDB file, in
* such case we want just to read the RDB file in memory. */
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
/* We need to stop any AOF rewriting child before flushing and parsing
* the RDB, otherwise we'll create a copy-on-write disaster. */
if (server.aof_state != AOF_OFF) stopAppendOnly();
/* When diskless RDB loading is used by replicas, it may be configured
* in order to save the current DB instead of throwing it away,
* so that we can restore it in case of failed transfer. */
if (use_diskless_load &&
server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB)
{
/* Create a backup of server.db[] and initialize to empty
* dictionaries. */
diskless_load_backup = disklessLoadMakeBackup();
if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Initialize empty tempDb dictionaries. */
diskless_load_tempDb = disklessLoadInitTempDb();
moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED,
NULL);
} else {
replicationAttachToNewMaster();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
/* Replica starts to apply data from new master, we must discard the cached
* master structure. */
serverAssert(server.master == NULL);
replicationDiscardCachedMaster();
/* We want our slaves to resync with us as well, if we have any sub-slaves.
* The master already transferred us an entirely different data set and we
* have no way to incrementally feed our slaves after that. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
/* We call to emptyDb even in case of REPL_DISKLESS_LOAD_SWAPDB
* (Where disklessLoadMakeBackup left server.db empty) because we
* want to execute all the auxiliary logic of emptyDb (Namely,
* fire module events) */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
connSetReadHandler(conn, NULL);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
redisDb *dbarray;
int asyncLoading = 0;
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Async loading means we continue serving read commands during full resync, and
* "swap" the new db with the old db only when loading is done.
* It is enabled only on SWAPDB diskless replication when master replication ID hasn't changed,
* because in that state the old content of the db represents a different point in time of the same
* data set we're currently receiving from the master. */
if (memcmp(server.replid, server.master_replid, CONFIG_RUN_ID_SIZE) == 0) {
asyncLoading = 1;
}
dbarray = diskless_load_tempDb;
} else {
dbarray = server.db;
}
rioInitWithConn(&rdb,conn,server.repl_transfer_size);
/* Put the socket in blocking mode to simplify RDB transfer.
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout*1000);
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION);
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);
if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi) != C_OK) {
if (rdbLoadRio(&rdb,RDBFLAGS_REPLICATION,&rsi,dbarray) != C_OK) {
/* RDB loading failed. */
serverLog(LL_WARNING,
"Failed trying to load the MASTER synchronization DB "
@ -1955,13 +1972,17 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake(1);
rioFreeConn(&rdb, NULL);
/* Remove the half-loaded data in case we started with
* an empty replica. */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Restore the backed up databases. */
disklessLoadRestoreBackup(diskless_load_backup);
/* Discard potentially partially loaded tempDb. */
moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED,
NULL);
disklessLoadDiscardTempDb(diskless_load_tempDb);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding temporary DB in background");
} else {
/* Remove the half-loaded data in case we started with an empty replica. */
emptyDb(-1,empty_db_flags,replicationEmptyDbCallback);
}
/* Note that there's no point in restarting the AOF on SYNC
@ -1972,12 +1993,26 @@ void readSyncBulkPayload(connection *conn) {
/* RDB loading succeeded if we reach this point. */
if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
/* Delete the backup databases we created before starting to load
* the new RDB. Now the RDB was loaded with success so the old
* data is useless. */
disklessLoadDiscardBackup(diskless_load_backup, empty_db_flags);
/* We will soon swap main db with tempDb and replicas will start
* to apply data from new master, we must discard the cached
* master structure and force resync of sub-replicas. */
replicationAttachToNewMaster();
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Swapping active DB with loaded DB");
swapMainDbWithTempDb(diskless_load_tempDb);
moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED,
NULL);
/* Delete the old db as it's useless now. */
disklessLoadDiscardTempDb(diskless_load_tempDb);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding old DB in background");
}
/* Inform about db change, as replication was diskless and didn't cause a save. */
server.dirty++;
/* Verify the end mark is correct. */
if (usemark) {
if (!rioRead(&rdb,buf,CONFIG_RUN_ID_SIZE) ||

View File

@ -882,7 +882,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
"Write commands not allowed after non deterministic commands. Call redis.replicate_commands() at the start of your script in order to switch to single commands replication mode.");
goto cleanup;
} else if (server.masterhost && server.repl_slave_ro &&
!server.loading &&
server.lua_caller->id != CLIENT_ID_AOF &&
!(server.lua_caller->flags & CLIENT_MASTER))
{
luaPushError(lua, shared.roslaveerr->ptr);
@ -905,11 +905,11 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
* could enlarge the memory usage are not allowed, but only if this is the
* first write in the context of this script, otherwise we can't stop
* in the middle. */
if (server.maxmemory && /* Maxmemory is actually enabled. */
!server.loading && /* Don't care about mem if loading. */
!server.masterhost && /* Slave must execute the script. */
server.lua_write_dirty == 0 && /* Script had no side effects so far. */
server.lua_oom && /* Detected OOM when script start. */
if (server.maxmemory && /* Maxmemory is actually enabled. */
server.lua_caller->id != CLIENT_ID_AOF && /* Don't care about mem if loading from AOF. */
!server.masterhost && /* Slave must execute the script. */
server.lua_write_dirty == 0 && /* Script had no side effects so far. */
server.lua_oom && /* Detected OOM when script start. */
(cmd->flags & CMD_DENYOOM))
{
luaPushError(lua, shared.oomerr->ptr);
@ -922,7 +922,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
/* If this is a Redis Cluster node, we need to make sure Lua is not
* trying to access non-local keys, with the exception of commands
* received from our master or when loading the AOF back in memory. */
if (server.cluster_enabled && !server.loading &&
if (server.cluster_enabled && server.lua_caller->id != CLIENT_ID_AOF &&
!(server.lua_caller->flags & CLIENT_MASTER))
{
int error_code;

View File

@ -3680,6 +3680,7 @@ void initServerConfig(void) {
server.skip_checksum_validation = 0;
server.saveparams = NULL;
server.loading = 0;
server.async_loading = 0;
server.loading_rdb_used_mem = 0;
server.aof_state = AOF_OFF;
server.aof_rewrite_base_size = 0;
@ -4252,6 +4253,7 @@ void initServer(void) {
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
server.db[j].slots_to_keys = NULL; /* Set by clusterInit later on if necessary. */
listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
@ -5432,7 +5434,7 @@ int processCommand(client *c) {
/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */
if (server.loading && is_denyloading_command) {
if (server.loading && !server.async_loading && is_denyloading_command) {
rejectCommand(c, shared.loadingerr);
return C_OK;
}
@ -6420,6 +6422,7 @@ sds genRedisInfoString(const char *section) {
info = sdscatprintf(info,
"# Persistence\r\n"
"loading:%d\r\n"
"async_loading:%d\r\n"
"current_cow_peak:%zu\r\n"
"current_cow_size:%zu\r\n"
"current_cow_size_age:%lu\r\n"
@ -6445,7 +6448,8 @@ sds genRedisInfoString(const char *section) {
"aof_last_cow_size:%zu\r\n"
"module_fork_in_progress:%d\r\n"
"module_fork_last_cow_size:%zu\r\n",
(int)server.loading,
(int)(server.loading && !server.async_loading),
(int)server.async_loading,
server.stat_current_cow_peak,
server.stat_current_cow_bytes,
server.stat_current_cow_updated ? (unsigned long) elapsedMs(server.stat_current_cow_updated) / 1000 : 0,

View File

@ -803,6 +803,9 @@ typedef struct replBufBlock {
char buf[];
} replBufBlock;
/* Opaque type for the Slot to Key API. */
typedef struct clusterSlotToKeyMapping clusterSlotToKeyMapping;
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
@ -816,13 +819,9 @@ typedef struct redisDb {
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
clusterSlotToKeyMapping *slots_to_keys; /* Array of slots to keys. Only used in cluster mode (db 0). */
} redisDb;
/* Declare database backup that include redis main DBs and slots to keys map.
* Definition is in db.c. We can't define it here since we define CLUSTER_SLOTS
* in cluster.h. */
typedef struct dbBackup dbBackup;
/* Client MULTI/EXEC state */
typedef struct multiCmd {
robj **argv;
@ -1394,6 +1393,7 @@ struct redisServer {
/* RDB / AOF loading information */
volatile sig_atomic_t loading; /* We are loading data from disk if true */
volatile sig_atomic_t async_loading; /* We are loading data without blocking the db being served */
off_t loading_total_bytes;
off_t loading_rdb_used_mem;
off_t loading_loaded_bytes;
@ -2035,6 +2035,7 @@ void ModuleForkDoneHandler(int exitcode, int bysignal);
int TerminateModuleForkChild(int child_pid, int wait);
ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors();
int moduleAllModulesHandleReplAsyncLoad();
sds modulesCollectInfo(sds info, const char *section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
void processModuleLoadingProgressEvent(int is_aof);
@ -2338,7 +2339,7 @@ const char *getFailoverStateString();
/* Generic persistence functions */
void startLoadingFile(FILE* fp, char* filename, int rdbflags);
void startLoading(size_t size, int rdbflags);
void startLoading(size_t size, int rdbflags, int async);
void loadingProgress(off_t pos);
void stopLoading(int success);
void startSaving(int rdbflags);
@ -2663,9 +2664,8 @@ long long emptyDb(int dbnum, int flags, void(callback)(dict*));
long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, void(callback)(dict*));
void flushAllDataAndResetRDB(int flags);
long long dbTotalServerKeyCount();
dbBackup *backupDb(void);
void restoreDbBackup(dbBackup *backup);
void discardDbBackup(dbBackup *backup, int flags, void(callback)(dict*));
redisDb *initTempDb(void);
void discardTempDb(redisDb *tempDb, void(callback)(dict*));
int selectDb(client *c, int id);
@ -3054,6 +3054,7 @@ void debugDelay(int usec);
void killIOThreads(void);
void killThreads(void);
void makeThreadKillable(void);
void swapMainDbWithTempDb(redisDb *tempDb);
/* Use macro for checking log level to avoid evaluating arguments in cases log
* should be ignored due to low level. */

View File

@ -1,4 +1,4 @@
# Check replica can restore database backup correctly if fail to diskless load.
# Check that replica keys and keys to slots map are right after failing to diskless load using SWAPDB.
source "../tests/includes/init-tests.tcl"
@ -14,7 +14,7 @@ test "Cluster is writable" {
cluster_write_test 0
}
test "Right to restore backups when fail to diskless load " {
test "Main db not affected when fail to diskless load" {
set master [Rn 0]
set replica [Rn 1]
set master_id 0
@ -63,9 +63,9 @@ test "Right to restore backups when fail to diskless load " {
restart_instance redis $replica_id
$replica READONLY
# Start full sync, wait till after db is flushed (backed up)
# Start full sync, wait till after db started loading in background
wait_for_condition 500 10 {
[s $replica_id loading] eq 1
[s $replica_id async_loading] eq 1
} else {
fail "Fail to full sync"
}
@ -75,7 +75,7 @@ test "Right to restore backups when fail to diskless load " {
# Start full sync, wait till the replica detects the disconnection
wait_for_condition 500 10 {
[s $replica_id loading] eq 0
[s $replica_id async_loading] eq 0
} else {
fail "Fail to full sync"
}

View File

@ -41,6 +41,7 @@ test "Cluster nodes hard reset" {
R $id config set cluster-slave-validity-factor 10
R $id config set loading-process-events-interval-bytes 2097152
R $id config set key-load-delay 0
R $id config set repl-diskless-load disabled
R $id config rewrite
}
}

View File

@ -383,72 +383,226 @@ start_server {tags {"repl external:skip"}} {
}
}
test {slave fails full sync and diskless load swapdb recovers it} {
start_server {tags {"repl"}} {
set slave [srv 0 client]
set slave_host [srv 0 host]
set slave_port [srv 0 port]
set slave_log [srv 0 stdout]
# Diskless load swapdb when NOT async_loading (different master replid)
foreach testType {Successful Aborted} {
start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
set replica_log [srv 0 stdout]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
# Put different data sets on the master and slave
# we need to put large keys on the master since the slave replies to info only once in 2mb
$slave debug populate 2000 slave 10
$master debug populate 800 master 100000
$master config set rdbcompression no
# Set master and slave to use diskless replication
# Set master and replica to use diskless replication on swapdb mode
$master config set repl-diskless-sync yes
$master config set repl-diskless-sync-delay 0
$slave config set repl-diskless-load swapdb
$master config set save ""
$replica config set repl-diskless-load swapdb
$replica config set save ""
# Set master with a slow rdb generation, so that we can easily disconnect it mid sync
# 10ms per key, with 800 keys is 8 seconds
$master config set rdb-key-save-delay 10000
# Put different data sets on the master and replica
# We need to put large keys on the master since the replica replies to info only once in 2mb
$replica debug populate 200 slave 10
$master debug populate 1000 master 100000
$master config set rdbcompression no
# Start the replication process...
$slave slaveof $master_host $master_port
# Set a key value on replica to check status on failure and after swapping db
$replica set mykey myvalue
# wait for the slave to start reading the rdb
wait_for_condition 100 100 {
[s -1 loading] eq 1
} else {
fail "Replica didn't get into loading mode"
switch $testType {
"Aborted" {
# Set master with a slow rdb generation, so that we can easily intercept loading
# 10ms per key, with 1000 keys is 10 seconds
$master config set rdb-key-save-delay 10000
# Start the replication process
$replica replicaof $master_host $master_port
test {Diskless load swapdb (different replid): replica enter loading} {
# Wait for the replica to start reading the rdb
wait_for_condition 100 100 {
[s -1 loading] eq 1
} else {
fail "Replica didn't get into loading mode"
}
assert_equal [s -1 async_loading] 0
}
# Make sure that next sync will not start immediately so that we can catch the replica in between syncs
$master config set repl-diskless-sync-delay 5
# Kill the replica connection on the master
set killed [$master client kill type replica]
# Wait for loading to stop (fail)
wait_for_condition 100 100 {
[s -1 loading] eq 0
} else {
fail "Replica didn't disconnect"
}
test {Diskless load swapdb (different replid): old database is exposed after replication fails} {
# Ensure we see old values from replica
assert_equal [$replica get mykey] "myvalue"
# Make sure amount of replica keys didn't change
assert_equal [$replica dbsize] 201
}
# Speed up shutdown
$master config set rdb-key-save-delay 0
}
"Successful" {
# Start the replication process
$replica replicaof $master_host $master_port
# Let replica finish sync with master
wait_for_condition 100 100 {
[s -1 master_link_status] eq "up"
} else {
fail "Master <-> Replica didn't finish sync"
}
test {Diskless load swapdb (different replid): new database is exposed after swapping} {
# Ensure we don't see anymore the key that was stored only to replica and also that we don't get LOADING status
assert_equal [$replica GET mykey] ""
# Make sure amount of keys matches master
assert_equal [$replica dbsize] 1000
}
}
}
# make sure that next sync will not start immediately so that we can catch the slave in between syncs
$master config set repl-diskless-sync-delay 5
# for faster server shutdown, make rdb saving fast again (the fork is already uses the slow one)
$master config set rdb-key-save-delay 0
# waiting slave to do flushdb (key count drop)
wait_for_condition 50 100 {
2000 != [scan [regexp -inline {keys\=([\d]*)} [$slave info keyspace]] keys=%d]
} else {
fail "Replica didn't flush"
}
# make sure we're still loading
assert_equal [s -1 loading] 1
# kill the slave connection on the master
set killed [$master client kill type slave]
# wait for loading to stop (fail)
wait_for_condition 50 100 {
[s -1 loading] eq 0
} else {
fail "Replica didn't disconnect"
}
# make sure the original keys were restored
assert_equal [$slave dbsize] 2000
}
}
} {} {external:skip}
}
# Diskless load swapdb when async_loading (matching master replid)
foreach testType {Successful Aborted} {
start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
set replica_log [srv 0 stdout]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
# Set master and replica to use diskless replication on swapdb mode
$master config set repl-diskless-sync yes
$master config set repl-diskless-sync-delay 0
$master config set save ""
$replica config set repl-diskless-load swapdb
$replica config set save ""
# Set replica writable so we can check that a key we manually added is served
# during replication and after failure, but disappears on success
$replica config set replica-read-only no
# Initial sync to have matching replids between master and replica
$replica replicaof $master_host $master_port
# Let replica finish initial sync with master
wait_for_condition 100 100 {
[s -1 master_link_status] eq "up"
} else {
fail "Master <-> Replica didn't finish sync"
}
# Put different data sets on the master and replica
# We need to put large keys on the master since the replica replies to info only once in 2mb
$replica debug populate 2000 slave 10
$master debug populate 1000 master 100000
$master config set rdbcompression no
# Set a key value on replica to check status during loading, on failure and after swapping db
$replica set mykey myvalue
# Force the replica to try another full sync (this time it will have matching master replid)
$master multi
$master client kill type replica
# Fill replication backlog with new content
$master config set repl-backlog-size 16384
for {set keyid 0} {$keyid < 10} {incr keyid} {
$master set "$keyid string_$keyid" [string repeat A 16384]
}
$master exec
switch $testType {
"Aborted" {
# Set master with a slow rdb generation, so that we can easily intercept loading
# 10ms per key, with 1000 keys is 10 seconds
$master config set rdb-key-save-delay 10000
test {Diskless load swapdb (async_loading): replica enter async_loading} {
# Wait for the replica to start reading the rdb
wait_for_condition 100 100 {
[s -1 async_loading] eq 1
} else {
fail "Replica didn't get into async_loading mode"
}
assert_equal [s -1 loading] 0
}
test {Diskless load swapdb (async_loading): old database is exposed while async replication is in progress} {
# Ensure we still see old values while async_loading is in progress and also not LOADING status
assert_equal [$replica get mykey] "myvalue"
# Make sure we're still async_loading to validate previous assertion
assert_equal [s -1 async_loading] 1
# Make sure amount of replica keys didn't change
assert_equal [$replica dbsize] 2001
}
# Make sure that next sync will not start immediately so that we can catch the replica in between syncs
$master config set repl-diskless-sync-delay 5
# Kill the replica connection on the master
set killed [$master client kill type replica]
# Wait for loading to stop (fail)
wait_for_condition 100 100 {
[s -1 async_loading] eq 0
} else {
fail "Replica didn't disconnect"
}
test {Diskless load swapdb (async_loading): old database is exposed after async replication fails} {
# Ensure we see old values from replica
assert_equal [$replica get mykey] "myvalue"
# Make sure amount of replica keys didn't change
assert_equal [$replica dbsize] 2001
}
# Speed up shutdown
$master config set rdb-key-save-delay 0
}
"Successful" {
# Let replica finish sync with master
wait_for_condition 100 100 {
[s -1 master_link_status] eq "up"
} else {
fail "Master <-> Replica didn't finish sync"
}
test {Diskless load swapdb (async_loading): new database is exposed after swapping} {
# Ensure we don't see anymore the key that was stored only to replica and also that we don't get LOADING status
assert_equal [$replica GET mykey] ""
# Make sure amount of keys matches master
assert_equal [$replica dbsize] 1010
}
}
}
}
}
}
test {diskless loading short read} {
start_server {tags {"repl"}} {

View File

@ -13,39 +13,47 @@ RedisModuleType *testrdb_type = NULL;
RedisModuleString *before_str = NULL;
RedisModuleString *after_str = NULL;
void replBackupCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
/* Global values used to keep aux from db being loaded (in case of async_loading) */
RedisModuleString *before_str_temp = NULL;
RedisModuleString *after_str_temp = NULL;
/* Indicates whether there is an async replication in progress.
* We control this value from RedisModuleEvent_ReplAsyncLoad events. */
int async_loading = 0;
void replAsyncLoadCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data)
{
REDISMODULE_NOT_USED(e);
REDISMODULE_NOT_USED(data);
static RedisModuleString *before_str_backup = NULL;
static RedisModuleString *after_str_backup = NULL;
switch (sub) {
case REDISMODULE_SUBEVENT_REPL_BACKUP_CREATE:
assert(before_str_backup == NULL);
assert(after_str_backup == NULL);
before_str_backup = before_str;
after_str_backup = after_str;
before_str = NULL;
after_str = NULL;
case REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED:
assert(async_loading == 0);
async_loading = 1;
break;
case REDISMODULE_SUBEVENT_REPL_BACKUP_RESTORE:
case REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED:
/* Discard temp aux */
if (before_str_temp)
RedisModule_FreeString(ctx, before_str_temp);
if (after_str_temp)
RedisModule_FreeString(ctx, after_str_temp);
before_str_temp = NULL;
after_str_temp = NULL;
async_loading = 0;
break;
case REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED:
if (before_str)
RedisModule_FreeString(ctx, before_str);
if (after_str)
RedisModule_FreeString(ctx, after_str);
before_str = before_str_backup;
after_str = after_str_backup;
before_str_backup = NULL;
after_str_backup = NULL;
break;
case REDISMODULE_SUBEVENT_REPL_BACKUP_DISCARD:
if (before_str_backup)
RedisModule_FreeString(ctx, before_str_backup);
if (after_str_backup)
RedisModule_FreeString(ctx, after_str_backup);
before_str_backup = NULL;
after_str_backup = NULL;
before_str = before_str_temp;
after_str = after_str_temp;
before_str_temp = NULL;
after_str_temp = NULL;
async_loading = 0;
break;
default:
assert(0);
@ -105,24 +113,47 @@ int testrdb_aux_load(RedisModuleIO *rdb, int encver, int when) {
if (conf_aux_count==0) assert(0);
RedisModuleCtx *ctx = RedisModule_GetContextFromIO(rdb);
if (when == REDISMODULE_AUX_BEFORE_RDB) {
if (before_str)
RedisModule_FreeString(ctx, before_str);
before_str = NULL;
int count = RedisModule_LoadSigned(rdb);
if (RedisModule_IsIOError(rdb))
return REDISMODULE_ERR;
if (count)
before_str = RedisModule_LoadString(rdb);
if (async_loading == 0) {
if (before_str)
RedisModule_FreeString(ctx, before_str);
before_str = NULL;
int count = RedisModule_LoadSigned(rdb);
if (RedisModule_IsIOError(rdb))
return REDISMODULE_ERR;
if (count)
before_str = RedisModule_LoadString(rdb);
} else {
if (before_str_temp)
RedisModule_FreeString(ctx, before_str_temp);
before_str_temp = NULL;
int count = RedisModule_LoadSigned(rdb);
if (RedisModule_IsIOError(rdb))
return REDISMODULE_ERR;
if (count)
before_str_temp = RedisModule_LoadString(rdb);
}
} else {
if (after_str)
RedisModule_FreeString(ctx, after_str);
after_str = NULL;
int count = RedisModule_LoadSigned(rdb);
if (RedisModule_IsIOError(rdb))
return REDISMODULE_ERR;
if (count)
after_str = RedisModule_LoadString(rdb);
if (async_loading == 0) {
if (after_str)
RedisModule_FreeString(ctx, after_str);
after_str = NULL;
int count = RedisModule_LoadSigned(rdb);
if (RedisModule_IsIOError(rdb))
return REDISMODULE_ERR;
if (count)
after_str = RedisModule_LoadString(rdb);
} else {
if (after_str_temp)
RedisModule_FreeString(ctx, after_str_temp);
after_str_temp = NULL;
int count = RedisModule_LoadSigned(rdb);
if (RedisModule_IsIOError(rdb))
return REDISMODULE_ERR;
if (count)
after_str_temp = RedisModule_LoadString(rdb);
}
}
if (RedisModule_IsIOError(rdb))
return REDISMODULE_ERR;
return REDISMODULE_OK;
@ -162,6 +193,21 @@ int testrdb_get_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}
/* For purpose of testing module events, expose variable state during async_loading. */
int testrdb_async_loading_get_before(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
REDISMODULE_NOT_USED(argv);
if (argc != 1){
RedisModule_WrongArity(ctx);
return REDISMODULE_OK;
}
if (before_str_temp)
RedisModule_ReplyWithString(ctx, before_str_temp);
else
RedisModule_ReplyWithStringBuffer(ctx, "", 0);
return REDISMODULE_OK;
}
int testrdb_set_after(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
{
if (argc != 2){
@ -226,11 +272,10 @@ int testrdb_get_key(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
if (RedisModule_Init(ctx,"testrdb",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS);
RedisModule_SetModuleOptions(ctx, REDISMODULE_OPTIONS_HANDLE_IO_ERRORS | REDISMODULE_OPTIONS_HANDLE_REPL_ASYNC_LOAD);
if (argc > 0)
RedisModule_StringToLongLong(argv[0], &conf_aux_count);
@ -274,6 +319,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_CreateCommand(ctx,"testrdb.get.before", testrdb_get_before,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"testrdb.async_loading.get.before", testrdb_async_loading_get_before,"",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx,"testrdb.set.after", testrdb_set_after,"deny-oom",0,0,0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
@ -287,7 +335,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_ERR;
RedisModule_SubscribeToServerEvent(ctx,
RedisModuleEvent_ReplBackup, replBackupCallback);
RedisModuleEvent_ReplAsyncLoad, replAsyncLoadCallback);
return REDISMODULE_OK;
}
@ -297,5 +345,9 @@ int RedisModule_OnUnload(RedisModuleCtx *ctx) {
RedisModule_FreeString(ctx, before_str);
if (after_str)
RedisModule_FreeString(ctx, after_str);
if (before_str_temp)
RedisModule_FreeString(ctx, before_str_temp);
if (after_str_temp)
RedisModule_FreeString(ctx, after_str_temp);
return REDISMODULE_OK;
}

View File

@ -131,5 +131,120 @@ tags "modules" {
}
}
}
# Module events for diskless load swapdb when async_loading (matching master replid)
foreach testType {Successful Aborted} {
start_server [list overrides [list loadmodule "$testmodule 2"] tags [list external:skip]] {
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
set replica_log [srv 0 stdout]
start_server [list overrides [list loadmodule "$testmodule 2"]] {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
set start [clock clicks -milliseconds]
# Set master and replica to use diskless replication on swapdb mode
$master config set repl-diskless-sync yes
$master config set repl-diskless-sync-delay 0
$master config set save ""
$replica config set repl-diskless-load swapdb
$replica config set save ""
# Initial sync to have matching replids between master and replica
$replica replicaof $master_host $master_port
# Let replica finish initial sync with master
wait_for_condition 100 100 {
[s -1 master_link_status] eq "up"
} else {
fail "Master <-> Replica didn't finish sync"
}
# Set global values on module so we can check if module event callbacks will pick it up correctly
$master testrdb.set.before value1_master
$replica testrdb.set.before value1_replica
# Put different data sets on the master and replica
# We need to put large keys on the master since the replica replies to info only once in 2mb
$replica debug populate 200 slave 10
$master debug populate 1000 master 100000
$master config set rdbcompression no
# Force the replica to try another full sync (this time it will have matching master replid)
$master multi
$master client kill type replica
# Fill replication backlog with new content
$master config set repl-backlog-size 16384
for {set keyid 0} {$keyid < 10} {incr keyid} {
$master set "$keyid string_$keyid" [string repeat A 16384]
}
$master exec
switch $testType {
"Aborted" {
# Set master with a slow rdb generation, so that we can easily intercept loading
# 10ms per key, with 1000 keys is 10 seconds
$master config set rdb-key-save-delay 10000
test {Diskless load swapdb RedisModuleEvent_ReplAsyncLoad handling: during loading, can keep module variable same as before} {
# Wait for the replica to start reading the rdb and module for acknowledgement
# We wanna abort only after the temp db was populated by REDISMODULE_AUX_BEFORE_RDB
wait_for_condition 100 100 {
[s -1 async_loading] eq 1 && [$replica testrdb.async_loading.get.before] eq "value1_master"
} else {
fail "Module didn't receive or react to REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED"
}
assert_equal [$replica dbsize] 200
assert_equal value1_replica [$replica testrdb.get.before]
}
# Make sure that next sync will not start immediately so that we can catch the replica in between syncs
$master config set repl-diskless-sync-delay 5
# Kill the replica connection on the master
set killed [$master client kill type replica]
test {Diskless load swapdb RedisModuleEvent_ReplAsyncLoad handling: when loading aborted, can keep module variable same as before} {
# Wait for loading to stop (fail) and module for acknowledgement
wait_for_condition 100 100 {
[s -1 async_loading] eq 0 && [$replica testrdb.async_loading.get.before] eq ""
} else {
fail "Module didn't receive or react to REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED"
}
assert_equal [$replica dbsize] 200
assert_equal value1_replica [$replica testrdb.get.before]
}
# Speed up shutdown
$master config set rdb-key-save-delay 0
}
"Successful" {
# Let replica finish sync with master
wait_for_condition 100 100 {
[s -1 master_link_status] eq "up"
} else {
fail "Master <-> Replica didn't finish sync"
}
test {Diskless load swapdb RedisModuleEvent_ReplAsyncLoad handling: after db loaded, can set module variable with new value} {
assert_equal [$replica dbsize] 1010
assert_equal value1_master [$replica testrdb.get.before]
}
}
}
if {$::verbose} {
set end [clock clicks -milliseconds]
set duration [expr $end - $start]
puts "test took $duration ms"
}
}
}
}
}
}