Adjust io-threads number in runtime to fully utilize multi threads

This commit is contained in:
Lipeng Zhu 2024-01-26 04:22:42 -05:00
parent 85a834bfa2
commit 14304c58c7
3 changed files with 68 additions and 42 deletions

View File

@ -4247,7 +4247,8 @@ void *IOThreadMain(void *myid) {
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */
/* We start from main thread, no additional io-threads created yet. */
server.io_threads_active_num = 1;
/* Indicate that io-threads are currently idle */
io_threads_op = IO_THREADS_OP_IDLE;
@ -4298,44 +4299,70 @@ void killIOThreads(void) {
}
}
void startThreadedIO(void) {
serverAssert(server.io_threads_active == 0);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_unlock(&io_threads_mutex[j]);
server.io_threads_active = 1;
/* This function will active additional num I/O threads. */
static inline void activateThreadedIO(int num) {
serverAssert(num > 0);
serverAssert(server.io_threads_active_num + num <= server.io_threads_num);
for (int i = 0; i < num; i++)
pthread_mutex_unlock(&io_threads_mutex[server.io_threads_active_num + i]);
server.io_threads_active_num += num;
}
void stopThreadedIO(void) {
/* This function will inactive additional num I/O threads. */
static inline void deactivateThreadedIO(int num) {
serverAssert(num > 0);
serverAssert(server.io_threads_active_num - num >= 1);
/* We may have still clients with pending reads when this function
* is called: handle them before stopping the threads. */
handleClientsWithPendingReadsUsingThreads();
serverAssert(server.io_threads_active == 1);
for (int j = 1; j < server.io_threads_num; j++)
pthread_mutex_lock(&io_threads_mutex[j]);
server.io_threads_active = 0;
for (int i = 0; i < num; i++)
pthread_mutex_lock(&io_threads_mutex[server.io_threads_active_num -1 - i]);
server.io_threads_active_num -= num;
}
/* This function checks if there are not enough pending clients to justify
* taking the I/O threads active: in that case I/O threads are stopped if
* currently active. We track the pending writes as a measure of clients
* we need to handle in parallel, however the I/O threading is disabled
* globally for reads as well if we have too little pending clients.
/* This function will dynamically adjust the I/O threads number according to
* real workloads in runtime, currently we track the latest x rounds of
* pending writes as a measure of clients we need to handle in parallel,
* however the I/O threading is disabled globally for reads as well if
* we have too little pending clients.
*
* The function returns 0 if the I/O threading should be used because there
* are enough active threads, otherwise 1 is returned and the I/O threads
* could be possibly stopped (if already active) as a side effect. */
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
/* Return ASAP if IO threads are disabled (single threaded mode). */
#define IO_THREAD_BATCH_NUM 2
int adjustThreadedIOIfNeeded(void) {
/* Return ASAP if I/O threads are disabled (single threaded mode). */
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (server.io_threads_active) stopThreadedIO();
return 1;
int pending = listLength(server.clients_pending_write);
server.clients_pending_write_avg_num = (int)(server.clients_pending_write_avg_num * 0.8 +
pending * 0.2);
/* I/O threads number except the main thread. */
int ioThreadsNow = server.io_threads_active_num - 1;
int ioThreadsReq = server.clients_pending_write_avg_num / IO_THREAD_BATCH_NUM;
int ioThreadsCap = server.io_threads_num - server.io_threads_active_num;
if (ioThreadsReq < ioThreadsNow) {
/* The requested I/O threads is less than existing active I/O threads,
* we need to deactivate some I/O threads to make CPU more efficiency. */
if (server.io_threads_active_num > 1) {
if (ioThreadsNow > ioThreadsReq) {
int stopIOThreadsNum = ioThreadsNow - ioThreadsReq;
deactivateThreadedIO(stopIOThreadsNum);
}
if (server.io_threads_active_num == 1) return 1;
}
} else if (ioThreadsReq > ioThreadsNow) {
/* We need more I/O threads to increase the parallelism. */
if (server.io_threads_active_num < server.io_threads_num) {
int startIOThreadsNum = min(ioThreadsReq - ioThreadsNow, ioThreadsCap);
activateThreadedIO(startIOThreadsNum);
}
} else {
return 0;
if (server.io_threads_active_num == 1) return 1;
}
return 0;
}
/* This function achieves thread safety using a fan-out -> fan-in paradigm:
@ -4350,13 +4377,10 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but the boring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
if (server.io_threads_num == 1 || adjustThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!server.io_threads_active) startThreadedIO();
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
@ -4382,7 +4406,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
continue;
}
int target_id = item_id % server.io_threads_num;
int target_id = item_id % server.io_threads_active_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
@ -4390,7 +4414,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
for (int j = 1; j < server.io_threads_active_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
@ -4406,7 +4430,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
for (int j = 1; j < server.io_threads_active_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
@ -4443,7 +4467,7 @@ int handleClientsWithPendingWritesUsingThreads(void) {
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (server.io_threads_active &&
if (server.io_threads_active_num > 1 &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_BLOCKED)) &&
@ -4470,7 +4494,7 @@ int postponeClientRead(client *c) {
* it can safely perform post-processing and return to normal synchronous
* work. */
int handleClientsWithPendingReadsUsingThreads(void) {
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
if (server.io_threads_active_num == 1 || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
@ -4481,7 +4505,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
int target_id = item_id % server.io_threads_active_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
@ -4489,7 +4513,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
for (int j = 1; j < server.io_threads_active_num; j++) {
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
@ -4505,7 +4529,7 @@ int handleClientsWithPendingReadsUsingThreads(void) {
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
for (int j = 1; j < server.io_threads_active_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}

View File

@ -1592,8 +1592,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
migrateCloseTimedoutSockets();
}
/* Stop the I/O threads if we don't have enough pending work. */
stopThreadedIOIfNeeded();
/* Adjust the I/O threads according to the realtime workloads. */
adjustThreadedIOIfNeeded();
/* Resize tracking keys table if needed. This is also done at every
* command execution, but we want to be sure that if the last command
@ -2721,6 +2721,7 @@ void initServer(void) {
server.slaves = listCreate();
server.monitors = listCreate();
server.clients_pending_write = listCreate();
server.clients_pending_write_avg_num = 0;
server.clients_pending_read = listCreate();
server.clients_timeout_table = raxNew();
server.replication_allowed = 1;
@ -5644,7 +5645,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"lru_clock:%u\r\n", server.lruclock,
"executable:%s\r\n", server.executable ? server.executable : "",
"config_file:%s\r\n", server.configfile ? server.configfile : "",
"io_threads_active:%i\r\n", server.io_threads_active));
"io_threads_active_num:%i\r\n", server.io_threads_active_num));
/* Conditional properties */
if (isShutdownInitiated()) {

View File

@ -1623,6 +1623,7 @@ struct redisServer {
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *clients_pending_write; /* There is to write or install handler. */
int clients_pending_write_avg_num; /* State of last x rounds pending write requests. */
list *clients_pending_read; /* Client has pending read socket buffers. */
list *slaves, *monitors; /* List of slaves and MONITORs */
client *current_client; /* The client that triggered the command execution (External or AOF). */
@ -1648,9 +1649,9 @@ struct redisServer {
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
redisAtomic uint64_t next_client_id; /* Next client unique ID. Incremental. */
int protected_mode; /* Don't accept external connections. */
int io_threads_num; /* Number of IO threads to use. */
int io_threads_num; /* Maximum number of IO threads to use. */
int io_threads_do_reads; /* Read and parse from IO threads? */
int io_threads_active; /* Is IO threads currently active? */
int io_threads_active_num; /* Number of active IO threads to use. */
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */
int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */
@ -2683,7 +2684,7 @@ void blockingOperationEnds(void);
int handleClientsWithPendingWrites(void);
int handleClientsWithPendingWritesUsingThreads(void);
int handleClientsWithPendingReadsUsingThreads(void);
int stopThreadedIOIfNeeded(void);
int adjustThreadedIOIfNeeded(void);
int clientHasPendingReplies(client *c);
int updateClientMemUsageAndBucket(client *c);
void removeClientFromMemUsageBucket(client *c, int allow_eviction);