Change FLUSHALL/FLUSHDB SYNC to run as blocking ASYNC (#13167)

# Overview
Users utilize the `FLUSHDB SYNC` and `FLUSHALL SYNC` commands for a variety of 
reasons. The main issue with this command is that if the database becomes 
substantial in size, the server will be unresponsive for an extended period. 
Other than freezing application traffic, this may also lead some clients making 
incorrect judgments about the server's availability. For instance, a watchdog may 
erroneously decide to terminate the process, resulting in potential adverse 
outcomes. While a `FLUSH* ASYNC` can address these issues, it might not be used 
for two reasons: firstly, it's not the default, and secondly, in some cases, the 
client issuing the flush wants to wait for its completion before repopulating the 
database.

Between the option of triggering FLUSH* asynchronously in the background without 
indication for completion versus running it synchronously in the foreground by 
the main thread, there is another more appealing option. We can block the
client that requested the flush, execute the flush command in the background, and 
once done, unblock the client and return notification for completion. This approach 
ensures the server remains responsive to other clients, and the blocked client 
receives the expected response only after the flush operation has been successfully 
carried out.

# Implementation details
Instead of defining yet another flavor to the flush command, we can modify
`FLUSHALL SYNC` and `FLUSHDB SYNC` always run in this new mode.

## Extending BIO Threads capabilities
Today jobs that are carried out by BIO threads don't have the capability to 
indicate completion to the main thread. We can add this infrastructure by having
an additional dummy job, coined as completion-job, that eventually will be written 
by BIO threads to a response-queue. The main thread will take care to consume items
from the response-queue and call the provided callback function of each 
completion-job.

## FLUSH* SYNC to run as blocking ASYNC
Command `FLUSH* SYNC` will be modified to create one or more async jobs to flush
DB(s) and afterward will push additional completion-job request. By sending the
completion job request only at the end, the main thread will be called back only
after all the preceding jobs completed their task in the background. During that
time, the client of the command is suspended and marked as `BLOCKED_LAZYFREE`
whereas any other client will be able to communicate with the server without any
issue.
This commit is contained in:
Moti Cohen 2024-04-02 15:09:52 +03:00 committed by GitHub
parent ce47834309
commit 4df037962d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 352 additions and 48 deletions

135
src/bio.c
View File

@ -1,16 +1,16 @@
/* Background I/O service for Redis.
*
* This file implements operations that we need to perform in the background.
* Currently there is only a single operation, that is a background close(2)
* system call. This is needed as when the process is the last owner of a
* reference to a file closing it means unlinking it, and the deletion of the
* file is slow, blocking the server.
* Currently there are 3 operations:
* 1) a background close(2) system call. This is needed when the process is
* the last owner of a reference to a file closing it means unlinking it, and
* the deletion of the file is slow, blocking the server.
* 2) AOF fsync
* 3) lazyfree of memory
*
* In the future we'll either continue implementing new things we need or
* we'll switch to libeio. However there are probably long term uses for this
* file as we may want to put here Redis specific background tasks (for instance
* it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
* implementation).
* file as we may want to put here Redis specific background tasks.
*
* DESIGN
* ------
@ -26,8 +26,13 @@
* least-recently-inserted to the most-recently-inserted (older jobs processed
* first).
*
* Currently there is no way for the creator of the job to be notified about
* the completion of the operation, this will only be added when/if needed.
* To let the creator of the job to be notified about the completion of the
* operation, it will need to submit additional dummy job, coined as
* completion job request that will be written back eventually, by the
* background thread, into completion job response queue. This notification
* layout can simplify flows that might submit more than one job, such as
* in case of FLUSHALL which for a single command submits multiple jobs. It
* is also correct because jobs are processed in FIFO fashion.
*
* ----------------------------------------------------------------------------
*
@ -38,9 +43,9 @@
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*/
#include "server.h"
#include "bio.h"
#include <fcntl.h>
static char* bio_worker_title[] = {
"bio_close_file",
@ -55,6 +60,9 @@ static unsigned int bio_job_to_worker[] = {
[BIO_AOF_FSYNC] = 1,
[BIO_CLOSE_AOF] = 1,
[BIO_LAZY_FREE] = 2,
[BIO_COMP_RQ_CLOSE_FILE] = 0,
[BIO_COMP_RQ_AOF_FSYNC] = 1,
[BIO_COMP_RQ_LAZY_FREE] = 2
};
static pthread_t bio_threads[BIO_WORKER_NUM];
@ -63,6 +71,18 @@ static pthread_cond_t bio_newjob_cond[BIO_WORKER_NUM];
static list *bio_jobs[BIO_WORKER_NUM];
static unsigned long bio_jobs_counter[BIO_NUM_OPS] = {0};
/* The bio_comp_list is used to hold completion job responses and to handover
* to main thread to callback as notification for job completion. Main
* thread will be triggered to read the list by signaling via writing to a pipe */
static list *bio_comp_list;
static pthread_mutex_t bio_mutex_comp;
static int job_comp_pipe[2]; /* Pipe used to awake the event loop */
typedef struct bio_comp_item {
comp_fn *func; /* callback after completion job will be processed */
uint64_t arg; /* user data to be passed to the function */
} bio_comp_item;
/* This structure represents a background Job. It is only used locally to this
* file as the API does not expose the internals at all. */
typedef union bio_job {
@ -86,9 +106,15 @@ typedef union bio_job {
lazy_free_fn *free_fn; /* Function that will free the provided arguments */
void *free_args[]; /* List of arguments to be passed to the free function */
} free_args;
struct {
int type; /* header */
comp_fn *fn; /* callback. Handover to main thread to cb as notify for job completion */
uint64_t arg; /* callback arguments */
} comp_rq;
} bio_job;
void *bioProcessBackgroundJobs(void *arg);
void bioPipeReadJobCompList(aeEventLoop *el, int fd, void *privdata, int mask);
/* Make sure we have enough stack to perform all the things we do in the
* main thread. */
@ -108,6 +134,27 @@ void bioInit(void) {
bio_jobs[j] = listCreate();
}
/* init jobs comp responses */
bio_comp_list = listCreate();
pthread_mutex_init(&bio_mutex_comp, NULL);
/* Create a pipe for background thread to be able to wake up the redis main thread.
* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half.
* Enable close-on-exec flag on pipes in case of the fork-exec system calls in
* sentinels or redis servers. */
if (anetPipe(job_comp_pipe, O_CLOEXEC|O_NONBLOCK, O_CLOEXEC|O_NONBLOCK) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for bio thread: %s", strerror(errno));
exit(1);
}
/* Register a readable event for the pipe used to awake the event loop on job completion */
if (aeCreateFileEvent(server.el, job_comp_pipe[0], AE_READABLE,
bioPipeReadJobCompList, NULL) == AE_ERR) {
serverPanic("Error registering the readable event for the bio pipe.");
}
/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
@ -153,6 +200,28 @@ void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
bioSubmitJob(BIO_LAZY_FREE, job);
}
void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data) {
int type;
switch (assigned_worker) {
case BIO_WORKER_CLOSE_FILE:
type = BIO_COMP_RQ_CLOSE_FILE;
break;
case BIO_WORKER_AOF_FSYNC:
type = BIO_COMP_RQ_AOF_FSYNC;
break;
case BIO_WORKER_LAZY_FREE:
type = BIO_COMP_RQ_LAZY_FREE;
break;
default:
serverPanic("Invalid worker type in bioCreateCompRq().");
}
bio_job *job = zmalloc(sizeof(*job));
job->comp_rq.fn = func;
job->comp_rq.arg = user_data;
bioSubmitJob(type, job);
}
void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache) {
bio_job *job = zmalloc(sizeof(*job));
job->fd_args.fd = fd;
@ -264,6 +333,21 @@ void *bioProcessBackgroundJobs(void *arg) {
close(job->fd_args.fd);
} else if (job_type == BIO_LAZY_FREE) {
job->free_args.free_fn(job->free_args.free_args);
} else if ((job_type == BIO_COMP_RQ_CLOSE_FILE) ||
(job_type == BIO_COMP_RQ_AOF_FSYNC) ||
(job_type == BIO_COMP_RQ_LAZY_FREE)) {
bio_comp_item *comp_rsp = zmalloc(sizeof(bio_comp_item));
comp_rsp->func = job->comp_rq.fn;
comp_rsp->arg = job->comp_rq.arg;
/* just write it to completion job responses */
pthread_mutex_lock(&bio_mutex_comp);
listAddNodeTail(bio_comp_list, comp_rsp);
pthread_mutex_unlock(&bio_mutex_comp);
if (write(job_comp_pipe[1],"A",1) != 1) {
/* Pipe is non-blocking, write() may fail if it's full. */
}
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
@ -322,3 +406,34 @@ void bioKillThreads(void) {
}
}
}
void bioPipeReadJobCompList(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
char buf[128];
list *tmp_list = NULL;
while (read(fd, buf, sizeof(buf)) == sizeof(buf));
/* Handle event loop events if pipe was written from event loop API */
pthread_mutex_lock(&bio_mutex_comp);
if (listLength(bio_comp_list)) {
tmp_list = bio_comp_list;
bio_comp_list = listCreate();
}
pthread_mutex_unlock(&bio_mutex_comp);
if (!tmp_list) return;
/* callback to all job completions */
while (listLength(tmp_list)) {
listNode *ln = listFirst(tmp_list);
bio_comp_item *rsp = ln->value;
listDelNode(tmp_list, ln);
rsp->func(rsp->arg);
zfree(rsp);
}
listRelease(tmp_list);
}

