/*------------------------------------------------------------------------- * * walsender.c * * The WAL sender process (walsender) is new as of Postgres 9.0. It takes * care of sending XLOG from the primary server to a single recipient. * (Note that there can be more than one walsender process concurrently.) * It is started by the postmaster when the walreceiver of a standby server * connects to the primary server and requests XLOG streaming replication. * * A walsender is similar to a regular backend, ie. there is a one-to-one * relationship between a connection and a walsender process, but instead * of processing SQL queries, it understands a small set of special * replication-mode commands. The START_REPLICATION command begins streaming * WAL to the client. While streaming, the walsender keeps reading XLOG * records from the disk and sends them to the standby server over the * COPY protocol, until either side ends the replication by exiting COPY * mode (or until the connection is closed). * * Normal termination is by SIGTERM, which instructs the walsender to * close the connection and exit(0) at the next convenient moment. Emergency * termination is by SIGQUIT; like any backend, the walsender will simply * abort and exit on SIGQUIT. A close of the connection and a FATAL error * are treated as not a crash but approximately normal termination; * the walsender will exit quickly without sending any more XLOG records. * * If the server is shut down, checkpointer sends us * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited. If * the backend is idle or runs an SQL query this causes the backend to * shutdown, if logical replication is in progress all existing WAL records * are processed followed by a shutdown. Otherwise this causes the walsender * to switch to the "stopping" state. In this state, the walsender will reject * any further replication commands. The checkpointer begins the shutdown * checkpoint once all walsenders are confirmed as stopping. When the shutdown * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs * walsender to send any outstanding WAL, including the shutdown checkpoint * record, wait for it to be replicated to the standby, and then exit. * * * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group * * IDENTIFICATION * src/backend/replication/walsender.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include #include #include "access/printtup.h" #include "access/timeline.h" #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" #include "access/xlogutils.h" #include "catalog/pg_authid.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" #include "commands/defrem.h" #include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "miscadmin.h" #include "nodes/replnodes.h" #include "pgstat.h" #include "postmaster/interrupt.h" #include "replication/basebackup.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/logicalfuncs.h" #include "replication/slot.h" #include "replication/snapbuild.h" #include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/walsender_private.h" #include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/proc.h" #include "storage/procarray.h" #include "tcop/dest.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/portal.h" #include "utils/ps_status.h" #include "utils/timeout.h" #include "utils/timestamp.h" /* * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ. * * We don't have a good idea of what a good value would be; there's some * overhead per message in both walsender and walreceiver, but on the other * hand sending large batches makes walsender less responsive to signals * because signals are checked only between messages. 128kB (with * default 8k blocks) seems like a reasonable guess for now. */ #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* Array of WalSnds in shared memory */ WalSndCtlData *WalSndCtl = NULL; /* My slot in the shared memory array */ WalSnd *MyWalSnd = NULL; /* Global state */ bool am_walsender = false; /* Am I a walsender process? */ bool am_cascading_walsender = false; /* Am I cascading WAL to another * standby? */ bool am_db_walsender = false; /* Connected to a database? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent * walsenders */ int wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL * data message */ bool log_replication_commands = false; /* * State for WalSndWakeupRequest */ bool wake_wal_senders = false; static WALOpenSegment *sendSeg = NULL; static WALSegmentContext *sendCxt = NULL; /* * These variables keep track of the state of the timeline we're currently * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric, * the timeline is not the latest timeline on this server, and the server's * history forked off from that timeline at sendTimeLineValidUpto. */ static TimeLineID sendTimeLine = 0; static TimeLineID sendTimeLineNextTLI = 0; static bool sendTimeLineIsHistoric = false; static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; /* * How far have we sent WAL already? This is also advertised in * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) */ static XLogRecPtr sentPtr = 0; /* Buffers for constructing outgoing messages and processing reply messages. */ static StringInfoData output_message; static StringInfoData reply_message; static StringInfoData tmpbuf; /* Timestamp of last ProcessRepliesIfAny(). */ static TimestampTz last_processing = 0; /* * Timestamp of last ProcessRepliesIfAny() that saw a reply from the * standby. Set to 0 if wal_sender_timeout doesn't need to be active. */ static TimestampTz last_reply_timestamp = 0; /* Have we sent a heartbeat message asking for reply, since last reply? */ static bool waiting_for_ping_response = false; /* * While streaming WAL in Copy mode, streamingDoneSending is set to true * after we have sent CopyDone. We should not send any more CopyData messages * after that. streamingDoneReceiving is set to true when we receive CopyDone * from the other end. When both become true, it's time to exit Copy mode. */ static bool streamingDoneSending; static bool streamingDoneReceiving; /* Are we there yet? */ static bool WalSndCaughtUp = false; /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGUSR2 = false; static volatile sig_atomic_t got_STOPPING = false; /* * This is set while we are streaming. When not set * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set, * the main loop is responsible for checking got_STOPPING and terminating when * it's set (after streaming any remaining WAL). */ static volatile sig_atomic_t replication_active = false; static LogicalDecodingContext *logical_decoding_ctx = NULL; /* A sample associating a WAL location with the time it was written. */ typedef struct { XLogRecPtr lsn; TimestampTz time; } WalTimeSample; /* The size of our buffer of time samples. */ #define LAG_TRACKER_BUFFER_SIZE 8192 /* A mechanism for tracking replication lag. */ typedef struct { XLogRecPtr last_lsn; WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]; int write_head; int read_heads[NUM_SYNC_REP_WAIT_MODE]; WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]; } LagTracker; static LagTracker *lag_tracker; /* Signal handlers */ static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ typedef void (*WalSndSendDataCallback) (void); static void WalSndLoop(WalSndSendDataCallback send_data); static void InitWalSenderSlot(void); static void WalSndKill(int code, Datum arg); static void WalSndShutdown(void) pg_attribute_noreturn(); static void XLogSendPhysical(void); static void XLogSendLogical(void); static void WalSndDone(WalSndSendDataCallback send_data); static XLogRecPtr GetStandbyFlushRecPtr(void); static void IdentifySystem(void); static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); static void DropReplicationSlot(DropReplicationSlotCmd *cmd); static void StartReplication(StartReplicationCmd *cmd); static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); static void WalSndKeepalive(bool requestReply); static void WalSndKeepaliveIfNecessary(void); static void WalSndCheckTimeOut(void); static long WalSndComputeSleeptime(TimestampTz now); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p); static void UpdateSpillStats(LogicalDecodingContext *ctx); /* Initialize walsender process before entering the main command loop */ void InitWalSender(void) { am_cascading_walsender = RecoveryInProgress(); /* Create a per-walsender data structure in shared memory */ InitWalSenderSlot(); /* * We don't currently need any ResourceOwner in a walsender process, but * if we did, we could call CreateAuxProcessResourceOwner here. */ /* * Let postmaster know that we're a WAL sender. Once we've declared us as * a WAL sender process, postmaster will let us outlive the bgwriter and * kill us last in the shutdown sequence, so we get a chance to stream all * remaining WAL at shutdown, including the shutdown checkpoint. Note that * there's no going back, and we mustn't write any WAL records after this. */ MarkPostmasterChildWalSender(); SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); /* Initialize empty timestamp buffer for lag tracking. */ lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker)); /* Make sure we can remember the current read position in XLOG. */ sendSeg = (WALOpenSegment *) MemoryContextAlloc(TopMemoryContext, sizeof(WALOpenSegment)); sendCxt = (WALSegmentContext *) MemoryContextAlloc(TopMemoryContext, sizeof(WALSegmentContext)); WALOpenSegmentInit(sendSeg, sendCxt, wal_segment_size, NULL); } /* * Clean up after an error. * * WAL sender processes don't use transactions like regular backends do. * This function does any cleanup required after an error in a WAL sender * process, similar to what transaction abort does in a regular backend. */ void WalSndErrorCleanup(void) { LWLockReleaseAll(); ConditionVariableCancelSleep(); pgstat_report_wait_end(); if (sendSeg->ws_file >= 0) { close(sendSeg->ws_file); sendSeg->ws_file = -1; } if (MyReplicationSlot != NULL) ReplicationSlotRelease(); ReplicationSlotCleanup(); replication_active = false; if (got_STOPPING || got_SIGUSR2) proc_exit(0); /* Revert back to startup state */ WalSndSetState(WALSNDSTATE_STARTUP); } /* * Handle a client's connection abort in an orderly manner. */ static void WalSndShutdown(void) { /* * Reset whereToSendOutput to prevent ereport from attempting to send any * more messages to the standby. */ if (whereToSendOutput == DestRemote) whereToSendOutput = DestNone; proc_exit(0); abort(); /* keep the compiler quiet */ } /* * Handle the IDENTIFY_SYSTEM command. */ static void IdentifySystem(void) { char sysid[32]; char xloc[MAXFNAMELEN]; XLogRecPtr logptr; char *dbname = NULL; DestReceiver *dest; TupOutputState *tstate; TupleDesc tupdesc; Datum values[4]; bool nulls[4]; /* * Reply with a result set with one row, four columns. First col is system * ID, second is timeline ID, third is current xlog location and the * fourth contains the database name if we are connected to one. */ snprintf(sysid, sizeof(sysid), UINT64_FORMAT, GetSystemIdentifier()); am_cascading_walsender = RecoveryInProgress(); if (am_cascading_walsender) { /* this also updates ThisTimeLineID */ logptr = GetStandbyFlushRecPtr(); } else logptr = GetFlushRecPtr(); snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr); if (MyDatabaseId != InvalidOid) { MemoryContext cur = CurrentMemoryContext; /* syscache access needs a transaction env. */ StartTransactionCommand(); /* make dbname live outside TX context */ MemoryContextSwitchTo(cur); dbname = get_database_name(MyDatabaseId); CommitTransactionCommand(); /* CommitTransactionCommand switches to TopMemoryContext */ MemoryContextSwitchTo(cur); } dest = CreateDestReceiver(DestRemoteSimple); MemSet(nulls, false, sizeof(nulls)); /* need a tuple descriptor representing four columns */ tupdesc = CreateTemplateTupleDesc(4); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid", TEXTOID, -1, 0); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline", INT4OID, -1, 0); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos", TEXTOID, -1, 0); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname", TEXTOID, -1, 0); /* prepare for projection of tuples */ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); /* column 1: system identifier */ values[0] = CStringGetTextDatum(sysid); /* column 2: timeline */ values[1] = Int32GetDatum(ThisTimeLineID); /* column 3: wal location */ values[2] = CStringGetTextDatum(xloc); /* column 4: database name, or NULL if none */ if (dbname) values[3] = CStringGetTextDatum(dbname); else nulls[3] = true; /* send it to dest */ do_tup_output(tstate, values, nulls); end_tup_output(tstate); } /* * Handle TIMELINE_HISTORY command. */ static void SendTimeLineHistory(TimeLineHistoryCmd *cmd) { StringInfoData buf; char histfname[MAXFNAMELEN]; char path[MAXPGPATH]; int fd; off_t histfilelen; off_t bytesleft; Size len; /* * Reply with a result set with one row, and two columns. The first col is * the name of the history file, 2nd is the contents. */ TLHistoryFileName(histfname, cmd->timeline); TLHistoryFilePath(path, cmd->timeline); /* Send a RowDescription message */ pq_beginmessage(&buf, 'T'); pq_sendint16(&buf, 2); /* 2 fields */ /* first field */ pq_sendstring(&buf, "filename"); /* col name */ pq_sendint32(&buf, 0); /* table oid */ pq_sendint16(&buf, 0); /* attnum */ pq_sendint32(&buf, TEXTOID); /* type oid */ pq_sendint16(&buf, -1); /* typlen */ pq_sendint32(&buf, 0); /* typmod */ pq_sendint16(&buf, 0); /* format code */ /* second field */ pq_sendstring(&buf, "content"); /* col name */ pq_sendint32(&buf, 0); /* table oid */ pq_sendint16(&buf, 0); /* attnum */ pq_sendint32(&buf, BYTEAOID); /* type oid */ pq_sendint16(&buf, -1); /* typlen */ pq_sendint32(&buf, 0); /* typmod */ pq_sendint16(&buf, 0); /* format code */ pq_endmessage(&buf); /* Send a DataRow message */ pq_beginmessage(&buf, 'D'); pq_sendint16(&buf, 2); /* # of columns */ len = strlen(histfname); pq_sendint32(&buf, len); /* col1 len */ pq_sendbytes(&buf, histfname, len); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); if (fd < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); /* Determine file length and send it to client */ histfilelen = lseek(fd, 0, SEEK_END); if (histfilelen < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek to end of file \"%s\": %m", path))); if (lseek(fd, 0, SEEK_SET) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not seek to beginning of file \"%s\": %m", path))); pq_sendint32(&buf, histfilelen); /* col2 len */ bytesleft = histfilelen; while (bytesleft > 0) { PGAlignedBlock rbuf; int nread; pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ); nread = read(fd, rbuf.data, sizeof(rbuf)); pgstat_report_wait_end(); if (nread < 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read file \"%s\": %m", path))); else if (nread == 0) ereport(ERROR, (errcode(ERRCODE_DATA_CORRUPTED), errmsg("could not read file \"%s\": read %d of %zu", path, nread, (Size) bytesleft))); pq_sendbytes(&buf, rbuf.data, nread); bytesleft -= nread; } if (CloseTransientFile(fd) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not close file \"%s\": %m", path))); pq_endmessage(&buf); } /* * Handle START_REPLICATION command. * * At the moment, this never returns, but an ereport(ERROR) will take us back * to the main loop. */ static void StartReplication(StartReplicationCmd *cmd) { StringInfoData buf; XLogRecPtr FlushPtr; if (ThisTimeLineID == 0) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION"))); /* * We assume here that we're logging enough information in the WAL for * log-shipping, since this is checked in PostmasterMain(). * * NOTE: wal_level can only change at shutdown, so in most cases it is * difficult for there to be WAL data that we can still see that was * written at wal_level='minimal'. */ if (cmd->slotname) { ReplicationSlotAcquire(cmd->slotname, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), (errmsg("cannot use a logical replication slot for physical replication")))); } /* * Select the timeline. If it was given explicitly by the client, use * that. Otherwise use the timeline of the last replayed record, which is * kept in ThisTimeLineID. */ if (am_cascading_walsender) { /* this also updates ThisTimeLineID */ FlushPtr = GetStandbyFlushRecPtr(); } else FlushPtr = GetFlushRecPtr(); if (cmd->timeline != 0) { XLogRecPtr switchpoint; sendTimeLine = cmd->timeline; if (sendTimeLine == ThisTimeLineID) { sendTimeLineIsHistoric = false; sendTimeLineValidUpto = InvalidXLogRecPtr; } else { List *timeLineHistory; sendTimeLineIsHistoric = true; /* * Check that the timeline the client requested exists, and the * requested start location is on that timeline. */ timeLineHistory = readTimeLineHistory(ThisTimeLineID); switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, &sendTimeLineNextTLI); list_free_deep(timeLineHistory); /* * Found the requested timeline in the history. Check that * requested startpoint is on that timeline in our history. * * This is quite loose on purpose. We only check that we didn't * fork off the requested timeline before the switchpoint. We * don't check that we switched *to* it before the requested * starting point. This is because the client can legitimately * request to start replication from the beginning of the WAL * segment that contains switchpoint, but on the new timeline, so * that it doesn't end up with a partial segment. If you ask for * too old a starting point, you'll get an error later when we * fail to find the requested WAL segment in pg_wal. * * XXX: we could be more strict here and only allow a startpoint * that's older than the switchpoint, if it's still in the same * WAL segment. */ if (!XLogRecPtrIsInvalid(switchpoint) && switchpoint < cmd->startpoint) { ereport(ERROR, (errmsg("requested starting point %X/%X on timeline %u is not in this server's history", (uint32) (cmd->startpoint >> 32), (uint32) (cmd->startpoint), cmd->timeline), errdetail("This server's history forked from timeline %u at %X/%X.", cmd->timeline, (uint32) (switchpoint >> 32), (uint32) (switchpoint)))); } sendTimeLineValidUpto = switchpoint; } } else { sendTimeLine = ThisTimeLineID; sendTimeLineValidUpto = InvalidXLogRecPtr; sendTimeLineIsHistoric = false; } streamingDoneSending = streamingDoneReceiving = false; /* If there is nothing to stream, don't even enter COPY mode */ if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto) { /* * When we first start replication the standby will be behind the * primary. For some applications, for example synchronous * replication, it is important to have a clear state for this initial * catchup mode, so we can trigger actions when we change streaming * state later. We may stay in this state for a long time, which is * exactly why we want to be able to monitor whether or not we are * still here. */ WalSndSetState(WALSNDSTATE_CATCHUP); /* Send a CopyBothResponse message, and start streaming */ pq_beginmessage(&buf, 'W'); pq_sendbyte(&buf, 0); pq_sendint16(&buf, 0); pq_endmessage(&buf); pq_flush(); /* * Don't allow a request to stream from a future point in WAL that * hasn't been flushed to disk in this server yet. */ if (FlushPtr < cmd->startpoint) { ereport(ERROR, (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X", (uint32) (cmd->startpoint >> 32), (uint32) (cmd->startpoint), (uint32) (FlushPtr >> 32), (uint32) (FlushPtr)))); } /* Start streaming from the requested point */ sentPtr = cmd->startpoint; /* Initialize shared memory status, too */ SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sentPtr = sentPtr; SpinLockRelease(&MyWalSnd->mutex); SyncRepInitConfig(); /* Main loop of walsender */ replication_active = true; WalSndLoop(XLogSendPhysical); replication_active = false; if (got_STOPPING) proc_exit(0); WalSndSetState(WALSNDSTATE_STARTUP); Assert(streamingDoneSending && streamingDoneReceiving); } if (cmd->slotname) ReplicationSlotRelease(); /* * Copy is finished now. Send a single-row result set indicating the next * timeline. */ if (sendTimeLineIsHistoric) { char startpos_str[8 + 1 + 8 + 1]; DestReceiver *dest; TupOutputState *tstate; TupleDesc tupdesc; Datum values[2]; bool nulls[2]; snprintf(startpos_str, sizeof(startpos_str), "%X/%X", (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto); dest = CreateDestReceiver(DestRemoteSimple); MemSet(nulls, false, sizeof(nulls)); /* * Need a tuple descriptor representing two columns. int8 may seem * like a surprising data type for this, but in theory int4 would not * be wide enough for this, as TimeLineID is unsigned. */ tupdesc = CreateTemplateTupleDesc(2); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli", INT8OID, -1, 0); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos", TEXTOID, -1, 0); /* prepare for projection of tuple */ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); values[0] = Int64GetDatum((int64) sendTimeLineNextTLI); values[1] = CStringGetTextDatum(startpos_str); /* send it to dest */ do_tup_output(tstate, values, nulls); end_tup_output(tstate); } /* Send CommandComplete message */ pq_puttextmessage('C', "START_STREAMING"); } /* * read_page callback for logical decoding contexts, as a walsender process. * * Inside the walsender we can do better than logical_read_local_xlog_page, * which has to do a plain sleep/busy loop, because the walsender's latch gets * set every time WAL is flushed. */ static int logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page) { XLogRecPtr flushptr; int count; WALReadError errinfo; XLogSegNo segno; XLogReadDetermineTimeline(state, targetPagePtr, reqLen); sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID); sendTimeLine = state->currTLI; sendTimeLineValidUpto = state->currTLIValidUntil; sendTimeLineNextTLI = state->nextTLI; /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) return -1; if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; /* more than one block available */ else count = flushptr - targetPagePtr; /* part of the page available */ /* now actually read the data, we know it's there */ if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, sendSeg->ws_tli, /* Pass the current TLI because only * WalSndSegmentOpen controls whether new * TLI is needed. */ sendSeg, sendCxt, WalSndSegmentOpen, &errinfo)) WALReadRaiseError(&errinfo); /* * After reading into the buffer, check that what we read was valid. We do * this after reading, because even though the segment was present when we * opened it, it might get recycled or removed while we read it. The * read() succeeds in that case, but the data we tried to read might * already have been overwritten with new WAL records. */ XLByteToSeg(targetPagePtr, segno, sendCxt->ws_segsize); CheckXLogRemoved(segno, sendSeg->ws_tli); return count; } /* * Process extra options given to CREATE_REPLICATION_SLOT. */ static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; /* Parse options */ foreach(lc, cmd->options) { DefElem *defel = (DefElem *) lfirst(lc); if (strcmp(defel->defname, "export_snapshot") == 0) { if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); snapshot_action_given = true; *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT : CRS_NOEXPORT_SNAPSHOT; } else if (strcmp(defel->defname, "use_snapshot") == 0) { if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); snapshot_action_given = true; *snapshot_action = CRS_USE_SNAPSHOT; } else if (strcmp(defel->defname, "reserve_wal") == 0) { if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); reserve_wal_given = true; *reserve_wal = true; } else elog(ERROR, "unrecognized option: %s", defel->defname); } } /* * Create a new replication slot. */ static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { const char *snapshot_name = NULL; char xloc[MAXFNAMELEN]; char *slot_name; bool reserve_wal = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; TupleDesc tupdesc; Datum values[4]; bool nulls[4]; Assert(!MyReplicationSlot); parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action); /* setup state for XLogRead */ sendTimeLineIsHistoric = false; sendTimeLine = ThisTimeLineID; if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT); } else { CheckLogicalDecodingRequirements(); /* * Initially create persistent slot as ephemeral - that allows us to * nicely handle errors during initialization because it'll get * dropped if this transaction fails. We'll make it persistent at the * end. Temporary slots can be created as temporary from beginning as * they get dropped on error as well. */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL); } if (cmd->kind == REPLICATION_KIND_LOGICAL) { LogicalDecodingContext *ctx; bool need_full_snapshot = false; /* * Do options check early so that we can bail before calling the * DecodingContextFindStartpoint which can take long time. */ if (snapshot_action == CRS_EXPORT_SNAPSHOT) { if (IsTransactionBlock()) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must not be called inside a transaction", "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT"))); need_full_snapshot = true; } else if (snapshot_action == CRS_USE_SNAPSHOT) { if (!IsTransactionBlock()) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must be called inside a transaction", "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); if (XactIsoLevel != XACT_REPEATABLE_READ) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must be called in REPEATABLE READ isolation mode transaction", "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); if (FirstSnapshotSet) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must be called before any query", "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); if (IsSubTransaction()) ereport(ERROR, /*- translator: %s is a CREATE_REPLICATION_SLOT statement */ (errmsg("%s must not be called in a subtransaction", "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT"))); need_full_snapshot = true; } ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, InvalidXLogRecPtr, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); /* * Signal that we don't need the timeout mechanism. We're just * creating the replication slot and don't yet accept feedback * messages or send keepalives. As we possibly need to wait for * further WAL the walsender would otherwise possibly be killed too * soon. */ last_reply_timestamp = 0; /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); /* * Export or use the snapshot if we've been asked to do so. * * NB. We will convert the snapbuild.c kind of snapshot to normal * snapshot when doing this. */ if (snapshot_action == CRS_EXPORT_SNAPSHOT) { snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder); } else if (snapshot_action == CRS_USE_SNAPSHOT) { Snapshot snap; snap = SnapBuildInitialSnapshot(ctx->snapshot_builder); RestoreTransactionSnapshot(snap, MyProc); } /* don't need the decoding context anymore */ FreeDecodingContext(ctx); if (!cmd->temporary) ReplicationSlotPersist(); } else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal) { ReplicationSlotReserveWal(); ReplicationSlotMarkDirty(); /* Write this slot to disk if it's a permanent one. */ if (!cmd->temporary) ReplicationSlotSave(); } snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (MyReplicationSlot->data.confirmed_flush >> 32), (uint32) MyReplicationSlot->data.confirmed_flush); dest = CreateDestReceiver(DestRemoteSimple); MemSet(nulls, false, sizeof(nulls)); /*---------- * Need a tuple descriptor representing four columns: * - first field: the slot name * - second field: LSN at which we became consistent * - third field: exported snapshot's name * - fourth field: output plugin *---------- */ tupdesc = CreateTemplateTupleDesc(4); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name", TEXTOID, -1, 0); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point", TEXTOID, -1, 0); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name", TEXTOID, -1, 0); TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin", TEXTOID, -1, 0); /* prepare for projection of tuples */ tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); /* slot_name */ slot_name = NameStr(MyReplicationSlot->data.name); values[0] = CStringGetTextDatum(slot_name); /* consistent wal location */ values[1] = CStringGetTextDatum(xloc); /* snapshot name, or NULL if none */ if (snapshot_name != NULL) values[2] = CStringGetTextDatum(snapshot_name); else nulls[2] = true; /* plugin, or NULL if none */ if (cmd->plugin != NULL) values[3] = CStringGetTextDatum(cmd->plugin); else nulls[3] = true; /* send it to dest */ do_tup_output(tstate, values, nulls); end_tup_output(tstate); ReplicationSlotRelease(); } /* * Get rid of a replication slot that is no longer wanted. */ static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { ReplicationSlotDrop(cmd->slotname, !cmd->wait); EndCommand("DROP_REPLICATION_SLOT", DestRemote); } /* * Load previously initiated logical slot and prepare for sending data (via * WalSndLoop). */ static void StartLogicalReplication(StartReplicationCmd *cmd) { StringInfoData buf; /* make sure that our requirements are still fulfilled */ CheckLogicalDecodingRequirements(); Assert(!MyReplicationSlot); ReplicationSlotAcquire(cmd->slotname, true); /* * Force a disconnect, so that the decoding code doesn't need to care * about an eventual switch from running in recovery, to running in a * normal environment. Client code is expected to handle reconnects. */ if (am_cascading_walsender && !RecoveryInProgress()) { ereport(LOG, (errmsg("terminating walsender process after promotion"))); got_STOPPING = true; } /* * Create our decoding context, making it start at the previously ack'ed * position. * * Do this before sending a CopyBothResponse message, so that any errors * are reported early. */ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, false, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData, WalSndUpdateProgress); WalSndSetState(WALSNDSTATE_CATCHUP); /* Send a CopyBothResponse message, and start streaming */ pq_beginmessage(&buf, 'W'); pq_sendbyte(&buf, 0); pq_sendint16(&buf, 0); pq_endmessage(&buf); pq_flush(); /* Start reading WAL from the oldest required WAL. */ XLogBeginRead(logical_decoding_ctx->reader, MyReplicationSlot->data.restart_lsn); /* * Report the location after which we'll send out further commits as the * current sentPtr. */ sentPtr = MyReplicationSlot->data.confirmed_flush; /* Also update the sent position status in shared memory */ SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn; SpinLockRelease(&MyWalSnd->mutex); replication_active = true; SyncRepInitConfig(); /* Main loop of walsender */ WalSndLoop(XLogSendLogical); FreeDecodingContext(logical_decoding_ctx); ReplicationSlotRelease(); replication_active = false; if (got_STOPPING) proc_exit(0); WalSndSetState(WALSNDSTATE_STARTUP); /* Get out of COPY mode (CommandComplete). */ EndCommand("COPY 0", DestRemote); } /* * LogicalDecodingContext 'prepare_write' callback. * * Prepare a write into a StringInfo. * * Don't do anything lasting in here, it's quite possible that nothing will be done * with the data. */ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write) { /* can't have sync rep confused by sending the same LSN several times */ if (!last_write) lsn = InvalidXLogRecPtr; resetStringInfo(ctx->out); pq_sendbyte(ctx->out, 'w'); pq_sendint64(ctx->out, lsn); /* dataStart */ pq_sendint64(ctx->out, lsn); /* walEnd */ /* * Fill out the sendtime later, just as it's done in XLogSendPhysical, but * reserve space here. */ pq_sendint64(ctx->out, 0); /* sendtime */ } /* * LogicalDecodingContext 'write' callback. * * Actually write out data previously prepared by WalSndPrepareWrite out to * the network. Take as long as needed, but process replies from the other * side and check timeouts during that. */ static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write) { TimestampTz now; /* * Fill the send timestamp last, so that it is taken as late as possible. * This is somewhat ugly, but the protocol is set as it's already used for * several releases by streaming physical replication. */ resetStringInfo(&tmpbuf); now = GetCurrentTimestamp(); pq_sendint64(&tmpbuf, now); memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); /* output previously gathered data in a CopyData packet */ pq_putmessage_noblock('d', ctx->out->data, ctx->out->len); CHECK_FOR_INTERRUPTS(); /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); /* Try taking fast path unless we get too close to walsender timeout. */ if (now < TimestampTzPlusMilliseconds(last_reply_timestamp, wal_sender_timeout / 2) && !pq_is_send_pending()) { return; } /* If we have pending write here, go to slow path */ for (;;) { int wakeEvents; long sleeptime; /* Check for input from the client */ ProcessRepliesIfAny(); /* die if timeout was reached */ WalSndCheckTimeOut(); /* Send keepalive if the time has come */ WalSndKeepaliveIfNecessary(); if (!pq_is_send_pending()) break; sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; /* Sleep until something happens or we time out */ (void) WaitLatchOrSocket(MyLatch, wakeEvents, MyProcPort->sock, sleeptime, WAIT_EVENT_WAL_SENDER_WRITE_DATA); /* Clear any already-pending wakeups */ ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); /* Process any requests or signals received recently */ if (ConfigReloadPending) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); } /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); } /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); } /* * LogicalDecodingContext 'update_progress' callback. * * Write the current position to the lag tracker (see XLogSendPhysical), * and update the spill statistics. */ static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) { static TimestampTz sendTime = 0; TimestampTz now = GetCurrentTimestamp(); /* * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to * avoid flooding the lag tracker when we commit frequently. */ #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 if (!TimestampDifferenceExceeds(sendTime, now, WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) return; LagTrackerWrite(lsn, now); sendTime = now; /* * Update statistics about transactions that spilled to disk. */ UpdateSpillStats(ctx); } /* * Wait till WAL < loc is flushed to disk so it can be safely sent to client. * * Returns end LSN of flushed WAL. Normally this will be >= loc, but * if we detect a shutdown request (either from postmaster or client) * we will return early, so caller must always check. */ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; /* * Fast path to avoid acquiring the spinlock in case we already know we * have enough WAL available. This is particularly interesting if we're * far behind. */ if (RecentFlushPtr != InvalidXLogRecPtr && loc <= RecentFlushPtr) return RecentFlushPtr; /* Get a more recent flush pointer. */ if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(); else RecentFlushPtr = GetXLogReplayRecPtr(NULL); for (;;) { long sleeptime; /* Clear any already-pending wakeups */ ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); /* Process any requests or signals received recently */ if (ConfigReloadPending) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); } /* Check for input from the client */ ProcessRepliesIfAny(); /* * If we're shutting down, trigger pending WAL to be written out, * otherwise we'd possibly end up waiting for WAL that never gets * written, because walwriter has shut down already. */ if (got_STOPPING) XLogBackgroundFlush(); /* Update our idea of the currently flushed position. */ if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(); else RecentFlushPtr = GetXLogReplayRecPtr(NULL); /* * If postmaster asked us to stop, don't wait anymore. * * It's important to do this check after the recomputation of * RecentFlushPtr, so we can send all remaining data before shutting * down. */ if (got_STOPPING) break; /* * We only send regular messages to the client for full decoded * transactions, but a synchronous replication and walsender shutdown * possibly are waiting for a later location. So we send pings * containing the flush location every now and then. */ if (MyWalSnd->flush < sentPtr && MyWalSnd->write < sentPtr && !waiting_for_ping_response) { WalSndKeepalive(false); waiting_for_ping_response = true; } /* check whether we're done */ if (loc <= RecentFlushPtr) break; /* Waiting for new WAL. Since we need to wait, we're now caught up. */ WalSndCaughtUp = true; /* * Try to flush any pending output to the client. */ if (pq_flush_if_writable() != 0) WalSndShutdown(); /* * If we have received CopyDone from the client, sent CopyDone * ourselves, and the output buffer is empty, it's time to exit * streaming, so fail the current WAL fetch request. */ if (streamingDoneReceiving && streamingDoneSending && !pq_is_send_pending()) break; /* die if timeout was reached */ WalSndCheckTimeOut(); /* Send keepalive if the time has come */ WalSndKeepaliveIfNecessary(); /* * Sleep until something happens or we time out. Also wait for the * socket becoming writable, if there's still pending output. * Otherwise we might sit on sendable output data while waiting for * new WAL to be generated. (But if we have nothing to send, we don't * want to wake on socket-writable.) */ sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT; if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; (void) WaitLatchOrSocket(MyLatch, wakeEvents, MyProcPort->sock, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); } /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); return RecentFlushPtr; } /* * Execute an incoming replication command. * * Returns true if the cmd_string was recognized as WalSender command, false * if not. */ bool exec_replication_command(const char *cmd_string) { int parse_rc; Node *cmd_node; MemoryContext cmd_context; MemoryContext old_context; /* * If WAL sender has been told that shutdown is getting close, switch its * status accordingly to handle the next replication commands correctly. */ if (got_STOPPING) WalSndSetState(WALSNDSTATE_STOPPING); /* * Throw error if in stopping mode. We need prevent commands that could * generate WAL while the shutdown checkpoint is being written. To be * safe, we just prohibit all new commands. */ if (MyWalSnd->state == WALSNDSTATE_STOPPING) ereport(ERROR, (errmsg("cannot execute new commands while WAL sender is in stopping mode"))); /* * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next * command arrives. Clean up the old stuff if there's anything. */ SnapBuildClearExportedSnapshot(); CHECK_FOR_INTERRUPTS(); cmd_context = AllocSetContextCreate(CurrentMemoryContext, "Replication command context", ALLOCSET_DEFAULT_SIZES); old_context = MemoryContextSwitchTo(cmd_context); replication_scanner_init(cmd_string); parse_rc = replication_yyparse(); if (parse_rc != 0) ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), (errmsg_internal("replication command parser returned %d", parse_rc)))); cmd_node = replication_parse_result; /* * Log replication command if log_replication_commands is enabled. Even * when it's disabled, log the command with DEBUG1 level for backward * compatibility. Note that SQL commands are not logged here, and will be * logged later if log_statement is enabled. */ if (cmd_node->type != T_SQLCmd) ereport(log_replication_commands ? LOG : DEBUG1, (errmsg("received replication command: %s", cmd_string))); /* * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was * called outside of transaction the snapshot should be cleared here. */ if (!IsTransactionBlock()) SnapBuildClearExportedSnapshot(); /* * For aborted transactions, don't allow anything except pure SQL, the * exec_simple_query() will handle it correctly. */ if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd)) ereport(ERROR, (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION), errmsg("current transaction is aborted, " "commands ignored until end of transaction block"))); CHECK_FOR_INTERRUPTS(); /* * Allocate buffers that will be used for each outgoing and incoming * message. We do this just once per command to reduce palloc overhead. */ initStringInfo(&output_message); initStringInfo(&reply_message); initStringInfo(&tmpbuf); /* Report to pgstat that this process is running */ pgstat_report_activity(STATE_RUNNING, NULL); switch (cmd_node->type) { case T_IdentifySystemCmd: IdentifySystem(); break; case T_BaseBackupCmd: PreventInTransactionBlock(true, "BASE_BACKUP"); SendBaseBackup((BaseBackupCmd *) cmd_node); break; case T_CreateReplicationSlotCmd: CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node); break; case T_DropReplicationSlotCmd: DropReplicationSlot((DropReplicationSlotCmd *) cmd_node); break; case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; PreventInTransactionBlock(true, "START_REPLICATION"); if (cmd->kind == REPLICATION_KIND_PHYSICAL) StartReplication(cmd); else StartLogicalReplication(cmd); break; } case T_TimeLineHistoryCmd: PreventInTransactionBlock(true, "TIMELINE_HISTORY"); SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node); break; case T_VariableShowStmt: { DestReceiver *dest = CreateDestReceiver(DestRemoteSimple); VariableShowStmt *n = (VariableShowStmt *) cmd_node; /* syscache access needs a transaction environment */ StartTransactionCommand(); GetPGVariable(n->name, dest); CommitTransactionCommand(); } break; case T_SQLCmd: if (MyDatabaseId == InvalidOid) ereport(ERROR, (errmsg("cannot execute SQL commands in WAL sender for physical replication"))); /* Report to pgstat that this process is now idle */ pgstat_report_activity(STATE_IDLE, NULL); /* Tell the caller that this wasn't a WalSender command. */ return false; default: elog(ERROR, "unrecognized replication command node tag: %u", cmd_node->type); } /* done */ MemoryContextSwitchTo(old_context); MemoryContextDelete(cmd_context); /* Send CommandComplete message */ EndCommand("SELECT", DestRemote); /* Report to pgstat that this process is now idle */ pgstat_report_activity(STATE_IDLE, NULL); return true; } /* * Process any incoming messages while streaming. Also checks if the remote * end has closed the connection. */ static void ProcessRepliesIfAny(void) { unsigned char firstchar; int r; bool received = false; last_processing = GetCurrentTimestamp(); for (;;) { pq_startmsgread(); r = pq_getbyte_if_available(&firstchar); if (r < 0) { /* unexpected error or EOF */ ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); proc_exit(0); } if (r == 0) { /* no data available without blocking */ pq_endmsgread(); break; } /* Read the message contents */ resetStringInfo(&reply_message); if (pq_getmessage(&reply_message, 0)) { ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected EOF on standby connection"))); proc_exit(0); } /* * If we already received a CopyDone from the frontend, the frontend * should not send us anything until we've closed our end of the COPY. * XXX: In theory, the frontend could already send the next command * before receiving the CopyDone, but libpq doesn't currently allow * that. */ if (streamingDoneReceiving && firstchar != 'X') ereport(FATAL, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected standby message type \"%c\", after receiving CopyDone", firstchar))); /* Handle the very limited subset of commands expected in this phase */ switch (firstchar) { /* * 'd' means a standby reply wrapped in a CopyData packet. */ case 'd': ProcessStandbyMessage(); received = true; break; /* * CopyDone means the standby requested to finish streaming. * Reply with CopyDone, if we had not sent that already. */ case 'c': if (!streamingDoneSending) { pq_putmessage_noblock('c', NULL, 0); streamingDoneSending = true; } streamingDoneReceiving = true; received = true; break; /* * 'X' means that the standby is closing down the socket. */ case 'X': proc_exit(0); default: ereport(FATAL, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("invalid standby message type \"%c\"", firstchar))); } } /* * Save the last reply timestamp if we've received at least one reply. */ if (received) { last_reply_timestamp = last_processing; waiting_for_ping_response = false; } } /* * Process a status update message received from standby. */ static void ProcessStandbyMessage(void) { char msgtype; /* * Check message type from the first byte. */ msgtype = pq_getmsgbyte(&reply_message); switch (msgtype) { case 'r': ProcessStandbyReplyMessage(); break; case 'h': ProcessStandbyHSFeedbackMessage(); break; default: ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("unexpected message type \"%c\"", msgtype))); proc_exit(0); } } /* * Remember that a walreceiver just confirmed receipt of lsn `lsn`. */ static void PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { bool changed = false; ReplicationSlot *slot = MyReplicationSlot; Assert(lsn != InvalidXLogRecPtr); SpinLockAcquire(&slot->mutex); if (slot->data.restart_lsn != lsn) { changed = true; slot->data.restart_lsn = lsn; } SpinLockRelease(&slot->mutex); if (changed) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredLSN(); } /* * One could argue that the slot should be saved to disk now, but that'd * be energy wasted - the worst lost information can do here is give us * wrong information in a statistics view - we'll just potentially be more * conservative in removing files. */ } /* * Regular reply from standby advising of WAL locations on standby server. */ static void ProcessStandbyReplyMessage(void) { XLogRecPtr writePtr, flushPtr, applyPtr; bool replyRequested; TimeOffset writeLag, flushLag, applyLag; bool clearLagTimes; TimestampTz now; TimestampTz replyTime; static bool fullyAppliedLastTime = false; /* the caller already consumed the msgtype byte */ writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); replyTime = pq_getmsgint64(&reply_message); replyRequested = pq_getmsgbyte(&reply_message); if (log_min_messages <= DEBUG2) { char *replyTimeStr; /* Copy because timestamptz_to_str returns a static buffer */ replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s", (uint32) (writePtr >> 32), (uint32) writePtr, (uint32) (flushPtr >> 32), (uint32) flushPtr, (uint32) (applyPtr >> 32), (uint32) applyPtr, replyRequested ? " (reply requested)" : "", replyTimeStr); pfree(replyTimeStr); } /* See if we can compute the round-trip lag for these positions. */ now = GetCurrentTimestamp(); writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now); flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now); applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now); /* * If the standby reports that it has fully replayed the WAL in two * consecutive reply messages, then the second such message must result * from wal_receiver_status_interval expiring on the standby. This is a * convenient time to forget the lag times measured when it last * wrote/flushed/applied a WAL record, to avoid displaying stale lag data * until more WAL traffic arrives. */ clearLagTimes = false; if (applyPtr == sentPtr) { if (fullyAppliedLastTime) clearLagTimes = true; fullyAppliedLastTime = true; } else fullyAppliedLastTime = false; /* Send a reply if the standby requested one. */ if (replyRequested) WalSndKeepalive(false); /* * Update shared state for this WalSender process based on reply data from * standby. */ { WalSnd *walsnd = MyWalSnd; SpinLockAcquire(&walsnd->mutex); walsnd->write = writePtr; walsnd->flush = flushPtr; walsnd->apply = applyPtr; if (writeLag != -1 || clearLagTimes) walsnd->writeLag = writeLag; if (flushLag != -1 || clearLagTimes) walsnd->flushLag = flushLag; if (applyLag != -1 || clearLagTimes) walsnd->applyLag = applyLag; walsnd->replyTime = replyTime; SpinLockRelease(&walsnd->mutex); } if (!am_cascading_walsender) SyncRepReleaseWaiters(); /* * Advance our local xmin horizon when the client confirmed a flush. */ if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr) { if (SlotIsLogical(MyReplicationSlot)) LogicalConfirmReceivedLocation(flushPtr); else PhysicalConfirmReceivedLocation(flushPtr); } } /* compute new replication slot xmin horizon if needed */ static void PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin) { bool changed = false; ReplicationSlot *slot = MyReplicationSlot; SpinLockAcquire(&slot->mutex); MyPgXact->xmin = InvalidTransactionId; /* * For physical replication we don't need the interlock provided by xmin * and effective_xmin since the consequences of a missed increase are * limited to query cancellations, so set both at once. */ if (!TransactionIdIsNormal(slot->data.xmin) || !TransactionIdIsNormal(feedbackXmin) || TransactionIdPrecedes(slot->data.xmin, feedbackXmin)) { changed = true; slot->data.xmin = feedbackXmin; slot->effective_xmin = feedbackXmin; } if (!TransactionIdIsNormal(slot->data.catalog_xmin) || !TransactionIdIsNormal(feedbackCatalogXmin) || TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin)) { changed = true; slot->data.catalog_xmin = feedbackCatalogXmin; slot->effective_catalog_xmin = feedbackCatalogXmin; } SpinLockRelease(&slot->mutex); if (changed) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredXmin(false); } } /* * Check that the provided xmin/epoch are sane, that is, not in the future * and not so far back as to be already wrapped around. * * Epoch of nextXid should be same as standby, or if the counter has * wrapped, then one greater than standby. * * This check doesn't care about whether clog exists for these xids * at all. */ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch) { FullTransactionId nextFullXid; TransactionId nextXid; uint32 nextEpoch; nextFullXid = ReadNextFullTransactionId(); nextXid = XidFromFullTransactionId(nextFullXid); nextEpoch = EpochFromFullTransactionId(nextFullXid); if (xid <= nextXid) { if (epoch != nextEpoch) return false; } else { if (epoch + 1 != nextEpoch) return false; } if (!TransactionIdPrecedesOrEquals(xid, nextXid)) return false; /* epoch OK, but it's wrapped around */ return true; } /* * Hot Standby feedback */ static void ProcessStandbyHSFeedbackMessage(void) { TransactionId feedbackXmin; uint32 feedbackEpoch; TransactionId feedbackCatalogXmin; uint32 feedbackCatalogEpoch; TimestampTz replyTime; /* * Decipher the reply message. The caller already consumed the msgtype * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation * of this message. */ replyTime = pq_getmsgint64(&reply_message); feedbackXmin = pq_getmsgint(&reply_message, 4); feedbackEpoch = pq_getmsgint(&reply_message, 4); feedbackCatalogXmin = pq_getmsgint(&reply_message, 4); feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4); if (log_min_messages <= DEBUG2) { char *replyTimeStr; /* Copy because timestamptz_to_str returns a static buffer */ replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s", feedbackXmin, feedbackEpoch, feedbackCatalogXmin, feedbackCatalogEpoch, replyTimeStr); pfree(replyTimeStr); } /* * Update shared state for this WalSender process based on reply data from * standby. */ { WalSnd *walsnd = MyWalSnd; SpinLockAcquire(&walsnd->mutex); walsnd->replyTime = replyTime; SpinLockRelease(&walsnd->mutex); } /* * Unset WalSender's xmins if the feedback message values are invalid. * This happens when the downstream turned hot_standby_feedback off. */ if (!TransactionIdIsNormal(feedbackXmin) && !TransactionIdIsNormal(feedbackCatalogXmin)) { MyPgXact->xmin = InvalidTransactionId; if (MyReplicationSlot != NULL) PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin); return; } /* * Check that the provided xmin/epoch are sane, that is, not in the future * and not so far back as to be already wrapped around. Ignore if not. */ if (TransactionIdIsNormal(feedbackXmin) && !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch)) return; if (TransactionIdIsNormal(feedbackCatalogXmin) && !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch)) return; /* * Set the WalSender's xmin equal to the standby's requested xmin, so that * the xmin will be taken into account by GetOldestXmin. This will hold * back the removal of dead rows and thereby prevent the generation of * cleanup conflicts on the standby server. * * There is a small window for a race condition here: although we just * checked that feedbackXmin precedes nextXid, the nextXid could have * gotten advanced between our fetching it and applying the xmin below, * perhaps far enough to make feedbackXmin wrap around. In that case the * xmin we set here would be "in the future" and have no effect. No point * in worrying about this since it's too late to save the desired data * anyway. Assuming that the standby sends us an increasing sequence of * xmins, this could only happen during the first reply cycle, else our * own xmin would prevent nextXid from advancing so far. * * We don't bother taking the ProcArrayLock here. Setting the xmin field * is assumed atomic, and there's no real need to prevent a concurrent * GetOldestXmin. (If we're moving our xmin forward, this is obviously * safe, and if we're moving it backwards, well, the data is at risk * already since a VACUUM could have just finished calling GetOldestXmin.) * * If we're using a replication slot we reserve the xmin via that, * otherwise via the walsender's PGXACT entry. We can only track the * catalog xmin separately when using a slot, so we store the least of the * two provided when not using a slot. * * XXX: It might make sense to generalize the ephemeral slot concept and * always use the slot mechanism to handle the feedback xmin. */ if (MyReplicationSlot != NULL) /* XXX: persistency configurable? */ PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin); else { if (TransactionIdIsNormal(feedbackCatalogXmin) && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin)) MyPgXact->xmin = feedbackCatalogXmin; else MyPgXact->xmin = feedbackXmin; } } /* * Compute how long send/receive loops should sleep. * * If wal_sender_timeout is enabled we want to wake up in time to send * keepalives and to abort the connection if wal_sender_timeout has been * reached. */ static long WalSndComputeSleeptime(TimestampTz now) { long sleeptime = 10000; /* 10 s */ if (wal_sender_timeout > 0 && last_reply_timestamp > 0) { TimestampTz wakeup_time; long sec_to_timeout; int microsec_to_timeout; /* * At the latest stop sleeping once wal_sender_timeout has been * reached. */ wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, wal_sender_timeout); /* * If no ping has been sent yet, wakeup when it's time to do so. * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of * the timeout passed without a response. */ if (!waiting_for_ping_response) wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp, wal_sender_timeout / 2); /* Compute relative time until wakeup. */ TimestampDifference(now, wakeup_time, &sec_to_timeout, µsec_to_timeout); sleeptime = sec_to_timeout * 1000 + microsec_to_timeout / 1000; } return sleeptime; } /* * Check whether there have been responses by the client within * wal_sender_timeout and shutdown if not. Using last_processing as the * reference point avoids counting server-side stalls against the client. * However, a long server-side stall can make WalSndKeepaliveIfNecessary() * postdate last_processing by more than wal_sender_timeout. If that happens, * the client must reply almost immediately to avoid a timeout. This rarely * affects the default configuration, under which clients spontaneously send a * message every standby_message_timeout = wal_sender_timeout/6 = 10s. We * could eliminate that problem by recognizing timeout expiration at * wal_sender_timeout/2 after the keepalive. */ static void WalSndCheckTimeOut(void) { TimestampTz timeout; /* don't bail out if we're doing something that doesn't require timeouts */ if (last_reply_timestamp <= 0) return; timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, wal_sender_timeout); if (wal_sender_timeout > 0 && last_processing >= timeout) { /* * Since typically expiration of replication timeout means * communication problem, we don't send the error message to the * standby. */ ereport(COMMERROR, (errmsg("terminating walsender process due to replication timeout"))); WalSndShutdown(); } } /* Main loop of walsender process that streams the WAL over Copy messages. */ static void WalSndLoop(WalSndSendDataCallback send_data) { /* * Initialize the last reply timestamp. That enables timeout processing * from hereon. */ last_reply_timestamp = GetCurrentTimestamp(); waiting_for_ping_response = false; /* * Loop until we reach the end of this timeline or the client requests to * stop streaming. */ for (;;) { /* Clear any already-pending wakeups */ ResetLatch(MyLatch); CHECK_FOR_INTERRUPTS(); /* Process any requests or signals received recently */ if (ConfigReloadPending) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); } /* Check for input from the client */ ProcessRepliesIfAny(); /* * If we have received CopyDone from the client, sent CopyDone * ourselves, and the output buffer is empty, it's time to exit * streaming. */ if (streamingDoneReceiving && streamingDoneSending && !pq_is_send_pending()) break; /* * If we don't have any pending data in the output buffer, try to send * some more. If there is some, we don't bother to call send_data * again until we've flushed it ... but we'd better assume we are not * caught up. */ if (!pq_is_send_pending()) send_data(); else WalSndCaughtUp = false; /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); /* If nothing remains to be sent right now ... */ if (WalSndCaughtUp && !pq_is_send_pending()) { /* * If we're in catchup state, move to streaming. This is an * important state change for users to know about, since before * this point data loss might occur if the primary dies and we * need to failover to the standby. The state change is also * important for synchronous replication, since commits that * started to wait at that point might wait for some time. */ if (MyWalSnd->state == WALSNDSTATE_CATCHUP) { ereport(DEBUG1, (errmsg("\"%s\" has now caught up with upstream server", application_name))); WalSndSetState(WALSNDSTATE_STREAMING); } /* * When SIGUSR2 arrives, we send any outstanding logs up to the * shutdown checkpoint record (i.e., the latest record), wait for * them to be replicated to the standby, and exit. This may be a * normal termination at shutdown, or a promotion, the walsender * is not sure which. */ if (got_SIGUSR2) WalSndDone(send_data); } /* Check for replication timeout. */ WalSndCheckTimeOut(); /* Send keepalive if the time has come */ WalSndKeepaliveIfNecessary(); /* * We don't block if not caught up, unless there is unsent data * pending in which case we'd better block until the socket is * write-ready. This test is only needed for the case where the * send_data callback handled a subset of the available data but then * pq_flush_if_writable flushed it all --- we should immediately try * to send more. */ if ((WalSndCaughtUp && !streamingDoneSending) || pq_is_send_pending()) { long sleeptime; int wakeEvents; wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT | WL_SOCKET_READABLE; /* * Use fresh timestamp, not last_processing, to reduce the chance * of reaching wal_sender_timeout before sending a keepalive. */ sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; /* Sleep until something happens or we time out */ (void) WaitLatchOrSocket(MyLatch, wakeEvents, MyProcPort->sock, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN); } } } /* Initialize a per-walsender data structure for this walsender process */ static void InitWalSenderSlot(void) { int i; /* * WalSndCtl should be set up already (we inherit this by fork() or * EXEC_BACKEND mechanism from the postmaster). */ Assert(WalSndCtl != NULL); Assert(MyWalSnd == NULL); /* * Find a free walsender slot and reserve it. This must not fail due to * the prior check for free WAL senders in InitProcess(). */ for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockAcquire(&walsnd->mutex); if (walsnd->pid != 0) { SpinLockRelease(&walsnd->mutex); continue; } else { /* * Found a free slot. Reserve it for us. */ walsnd->pid = MyProcPid; walsnd->sentPtr = InvalidXLogRecPtr; walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; walsnd->writeLag = -1; walsnd->flushLag = -1; walsnd->applyLag = -1; walsnd->state = WALSNDSTATE_STARTUP; walsnd->latch = &MyProc->procLatch; walsnd->replyTime = 0; walsnd->spillTxns = 0; walsnd->spillCount = 0; walsnd->spillBytes = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; break; } } Assert(MyWalSnd != NULL); /* Arrange to clean up at walsender exit */ on_shmem_exit(WalSndKill, 0); } /* Destroy the per-walsender data structure for this walsender process */ static void WalSndKill(int code, Datum arg) { WalSnd *walsnd = MyWalSnd; Assert(walsnd != NULL); MyWalSnd = NULL; SpinLockAcquire(&walsnd->mutex); /* clear latch while holding the spinlock, so it can safely be read */ walsnd->latch = NULL; /* Mark WalSnd struct as no longer being in use. */ walsnd->pid = 0; SpinLockRelease(&walsnd->mutex); } /* walsender's openSegment callback for WALRead */ static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt, TimeLineID *tli_p) { char path[MAXPGPATH]; int fd; /*------- * When reading from a historic timeline, and there is a timeline switch * within this segment, read from the WAL segment belonging to the new * timeline. * * For example, imagine that this server is currently on timeline 5, and * we're streaming timeline 4. The switch from timeline 4 to 5 happened at * 0/13002088. In pg_wal, we have these files: * * ... * 000000040000000000000012 * 000000040000000000000013 * 000000050000000000000013 * 000000050000000000000014 * ... * * In this situation, when requested to send the WAL from segment 0x13, on * timeline 4, we read the WAL from file 000000050000000000000013. Archive * recovery prefers files from newer timelines, so if the segment was * restored from the archive on this server, the file belonging to the old * timeline, 000000040000000000000013, might not exist. Their contents are * equal up to the switchpoint, because at a timeline switch, the used * portion of the old segment is copied to the new file. ------- */ *tli_p = sendTimeLine; if (sendTimeLineIsHistoric) { XLogSegNo endSegNo; XLByteToSeg(sendTimeLineValidUpto, endSegNo, segcxt->ws_segsize); if (sendSeg->ws_segno == endSegNo) *tli_p = sendTimeLineNextTLI; } XLogFilePath(path, *tli_p, nextSegNo, segcxt->ws_segsize); fd = BasicOpenFile(path, O_RDONLY | PG_BINARY); if (fd >= 0) return fd; /* * If the file is not found, assume it's because the standby asked for a * too old WAL segment that has already been removed or recycled. */ if (errno == ENOENT) { char xlogfname[MAXFNAMELEN]; int save_errno = errno; XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size); errno = save_errno; ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", xlogfname))); } else ereport(ERROR, (errcode_for_file_access(), errmsg("could not open file \"%s\": %m", path))); return -1; /* keep compiler quiet */ } /* * Send out the WAL in its normal physical/stored form. * * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, * but not yet sent to the client, and buffer it in the libpq output * buffer. * * If there is no unsent WAL remaining, WalSndCaughtUp is set to true, * otherwise WalSndCaughtUp is set to false. */ static void XLogSendPhysical(void) { XLogRecPtr SendRqstPtr; XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; XLogSegNo segno; WALReadError errinfo; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) WalSndSetState(WALSNDSTATE_STOPPING); if (streamingDoneSending) { WalSndCaughtUp = true; return; } /* Figure out how far we can safely send the WAL. */ if (sendTimeLineIsHistoric) { /* * Streaming an old timeline that's in this server's history, but is * not the one we're currently inserting or replaying. It can be * streamed up to the point where we switched off that timeline. */ SendRqstPtr = sendTimeLineValidUpto; } else if (am_cascading_walsender) { /* * Streaming the latest timeline on a standby. * * Attempt to send all WAL that has already been replayed, so that we * know it's valid. If we're receiving WAL through streaming * replication, it's also OK to send any WAL that has been received * but not replayed. * * The timeline we're recovering from can change, or we can be * promoted. In either case, the current timeline becomes historic. We * need to detect that so that we don't try to stream past the point * where we switched to another timeline. We check for promotion or * timeline switch after calculating FlushPtr, to avoid a race * condition: if the timeline becomes historic just after we checked * that it was still current, it's still be OK to stream it up to the * FlushPtr that was calculated before it became historic. */ bool becameHistoric = false; SendRqstPtr = GetStandbyFlushRecPtr(); if (!RecoveryInProgress()) { /* * We have been promoted. RecoveryInProgress() updated * ThisTimeLineID to the new current timeline. */ am_cascading_walsender = false; becameHistoric = true; } else { /* * Still a cascading standby. But is the timeline we're sending * still the one recovery is recovering from? ThisTimeLineID was * updated by the GetStandbyFlushRecPtr() call above. */ if (sendTimeLine != ThisTimeLineID) becameHistoric = true; } if (becameHistoric) { /* * The timeline we were sending has become historic. Read the * timeline history file of the new timeline to see where exactly * we forked off from the timeline we were sending. */ List *history; history = readTimeLineHistory(ThisTimeLineID); sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); Assert(sendTimeLine < sendTimeLineNextTLI); list_free_deep(history); sendTimeLineIsHistoric = true; SendRqstPtr = sendTimeLineValidUpto; } } else { /* * Streaming the current timeline on a master. * * Attempt to send all data that's already been written out and * fsync'd to disk. We cannot go further than what's been written out * given the current implementation of XLogRead(). And in any case * it's unsafe to send WAL that is not securely down to disk on the * master: if the master subsequently crashes and restarts, standbys * must not have applied any WAL that got lost on the master. */ SendRqstPtr = GetFlushRecPtr(); } /* * Record the current system time as an approximation of the time at which * this WAL location was written for the purposes of lag tracking. * * In theory we could make XLogFlush() record a time in shmem whenever WAL * is flushed and we could get that time as well as the LSN when we call * GetFlushRecPtr() above (and likewise for the cascading standby * equivalent), but rather than putting any new code into the hot WAL path * it seems good enough to capture the time here. We should reach this * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that * may take some time, we read the WAL flush pointer and take the time * very close to together here so that we'll get a later position if it is * still moving. * * Because LagTrackerWrite ignores samples when the LSN hasn't advanced, * this gives us a cheap approximation for the WAL flush time for this * LSN. * * Note that the LSN is not necessarily the LSN for the data contained in * the present message; it's the end of the WAL, which might be further * ahead. All the lag tracking machinery cares about is finding out when * that arbitrary LSN is eventually reported as written, flushed and * applied, so that it can measure the elapsed time. */ LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp()); /* * If this is a historic timeline and we've reached the point where we * forked to the next timeline, stop streaming. * * Note: We might already have sent WAL > sendTimeLineValidUpto. The * startup process will normally replay all WAL that has been received * from the master, before promoting, but if the WAL streaming is * terminated at a WAL page boundary, the valid portion of the timeline * might end in the middle of a WAL record. We might've already sent the * first half of that partial WAL record to the cascading standby, so that * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't * replay the partial WAL record either, so it can still follow our * timeline switch. */ if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { /* close the current file. */ if (sendSeg->ws_file >= 0) close(sendSeg->ws_file); sendSeg->ws_file = -1; /* Send CopyDone */ pq_putmessage_noblock('c', NULL, 0); streamingDoneSending = true; WalSndCaughtUp = true; elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto, (uint32) (sentPtr >> 32), (uint32) sentPtr); return; } /* Do we have any work to do? */ Assert(sentPtr <= SendRqstPtr); if (SendRqstPtr <= sentPtr) { WalSndCaughtUp = true; return; } /* * Figure out how much to send in one message. If there's no more than * MAX_SEND_SIZE bytes to send, send everything. Otherwise send * MAX_SEND_SIZE bytes, but round back to logfile or page boundary. * * The rounding is not only for performance reasons. Walreceiver relies on * the fact that we never split a WAL record across two messages. Since a * long WAL record is split at page boundary into continuation records, * page boundary is always a safe cut-off point. We also assume that * SendRqstPtr never points to the middle of a WAL record. */ startptr = sentPtr; endptr = startptr; endptr += MAX_SEND_SIZE; /* if we went beyond SendRqstPtr, back off */ if (SendRqstPtr <= endptr) { endptr = SendRqstPtr; if (sendTimeLineIsHistoric) WalSndCaughtUp = false; else WalSndCaughtUp = true; } else { /* round down to page boundary. */ endptr -= (endptr % XLOG_BLCKSZ); WalSndCaughtUp = false; } nbytes = endptr - startptr; Assert(nbytes <= MAX_SEND_SIZE); /* * OK to read and send the slice. */ resetStringInfo(&output_message); pq_sendbyte(&output_message, 'w'); pq_sendint64(&output_message, startptr); /* dataStart */ pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ pq_sendint64(&output_message, 0); /* sendtime, filled in last */ /* * Read the log directly into the output buffer to avoid extra memcpy * calls. */ enlargeStringInfo(&output_message, nbytes); retry: if (!WALRead(&output_message.data[output_message.len], startptr, nbytes, sendSeg->ws_tli, /* Pass the current TLI because only * WalSndSegmentOpen controls whether new * TLI is needed. */ sendSeg, sendCxt, WalSndSegmentOpen, &errinfo)) WALReadRaiseError(&errinfo); /* See logical_read_xlog_page(). */ XLByteToSeg(startptr, segno, sendCxt->ws_segsize); CheckXLogRemoved(segno, sendSeg->ws_tli); /* * During recovery, the currently-open WAL file might be replaced with the * file of the same name retrieved from archive. So we always need to * check what we read was valid after reading into the buffer. If it's * invalid, we try to open and read the file again. */ if (am_cascading_walsender) { WalSnd *walsnd = MyWalSnd; bool reload; SpinLockAcquire(&walsnd->mutex); reload = walsnd->needreload; walsnd->needreload = false; SpinLockRelease(&walsnd->mutex); if (reload && sendSeg->ws_file >= 0) { close(sendSeg->ws_file); sendSeg->ws_file = -1; goto retry; } } output_message.len += nbytes; output_message.data[output_message.len] = '\0'; /* * Fill the send timestamp last, so that it is taken as late as possible. */ resetStringInfo(&tmpbuf); pq_sendint64(&tmpbuf, GetCurrentTimestamp()); memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); pq_putmessage_noblock('d', output_message.data, output_message.len); sentPtr = endptr; /* Update shared memory status */ { WalSnd *walsnd = MyWalSnd; SpinLockAcquire(&walsnd->mutex); walsnd->sentPtr = sentPtr; SpinLockRelease(&walsnd->mutex); } /* Report progress of XLOG streaming in PS display */ if (update_process_title) { char activitymsg[50]; snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", (uint32) (sentPtr >> 32), (uint32) sentPtr); set_ps_display(activitymsg, false); } } /* * Stream out logically decoded data. */ static void XLogSendLogical(void) { XLogRecord *record; char *errm; /* * We'll use the current flush point to determine whether we've caught up. * This variable is static in order to cache it across calls. Caching is * helpful because GetFlushRecPtr() needs to acquire a heavily-contended * spinlock. */ static XLogRecPtr flushPtr = InvalidXLogRecPtr; /* * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to * true in WalSndWaitForWal, if we're actually waiting. We also set to * true if XLogReadRecord() had to stop reading but WalSndWaitForWal * didn't wait - i.e. when we're shutting down. */ WalSndCaughtUp = false; record = XLogReadRecord(logical_decoding_ctx->reader, &errm); /* xlog record was invalid */ if (errm != NULL) elog(ERROR, "%s", errm); if (record != NULL) { /* * Note the lack of any call to LagTrackerWrite() which is handled by * WalSndUpdateProgress which is called by output plugin through * logical decoding write api. */ LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); sentPtr = logical_decoding_ctx->reader->EndRecPtr; } /* * If first time through in this session, initialize flushPtr. Otherwise, * we only need to update flushPtr if EndRecPtr is past it. */ if (flushPtr == InvalidXLogRecPtr) flushPtr = GetFlushRecPtr(); else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) flushPtr = GetFlushRecPtr(); /* If EndRecPtr is still past our flushPtr, it means we caught up. */ if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr) WalSndCaughtUp = true; /* * If we're caught up and have been requested to stop, have WalSndLoop() * terminate the connection in an orderly manner, after writing out all * the pending data. */ if (WalSndCaughtUp && got_STOPPING) got_SIGUSR2 = true; /* Update shared memory status */ { WalSnd *walsnd = MyWalSnd; SpinLockAcquire(&walsnd->mutex); walsnd->sentPtr = sentPtr; SpinLockRelease(&walsnd->mutex); } } /* * Shutdown if the sender is caught up. * * NB: This should only be called when the shutdown signal has been received * from postmaster. * * Note that if we determine that there's still more data to send, this * function will return control to the caller. */ static void WalSndDone(WalSndSendDataCallback send_data) { XLogRecPtr replicatedPtr; /* ... let's just be real sure we're caught up ... */ send_data(); /* * To figure out whether all WAL has successfully been replicated, check * flush location if valid, write otherwise. Tools like pg_receivewal will * usually (unless in synchronous mode) return an invalid flush location. */ replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ? MyWalSnd->write : MyWalSnd->flush; if (WalSndCaughtUp && sentPtr == replicatedPtr && !pq_is_send_pending()) { /* Inform the standby that XLOG streaming is done */ EndCommand("COPY 0", DestRemote); pq_flush(); proc_exit(0); } if (!waiting_for_ping_response) { WalSndKeepalive(true); waiting_for_ping_response = true; } } /* * Returns the latest point in WAL that has been safely flushed to disk, and * can be sent to the standby. This should only be called when in recovery, * ie. we're streaming to a cascaded standby. * * As a side-effect, ThisTimeLineID is updated to the TLI of the last * replayed WAL record. */ static XLogRecPtr GetStandbyFlushRecPtr(void) { XLogRecPtr replayPtr; TimeLineID replayTLI; XLogRecPtr receivePtr; TimeLineID receiveTLI; XLogRecPtr result; /* * We can safely send what's already been replayed. Also, if walreceiver * is streaming WAL from the same timeline, we can send anything that it * has streamed, but hasn't been replayed yet. */ receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); ThisTimeLineID = replayTLI; result = replayPtr; if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr) result = receivePtr; return result; } /* * Request walsenders to reload the currently-open WAL file */ void WalSndRqstFileReload(void) { int i; for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { SpinLockRelease(&walsnd->mutex); continue; } walsnd->needreload = true; SpinLockRelease(&walsnd->mutex); } } /* * Handle PROCSIG_WALSND_INIT_STOPPING signal. */ void HandleWalSndInitStopping(void) { Assert(am_walsender); /* * If replication has not yet started, die like with SIGTERM. If * replication is active, only set a flag and wake up the main loop. It * will send any outstanding WAL, wait for it to be replicated to the * standby, and then exit gracefully. */ if (!replication_active) kill(MyProcPid, SIGTERM); else got_STOPPING = true; } /* * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL * sender should already have been switched to WALSNDSTATE_STOPPING at * this point. */ static void WalSndLastCycleHandler(SIGNAL_ARGS) { int save_errno = errno; got_SIGUSR2 = true; SetLatch(MyLatch); errno = save_errno; } /* Set up signal handlers */ void WalSndSignals(void) { /* Set up signal handlers */ pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGINT, StatementCancelHandler); /* query cancel */ pqsignal(SIGTERM, die); /* request shutdown */ pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and * shutdown */ /* Reset some signals that are accepted by postmaster but not here */ pqsignal(SIGCHLD, SIG_DFL); } /* Report shared-memory space needed by WalSndShmemInit */ Size WalSndShmemSize(void) { Size size = 0; size = offsetof(WalSndCtlData, walsnds); size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd))); return size; } /* Allocate and initialize walsender-related shared memory */ void WalSndShmemInit(void) { bool found; int i; WalSndCtl = (WalSndCtlData *) ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found); if (!found) { /* First time through, so initialize */ MemSet(WalSndCtl, 0, WalSndShmemSize()); for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) SHMQueueInit(&(WalSndCtl->SyncRepQueue[i])); for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockInit(&walsnd->mutex); } } } /* * Wake up all walsenders * * This will be called inside critical sections, so throwing an error is not * advisable. */ void WalSndWakeup(void) { int i; for (i = 0; i < max_wal_senders; i++) { Latch *latch; WalSnd *walsnd = &WalSndCtl->walsnds[i]; /* * Get latch pointer with spinlock held, for the unlikely case that * pointer reads aren't atomic (as they're 8 bytes). */ SpinLockAcquire(&walsnd->mutex); latch = walsnd->latch; SpinLockRelease(&walsnd->mutex); if (latch != NULL) SetLatch(latch); } } /* * Signal all walsenders to move to stopping state. * * This will trigger walsenders to move to a state where no further WAL can be * generated. See this file's header for details. */ void WalSndInitStopping(void) { int i; for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; pid_t pid; SpinLockAcquire(&walsnd->mutex); pid = walsnd->pid; SpinLockRelease(&walsnd->mutex); if (pid == 0) continue; SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId); } } /* * Wait that all the WAL senders have quit or reached the stopping state. This * is used by the checkpointer to control when the shutdown checkpoint can * safely be performed. */ void WalSndWaitStopping(void) { for (;;) { int i; bool all_stopped = true; for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { SpinLockRelease(&walsnd->mutex); continue; } if (walsnd->state != WALSNDSTATE_STOPPING) { all_stopped = false; SpinLockRelease(&walsnd->mutex); break; } SpinLockRelease(&walsnd->mutex); } /* safe to leave if confirmation is done for all WAL senders */ if (all_stopped) return; pg_usleep(10000L); /* wait for 10 msec */ } } /* Set state for current walsender (only called in walsender) */ void WalSndSetState(WalSndState state) { WalSnd *walsnd = MyWalSnd; Assert(am_walsender); if (walsnd->state == state) return; SpinLockAcquire(&walsnd->mutex); walsnd->state = state; SpinLockRelease(&walsnd->mutex); } /* * Return a string constant representing the state. This is used * in system views, and should *not* be translated. */ static const char * WalSndGetStateString(WalSndState state) { switch (state) { case WALSNDSTATE_STARTUP: return "startup"; case WALSNDSTATE_BACKUP: return "backup"; case WALSNDSTATE_CATCHUP: return "catchup"; case WALSNDSTATE_STREAMING: return "streaming"; case WALSNDSTATE_STOPPING: return "stopping"; } return "UNKNOWN"; } static Interval * offset_to_interval(TimeOffset offset) { Interval *result = palloc(sizeof(Interval)); result->month = 0; result->day = 0; result->time = offset; return result; } /* * Returns activity of walsenders, including pids and xlog locations sent to * standby servers. */ Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { #define PG_STAT_GET_WAL_SENDERS_COLS 15 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; MemoryContext per_query_ctx; MemoryContext oldcontext; List *sync_standbys; int i; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("set-valued function called in context that cannot accept a set"))); if (!(rsinfo->allowedModes & SFRM_Materialize)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("materialize mode required, but it is not allowed in this context"))); /* Build a tuple descriptor for our result type */ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; oldcontext = MemoryContextSwitchTo(per_query_ctx); tupstore = tuplestore_begin_heap(true, false, work_mem); rsinfo->returnMode = SFRM_Materialize; rsinfo->setResult = tupstore; rsinfo->setDesc = tupdesc; MemoryContextSwitchTo(oldcontext); /* * Get the currently active synchronous standbys. */ LWLockAcquire(SyncRepLock, LW_SHARED); sync_standbys = SyncRepGetSyncStandbys(NULL); LWLockRelease(SyncRepLock); for (i = 0; i < max_wal_senders; i++) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; XLogRecPtr sentPtr; XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; TimeOffset writeLag; TimeOffset flushLag; TimeOffset applyLag; int priority; int pid; WalSndState state; TimestampTz replyTime; int64 spillTxns; int64 spillCount; int64 spillBytes; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) { SpinLockRelease(&walsnd->mutex); continue; } pid = walsnd->pid; sentPtr = walsnd->sentPtr; state = walsnd->state; write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; writeLag = walsnd->writeLag; flushLag = walsnd->flushLag; applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; replyTime = walsnd->replyTime; spillTxns = walsnd->spillTxns; spillCount = walsnd->spillCount; spillBytes = walsnd->spillBytes; SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); values[0] = Int32GetDatum(pid); if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS)) { /* * Only superusers and members of pg_read_all_stats can see * details. Other users only get the pid value to know it's a * walsender, but no details. */ MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1); } else { values[1] = CStringGetTextDatum(WalSndGetStateString(state)); if (XLogRecPtrIsInvalid(sentPtr)) nulls[2] = true; values[2] = LSNGetDatum(sentPtr); if (XLogRecPtrIsInvalid(write)) nulls[3] = true; values[3] = LSNGetDatum(write); if (XLogRecPtrIsInvalid(flush)) nulls[4] = true; values[4] = LSNGetDatum(flush); if (XLogRecPtrIsInvalid(apply)) nulls[5] = true; values[5] = LSNGetDatum(apply); /* * Treat a standby such as a pg_basebackup background process * which always returns an invalid flush location, as an * asynchronous standby. */ priority = XLogRecPtrIsInvalid(flush) ? 0 : priority; if (writeLag < 0) nulls[6] = true; else values[6] = IntervalPGetDatum(offset_to_interval(writeLag)); if (flushLag < 0) nulls[7] = true; else values[7] = IntervalPGetDatum(offset_to_interval(flushLag)); if (applyLag < 0) nulls[8] = true; else values[8] = IntervalPGetDatum(offset_to_interval(applyLag)); values[9] = Int32GetDatum(priority); /* * More easily understood version of standby state. This is purely * informational. * * In quorum-based sync replication, the role of each standby * listed in synchronous_standby_names can be changing very * frequently. Any standbys considered as "sync" at one moment can * be switched to "potential" ones at the next moment. So, it's * basically useless to report "sync" or "potential" as their sync * states. We report just "quorum" for them. */ if (priority == 0) values[10] = CStringGetTextDatum("async"); else if (list_member_int(sync_standbys, i)) values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else values[10] = CStringGetTextDatum("potential"); if (replyTime == 0) nulls[11] = true; else values[11] = TimestampTzGetDatum(replyTime); /* spill to disk */ values[12] = Int64GetDatum(spillTxns); values[13] = Int64GetDatum(spillCount); values[14] = Int64GetDatum(spillBytes); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); } /* clean up and return the tuplestore */ tuplestore_donestoring(tupstore); return (Datum) 0; } /* * This function is used to send a keepalive message to standby. * If requestReply is set, sets a flag in the message requesting the standby * to send a message back to us, for heartbeat purposes. */ static void WalSndKeepalive(bool requestReply) { elog(DEBUG2, "sending replication keepalive"); /* construct the message... */ resetStringInfo(&output_message); pq_sendbyte(&output_message, 'k'); pq_sendint64(&output_message, sentPtr); pq_sendint64(&output_message, GetCurrentTimestamp()); pq_sendbyte(&output_message, requestReply ? 1 : 0); /* ... and send it wrapped in CopyData */ pq_putmessage_noblock('d', output_message.data, output_message.len); } /* * Send keepalive message if too much time has elapsed. */ static void WalSndKeepaliveIfNecessary(void) { TimestampTz ping_time; /* * Don't send keepalive messages if timeouts are globally disabled or * we're doing something not partaking in timeouts. */ if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) return; if (waiting_for_ping_response) return; /* * If half of wal_sender_timeout has lapsed without receiving any reply * from the standby, send a keep-alive message to the standby requesting * an immediate reply. */ ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp, wal_sender_timeout / 2); if (last_processing >= ping_time) { WalSndKeepalive(true); waiting_for_ping_response = true; /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); } } /* * Record the end of the WAL and the time it was flushed locally, so that * LagTrackerRead can compute the elapsed time (lag) when this WAL location is * eventually reported to have been written, flushed and applied by the * standby in a reply message. */ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) { bool buffer_full; int new_write_head; int i; if (!am_walsender) return; /* * If the lsn hasn't advanced since last time, then do nothing. This way * we only record a new sample when new WAL has been written. */ if (lag_tracker->last_lsn == lsn) return; lag_tracker->last_lsn = lsn; /* * If advancing the write head of the circular buffer would crash into any * of the read heads, then the buffer is full. In other words, the * slowest reader (presumably apply) is the one that controls the release * of space. */ new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE; buffer_full = false; for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i) { if (new_write_head == lag_tracker->read_heads[i]) buffer_full = true; } /* * If the buffer is full, for now we just rewind by one slot and overwrite * the last sample, as a simple (if somewhat uneven) way to lower the * sampling rate. There may be better adaptive compaction algorithms. */ if (buffer_full) { new_write_head = lag_tracker->write_head; if (lag_tracker->write_head > 0) lag_tracker->write_head--; else lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1; } /* Store a sample at the current write head position. */ lag_tracker->buffer[lag_tracker->write_head].lsn = lsn; lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time; lag_tracker->write_head = new_write_head; } /* * Find out how much time has elapsed between the moment WAL location 'lsn' * (or the highest known earlier LSN) was flushed locally and the time 'now'. * We have a separate read head for each of the reported LSN locations we * receive in replies from standby; 'head' controls which read head is * used. Whenever a read head crosses an LSN which was written into the * lag buffer with LagTrackerWrite, we can use the associated timestamp to * find out the time this LSN (or an earlier one) was flushed locally, and * therefore compute the lag. * * Return -1 if no new sample data is available, and otherwise the elapsed * time in microseconds. */ static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) { TimestampTz time = 0; /* Read all unread samples up to this LSN or end of buffer. */ while (lag_tracker->read_heads[head] != lag_tracker->write_head && lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn) { time = lag_tracker->buffer[lag_tracker->read_heads[head]].time; lag_tracker->last_read[head] = lag_tracker->buffer[lag_tracker->read_heads[head]]; lag_tracker->read_heads[head] = (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE; } /* * If the lag tracker is empty, that means the standby has processed * everything we've ever sent so we should now clear 'last_read'. If we * didn't do that, we'd risk using a stale and irrelevant sample for * interpolation at the beginning of the next burst of WAL after a period * of idleness. */ if (lag_tracker->read_heads[head] == lag_tracker->write_head) lag_tracker->last_read[head].time = 0; if (time > now) { /* If the clock somehow went backwards, treat as not found. */ return -1; } else if (time == 0) { /* * We didn't cross a time. If there is a future sample that we * haven't reached yet, and we've already reached at least one sample, * let's interpolate the local flushed time. This is mainly useful * for reporting a completely stuck apply position as having * increasing lag, since otherwise we'd have to wait for it to * eventually start moving again and cross one of our samples before * we can show the lag increasing. */ if (lag_tracker->read_heads[head] == lag_tracker->write_head) { /* There are no future samples, so we can't interpolate. */ return -1; } else if (lag_tracker->last_read[head].time != 0) { /* We can interpolate between last_read and the next sample. */ double fraction; WalTimeSample prev = lag_tracker->last_read[head]; WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]]; if (lsn < prev.lsn) { /* * Reported LSNs shouldn't normally go backwards, but it's * possible when there is a timeline change. Treat as not * found. */ return -1; } Assert(prev.lsn < next.lsn); if (prev.time > next.time) { /* If the clock somehow went backwards, treat as not found. */ return -1; } /* See how far we are between the previous and next samples. */ fraction = (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn); /* Scale the local flush time proportionally. */ time = (TimestampTz) ((double) prev.time + (next.time - prev.time) * fraction); } else { /* * We have only a future sample, implying that we were entirely * caught up but and now there is a new burst of WAL and the * standby hasn't processed the first sample yet. Until the * standby reaches the future sample the best we can do is report * the hypothetical lag if that sample were to be replayed now. */ time = lag_tracker->buffer[lag_tracker->read_heads[head]].time; } } /* Return the elapsed time since local flush time in microseconds. */ Assert(time != 0); return now - time; } static void UpdateSpillStats(LogicalDecodingContext *ctx) { ReorderBuffer *rb = ctx->reorder; SpinLockAcquire(&MyWalSnd->mutex); MyWalSnd->spillTxns = rb->spillTxns; MyWalSnd->spillCount = rb->spillCount; MyWalSnd->spillBytes = rb->spillBytes; elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld", rb, (long long) rb->spillTxns, (long long) rb->spillCount, (long long) rb->spillBytes); SpinLockRelease(&MyWalSnd->mutex); }