From bb1de082eac26d5242733eb0b40959bd9de2e15b Mon Sep 17 00:00:00 2001 From: DarrenJiang13 Date: Tue, 31 May 2022 13:07:33 +0800 Subject: [PATCH] Adds isolated netstats for replication. (#10062) The amount of `server.stat_net_output_bytes/server.stat_net_input_bytes` is actually the sum of replication flow and users' data flow. It may cause confusions like this: "Why does my server get such a large output_bytes while I am doing nothing? ". After discussions and revisions, now here is the change about what this PR brings (final version before merge): - 2 server variables to count the network bytes during replication, including fullsync and propagate bytes. - `server.stat_net_repl_output_bytes`/`server.stat_net_repl_input_bytes` - 3 info fields to print the input and output of repl bytes and instantaneous value of total repl bytes. - `total_net_repl_input_bytes` / `total_net_repl_output_bytes` - `instantaneous_repl_total_kbps` - 1 new API `rioCheckType()` to check the type of rio. So we can use this to distinguish between diskless and diskbased replication - 2 new counting items to keep network statistics consistent between master and slave - rdb portion during diskless replica. in `rdbLoadProgressCallback()` - first line of the full sync payload. in `readSyncBulkPayload()` Co-authored-by: Oran Agra --- src/networking.c | 17 ++++++++++++++--- src/rdb.c | 3 +++ src/replication.c | 21 +++++++++++++-------- src/rio.c | 14 ++++++++++++++ src/rio.h | 7 ++++++- src/server.c | 24 ++++++++++++++++++++---- src/server.h | 7 +++++-- 7 files changed, 75 insertions(+), 18 deletions(-) diff --git a/src/networking.c b/src/networking.c index 1260be067..c1d6aa4bd 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1953,7 +1953,13 @@ int writeToClient(client *c, int handler_installed) { zmalloc_used_memory() < server.maxmemory) && !(c->flags & CLIENT_SLAVE)) break; } - atomicIncr(server.stat_net_output_bytes, totwritten); + + if (getClientType(c) == CLIENT_TYPE_SLAVE) { + atomicIncr(server.stat_net_repl_output_bytes, totwritten); + } else { + atomicIncr(server.stat_net_output_bytes, totwritten); + } + if (nwritten == -1) { if (connGetState(c->conn) != CONN_STATE_CONNECTED) { serverLog(LL_VERBOSE, @@ -2655,8 +2661,13 @@ void readQueryFromClient(connection *conn) { if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->lastinteraction = server.unixtime; - if (c->flags & CLIENT_MASTER) c->read_reploff += nread; - atomicIncr(server.stat_net_input_bytes, nread); + if (c->flags & CLIENT_MASTER) { + c->read_reploff += nread; + atomicIncr(server.stat_net_repl_input_bytes, nread); + } else { + atomicIncr(server.stat_net_input_bytes, nread); + } + if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); diff --git a/src/rdb.c b/src/rdb.c index 62ec5bbb2..1faecd2f3 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2782,6 +2782,9 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { processEventsWhileBlocked(); processModuleLoadingProgressEvent(0); } + if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) { + atomicIncr(server.stat_net_repl_input_bytes, len); + } } /* Save the given functions_ctx to the rdb. diff --git a/src/replication.c b/src/replication.c index 87b5c138c..385efcf6c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1358,7 +1358,7 @@ void sendBulkToSlave(connection *conn) { freeClient(slave); return; } - atomicIncr(server.stat_net_output_bytes, nwritten); + atomicIncr(server.stat_net_repl_output_bytes, nwritten); sdsrange(slave->replpreamble,nwritten,-1); if (sdslen(slave->replpreamble) == 0) { sdsfree(slave->replpreamble); @@ -1387,7 +1387,7 @@ void sendBulkToSlave(connection *conn) { return; } slave->repldboff += nwritten; - atomicIncr(server.stat_net_output_bytes, nwritten); + atomicIncr(server.stat_net_repl_output_bytes, nwritten); if (slave->repldboff == slave->repldbsize) { close(slave->repldbfd); slave->repldbfd = -1; @@ -1419,7 +1419,7 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { void rdbPipeWriteHandler(struct connection *conn) { serverAssert(server.rdb_pipe_bufflen>0); client *slave = connGetPrivateData(conn); - int nwritten; + ssize_t nwritten; if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff, server.rdb_pipe_bufflen - slave->repldboff)) == -1) { @@ -1431,7 +1431,7 @@ void rdbPipeWriteHandler(struct connection *conn) { return; } else { slave->repldboff += nwritten; - atomicIncr(server.stat_net_output_bytes, nwritten); + atomicIncr(server.stat_net_repl_output_bytes, nwritten); if (slave->repldboff < server.rdb_pipe_bufflen) { slave->repl_last_partial_write = server.unixtime; return; /* more data to write.. */ @@ -1491,7 +1491,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int stillAlive = 0; for (i=0; i < server.rdb_pipe_numconns; i++) { - int nwritten; + ssize_t nwritten; connection *conn = server.rdb_pipe_conns[i]; if (!conn) continue; @@ -1511,7 +1511,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, /* Note: when use diskless replication, 'repldboff' is the offset * of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */ slave->repldboff = nwritten; - atomicIncr(server.stat_net_output_bytes, nwritten); + atomicIncr(server.stat_net_repl_output_bytes, nwritten); } /* If we were unable to write all the data to one of the replicas, * setup write handler (and disable pipe read handler, below) */ @@ -1817,11 +1817,16 @@ void readSyncBulkPayload(connection *conn) { /* If repl_transfer_size == -1 we still have to read the bulk length * from the master reply. */ if (server.repl_transfer_size == -1) { - if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) { + nread = connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000); + if (nread == -1) { serverLog(LL_WARNING, "I/O error reading bulk count from MASTER: %s", strerror(errno)); goto error; + } else { + /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and + * convert "\r\n" to '\0' so 1 byte is lost. */ + atomicIncr(server.stat_net_repl_input_bytes, nread+1); } if (buf[0] == '-') { @@ -1892,7 +1897,7 @@ void readSyncBulkPayload(connection *conn) { cancelReplicationHandshake(1); return; } - atomicIncr(server.stat_net_input_bytes, nread); + atomicIncr(server.stat_net_repl_input_bytes, nread); /* When a mark is used, we want to detect EOF asap in order to avoid * writing the EOF mark into the file... */ diff --git a/src/rio.c b/src/rio.c index f99913152..bcda3767b 100644 --- a/src/rio.c +++ b/src/rio.c @@ -438,6 +438,20 @@ void rioSetAutoSync(rio *r, off_t bytes) { r->io.file.autosync = bytes; } +/* Check the type of rio. */ +uint8_t rioCheckType(rio *r) { + if (r->read == rioFileRead) { + return RIO_TYPE_FILE; + } else if (r->read == rioBufferRead) { + return RIO_TYPE_BUFFER; + } else if (r->read == rioConnRead) { + return RIO_TYPE_CONN; + } else { + /* r->read == rioFdRead */ + return RIO_TYPE_FD; + } +} + /* --------------------------- Higher level interface -------------------------- * * The following higher level functions use lower level rio.c functions to help diff --git a/src/rio.h b/src/rio.h index 9576335e8..51738366a 100644 --- a/src/rio.h +++ b/src/rio.h @@ -40,6 +40,11 @@ #define RIO_FLAG_READ_ERROR (1<<0) #define RIO_FLAG_WRITE_ERROR (1<<1) +#define RIO_TYPE_FILE (1<<0) +#define RIO_TYPE_BUFFER (1<<1) +#define RIO_TYPE_CONN (1<<2) +#define RIO_TYPE_FD (1<<3) + struct _rio { /* Backend functions. * Since this functions do not tolerate short writes or reads the return @@ -174,5 +179,5 @@ int rioWriteBulkObject(rio *r, struct redisObject *obj); void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len); void rioSetAutoSync(rio *r, off_t bytes); - +uint8_t rioCheckType(rio *r); #endif diff --git a/src/server.c b/src/server.c index 75dbab0d2..231ec5648 100644 --- a/src/server.c +++ b/src/server.c @@ -1188,14 +1188,19 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { run_with_period(100) { long long stat_net_input_bytes, stat_net_output_bytes; + long long stat_net_repl_input_bytes, stat_net_repl_output_bytes; atomicGet(server.stat_net_input_bytes, stat_net_input_bytes); atomicGet(server.stat_net_output_bytes, stat_net_output_bytes); + atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes); + atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes); trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands); trackInstantaneousMetric(STATS_METRIC_NET_INPUT, - stat_net_input_bytes); + stat_net_input_bytes + stat_net_repl_input_bytes); trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, - stat_net_output_bytes); + stat_net_output_bytes + stat_net_repl_output_bytes); + trackInstantaneousMetric(STATS_METRIC_NET_TOTAL_REPLICATION, + stat_net_repl_input_bytes + stat_net_repl_output_bytes); } /* We have just LRU_BITS bits per object for LRU information. @@ -2356,6 +2361,8 @@ void resetServerStats(void) { server.stat_aofrw_consecutive_failures = 0; atomicSet(server.stat_net_input_bytes, 0); atomicSet(server.stat_net_output_bytes, 0); + atomicSet(server.stat_net_repl_input_bytes, 0); + atomicSet(server.stat_net_repl_output_bytes, 0); server.stat_unexpected_error_replies = 0; server.stat_total_error_replies = 0; server.stat_dump_payload_sanitizations = 0; @@ -5576,6 +5583,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { if (all_sections || (dictFind(section_dict,"stats") != NULL)) { long long stat_total_reads_processed, stat_total_writes_processed; long long stat_net_input_bytes, stat_net_output_bytes; + long long stat_net_repl_input_bytes, stat_net_repl_output_bytes; long long current_eviction_exceeded_time = server.stat_last_eviction_exceeded_time ? (long long) elapsedUs(server.stat_last_eviction_exceeded_time): 0; long long current_active_defrag_time = server.stat_last_active_defrag_time ? @@ -5584,6 +5592,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { atomicGet(server.stat_total_writes_processed, stat_total_writes_processed); atomicGet(server.stat_net_input_bytes, stat_net_input_bytes); atomicGet(server.stat_net_output_bytes, stat_net_output_bytes); + atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes); + atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes); if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, @@ -5593,8 +5603,11 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "instantaneous_ops_per_sec:%lld\r\n" "total_net_input_bytes:%lld\r\n" "total_net_output_bytes:%lld\r\n" + "total_net_repl_input_bytes:%lld\r\n" + "total_net_repl_output_bytes:%lld\r\n" "instantaneous_input_kbps:%.2f\r\n" "instantaneous_output_kbps:%.2f\r\n" + "instantaneous_repl_total_kbps:%.2f\r\n" "rejected_connections:%lld\r\n" "sync_full:%lld\r\n" "sync_partial_ok:%lld\r\n" @@ -5636,10 +5649,13 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), - stat_net_input_bytes, - stat_net_output_bytes, + stat_net_input_bytes + stat_net_repl_input_bytes, + stat_net_output_bytes + stat_net_repl_output_bytes, + stat_net_repl_input_bytes, + stat_net_repl_output_bytes, (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024, (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024, + (float)getInstantaneousMetric(STATS_METRIC_NET_TOTAL_REPLICATION)/1024, server.stat_rejected_conn, server.stat_sync_full, server.stat_sync_partial_ok, diff --git a/src/server.h b/src/server.h index b34a84cd7..e76053513 100644 --- a/src/server.h +++ b/src/server.h @@ -155,9 +155,10 @@ typedef long long ustime_t; /* microsecond time type. */ /* Instantaneous metrics tracking. */ #define STATS_METRIC_SAMPLES 16 /* Number of samples per metric. */ #define STATS_METRIC_COMMAND 0 /* Number of commands executed. */ -#define STATS_METRIC_NET_INPUT 1 /* Bytes read to network .*/ +#define STATS_METRIC_NET_INPUT 1 /* Bytes read to network. */ #define STATS_METRIC_NET_OUTPUT 2 /* Bytes written to network. */ -#define STATS_METRIC_COUNT 3 +#define STATS_METRIC_NET_TOTAL_REPLICATION 3 /* Bytes written and read to network during replication. */ +#define STATS_METRIC_COUNT 4 /* Protocol and I/O related defines */ #define PROTO_IOBUF_LEN (1024*16) /* Generic I/O buffer size */ @@ -1586,6 +1587,8 @@ struct redisServer { struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ redisAtomic long long stat_net_input_bytes; /* Bytes read from network. */ redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */ + redisAtomic long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */ + redisAtomic long long stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */ size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */ size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */ monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */