optimize poll_events() to spread the work over the threads more evenly (#12975)

* optimize poll_events() to spread the work over the threads more evenly

* fixed typos, code cleanup

* better error handling

* prevent crash in case callbacks manipulate the sockets arrays - added warnings
This commit is contained in:
Costa Tsaousis 2022-05-21 09:49:56 +03:00 committed by GitHub
parent bf39581189
commit 619f9964a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 238 additions and 160 deletions

View File

@ -1386,175 +1386,142 @@ static void poll_events_cleanup(void *data) {
freez(p->inf);
}
static void poll_events_process(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, short int revents, time_t now) {
short int events = pf->events;
int fd = pf->fd;
pf->revents = 0;
size_t i = pi->slot;
static int poll_process_error(POLLINFO *pi, struct pollfd *pf, short int revents) {
error("POLLFD: LISTENER: received %s %s %s on socket at slot %zu (fd %d) client '%s' port '%s' expecting %s %s %s, having %s %s %s"
, revents & POLLERR ? "POLLERR" : ""
, revents & POLLHUP ? "POLLHUP" : ""
, revents & POLLNVAL ? "POLLNVAL" : ""
, pi->slot
, pi->fd
, pi->client_ip ? pi->client_ip : "<undefined-ip>"
, pi->client_port ? pi->client_port : "<undefined-port>"
, pf->events & POLLIN ? "POLLIN" : "", pf->events & POLLOUT ? "POLLOUT" : "", pf->events & POLLPRI ? "POLLPRI" : ""
, revents & POLLIN ? "POLLIN" : "", revents & POLLOUT ? "POLLOUT" : "", revents & POLLPRI ? "POLLPRI" : ""
);
if(unlikely(fd == -1)) {
debug(D_POLLFD, "POLLFD: LISTENER: ignoring slot %zu, it does not have an fd", i);
return;
}
pf->events = 0;
poll_close_fd(pi);
return 1;
}
debug(D_POLLFD, "POLLFD: LISTENER: processing events for slot %zu (events = %d, revents = %d)", i, events, revents);
static inline int poll_process_send(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) {
pi->last_sent_t = now;
pi->send_count++;
if(revents & POLLIN || revents & POLLPRI) {
// receiving data
debug(D_POLLFD, "POLLFD: LISTENER: sending data to socket on slot %zu (fd %d)", pi->slot, pf->fd);
pi->last_received_t = now;
pi->recv_count++;
pf->events = 0;
if(likely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET)) {
// read data from client TCP socket
debug(D_POLLFD, "POLLFD: LISTENER: reading data from TCP client slot %zu (fd %d)", i, fd);
// remember the slot, in case we need to close it later
// the callback may manipulate the socket list and our pf and pi pointers may be invalid after that call
size_t slot = pi->slot;
pf->events = 0;
if (pi->rcv_callback(pi, &pf->events) == -1) {
poll_close_fd(&p->inf[i]);
return;
}
pf = &p->fds[i];
pi = &p->inf[i];
if (unlikely(pi->snd_callback(pi, &pf->events) == -1))
poll_close_fd(&p->inf[slot]);
#ifdef NETDATA_INTERNAL_CHECKS
// this is common - it is used for web server file copies
if(unlikely(!(pf->events & (POLLIN|POLLOUT)))) {
error("POLLFD: LISTENER: after reading, client slot %zu (fd %d) from %s port %s was left without expecting input or output. ", i, fd, pi->client_ip?pi->client_ip:"<undefined-ip>", pi->client_port?pi->client_port:"<undefined-port>");
//poll_close_fd(pi);
//return;
}
#endif
// IMPORTANT:
// pf and pi may be invalid below this point, they may have been reallocated.
return 1;
}
static inline int poll_process_tcp_read(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) {
pi->last_received_t = now;
pi->recv_count++;
debug(D_POLLFD, "POLLFD: LISTENER: reading data from TCP client slot %zu (fd %d)", pi->slot, pf->fd);
pf->events = 0;
// remember the slot, in case we need to close it later
// the callback may manipulate the socket list and our pf and pi pointers may be invalid after that call
size_t slot = pi->slot;
if (pi->rcv_callback(pi, &pf->events) == -1)
poll_close_fd(&p->inf[slot]);
// IMPORTANT:
// pf and pi may be invalid below this point, they may have been reallocated.
return 1;
}
static inline int poll_process_udp_read(POLLINFO *pi, struct pollfd *pf, time_t now __maybe_unused) {
pi->last_received_t = now;
pi->recv_count++;
debug(D_POLLFD, "POLLFD: LISTENER: reading data from UDP slot %zu (fd %d)", pi->slot, pf->fd);
// TODO: access_list is not applied to UDP
// but checking the access list on every UDP packet will destroy
// performance, especially for statsd.
pf->events = 0;
if(pi->rcv_callback(pi, &pf->events) == -1)
return 0;
// IMPORTANT:
// pf and pi may be invalid below this point, they may have been reallocated.
return 1;
}
static int poll_process_new_tcp_connection(POLLJOB *p, POLLINFO *pi, struct pollfd *pf, time_t now) {
pi->last_received_t = now;
pi->recv_count++;
debug(D_POLLFD, "POLLFD: LISTENER: accepting connections from slot %zu (fd %d)", pi->slot, pf->fd);
char client_ip[INET6_ADDRSTRLEN] = "";
char client_port[NI_MAXSERV] = "";
char client_host[NI_MAXHOST] = "";
debug(D_POLLFD, "POLLFD: LISTENER: calling accept4() slot %zu (fd %d)", pi->slot, pf->fd);
int nfd = accept_socket(
pf->fd,SOCK_NONBLOCK,
client_ip, INET6_ADDRSTRLEN, client_port,NI_MAXSERV, client_host, NI_MAXHOST,
p->access_list, p->allow_dns
);
if (unlikely(nfd < 0)) {
// accept failed
debug(D_POLLFD, "POLLFD: LISTENER: accept4() slot %zu (fd %d) failed.", pi->slot, pf->fd);
if(unlikely(errno == EMFILE)) {
error("POLLFD: LISTENER: too many open files - sleeping for 1ms - used by this thread %zu, max for this thread %zu", p->used, p->limit);
usleep(1000); // 1ms
}
else if(likely(pi->flags & POLLINFO_FLAG_SERVER_SOCKET)) {
// new connection
// debug(D_POLLFD, "POLLFD: LISTENER: accepting connections from slot %zu (fd %d)", i, fd);
else if(unlikely(errno != EWOULDBLOCK && errno != EAGAIN))
error("POLLFD: LISTENER: accept() failed.");
switch(pi->socktype) {
case SOCK_STREAM: {
// a TCP socket
// we accept the connection
}
else {
// accept ok
int nfd;
do {
char client_ip[INET6_ADDRSTRLEN];
char client_port[NI_MAXSERV];
char client_host[NI_MAXHOST];
client_host[0] = 0;
client_ip[0] = 0;
client_port[0] = 0;
poll_add_fd(p
, nfd
, SOCK_STREAM
, pi->port_acl
, POLLINFO_FLAG_CLIENT_SOCKET
, client_ip
, client_port
, client_host
, p->add_callback
, p->del_callback
, p->rcv_callback
, p->snd_callback
, NULL
);
debug(D_POLLFD, "POLLFD: LISTENER: calling accept4() slot %zu (fd %d)", i, fd);
nfd = accept_socket(fd, SOCK_NONBLOCK, client_ip, INET6_ADDRSTRLEN, client_port, NI_MAXSERV,
client_host, NI_MAXHOST, p->access_list, p->allow_dns);
if (unlikely(nfd < 0)) {
// accept failed
// IMPORTANT:
// pf and pi may be invalid below this point, they may have been reallocated.
debug(D_POLLFD, "POLLFD: LISTENER: accept4() slot %zu (fd %d) failed.", i, fd);
if(unlikely(errno == EMFILE)) {
error("POLLFD: LISTENER: too many open files - sleeping for 1ms - used by this thread %zu, max for this thread %zu", p->used, p->limit);
usleep(1000); // 10ms
}
else if(unlikely(errno != EWOULDBLOCK && errno != EAGAIN))
error("POLLFD: LISTENER: accept() failed.");
break;
}
else {
// accept ok
// info("POLLFD: LISTENER: client '[%s]:%s' connected to '%s' on fd %d", client_ip, client_port, sockets->fds_names[i], nfd);
poll_add_fd(p
, nfd
, SOCK_STREAM
, pi->port_acl
, POLLINFO_FLAG_CLIENT_SOCKET
, client_ip
, client_port
, client_host
, p->add_callback
, p->del_callback
, p->rcv_callback
, p->snd_callback
, NULL
);
// it may have reallocated them, so refresh our pointers
pf = &p->fds[i];
pi = &p->inf[i];
}
} while (nfd >= 0 && (!p->limit || p->used < p->limit));
break;
}
case SOCK_DGRAM: {
// a UDP socket
// we read data from the server socket
debug(D_POLLFD, "POLLFD: LISTENER: reading data from UDP slot %zu (fd %d)", i, fd);
// TODO: access_list is not applied to UDP
// but checking the access list on every UDP packet will destroy
// performance, especially for statsd.
pf->events = 0;
pi->rcv_callback(pi, &pf->events);
break;
}
default: {
error("POLLFD: LISTENER: Unknown socktype %d on slot %zu", pi->socktype, pi->slot);
break;
}
}
}
return 1;
}
if(unlikely(revents & POLLOUT)) {
// sending data
debug(D_POLLFD, "POLLFD: LISTENER: sending data to socket on slot %zu (fd %d)", i, fd);
pi->last_sent_t = now;
pi->send_count++;
pf->events = 0;
if (pi->snd_callback(pi, &pf->events) == -1) {
poll_close_fd(&p->inf[i]);
return;
}
pf = &p->fds[i];
pi = &p->inf[i];
#ifdef NETDATA_INTERNAL_CHECKS
// this is common - it is used for streaming
if(unlikely(pi->flags & POLLINFO_FLAG_CLIENT_SOCKET && !(pf->events & (POLLIN|POLLOUT)))) {
error("POLLFD: LISTENER: after sending, client slot %zu (fd %d) from %s port %s was left without expecting input or output. ", i, fd, pi->client_ip?pi->client_ip:"<undefined-ip>", pi->client_port?pi->client_port:"<undefined-port>");
//poll_close_fd(pi);
//return;
}
#endif
}
if(unlikely(revents & POLLERR)) {
error("POLLFD: LISTENER: processing POLLERR events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd);
pf->events = 0;
poll_close_fd(pi);
return;
}
if(unlikely(revents & POLLHUP)) {
error("POLLFD: LISTENER: processing POLLHUP events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd);
pf->events = 0;
poll_close_fd(pi);
return;
}
if(unlikely(revents & POLLNVAL)) {
error("POLLFD: LISTENER: processing POLLNVAL events for slot %zu fd %d (events = %d, revents = %d)", i, events, revents, fd);
pf->events = 0;
poll_close_fd(pi);
return;
}
return 0;
}
void poll_events(LISTEN_SOCKETS *sockets
@ -1687,18 +1654,129 @@ void poll_events(LISTEN_SOCKETS *sockets
debug(D_POLLFD, "POLLFD: LISTENER: poll() timeout.");
}
else {
POLLINFO *pi;
struct pollfd *pf;
size_t idx, processed = 0;
short int revents;
// keep fast lookup arrays per function
// to avoid looping through the entire list every time
size_t sends[p.max + 1], sends_max = 0;
size_t reads[p.max + 1], reads_max = 0;
size_t conns[p.max + 1], conns_max = 0;
size_t udprd[p.max + 1], udprd_max = 0;
for (i = 0; i <= p.max; i++) {
struct pollfd *pf = &p.fds[i];
short int revents = pf->revents;
if (unlikely(revents))
poll_events_process(&p, &p.inf[i], pf, revents, now);
pi = &p.inf[i];
pf = &p.fds[i];
revents = pf->revents;
if(unlikely(revents == 0 || pf->fd == -1))
continue;
if (unlikely(revents & (POLLERR|POLLHUP|POLLNVAL))) {
// something is wrong to one of our sockets
pf->revents = 0;
processed += poll_process_error(pi, pf, revents);
}
else if (likely(revents & POLLOUT)) {
// a client is ready to receive data
sends[sends_max++] = i;
}
else if (likely(revents & (POLLIN|POLLPRI))) {
if (pi->flags & POLLINFO_FLAG_CLIENT_SOCKET) {
// a client sent data to us
reads[reads_max++] = i;
}
else if (pi->flags & POLLINFO_FLAG_SERVER_SOCKET) {
// something is coming to our server sockets
if(pi->socktype == SOCK_DGRAM) {
// UDP receive, directly on our listening socket
udprd[udprd_max++] = i;
}
else if(pi->socktype == SOCK_STREAM) {
// new TCP connection
conns[conns_max++] = i;
}
else
error("POLLFD: LISTENER: server slot %zu (fd %d) connection from %s port %s using unhandled socket type %d."
, i
, pi->fd
, pi->client_ip ? pi->client_ip : "<undefined-ip>"
, pi->client_port ? pi->client_port : "<undefined-port>"
, pi->socktype
);
}
else
error("POLLFD: LISTENER: client slot %zu (fd %d) data from %s port %s using flags %08X is neither client nor server."
, i
, pi->fd
, pi->client_ip ? pi->client_ip : "<undefined-ip>"
, pi->client_port ? pi->client_port : "<undefined-port>"
, pi->flags
);
}
else
error("POLLFD: LISTENER: socket slot %zu (fd %d) client %s port %s unhandled event id %d."
, i
, pi->fd
, pi->client_ip ? pi->client_ip : "<undefined-ip>"
, pi->client_port ? pi->client_port : "<undefined-port>"
, revents
);
}
// process sends
for (idx = 0; idx < sends_max; idx++) {
i = sends[idx];
pi = &p.inf[i];
pf = &p.fds[i];
pf->revents = 0;
processed += poll_process_send(&p, pi, pf, now);
}
// process UDP reads
for (idx = 0; idx < udprd_max; idx++) {
i = udprd[idx];
pi = &p.inf[i];
pf = &p.fds[i];
pf->revents = 0;
processed += poll_process_udp_read(pi, pf, now);
}
// process TCP reads
for (idx = 0; idx < reads_max; idx++) {
i = reads[idx];
pi = &p.inf[i];
pf = &p.fds[i];
pf->revents = 0;
processed += poll_process_tcp_read(&p, pi, pf, now);
}
if(!processed && (!p.limit || p.used < p.limit)) {
// nothing processed above (rcv, snd) and we have room for another TCP connection
// so, accept one TCP connection
for (idx = 0; idx < conns_max; idx++) {
i = conns[idx];
pi = &p.inf[i];
pf = &p.fds[i];
pf->revents = 0;
if (poll_process_new_tcp_connection(&p, pi, pf, now))
break;
}
}
}
if(unlikely(p.checks_every > 0 && now - last_check > p.checks_every)) {
last_check = now;
// security checks
// cleanup old sockets
for(i = 0; i <= p.max; i++) {
POLLINFO *pi = &p.inf[i];