diff --git a/src/Makefile b/src/Makefile index f4d726e769..93b2d17a82 100644 --- a/src/Makefile +++ b/src/Makefile @@ -4,7 +4,7 @@ # # Copyright (c) 1994, Regents of the University of California # -# $PostgreSQL: pgsql/src/Makefile,v 1.49 2010/01/15 17:01:06 heikki Exp $ +# $PostgreSQL: pgsql/src/Makefile,v 1.50 2010/01/20 09:16:23 heikki Exp $ # #------------------------------------------------------------------------- @@ -21,7 +21,7 @@ all install installdirs uninstall distprep: $(MAKE) -C backend/snowball $@ $(MAKE) -C include $@ $(MAKE) -C interfaces $@ - $(MAKE) -C backend/replication/walreceiver $@ + $(MAKE) -C backend/replication/libpqwalreceiver $@ $(MAKE) -C bin $@ $(MAKE) -C pl $@ $(MAKE) -C makefiles $@ @@ -52,7 +52,7 @@ clean: $(MAKE) -C backend/snowball $@ $(MAKE) -C include $@ $(MAKE) -C interfaces $@ - $(MAKE) -C backend/replication/walreceiver $@ + $(MAKE) -C backend/replication/libpqwalreceiver $@ $(MAKE) -C bin $@ $(MAKE) -C pl $@ $(MAKE) -C makefiles $@ @@ -67,7 +67,7 @@ distclean maintainer-clean: $(MAKE) -C backend/snowball $@ $(MAKE) -C include $@ $(MAKE) -C interfaces $@ - $(MAKE) -C backend/replication/walreceiver $@ + $(MAKE) -C backend/replication/libpqwalreceiver $@ $(MAKE) -C bin $@ $(MAKE) -C pl $@ $(MAKE) -C makefiles $@ @@ -82,7 +82,7 @@ coverage: $(MAKE) -C backend/utils/mb/conversion_procs $@ $(MAKE) -C backend/snowball $@ $(MAKE) -C interfaces $@ - $(MAKE) -C backend/replication/walreceiver $@ + $(MAKE) -C backend/replication/libpqwalreceiver $@ $(MAKE) -C bin $@ $(MAKE) -C pl $@ diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 84dd6638ef..3965896608 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.256 2010/01/15 09:19:00 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.257 2010/01/20 09:16:23 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -425,20 +425,7 @@ AuxiliaryProcessMain(int argc, char *argv[]) case WalReceiverProcess: /* don't set signals, walreceiver has its own agenda */ - { - PGFunction WalReceiverMain; - - /* - * Walreceiver is not linked directly into the server - * binary because we would then need to link the server - * with libpq. It's compiled as a dynamically loaded module - * to avoid that. - */ - WalReceiverMain = load_external_function("walreceiver", - "WalReceiverMain", - true, NULL); - WalReceiverMain(NULL); - } + WalReceiverMain(); proc_exit(1); /* should never return */ default: diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 7903c1ac5e..64a966b1cc 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -4,7 +4,7 @@ # Makefile for src/backend/replication # # IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.1 2010/01/15 09:19:03 heikki Exp $ +# $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.2 2010/01/20 09:16:24 heikki Exp $ # #------------------------------------------------------------------------- @@ -12,6 +12,6 @@ subdir = src/backend/replication top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS = walsender.o walreceiverfuncs.o +OBJS = walsender.o walreceiverfuncs.o walreceiver.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/README b/src/backend/replication/README index 0f40dc79e9..8b15dea58e 100644 --- a/src/backend/replication/README +++ b/src/backend/replication/README @@ -1,4 +1,36 @@ -$PostgreSQL: pgsql/src/backend/replication/README,v 1.1 2010/01/15 09:19:03 heikki Exp $ +$PostgreSQL: pgsql/src/backend/replication/README,v 1.2 2010/01/20 09:16:24 heikki Exp $ + +Walreceiver - libpqwalreceiver API +---------------------------------- + +The transport-specific part of walreceiver, responsible for connecting to +the primary server and receiving WAL files, is loaded dynamically to avoid +having to link the main server binary with libpq. The dynamically loaded +module is in libpqwalreceiver subdirectory. + +The dynamically loaded module implements three functions: + + +bool walrcv_connect(char *conninfo, XLogRecPtr startpoint) + +Establish connection to the primary, and starts streaming from 'startpoint'. +Returns true on success. + + +bool walrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) + +Retrieve any WAL record available through the connection, blocking for +maximum of 'timeout' ms. + + +void walrcv_disconnect(void); + +Disconnect. + + +This API should be considered internal at the moment, but we could open it +up for 3rd party replacements of libpqwalreceiver in the future, allowing +pluggable methods for receiveing WAL. Walreceiver IPC --------------- diff --git a/src/backend/replication/walreceiver/Makefile b/src/backend/replication/libpqwalreceiver/Makefile similarity index 60% rename from src/backend/replication/walreceiver/Makefile rename to src/backend/replication/libpqwalreceiver/Makefile index 3376ba6ec8..df28b90c4c 100644 --- a/src/backend/replication/walreceiver/Makefile +++ b/src/backend/replication/libpqwalreceiver/Makefile @@ -1,24 +1,22 @@ #------------------------------------------------------------------------- # # Makefile-- -# Makefile for src/backend/replication/walreceiver +# Makefile for src/backend/replication/libpqwalreceiver # # IDENTIFICATION -# $PostgreSQL: pgsql/src/backend/replication/walreceiver/Makefile,v 1.4 2010/01/15 21:06:26 tgl Exp $ +# $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/Makefile,v 1.1 2010/01/20 09:16:24 heikki Exp $ # #------------------------------------------------------------------------- -subdir = src/backend/replication/walreceiver +subdir = src/backend/postmaster/libpqwalreceiver top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global -override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) - -OBJS = walreceiver.o +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) +OBJS = libpqwalreceiver.o SHLIB_LINK = $(libpq) - -NAME := walreceiver +NAME = libpqwalreceiver all: submake-libpq all-shared-lib diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c new file mode 100644 index 0000000000..54b86fd135 --- /dev/null +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -0,0 +1,317 @@ +/*------------------------------------------------------------------------- + * + * libpqwalreceiver.c + * + * This file contains the libpq-specific parts of walreceiver. It's + * loaded as a dynamic module to avoid linking the main server binary with + * libpq. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include +#include + +#include "libpq-fe.h" +#include "access/xlog.h" +#include "miscadmin.h" +#include "replication/walreceiver.h" +#include "utils/builtins.h" + +#ifdef HAVE_POLL_H +#include +#endif +#ifdef HAVE_SYS_POLL_H +#include +#endif +#ifdef HAVE_SYS_SELECT_H +#include +#endif + +PG_MODULE_MAGIC; + +void _PG_init(void); + +/* Current connection to the primary, if any */ +static PGconn *streamConn = NULL; +static bool justconnected = false; + +/* Buffer for currently read records */ +static char *recvBuf = NULL; + +/* Prototypes for interface functions */ +static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); +static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, + int *len); +static void libpqrcv_disconnect(void); + +/* Prototypes for private functions */ +static bool libpq_select(int timeout_ms); + +/* + * Module load callback + */ +void +_PG_init(void) +{ + /* Tell walreceiver how to reach us */ + if (walrcv_connect != NULL || walrcv_receive != NULL || walrcv_disconnect) + elog(ERROR, "libpqwalreceiver already loaded"); + walrcv_connect = libpqrcv_connect; + walrcv_receive = libpqrcv_receive; + walrcv_disconnect = libpqrcv_disconnect; +} + +/* + * Establish the connection to the primary server for XLOG streaming + */ +static bool +libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) +{ + char conninfo_repl[MAXCONNINFO + 14]; + char *primary_sysid; + char standby_sysid[32]; + TimeLineID primary_tli; + TimeLineID standby_tli; + PGresult *res; + char cmd[64]; + + Assert(startpoint.xlogid != 0 || startpoint.xrecoff != 0); + + /* Connect */ + snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo); + + streamConn = PQconnectdb(conninfo_repl); + if (PQstatus(streamConn) != CONNECTION_OK) + ereport(ERROR, + (errmsg("could not connect to the primary server : %s", + PQerrorMessage(streamConn)))); + + /* + * Get the system identifier and timeline ID as a DataRow message + * from the primary server. + */ + res = PQexec(streamConn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive the SYSID and timeline ID from " + "the primary server: %s", + PQerrorMessage(streamConn)))); + } + if (PQnfields(res) != 2 || PQntuples(res) != 1) + { + int ntuples = PQntuples(res); + int nfields = PQnfields(res); + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields", + ntuples, nfields))); + } + primary_sysid = PQgetvalue(res, 0, 0); + primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); + + /* + * Confirm that the system identifier of the primary is the same + * as ours. + */ + snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, + GetSystemIdentifier()); + if (strcmp(primary_sysid, standby_sysid) != 0) + { + PQclear(res); + ereport(ERROR, + (errmsg("system differs between the primary and standby"), + errdetail("the primary SYSID is %s, standby SYSID is %s", + primary_sysid, standby_sysid))); + } + + /* + * Confirm that the current timeline of the primary is the same + * as the recovery target timeline. + */ + standby_tli = GetRecoveryTargetTLI(); + PQclear(res); + if (primary_tli != standby_tli) + ereport(ERROR, + (errmsg("timeline %u of the primary does not match recovery target timeline %u", + primary_tli, standby_tli))); + ThisTimeLineID = primary_tli; + + /* Start streaming from the point requested by startup process */ + snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", + startpoint.xlogid, startpoint.xrecoff); + res = PQexec(streamConn, cmd); + if (PQresultStatus(res) != PGRES_COPY_OUT) + ereport(ERROR, + (errmsg("could not start XLOG streaming: %s", + PQerrorMessage(streamConn)))); + PQclear(res); + + justconnected = true; + + return true; +} + +/* + * Wait until we can read WAL stream, or timeout. + * + * Returns true if data has become available for reading, false if timed out + * or interrupted by signal. + * + * This is based on pqSocketCheck. + */ +static bool +libpq_select(int timeout_ms) +{ + int ret; + + Assert(streamConn != NULL); + if (PQsocket(streamConn) < 0) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("socket not open"))); + + /* We use poll(2) if available, otherwise select(2) */ + { +#ifdef HAVE_POLL + struct pollfd input_fd; + + input_fd.fd = PQsocket(streamConn); + input_fd.events = POLLIN | POLLERR; + input_fd.revents = 0; + + ret = poll(&input_fd, 1, timeout_ms); +#else /* !HAVE_POLL */ + + fd_set input_mask; + struct timeval timeout; + struct timeval *ptr_timeout; + + FD_ZERO(&input_mask); + FD_SET(PQsocket(streamConn), &input_mask); + + if (timeout_ms < 0) + ptr_timeout = NULL; + else + { + timeout.tv_sec = timeout_ms / 1000; + timeout.tv_usec = (timeout_ms % 1000) * 1000; + ptr_timeout = &timeout; + } + + ret = select(PQsocket(streamConn) + 1, &input_mask, + NULL, NULL, ptr_timeout); +#endif /* HAVE_POLL */ + } + + if (ret == 0 || (ret < 0 && errno == EINTR)) + return false; + if (ret < 0) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("select() failed: %m"))); + return true; +} + +/* + * Disconnect connection to primary, if any. + */ +static void +libpqrcv_disconnect(void) +{ + PQfinish(streamConn); + streamConn = NULL; + justconnected = false; +} + +/* + * Receive any WAL records available from XLOG stream, blocking for + * maximum of 'timeout' ms. + * + * Returns: + * + * True if data was received. *recptr, *buffer and *len are set to + * the WAL location of the received data, buffer holding it, and length, + * respectively. + * + * False if no data was available within timeout, or wait was interrupted + * by signal. + * + * The buffer returned is only valid until the next call of this function or + * libpq_connect/disconnect. + * + * ereports on error. + */ +static bool +libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len) +{ + int rawlen; + + if (recvBuf != NULL) + PQfreemem(recvBuf); + recvBuf = NULL; + + /* + * If the caller requested to block, wait for data to arrive. But if + * this is the first call after connecting, don't wait, because + * there might already be some data in libpq buffer that we haven't + * returned to caller. + */ + if (timeout > 0 && !justconnected) + { + if (!libpq_select(timeout)) + return false; + + if (PQconsumeInput(streamConn) == 0) + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + } + justconnected = false; + + /* Receive CopyData message */ + rawlen = PQgetCopyData(streamConn, &recvBuf, 1); + if (rawlen == 0) /* no records available yet, then return */ + return false; + if (rawlen == -1) /* end-of-streaming or error */ + { + PGresult *res; + + res = PQgetResult(streamConn); + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("replication terminated by primary server"))); + } + PQclear(res); + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + } + if (rawlen < -1) + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + + if (rawlen < sizeof(XLogRecPtr)) + ereport(ERROR, + (errmsg("invalid WAL message received from primary"))); + + /* Return received WAL records to caller */ + *recptr = *((XLogRecPtr *) recvBuf); + *buffer = recvBuf + sizeof(XLogRecPtr); + *len = rawlen - sizeof(XLogRecPtr); + + return true; +} diff --git a/src/backend/replication/walreceiver/walreceiver.c b/src/backend/replication/walreceiver.c similarity index 66% rename from src/backend/replication/walreceiver/walreceiver.c rename to src/backend/replication/walreceiver.c index 65b1dfe1e6..f805e673e1 100644 --- a/src/backend/replication/walreceiver/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -21,24 +21,24 @@ * of the connection and a FATAL error are treated not as a crash but as * normal operation. * - * Walreceiver is a postmaster child process like others, but it's compiled - * as a dynamic module to avoid linking libpq with the main server binary. + * This file contains the server-facing parts of walreceiver. The libpq- + * specific parts are in the libpqwalreceiver module. It's loaded + * dynamically to avoid linking the server with libpq. * * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver/walreceiver.c,v 1.2 2010/01/16 01:55:28 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $ * *------------------------------------------------------------------------- */ #include "postgres.h" +#include #include -#include #include "access/xlog_internal.h" -#include "libpq-fe.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "replication/walreceiver.h" @@ -50,23 +50,10 @@ #include "utils/ps_status.h" #include "utils/resowner.h" -#ifdef HAVE_POLL_H -#include -#endif -#ifdef HAVE_SYS_POLL_H -#include -#endif -#ifdef HAVE_SYS_SELECT_H -#include -#endif - -PG_MODULE_MAGIC; - -PG_FUNCTION_INFO_V1(WalReceiverMain); -Datum WalReceiverMain(PG_FUNCTION_ARGS); - -/* streamConn is a PGconn object of a connection to walsender from walreceiver */ -static PGconn *streamConn = NULL; +/* libpqreceiver hooks to these when loaded */ +walrcv_connect_type walrcv_connect = NULL; +walrcv_receive_type walrcv_receive = NULL; +walrcv_disconnect_type walrcv_disconnect = NULL; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ @@ -79,16 +66,16 @@ static uint32 recvId = 0; static uint32 recvSeg = 0; static uint32 recvOff = 0; -/* Buffer for currently read records */ -static char *recvBuf = NULL; - -/* Flags set by interrupt handlers of walreceiver for later service in the main loop */ +/* + * Flags set by interrupt handlers of walreceiver for later service in the + * main loop. + */ static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; static void ProcessWalRcvInterrupts(void); -static void EnableImmediateExit(void); -static void DisableImmediateExit(void); +static void EnableWalRcvImmediateExit(void); +static void DisableWalRcvImmediateExit(void); /* * About SIGTERM handling: @@ -128,14 +115,14 @@ ProcessWalRcvInterrupts(void) } static void -EnableImmediateExit() +EnableWalRcvImmediateExit() { WalRcvImmediateInterruptOK = true; ProcessWalRcvInterrupts(); } static void -DisableImmediateExit() +DisableWalRcvImmediateExit() { WalRcvImmediateInterruptOK = false; ProcessWalRcvInterrupts(); @@ -147,12 +134,8 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); /* Prototypes for private functions */ -static void WalRcvLoop(void); static void InitWalRcv(void); -static void WalRcvConnect(void); -static bool WalRcvWait(int timeout_ms); static void WalRcvKill(int code, Datum arg); -static void XLogRecv(void); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(void); @@ -167,11 +150,21 @@ static struct } LogstreamResult; /* Main entry point for walreceiver process */ -Datum -WalReceiverMain(PG_FUNCTION_ARGS) +void +WalReceiverMain(void) { sigjmp_buf local_sigjmp_buf; MemoryContext walrcv_context; + char conninfo[MAXCONNINFO]; + XLogRecPtr startpoint; + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + if (walrcv_connect == NULL || walrcv_receive == NULL || + walrcv_disconnect == NULL) + elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* Mark walreceiver in progress */ InitWalRcv(); @@ -236,7 +229,7 @@ WalReceiverMain(PG_FUNCTION_ARGS) error_context_stack = NULL; /* Reset WalRcvImmediateInterruptOK */ - DisableImmediateExit(); + DisableWalRcvImmediateExit(); /* Prevent interrupts while cleaning up */ HOLD_INTERRUPTS(); @@ -244,12 +237,10 @@ WalReceiverMain(PG_FUNCTION_ARGS) /* Report the error to the server log */ EmitErrorReport(); - /* Free the data structure related to a connection */ - PQfinish(streamConn); - streamConn = NULL; - if (recvBuf != NULL) - PQfreemem(recvBuf); - recvBuf = NULL; + /* Disconnect any previous connection. */ + EnableWalRcvImmediateExit(); + walrcv_disconnect(); + DisableWalRcvImmediateExit(); /* * Now return to normal top-level context and clear ErrorContext for @@ -278,22 +269,24 @@ WalReceiverMain(PG_FUNCTION_ARGS) /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); + /* Fetch connection information from shared memory */ + SpinLockAcquire(&walrcv->mutex); + strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); + startpoint = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); + /* Establish the connection to the primary for XLOG streaming */ - WalRcvConnect(); + EnableWalRcvImmediateExit(); + walrcv_connect(conninfo, startpoint); + DisableWalRcvImmediateExit(); - /* Main loop of walreceiver */ - WalRcvLoop(); - - PG_RETURN_VOID(); /* WalRcvLoop() never returns, but keep compiler quiet */ -} - -/* Main loop of walreceiver process */ -static void -WalRcvLoop(void) -{ /* Loop until end-of-streaming or error */ for (;;) { + XLogRecPtr recptr; + char *buf; + int len; + /* * Emergency bailout if postmaster has died. This is to avoid the * necessity for manual cleanup of all postmaster children. @@ -319,14 +312,20 @@ WalRcvLoop(void) } /* Wait a while for data to arrive */ - if (WalRcvWait(NAPTIME_PER_CYCLE)) + if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len)) { - /* data has arrived. Process it */ - if (PQconsumeInput(streamConn) == 0) - ereport(ERROR, - (errmsg("could not read xlog records: %s", - PQerrorMessage(streamConn)))); - XLogRecv(); + /* Write received WAL records to disk */ + XLogWalRcvWrite(buf, len, recptr); + + /* Receive any more WAL records we can without sleeping */ + while(walrcv_receive(0, &recptr, &buf, &len)) + XLogWalRcvWrite(buf, len, recptr); + + /* + * Now that we've written some records, flush them to disk and + * let the startup process know about them. + */ + XLogWalRcvFlush(); } } } @@ -362,178 +361,6 @@ InitWalRcv(void) on_shmem_exit(WalRcvKill, 0); } -/* - * Establish the connection to the primary server for XLOG streaming - */ -static void -WalRcvConnect(void) -{ - char conninfo[MAXCONNINFO + 14]; - char *primary_sysid; - char standby_sysid[32]; - TimeLineID primary_tli; - TimeLineID standby_tli; - PGresult *res; - XLogRecPtr recptr; - char cmd[64]; - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* - * Set up a connection for XLOG streaming - */ - SpinLockAcquire(&walrcv->mutex); - snprintf(conninfo, sizeof(conninfo), "%s replication=true", walrcv->conninfo); - recptr = walrcv->receivedUpto; - SpinLockRelease(&walrcv->mutex); - - /* initialize local XLOG pointers */ - LogstreamResult.Write = LogstreamResult.Flush = recptr; - - Assert(recptr.xlogid != 0 || recptr.xrecoff != 0); - - EnableImmediateExit(); - streamConn = PQconnectdb(conninfo); - DisableImmediateExit(); - if (PQstatus(streamConn) != CONNECTION_OK) - ereport(ERROR, - (errmsg("could not connect to the primary server : %s", - PQerrorMessage(streamConn)))); - - /* - * Get the system identifier and timeline ID as a DataRow message - * from the primary server. - */ - EnableImmediateExit(); - res = PQexec(streamConn, "IDENTIFY_SYSTEM"); - DisableImmediateExit(); - if (PQresultStatus(res) != PGRES_TUPLES_OK) - { - PQclear(res); - ereport(ERROR, - (errmsg("could not receive the SYSID and timeline ID from " - "the primary server: %s", - PQerrorMessage(streamConn)))); - } - if (PQnfields(res) != 2 || PQntuples(res) != 1) - { - int ntuples = PQntuples(res); - int nfields = PQnfields(res); - PQclear(res); - ereport(ERROR, - (errmsg("invalid response from primary server"), - errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields", - ntuples, nfields))); - } - primary_sysid = PQgetvalue(res, 0, 0); - primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); - - /* - * Confirm that the system identifier of the primary is the same - * as ours. - */ - snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, - GetSystemIdentifier()); - if (strcmp(primary_sysid, standby_sysid) != 0) - { - PQclear(res); - ereport(ERROR, - (errmsg("system differs between the primary and standby"), - errdetail("the primary SYSID is %s, standby SYSID is %s", - primary_sysid, standby_sysid))); - } - - /* - * Confirm that the current timeline of the primary is the same - * as the recovery target timeline. - */ - standby_tli = GetRecoveryTargetTLI(); - PQclear(res); - if (primary_tli != standby_tli) - ereport(ERROR, - (errmsg("timeline %u of the primary does not match recovery target timeline %u", - primary_tli, standby_tli))); - ThisTimeLineID = primary_tli; - - /* Start streaming from the point requested by startup process */ - snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", recptr.xlogid, recptr.xrecoff); - EnableImmediateExit(); - res = PQexec(streamConn, cmd); - DisableImmediateExit(); - if (PQresultStatus(res) != PGRES_COPY_OUT) - ereport(ERROR, - (errmsg("could not start XLOG streaming: %s", - PQerrorMessage(streamConn)))); - PQclear(res); - - /* - * Process the outstanding messages before beginning to wait for - * new message to arrive. - */ - XLogRecv(); -} - -/* - * Wait until we can read WAL stream, or timeout. - * - * Returns true if data has become available for reading, false if timed out - * or interrupted by signal. - * - * This is based on pqSocketCheck. - */ -static bool -WalRcvWait(int timeout_ms) -{ - int ret; - - Assert(streamConn != NULL); - if (PQsocket(streamConn) < 0) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("socket not open"))); - - /* We use poll(2) if available, otherwise select(2) */ - { -#ifdef HAVE_POLL - struct pollfd input_fd; - - input_fd.fd = PQsocket(streamConn); - input_fd.events = POLLIN | POLLERR; - input_fd.revents = 0; - - ret = poll(&input_fd, 1, timeout_ms); -#else /* !HAVE_POLL */ - - fd_set input_mask; - struct timeval timeout; - struct timeval *ptr_timeout; - - FD_ZERO(&input_mask); - FD_SET(PQsocket(streamConn), &input_mask); - - if (timeout_ms < 0) - ptr_timeout = NULL; - else - { - timeout.tv_sec = timeout_ms / 1000; - timeout.tv_usec = (timeout_ms % 1000) * 1000; - ptr_timeout = &timeout; - } - - ret = select(PQsocket(streamConn) + 1, &input_mask, - NULL, NULL, ptr_timeout); -#endif /* HAVE_POLL */ - } - - if (ret == 0 || (ret < 0 && errno == EINTR)) - return false; - if (ret < 0) - ereport(ERROR, - (errcode_for_socket_access(), - errmsg("select() failed: %m"))); - return true; -} - /* * Clear our pid from shared memory at exit. */ @@ -555,7 +382,7 @@ WalRcvKill(int code, Datum arg) walrcv->pid = 0; SpinLockRelease(&walrcv->mutex); - PQfinish(streamConn); + walrcv_disconnect(); /* If requested to stop, tell postmaster to not restart us. */ if (stopped) @@ -612,64 +439,6 @@ WalRcvQuickDieHandler(SIGNAL_ARGS) exit(2); } -/* - * Receive any WAL records available without blocking from XLOG stream and - * write it to the disk. - */ -static void -XLogRecv(void) -{ - XLogRecPtr *recptr; - int len; - - for (;;) - { - /* Receive CopyData message */ - len = PQgetCopyData(streamConn, &recvBuf, 1); - if (len == 0) /* no records available yet, then return */ - break; - if (len == -1) /* end-of-streaming or error */ - { - PGresult *res; - - res = PQgetResult(streamConn); - if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - PQclear(res); - ereport(ERROR, - (errmsg("replication terminated by primary server"))); - } - PQclear(res); - ereport(ERROR, - (errmsg("could not read xlog records: %s", - PQerrorMessage(streamConn)))); - } - if (len < -1) - ereport(ERROR, - (errmsg("could not read xlog records: %s", - PQerrorMessage(streamConn)))); - - if (len < sizeof(XLogRecPtr)) - ereport(ERROR, - (errmsg("invalid WAL message received from primary"))); - - /* Write received WAL records to disk */ - recptr = (XLogRecPtr *) recvBuf; - XLogWalRcvWrite(recvBuf + sizeof(XLogRecPtr), - len - sizeof(XLogRecPtr), *recptr); - - if (recvBuf != NULL) - PQfreemem(recvBuf); - recvBuf = NULL; - } - - /* - * Now that we've written some records, flush them to disk and let the - * startup process know about them. - */ - XLogWalRcvFlush(); -} - /* * Write XLOG data to disk. */ diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 4342e252d6..c1d7b55887 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -4,13 +4,13 @@ * * This file contains functions used by the startup process to communicate * with the walreceiver process. Functions implementing walreceiver itself - * are in src/backend/replication/walreceiver subdirectory. + * are in walreceiver.c. * * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.1 2010/01/15 09:19:03 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $ * *------------------------------------------------------------------------- */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f848a9e509..57de368d41 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -5,13 +5,14 @@ * * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.2 2010/01/16 00:04:41 tgl Exp $ + * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.3 2010/01/20 09:16:24 heikki Exp $ * *------------------------------------------------------------------------- */ #ifndef _WALRECEIVER_H #define _WALRECEIVER_H +#include "access/xlogdefs.h" #include "storage/spin.h" /* @@ -60,6 +61,17 @@ typedef struct extern PGDLLIMPORT WalRcvData *WalRcv; +/* libpqwalreceiver hooks */ +typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint); +extern PGDLLIMPORT walrcv_connect_type walrcv_connect; + +typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len); +extern PGDLLIMPORT walrcv_receive_type walrcv_receive; + +typedef void (*walrcv_disconnect_type) (void); +extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect; + +extern void WalReceiverMain(void); extern Size WalRcvShmemSize(void); extern void WalRcvShmemInit(void); extern bool WalRcvInProgress(void);