View File

@ -10,6 +10,26 @@
#define __BIO_H
typedef void lazy_free_fn(void *args[]);
typedef void comp_fn(uint64_t user_data);
typedef enum bio_worker_t {
BIO_WORKER_CLOSE_FILE = 0,
BIO_WORKER_AOF_FSYNC,
BIO_WORKER_LAZY_FREE,
BIO_WORKER_NUM
} bio_worker_t;
/* Background job opcodes */
typedef enum bio_job_type_t {
BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */
BIO_AOF_FSYNC, /* Deferred AOF fsync. */
BIO_LAZY_FREE, /* Deferred objects freeing. */
BIO_CLOSE_AOF,
BIO_COMP_RQ_CLOSE_FILE, /* Job completion request, registered on close-file worker's queue */
BIO_COMP_RQ_AOF_FSYNC, /* Job completion request, registered on aof-fsync worker's queue */
BIO_COMP_RQ_LAZY_FREE, /* Job completion request, registered on lazy-free worker's queue */
BIO_NUM_OPS
} bio_job_type_t;
/* Exported API */
void bioInit(void);
@ -20,14 +40,7 @@ void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache);
void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data);
/* Background job opcodes */
enum {
BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */
BIO_AOF_FSYNC, /* Deferred AOF fsync. */
BIO_LAZY_FREE, /* Deferred objects freeing. */
BIO_CLOSE_AOF, /* Deferred close for AOF files. */
BIO_NUM_OPS
};
#endif

