Adds isolated netstats for replication. (#10062)

The amount of `server.stat_net_output_bytes/server.stat_net_input_bytes`
is actually the sum of replication flow and users' data flow. 
It may cause confusions like this:
"Why does my server get such a large output_bytes while I am doing nothing? ". 

After discussions and revisions, now here is the change about what this
PR brings (final version before merge):
- 2 server variables to count the network bytes during replication,
     including fullsync and propagate bytes.
     - `server.stat_net_repl_output_bytes`/`server.stat_net_repl_input_bytes`
- 3 info fields to print the input and output of repl bytes and instantaneous
     value of total repl bytes.
     - `total_net_repl_input_bytes` / `total_net_repl_output_bytes`
     - `instantaneous_repl_total_kbps`
- 1 new API `rioCheckType()` to check the type of rio. So we can use this
     to distinguish between diskless and diskbased replication
- 2 new counting items to keep network statistics consistent between master
     and slave
    - rdb portion during diskless replica. in `rdbLoadProgressCallback()`
    - first line of the full sync payload. in `readSyncBulkPayload()`

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
DarrenJiang13 2022-05-31 13:07:33 +08:00 committed by GitHub
parent 4065b4f27e
commit bb1de082ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 75 additions and 18 deletions

View File

@ -1953,7 +1953,13 @@ int writeToClient(client *c, int handler_installed) {
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
atomicIncr(server.stat_net_output_bytes, totwritten);
if (getClientType(c) == CLIENT_TYPE_SLAVE) {
atomicIncr(server.stat_net_repl_output_bytes, totwritten);
} else {
atomicIncr(server.stat_net_output_bytes, totwritten);
}
if (nwritten == -1) {
if (connGetState(c->conn) != CONN_STATE_CONNECTED) {
serverLog(LL_VERBOSE,
@ -2655,8 +2661,13 @@ void readQueryFromClient(connection *conn) {
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
atomicIncr(server.stat_net_input_bytes, nread);
if (c->flags & CLIENT_MASTER) {
c->read_reploff += nread;
atomicIncr(server.stat_net_repl_input_bytes, nread);
} else {
atomicIncr(server.stat_net_input_bytes, nread);
}
if (!(c->flags & CLIENT_MASTER) && sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

View File

@ -2782,6 +2782,9 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
processEventsWhileBlocked();
processModuleLoadingProgressEvent(0);
}
if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) {
atomicIncr(server.stat_net_repl_input_bytes, len);
}
}
/* Save the given functions_ctx to the rdb.

View File

@ -1358,7 +1358,7 @@ void sendBulkToSlave(connection *conn) {
freeClient(slave);
return;
}
atomicIncr(server.stat_net_output_bytes, nwritten);
atomicIncr(server.stat_net_repl_output_bytes, nwritten);
sdsrange(slave->replpreamble,nwritten,-1);
if (sdslen(slave->replpreamble) == 0) {
sdsfree(slave->replpreamble);
@ -1387,7 +1387,7 @@ void sendBulkToSlave(connection *conn) {
return;
}
slave->repldboff += nwritten;
atomicIncr(server.stat_net_output_bytes, nwritten);
atomicIncr(server.stat_net_repl_output_bytes, nwritten);
if (slave->repldboff == slave->repldbsize) {
close(slave->repldbfd);
slave->repldbfd = -1;
@ -1419,7 +1419,7 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
void rdbPipeWriteHandler(struct connection *conn) {
serverAssert(server.rdb_pipe_bufflen>0);
client *slave = connGetPrivateData(conn);
int nwritten;
ssize_t nwritten;
if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff,
server.rdb_pipe_bufflen - slave->repldboff)) == -1)
{
@ -1431,7 +1431,7 @@ void rdbPipeWriteHandler(struct connection *conn) {
return;
} else {
slave->repldboff += nwritten;
atomicIncr(server.stat_net_output_bytes, nwritten);
atomicIncr(server.stat_net_repl_output_bytes, nwritten);
if (slave->repldboff < server.rdb_pipe_bufflen) {
slave->repl_last_partial_write = server.unixtime;
return; /* more data to write.. */
@ -1491,7 +1491,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
int stillAlive = 0;
for (i=0; i < server.rdb_pipe_numconns; i++)
{
int nwritten;
ssize_t nwritten;
connection *conn = server.rdb_pipe_conns[i];
if (!conn)
continue;
@ -1511,7 +1511,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
/* Note: when use diskless replication, 'repldboff' is the offset
* of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */
slave->repldboff = nwritten;
atomicIncr(server.stat_net_output_bytes, nwritten);
atomicIncr(server.stat_net_repl_output_bytes, nwritten);
}
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
@ -1817,11 +1817,16 @@ void readSyncBulkPayload(connection *conn) {
/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
if (server.repl_transfer_size == -1) {
if (connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000) == -1) {
nread = connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000);
if (nread == -1) {
serverLog(LL_WARNING,
"I/O error reading bulk count from MASTER: %s",
strerror(errno));
goto error;
} else {
/* nread here is returned by connSyncReadLine(), which calls syncReadLine() and
* convert "\r\n" to '\0' so 1 byte is lost. */
atomicIncr(server.stat_net_repl_input_bytes, nread+1);
}
if (buf[0] == '-') {
@ -1892,7 +1897,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake(1);
return;
}
atomicIncr(server.stat_net_input_bytes, nread);
atomicIncr(server.stat_net_repl_input_bytes, nread);
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */

View File

@ -438,6 +438,20 @@ void rioSetAutoSync(rio *r, off_t bytes) {
r->io.file.autosync = bytes;
}
/* Check the type of rio. */
uint8_t rioCheckType(rio *r) {
if (r->read == rioFileRead) {
return RIO_TYPE_FILE;
} else if (r->read == rioBufferRead) {
return RIO_TYPE_BUFFER;
} else if (r->read == rioConnRead) {
return RIO_TYPE_CONN;
} else {
/* r->read == rioFdRead */
return RIO_TYPE_FD;
}
}
/* --------------------------- Higher level interface --------------------------
*
* The following higher level functions use lower level rio.c functions to help

View File

@ -40,6 +40,11 @@
#define RIO_FLAG_READ_ERROR (1<<0)
#define RIO_FLAG_WRITE_ERROR (1<<1)
#define RIO_TYPE_FILE (1<<0)
#define RIO_TYPE_BUFFER (1<<1)
#define RIO_TYPE_CONN (1<<2)
#define RIO_TYPE_FD (1<<3)
struct _rio {
/* Backend functions.
* Since this functions do not tolerate short writes or reads the return
@ -174,5 +179,5 @@ int rioWriteBulkObject(rio *r, struct redisObject *obj);
void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len);
void rioSetAutoSync(rio *r, off_t bytes);
uint8_t rioCheckType(rio *r);
#endif

View File

@ -1188,14 +1188,19 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
run_with_period(100) {
long long stat_net_input_bytes, stat_net_output_bytes;
long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes);
atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes);
trackInstantaneousMetric(STATS_METRIC_COMMAND,server.stat_numcommands);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT,
stat_net_input_bytes);
stat_net_input_bytes + stat_net_repl_input_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
stat_net_output_bytes);
stat_net_output_bytes + stat_net_repl_output_bytes);
trackInstantaneousMetric(STATS_METRIC_NET_TOTAL_REPLICATION,
stat_net_repl_input_bytes + stat_net_repl_output_bytes);
}
/* We have just LRU_BITS bits per object for LRU information.
@ -2356,6 +2361,8 @@ void resetServerStats(void) {
server.stat_aofrw_consecutive_failures = 0;
atomicSet(server.stat_net_input_bytes, 0);
atomicSet(server.stat_net_output_bytes, 0);
atomicSet(server.stat_net_repl_input_bytes, 0);
atomicSet(server.stat_net_repl_output_bytes, 0);
server.stat_unexpected_error_replies = 0;
server.stat_total_error_replies = 0;
server.stat_dump_payload_sanitizations = 0;
@ -5576,6 +5583,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
if (all_sections || (dictFind(section_dict,"stats") != NULL)) {
long long stat_total_reads_processed, stat_total_writes_processed;
long long stat_net_input_bytes, stat_net_output_bytes;
long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
long long current_eviction_exceeded_time = server.stat_last_eviction_exceeded_time ?
(long long) elapsedUs(server.stat_last_eviction_exceeded_time): 0;
long long current_active_defrag_time = server.stat_last_active_defrag_time ?
@ -5584,6 +5592,8 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
atomicGet(server.stat_total_writes_processed, stat_total_writes_processed);
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
atomicGet(server.stat_net_output_bytes, stat_net_output_bytes);
atomicGet(server.stat_net_repl_input_bytes, stat_net_repl_input_bytes);
atomicGet(server.stat_net_repl_output_bytes, stat_net_repl_output_bytes);
if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info,
@ -5593,8 +5603,11 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"instantaneous_ops_per_sec:%lld\r\n"
"total_net_input_bytes:%lld\r\n"
"total_net_output_bytes:%lld\r\n"
"total_net_repl_input_bytes:%lld\r\n"
"total_net_repl_output_bytes:%lld\r\n"
"instantaneous_input_kbps:%.2f\r\n"
"instantaneous_output_kbps:%.2f\r\n"
"instantaneous_repl_total_kbps:%.2f\r\n"
"rejected_connections:%lld\r\n"
"sync_full:%lld\r\n"
"sync_partial_ok:%lld\r\n"
@ -5636,10 +5649,13 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
server.stat_numconnections,
server.stat_numcommands,
getInstantaneousMetric(STATS_METRIC_COMMAND),
stat_net_input_bytes,
stat_net_output_bytes,
stat_net_input_bytes + stat_net_repl_input_bytes,
stat_net_output_bytes + stat_net_repl_output_bytes,
stat_net_repl_input_bytes,
stat_net_repl_output_bytes,
(float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
(float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
(float)getInstantaneousMetric(STATS_METRIC_NET_TOTAL_REPLICATION)/1024,
server.stat_rejected_conn,
server.stat_sync_full,
server.stat_sync_partial_ok,

View File

@ -155,9 +155,10 @@ typedef long long ustime_t; /* microsecond time type. */
/* Instantaneous metrics tracking. */
#define STATS_METRIC_SAMPLES 16 /* Number of samples per metric. */
#define STATS_METRIC_COMMAND 0 /* Number of commands executed. */
#define STATS_METRIC_NET_INPUT 1 /* Bytes read to network .*/
#define STATS_METRIC_NET_INPUT 1 /* Bytes read to network. */
#define STATS_METRIC_NET_OUTPUT 2 /* Bytes written to network. */
#define STATS_METRIC_COUNT 3
#define STATS_METRIC_NET_TOTAL_REPLICATION 3 /* Bytes written and read to network during replication. */
#define STATS_METRIC_COUNT 4
/* Protocol and I/O related defines */
#define PROTO_IOBUF_LEN (1024*16) /* Generic I/O buffer size */
@ -1586,6 +1587,8 @@ struct redisServer {
struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */
redisAtomic long long stat_net_input_bytes; /* Bytes read from network. */
redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */
redisAtomic long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */
redisAtomic long long stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */
size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */
size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */
monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */