Implement anetPipe() to combine creating pipe and setting flags (#9511)

Implement createPipe() to combine creating pipe and setting flags, also reduce
system calls by prioritizing pipe2() over pipe().

Without createPipe(), we have to call pipe() to create a pipe and then call some
functions (like anetCloexec() and anetNonBlock()) of anet.c to set flags respectively,
which leads to some extra system calls, now we can leverage pipe2() to combine
them and make the process of creating pipe more convergent in createPipe().

Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
Andy Pan 2021-10-06 21:08:13 +08:00 committed by GitHub
parent 123cc1a1bc
commit 2391aefd82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 73 additions and 23 deletions

View File

@ -620,3 +620,59 @@ int anetFormatFdAddr(int fd, char *buf, size_t buf_len, int fd_to_str_type) {
anetFdToString(fd,ip,sizeof(ip),&port,fd_to_str_type);
return anetFormatAddr(buf, buf_len, ip, port);
}
/* Create a pipe buffer with given flags for read end and write end.
* Note that it supports the file flags defined by pipe2() and fcntl(F_SETFL),
* and one of the use cases is O_CLOEXEC|O_NONBLOCK. */
int anetPipe(int fds[2], int read_flags, int write_flags) {
int pipe_flags = 0;
#if defined(__linux__) || defined(__FreeBSD__)
/* When possible, try to leverage pipe2() to apply flags that are common to both ends.
* There is no harm to set O_CLOEXEC to prevent fd leaks. */
pipe_flags = O_CLOEXEC | (read_flags & write_flags);
if (pipe2(fds, pipe_flags)) {
/* Fail on real failures, and fallback to simple pipe if pipe2 is unsupported. */
if (errno != ENOSYS && errno != EINVAL)
return -1;
pipe_flags = 0;
} else {
/* If the flags on both ends are identical, no need to do anything else. */
if ((O_CLOEXEC | read_flags) == (O_CLOEXEC | write_flags))
return 0;
/* Clear the flags which have already been set using pipe2. */
read_flags &= ~pipe_flags;
write_flags &= ~pipe_flags;
}
#endif
/* When we reach here with pipe_flags of 0, it means pipe2 failed (or was not attempted),
* so we try to use pipe. Otherwise, we skip and proceed to set specific flags below. */
if (pipe_flags == 0 && pipe(fds))
return -1;
/* File descriptor flags.
* Currently, only one such flag is defined: FD_CLOEXEC, the close-on-exec flag. */
if (read_flags & O_CLOEXEC)
if (fcntl(fds[0], F_SETFD, FD_CLOEXEC))
goto error;
if (write_flags & O_CLOEXEC)
if (fcntl(fds[1], F_SETFD, FD_CLOEXEC))
goto error;
/* File status flags after clearing the file descriptor flag O_CLOEXEC. */
read_flags &= ~O_CLOEXEC;
if (read_flags)
if (fcntl(fds[0], F_SETFL, read_flags))
goto error;
write_flags &= ~O_CLOEXEC;
if (write_flags)
if (fcntl(fds[1], F_SETFL, write_flags))
goto error;
return 0;
error:
close(fds[0]);
close(fds[1]);
return -1;
}

View File

@ -72,5 +72,6 @@ int anetFdToString(int fd, char *ip, size_t ip_len, int *port, int fd_to_str_typ
int anetKeepAlive(char *err, int fd, int interval);
int anetFormatAddr(char *fmt, size_t fmt_len, char *ip, int port);
int anetFormatFdAddr(int fd, char *buf, size_t buf_len, int fd_to_str_type);
int anetPipe(int fds[2], int read_flags, int write_flags);
#endif

View File

@ -1644,12 +1644,9 @@ int aofCreatePipes(void) {
int fds[6] = {-1, -1, -1, -1, -1, -1};
int j;
if (pipe(fds) == -1) goto error; /* parent -> children data. */
if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
if (pipe(fds+4) == -1) goto error; /* parent -> children ack. */
/* Parent -> children data is non blocking. */
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
if (anetPipe(fds, O_NONBLOCK, O_NONBLOCK) == -1) goto error; /* parent -> children data, non blocking pipe */
if (anetPipe(fds+2, 0, 0) == -1) goto error; /* children -> parent ack. */
if (anetPipe(fds+4, 0, 0) == -1) goto error; /* parent -> children ack. */
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
server.aof_pipe_write_data_to_child = fds[1];

View File

@ -29,6 +29,7 @@
#include "server.h"
#include <unistd.h>
#include <fcntl.h>
typedef struct {
size_t keys;
@ -42,12 +43,10 @@ typedef struct {
* RDB / AOF saving process from the child to the parent (for instance
* the amount of copy on write memory used) */
void openChildInfoPipe(void) {
if (pipe(server.child_info_pipe) == -1) {
if (anetPipe(server.child_info_pipe, O_NONBLOCK, 0) == -1) {
/* On error our two file descriptors should be still set to -1,
* but we call anyway closeChildInfoPipe() since can't hurt. */
closeChildInfoPipe();
} else if (anetNonBlock(NULL,server.child_info_pipe[0]) != ANET_OK) {
closeChildInfoPipe();
} else {
server.child_info_nread = 0;
}

View File

@ -60,6 +60,7 @@
#include <dlfcn.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <fcntl.h>
/* --------------------------------------------------------------------------
* Private data structures used by the modules system. Those are data
@ -9380,21 +9381,18 @@ void moduleInitModulesSystem(void) {
server.module_client = NULL;
moduleRegisterCoreAPI();
if (pipe(server.module_blocked_pipe) == -1) {
/* Create a pipe for module threads to be able to wake up the redis main thread.
* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half.
* Enable close-on-exec flag on pipes in case of the fork-exec system calls in
* sentinels or redis servers. */
if (anetPipe(server.module_blocked_pipe, O_CLOEXEC|O_NONBLOCK, O_CLOEXEC|O_NONBLOCK) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for module blocking commands: %s",
strerror(errno));
exit(1);
}
/* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half. */
anetNonBlock(NULL,server.module_blocked_pipe[0]);
anetNonBlock(NULL,server.module_blocked_pipe[1]);
/* Enable close-on-exec flag on pipes in case of the fork-exec system calls in
* sentinels or redis servers. */
anetCloexec(server.module_blocked_pipe[0]);
anetCloexec(server.module_blocked_pipe[1]);
/* Create the timers radix tree. */
Timers = raxNew();

View File

@ -3052,14 +3052,13 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
* the parent, we can't let it write directly to the sockets, since in case
* of TLS we must let the parent handle a continuous TLS state when the
* child terminates and parent takes over. */
if (pipe(pipefds) == -1) return C_ERR;
if (anetPipe(pipefds, O_NONBLOCK, 0) == -1) return C_ERR;
server.rdb_pipe_read = pipefds[0]; /* read end */
rdb_pipe_write = pipefds[1]; /* write end */
anetNonBlock(NULL, server.rdb_pipe_read);
/* create another pipe that is used by the parent to signal to the child
* that it can exit. */
if (pipe(pipefds) == -1) {
if (anetPipe(pipefds, 0, 0) == -1) {
close(rdb_pipe_write);
close(server.rdb_pipe_read);
return C_ERR;

View File

@ -6268,7 +6268,7 @@ int linuxMadvFreeForkBugCheck(void) {
*(volatile char*)q = 0;
/* Create a pipe for the child to return the info to the parent. */
ret = pipe(pipefd);
ret = anetPipe(pipefd, 0, 0);
if (ret < 0) {
serverLog(LL_WARNING, "Failed to create pipe: %s", strerror(errno));
bug_found = -1;