View File

@ -68,6 +68,7 @@ void blockClient(client *c, int btype) {
/* Master client should never be blocked unless pause or module */
serverAssert(!(c->flags & CLIENT_MASTER &&
btype != BLOCKED_MODULE &&
btype != BLOCKED_LAZYFREE &&
btype != BLOCKED_POSTPONE));
c->flags |= CLIENT_BLOCKED;
@ -175,6 +176,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
c->postponed_list_node = NULL;
} else if (c->bstate.btype == BLOCKED_SHUTDOWN) {
/* No special cleanup. */
} else if (c->bstate.btype == BLOCKED_LAZYFREE) {
/* No special cleanup. */
} else {
serverPanic("Unknown btype in unblockClient().");
}
@ -206,7 +209,9 @@ void unblockClient(client *c, int queue_for_reprocessing) {
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->bstate.btype == BLOCKED_LIST ||
if (c->bstate.btype == BLOCKED_LAZYFREE) {
addReply(c, shared.ok); /* No reason lazy-free to fail */
} else if (c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
addReplyNullArray(c);
@ -263,9 +268,16 @@ void disconnectAllBlockedClients(void) {
if (c->bstate.btype == BLOCKED_POSTPONE)
continue;
unblockClientOnError(c,
"-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
if (c->bstate.btype == BLOCKED_LAZYFREE) {
addReply(c, shared.ok); /* No reason lazy-free to fail */
c->flags &= ~CLIENT_PENDING_COMMAND;
unblockClient(c, 1);
} else {
unblockClientOnError(c,
"-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
}
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}

107
src/db.c
View File

@ -15,6 +15,7 @@
#include <signal.h>
#include <ctype.h>
#include "bio.h"
/*-----------------------------------------------------------------------------
* C-level DB API
@ -667,50 +668,110 @@ void flushAllDataAndResetRDB(int flags) {
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchronous. */
if (!(flags & EMPTYDB_ASYNC))
if (!(flags & EMPTYDB_ASYNC)) {
/* Only clear the current thread cache.
* Ignore the return call since this will fail if the tcache is disabled. */
je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
jemalloc_purge();
}
#endif
}
/* FLUSHDB [ASYNC]
*
* Flushes the currently SELECTed Redis DB. */
void flushdbCommand(client *c) {
/* Optimized FLUSHALL\FLUSHDB SYNC command finished to run by lazyfree thread */
void flushallSyncBgDone(uint64_t client_id) {
client *c = lookupClientByID(client_id);
/* Verify that client still exists */
if (!c) return;
/* Update current_client (Called functions might rely on it) */
client *old_client = server.current_client;
server.current_client = c;
/* Don't update blocked_us since command was processed in bg by lazy_free thread */
updateStatsOnUnblock(c, 0 /*blocked_us*/, elapsedUs(c->bstate.lazyfreeStartTime), 0);
/* lazyfree bg job always succeed */
addReply(c, shared.ok);
/* mark client as unblocked */
unblockClient(c, 1);
/* FLUSH command is finished. resetClient() and update replication offset. */
commandProcessed(c);
/* On flush completion, update the client's memory */
updateClientMemUsageAndBucket(c);
/* restore current_client */
server.current_client = old_client;
}
void flushCommandCommon(client *c, int isFlushAll) {
int blocking_async = 0; /* FLUSHALL\FLUSHDB SYNC opt to run as blocking ASYNC */
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
/* flushdb should not flush the functions */
server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL);
/* Without the forceCommandPropagation, when DB was already empty,
* FLUSHDB will not be replicated nor put into the AOF. */
/* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */
if ((!(flags & EMPTYDB_ASYNC)) && (!(c->flags & CLIENT_AVOID_BLOCKING_ASYNC_FLUSH))) {
/* Run as ASYNC */
flags |= EMPTYDB_ASYNC;
blocking_async = 1;
}
if (isFlushAll)
flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
else
server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL);
/* Without the forceCommandPropagation, when DB(s) was already empty,
* FLUSHALL\FLUSHDB will not be replicated nor put into the AOF. */
forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
addReply(c,shared.ok);
/* if blocking ASYNC, block client and add completion job request to BIO lazyfree
* worker's queue. To be called and reply with OK only after all preceding pending
* lazyfree jobs in queue were processed */
if (blocking_async) {
/* measure bg job till completion as elapsed time of flush command */
elapsedStart(&c->bstate.lazyfreeStartTime);
c->bstate.timeout = 0;
blockClient(c,BLOCKED_LAZYFREE);
bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id);
} else {
addReply(c, shared.ok);
}
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchronous. */
if (!(flags & EMPTYDB_ASYNC))
* harm and this way the flush and purge will be synchronous.
*
* Take care purge only FLUSHDB for sync flow. FLUSHALL sync flow already
* applied at flushAllDataAndResetRDB. Async flow will apply only later on */
if ((!isFlushAll) && (!(flags & EMPTYDB_ASYNC))) {
/* Only clear the current thread cache.
* Ignore the return call since this will fail if the tcache is disabled. */
je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
jemalloc_purge();
}
#endif
}
/* FLUSHALL [ASYNC]
/* FLUSHALL [SYNC|ASYNC]
*
* Flushes the whole server data set. */
void flushallCommand(client *c) {
int flags;
if (getFlushCommandFlags(c,&flags) == C_ERR) return;
/* flushall should not flush the functions */
flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
flushCommandCommon(c, 1);
}
/* Without the forceCommandPropagation, when DBs were already empty,
* FLUSHALL will not be replicated nor put into the AOF. */
forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
addReply(c,shared.ok);
/* FLUSHDB [SYNC|ASYNC]
*
* Flushes the currently SELECTed Redis DB. */
void flushdbCommand(client *c) {
flushCommandCommon(c, 0);
}
/* This command implements DEL and UNLINK. */

View File

@ -28,6 +28,14 @@ void lazyfreeFreeDatabase(void *args[]) {
kvstoreRelease(da2);
atomicDecr(lazyfree_objects,numkeys);
atomicIncr(lazyfreed_objects,numkeys);
#if defined(USE_JEMALLOC)
/* Only clear the current thread cache.
* Ignore the return call since this will fail if the tcache is disabled. */
je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
jemalloc_purge();
#endif
}
/* Release the key tracking table. */

View File

@ -382,6 +382,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
/* Any flag that does not let optimize FLUSH SYNC to run it in bg as blocking client ASYNC */
#define CLIENT_AVOID_BLOCKING_ASYNC_FLUSH (CLIENT_DENY_BLOCKING|CLIENT_MULTI|CLIENT_LUA_DEBUG|CLIENT_LUA_DEBUG_SYNC|CLIENT_MODULE)
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
typedef enum blocking_type {
@ -394,6 +397,7 @@ typedef enum blocking_type {
BLOCKED_ZSET, /* BZPOP et al. */
BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */
BLOCKED_SHUTDOWN, /* SHUTDOWN. */
BLOCKED_LAZYFREE, /* LAZYFREE */
BLOCKED_NUM, /* Number of blocked states. */
BLOCKED_END /* End of enumeration */
} blocking_type;
@ -654,7 +658,7 @@ typedef enum {
#define serverAssert(_e) (likely(_e)?(void)0 : (_serverAssert(#_e,__FILE__,__LINE__),redis_unreachable()))
#define serverPanic(...) _serverPanic(__FILE__,__LINE__,__VA_ARGS__),redis_unreachable()
/* The following macros provide assertions that are only executed during test builds and should be used to add
/* The following macros provide assertions that are only executed during test builds and should be used to add
* assertions that are too computationally expensive or dangerous to run during normal operations. */
#ifdef DEBUG_ASSERTIONS
#define debugServerAssertWithInfo(...) serverAssertWithInfo(__VA_ARGS__)
@ -1027,6 +1031,9 @@ typedef struct blockingState {
void *async_rm_call_handle; /* RedisModuleAsyncRMCallPromise structure.
which is opaque for the Redis core, only
handled in module.c. */
/* BLOCKED_LAZYFREE */
monotime lazyfreeStartTime;
} blockingState;
/* The following structure represents a node in the server.ready_keys list,
@ -1305,7 +1312,7 @@ struct sharedObjectsStruct {
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
*rpop, *lpop, *lpush, *rpoplpush, *lmove, *blmove, *zpopmin, *zpopmax,
*emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim,
*emptyscan, *multi, *exec, *left, *right, *hset, *srem, *xgroup, *xclaim,
*script, *replconf, *eval, *persist, *set, *pexpireat, *pexpire,
*time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
@ -1709,7 +1716,7 @@ struct redisServer {
long long el_cmd_cnt_max;
/* The sum of active-expire, active-defrag and all other tasks done by cron and beforeSleep,
but excluding read, write and AOF, which are counted by other sets of metrics. */
monotime el_cron_duration;
monotime el_cron_duration;
durationStats duration_stats[EL_DURATION_TYPE_NUM];
/* Configuration */
@ -3023,6 +3030,7 @@ size_t freeMemoryGetNotCountedMemory(void);
int overMaxmemoryAfterAlloc(size_t moremem);
uint64_t getCommandFlags(client *c);
int processCommand(client *c);
void commandProcessed(client *c);
int processPendingCommandAndInputBuffer(client *c);
int processCommandAndResetClient(client *c);
void setupSignalHandlers(void);

View File

@ -87,4 +87,91 @@ start_server {tags {"lazyfree"}} {
}
assert_equal [s lazyfreed_objects] 0
} {} {needs:config-resetstat}
test "FLUSHALL SYNC optimized to run in bg as blocking FLUSHALL ASYNC" {
set num_keys 1000
r config resetstat
# Verify at start there are no lazyfree pending objects
assert_equal [s lazyfree_pending_objects] 0
# Fillup DB with items
populate $num_keys
# Run FLUSHALL SYNC command, optimized as blocking ASYNC
r flushall
# Verify all keys counted as lazyfreed
assert_equal [s lazyfreed_objects] $num_keys
}
test "Run consecutive blocking FLUSHALL ASYNC successfully" {
r config resetstat
set rd [redis_deferring_client]
# Fillup DB with items
r set x 1
r set y 2
$rd write "FLUSHALL\r\nFLUSHALL\r\nFLUSHDB\r\n"
$rd flush
assert_equal [$rd read] {OK}
assert_equal [$rd read] {OK}
assert_equal [$rd read] {OK}
assert_equal [s lazyfreed_objects] 2
$rd close
}
test "FLUSHALL SYNC in MULTI not optimized to run as blocking FLUSHALL ASYNC" {
r config resetstat
# Fillup DB with items
r set x 11
r set y 22
# FLUSHALL SYNC in multi
r multi
r flushall
r exec
# Verify flushall not run as lazyfree
assert_equal [s lazyfree_pending_objects] 0
assert_equal [s lazyfreed_objects] 0
}
test "Client closed in the middle of blocking FLUSHALL ASYNC" {
set num_keys 100000
r config resetstat
# Fillup DB with items
populate $num_keys
# close client in the middle of ongoing Blocking FLUSHALL ASYNC
set rd [redis_deferring_client]
$rd flushall
$rd close
# Wait to verify all keys counted as lazyfreed
wait_for_condition 50 100 {
[s lazyfreed_objects] == $num_keys
} else {
fail "Unexpected number of lazyfreed_objects: [s lazyfreed_objects]"
}
}
test "Pending commands in querybuf processed once unblocking FLUSHALL ASYNC" {
r config resetstat
set rd [redis_deferring_client]
# Fillup DB with items
r set x 1
r set y 2
$rd write "FLUSHALL\r\nPING\r\n"
$rd flush
assert_equal [$rd read] {OK}
assert_equal [$rd read] {PONG}
assert_equal [s lazyfreed_objects] 2
$rd close
}
}