Add a slot synchronization function.

This commit introduces a new SQL function pg_sync_replication_slots()
which is used to synchronize the logical replication slots from the
primary server to the physical standby so that logical replication can be
resumed after a failover or planned switchover.

A new 'synced' flag is introduced in pg_replication_slots view, indicating
whether the slot has been synchronized from the primary server. On a
standby, synced slots cannot be dropped or consumed, and any attempt to
perform logical decoding on them will result in an error.

The logical replication slots on the primary can be synchronized to the
hot standby by using the 'failover' parameter of
pg-create-logical-replication-slot(), or by using the 'failover' option of
CREATE SUBSCRIPTION during slot creation, and then calling
pg_sync_replication_slots() on standby. For the synchronization to work,
it is mandatory to have a physical replication slot between the primary
and the standby aka 'primary_slot_name' should be configured on the
standby, and 'hot_standby_feedback' must be enabled on the standby. It is
also necessary to specify a valid 'dbname' in the 'primary_conninfo'.

If a logical slot is invalidated on the primary, then that slot on the
standby is also invalidated.

If a logical slot on the primary is valid but is invalidated on the
standby, then that slot is dropped but will be recreated on the standby in
the next pg_sync_replication_slots() call provided the slot still exists
on the primary server. It is okay to recreate such slots as long as these
are not consumable on standby (which is the case currently). This
situation may occur due to the following reasons:
- The 'max_slot_wal_keep_size' on the standby is insufficient to retain
WAL records from the restart_lsn of the slot.
- 'primary_slot_name' is temporarily reset to null and the physical slot
is removed.

The slot synchronization status on the standby can be monitored using the
'synced' column of pg_replication_slots view.

A functionality to automatically synchronize slots by a background worker
and allow logical walsenders to wait for the physical will be done in
subsequent commits.

Author: Hou Zhijie, Shveta Malik, Ajin Cherian based on an earlier version by Peter Eisentraut
Reviewed-by: Masahiko Sawada, Bertrand Drouvot, Peter Smith, Dilip Kumar, Nisha Moond, Kuroda Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
This commit is contained in:
Amit Kapila 2024-02-14 09:45:36 +05:30
parent 06bd311bce
commit ddd5f4f54a
26 changed files with 1522 additions and 37 deletions

View File

@ -64,6 +64,9 @@ DETAIL: Only roles with the REPLICATION attribute may use replication slots.
SELECT pg_drop_replication_slot('regression_slot');
ERROR: permission denied to use replication slots
DETAIL: Only roles with the REPLICATION attribute may use replication slots.
SELECT pg_sync_replication_slots();
ERROR: permission denied to use replication slots
DETAIL: Only roles with the REPLICATION attribute may use replication slots.
RESET ROLE;
-- replication users can drop superuser created slots
SET ROLE regress_lr_superuser;

View File

@ -425,6 +425,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', '
init
(1 row)
SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true);
ERROR: cannot enable failover for a temporary replication slot
SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
?column?
----------

View File

@ -29,6 +29,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
INSERT INTO lr_test VALUES('lr_superuser_init');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot');
SELECT pg_sync_replication_slots();
RESET ROLE;
-- replication users can drop superuser created slots

View File

@ -181,6 +181,7 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp');
SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true);
SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false);
SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false);
SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true);
SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
SELECT slot_name, slot_type, failover FROM pg_replication_slots;

View File

@ -4612,8 +4612,13 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
<varname>primary_conninfo</varname> string, or in a separate
<filename>~/.pgpass</filename> file on the standby server (use
<literal>replication</literal> as the database name).
Do not specify a database name in the
<varname>primary_conninfo</varname> string.
</para>
<para>
For replication slot synchronization (see
<xref linkend="logicaldecoding-replication-slots-synchronization"/>),
it is also necessary to specify a valid <literal>dbname</literal>
in the <varname>primary_conninfo</varname> string. This will only be
used for slot synchronization. It is ignored for streaming.
</para>
<para>
This parameter can only be set in the <filename>postgresql.conf</filename>

View File

@ -28075,7 +28075,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
</row>
<row>
<entry role="func_table_entry"><para role="func_signature">
<entry id="pg-create-logical-replication-slot" role="func_table_entry"><para role="func_signature">
<indexterm>
<primary>pg_create_logical_replication_slot</primary>
</indexterm>
@ -28444,6 +28444,39 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
record is flushed along with its transaction.
</para></entry>
</row>
<row>
<entry id="pg-sync-replication-slots" role="func_table_entry"><para role="func_signature">
<indexterm>
<primary>pg_sync_replication_slots</primary>
</indexterm>
<function>pg_sync_replication_slots</function> ()
<returnvalue>void</returnvalue>
</para>
<para>
Synchronize the logical failover replication slots from the primary
server to the standby server. This function can only be executed on the
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
</para>
<caution>
<para>
If, after executing the function,
<link linkend="guc-hot-standby-feedback">
<varname>hot_standby_feedback</varname></link> is disabled on
the standby or the physical slot configured in
<link linkend="guc-primary-slot-name">
<varname>primary_slot_name</varname></link> is
removed, then it is possible that the necessary rows of the
synchronized slot will be removed by the VACUUM process on the primary
server, resulting in the synchronized slot becoming invalidated.
</para>
</caution>
</entry>
</row>
</tbody>
</tgroup>
</table>

View File

@ -358,6 +358,62 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
So if a slot is no longer required it should be dropped.
</para>
</caution>
</sect2>
<sect2 id="logicaldecoding-replication-slots-synchronization">
<title>Replication Slot Synchronization</title>
<para>
The logical replication slots on the primary can be synchronized to
the hot standby by using the <literal>failover</literal> parameter of
<link linkend="pg-create-logical-replication-slot">
<function>pg_create_logical_replication_slot</function></link>, or by
using the <link linkend="sql-createsubscription-params-with-failover">
<literal>failover</literal></link> option of
<command>CREATE SUBSCRIPTION</command> during slot creation, and then calling
<link linkend="pg-sync-replication-slots">
<function>pg_sync_replication_slots</function></link>
on the standby. For the synchronization to work, it is mandatory to
have a physical replication slot between the primary and the standby aka
<link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
should be configured on the standby, and
<link linkend="guc-hot-standby-feedback"><varname>hot_standby_feedback</varname></link>
must be enabled on the standby. It is also necessary to specify a valid
<literal>dbname</literal> in the
<link linkend="guc-primary-conninfo"><varname>primary_conninfo</varname></link>.
</para>
<para>
The ability to resume logical replication after failover depends upon the
<link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>synced</structfield>
value for the synchronized slots on the standby at the time of failover.
Only persistent slots that have attained synced state as true on the standby
before failover can be used for logical replication after failover.
Temporary synced slots cannot be used for logical decoding, therefore
logical replication for those slots cannot be resumed. For example, if the
synchronized slot could not become persistent on the standby due to a
disabled subscription, then the subscription cannot be resumed after
failover even when it is enabled.
</para>
<para>
To resume logical replication after failover from the synced logical
slots, the subscription's 'conninfo' must be altered to point to the
new primary server. This is done using
<link linkend="sql-altersubscription-params-connection"><command>ALTER SUBSCRIPTION ... CONNECTION</command></link>.
It is recommended that subscriptions are first disabled before promoting
the standby and are re-enabled after altering the connection string.
</para>
<caution>
<para>
There is a chance that the old primary is up again during the promotion
and if subscriptions are not disabled, the logical subscribers may
continue to receive data from the old primary server even after promotion
until the connection string is altered. This might result in data
inconsistency issues, preventing the logical subscribers from being
able to continue replication from the new primary server.
</para>
</caution>
</sect2>
<sect2 id="logicaldecoding-explanation-output-plugins">

View File

@ -2062,7 +2062,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
If true, the slot is enabled to be synced to the standbys.
If true, the slot is enabled to be synced to the standbys
so that logical replication can be resumed after failover.
The default is false.
</para>
</listitem>
@ -2162,7 +2163,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
If true, the slot is enabled to be synced to the standbys.
If true, the slot is enabled to be synced to the standbys
so that logical replication can be resumed after failover.
</para>
</listitem>
</varlistentry>

View File

@ -2561,10 +2561,26 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
<structfield>failover</structfield> <type>bool</type>
</para>
<para>
True if this is a logical slot enabled to be synced to the standbys.
Always false for physical slots.
True if this is a logical slot enabled to be synced to the standbys
so that logical replication can be resumed from the new primary
after failover. Always false for physical slots.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>synced</structfield> <type>bool</type>
</para>
<para>
True if this is a logical slot that was synced from a primary server.
On a hot standby, the slots with the synced column marked as true can
neither be used for logical decoding nor dropped manually. The value
of this column has no meaning on the primary server; the column value on
the primary is default false for all slots but may (if leftover from a
promoted standby) also be true.
</para></entry>
</row>
</tbody>
</tgroup>
</table>

View File

@ -1024,7 +1024,8 @@ CREATE VIEW pg_replication_slots AS
L.safe_wal_size,
L.two_phase,
L.conflict_reason,
L.failover
L.failover,
L.synced
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);

View File

@ -25,6 +25,7 @@ OBJS = \
proto.o \
relation.o \
reorderbuffer.o \
slotsync.o \
snapbuild.o \
tablesync.o \
worker.o

View File

@ -524,6 +524,18 @@ CreateDecodingContext(XLogRecPtr start_lsn,
errmsg("replication slot \"%s\" was not created in this database",
NameStr(slot->data.name))));
/*
* Do not allow consumption of a "synchronized" slot until the standby
* gets promoted.
*/
if (RecoveryInProgress() && slot->data.synced)
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use replication slot \"%s\" for logical decoding",
NameStr(slot->data.name)),
errdetail("This slot is being synchronized from the primary server."),
errhint("Specify another replication slot."));
/*
* Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
* "cannot get changes" wording in this errmsg because that'd be

View File

@ -11,6 +11,7 @@ backend_sources += files(
'proto.c',
'relation.c',
'reorderbuffer.c',
'slotsync.c',
'snapbuild.c',
'tablesync.c',
'worker.c',

View File

@ -0,0 +1,906 @@
/*-------------------------------------------------------------------------
* slotsync.c
* Functionality for synchronizing slots to a standby server from the
* primary server.
*
* Copyright (c) 2024, PostgreSQL Global Development Group
*
* IDENTIFICATION
* src/backend/replication/logical/slotsync.c
*
* This file contains the code for slot synchronization on a physical standby
* to fetch logical failover slots information from the primary server, create
* the slots on the standby and synchronize them. This is done by a call to SQL
* function pg_sync_replication_slots.
*
* If on physical standby, the WAL corresponding to the remote's restart_lsn
* is not available or the remote's catalog_xmin precedes the oldest xid for which
* it is guaranteed that rows wouldn't have been removed then we cannot create
* the local standby slot because that would mean moving the local slot
* backward and decoding won't be possible via such a slot. In this case, the
* slot will be marked as RS_TEMPORARY. Once the primary server catches up,
* the slot will be marked as RS_PERSISTENT (which means sync-ready) after
* which we can call pg_sync_replication_slots() periodically to perform
* syncs.
*
* Any standby synchronized slots will be dropped if they no longer need
* to be synchronized. See comment atop drop_local_obsolete_slots() for more
* details.
*---------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "catalog/pg_database.h"
#include "commands/dbcommands.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/pg_lsn.h"
/* Struct for sharing information to control slot synchronization. */
typedef struct SlotSyncCtxStruct
{
/* prevents concurrent slot syncs to avoid slot overwrites */
bool syncing;
slock_t mutex;
} SlotSyncCtxStruct;
SlotSyncCtxStruct *SlotSyncCtx = NULL;
/*
* Flag to tell if we are syncing replication slots. Unlike the 'syncing' flag
* in SlotSyncCtxStruct, this flag is true only if the current process is
* performing slot synchronization.
*/
static bool syncing_slots = false;
/*
* Structure to hold information fetched from the primary server about a logical
* replication slot.
*/
typedef struct RemoteSlot
{
char *name;
char *plugin;
char *database;
bool two_phase;
bool failover;
XLogRecPtr restart_lsn;
XLogRecPtr confirmed_lsn;
TransactionId catalog_xmin;
/* RS_INVAL_NONE if valid, or the reason of invalidation */
ReplicationSlotInvalidationCause invalidated;
} RemoteSlot;
/*
* If necessary, update the local synced slot's metadata based on the data
* from the remote slot.
*
* If no update was needed (the data of the remote slot is the same as the
* local slot) return false, otherwise true.
*/
static bool
update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot = MyReplicationSlot;
bool xmin_changed;
bool restart_lsn_changed;
NameData plugin_name;
Assert(slot->data.invalidated == RS_INVAL_NONE);
xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin);
restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn);
if (!xmin_changed &&
!restart_lsn_changed &&
remote_dbid == slot->data.database &&
remote_slot->two_phase == slot->data.two_phase &&
remote_slot->failover == slot->data.failover &&
remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
return false;
/* Avoid expensive operations while holding a spinlock. */
namestrcpy(&plugin_name, remote_slot->plugin);
SpinLockAcquire(&slot->mutex);
slot->data.plugin = plugin_name;
slot->data.database = remote_dbid;
slot->data.two_phase = remote_slot->two_phase;
slot->data.failover = remote_slot->failover;
slot->data.restart_lsn = remote_slot->restart_lsn;
slot->data.confirmed_flush = remote_slot->confirmed_lsn;
slot->data.catalog_xmin = remote_slot->catalog_xmin;
slot->effective_catalog_xmin = remote_slot->catalog_xmin;
SpinLockRelease(&slot->mutex);
if (xmin_changed)
ReplicationSlotsComputeRequiredXmin(false);
if (restart_lsn_changed)
ReplicationSlotsComputeRequiredLSN();
return true;
}
/*
* Get the list of local logical slots that are synchronized from the
* primary server.
*/
static List *
get_local_synced_slots(void)
{
List *local_slots = NIL;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (int i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
/* Check if it is a synchronized slot */
if (s->in_use && s->data.synced)
{
Assert(SlotIsLogical(s));
local_slots = lappend(local_slots, s);
}
}
LWLockRelease(ReplicationSlotControlLock);
return local_slots;
}
/*
* Helper function to check if local_slot is required to be retained.
*
* Return false either if local_slot does not exist in the remote_slots list
* or is invalidated while the corresponding remote slot is still valid,
* otherwise true.
*/
static bool
local_sync_slot_required(ReplicationSlot *local_slot, List *remote_slots)
{
bool remote_exists = false;
bool locally_invalidated = false;
foreach_ptr(RemoteSlot, remote_slot, remote_slots)
{
if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0)
{
remote_exists = true;
/*
* If remote slot is not invalidated but local slot is marked as
* invalidated, then set locally_invalidated flag.
*/
SpinLockAcquire(&local_slot->mutex);
locally_invalidated =
(remote_slot->invalidated == RS_INVAL_NONE) &&
(local_slot->data.invalidated != RS_INVAL_NONE);
SpinLockRelease(&local_slot->mutex);
break;
}
}
return (remote_exists && !locally_invalidated);
}
/*
* Drop local obsolete slots.
*
* Drop the local slots that no longer need to be synced i.e. these either do
* not exist on the primary or are no longer enabled for failover.
*
* Additionally, drop any slots that are valid on the primary but got
* invalidated on the standby. This situation may occur due to the following
* reasons:
* - The 'max_slot_wal_keep_size' on the standby is insufficient to retain WAL
* records from the restart_lsn of the slot.
* - 'primary_slot_name' is temporarily reset to null and the physical slot is
* removed.
* These dropped slots will get recreated in next sync-cycle and it is okay to
* drop and recreate such slots as long as these are not consumable on the
* standby (which is the case currently).
*
* Note: Change of 'wal_level' on the primary server to a level lower than
* logical may also result in slot invalidation and removal on the standby.
* This is because such 'wal_level' change is only possible if the logical
* slots are removed on the primary server, so it's expected to see the
* slots being invalidated and removed on the standby too (and re-created
* if they are re-created on the primary server).
*/
static void
drop_local_obsolete_slots(List *remote_slot_list)
{
List *local_slots = get_local_synced_slots();
foreach_ptr(ReplicationSlot, local_slot, local_slots)
{
/* Drop the local slot if it is not required to be retained. */
if (!local_sync_slot_required(local_slot, remote_slot_list))
{
bool synced_slot;
/*
* Use shared lock to prevent a conflict with
* ReplicationSlotsDropDBSlots(), trying to drop the same slot
* during a drop-database operation.
*/
LockSharedObject(DatabaseRelationId, local_slot->data.database,
0, AccessShareLock);
/*
* In the small window between getting the slot to drop and
* locking the database, there is a possibility of a parallel
* database drop by the startup process and the creation of a new
* slot by the user. This new user-created slot may end up using
* the same shared memory as that of 'local_slot'. Thus check if
* local_slot is still the synced one before performing actual
* drop.
*/
SpinLockAcquire(&local_slot->mutex);
synced_slot = local_slot->in_use && local_slot->data.synced;
SpinLockRelease(&local_slot->mutex);
if (synced_slot)
{
ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
ReplicationSlotDropAcquired();
}
UnlockSharedObject(DatabaseRelationId, local_slot->data.database,
0, AccessShareLock);
ereport(LOG,
errmsg("dropped replication slot \"%s\" of dbid %d",
NameStr(local_slot->data.name),
local_slot->data.database));
}
}
}
/*
* Reserve WAL for the currently active local slot using the specified WAL
* location (restart_lsn).
*
* If the given WAL location has been removed, reserve WAL using the oldest
* existing WAL segment.
*/
static void
reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
{
XLogSegNo oldest_segno;
XLogSegNo segno;
ReplicationSlot *slot = MyReplicationSlot;
Assert(slot != NULL);
Assert(XLogRecPtrIsInvalid(slot->data.restart_lsn));
while (true)
{
SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = restart_lsn;
SpinLockRelease(&slot->mutex);
/* Prevent WAL removal as fast as possible */
ReplicationSlotsComputeRequiredLSN();
XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
/*
* Find the oldest existing WAL segment file.
*
* Normally, we can determine it by using the last removed segment
* number. However, if no WAL segment files have been removed by a
* checkpoint since startup, we need to search for the oldest segment
* file from the current timeline existing in XLOGDIR.
*
* XXX: Currently, we are searching for the oldest segment in the
* current timeline as there is less chance of the slot's restart_lsn
* from being some prior timeline, and even if it happens, in the
* worst case, we will wait to sync till the slot's restart_lsn moved
* to the current timeline.
*/
oldest_segno = XLogGetLastRemovedSegno() + 1;
if (oldest_segno == 1)
{
TimeLineID cur_timeline;
GetWalRcvFlushRecPtr(NULL, &cur_timeline);
oldest_segno = XLogGetOldestSegno(cur_timeline);
}
/*
* If all required WAL is still there, great, otherwise retry. The
* slot should prevent further removal of WAL, unless there's a
* concurrent ReplicationSlotsComputeRequiredLSN() after we've written
* the new restart_lsn above, so normally we should never need to loop
* more than twice.
*/
if (segno >= oldest_segno)
break;
/* Retry using the location of the oldest wal segment */
XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
}
}
/*
* If the remote restart_lsn and catalog_xmin have caught up with the
* local ones, then update the LSNs and persist the local synced slot for
* future synchronization; otherwise, do nothing.
*/
static void
update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot = MyReplicationSlot;
/*
* Check if the primary server has caught up. Refer to the comment atop
* the file for details on this check.
*/
if (remote_slot->restart_lsn < slot->data.restart_lsn ||
TransactionIdPrecedes(remote_slot->catalog_xmin,
slot->data.catalog_xmin))
{
/*
* The remote slot didn't catch up to locally reserved position.
*
* We do not drop the slot because the restart_lsn can be ahead of the
* current location when recreating the slot in the next cycle. It may
* take more time to create such a slot. Therefore, we keep this slot
* and attempt the synchronization in the next cycle.
*/
return;
}
/* First time slot update, the function must return true */
if (!update_local_synced_slot(remote_slot, remote_dbid))
elog(ERROR, "failed to update slot");
ReplicationSlotPersist();
ereport(LOG,
errmsg("newly created slot \"%s\" is sync-ready now",
remote_slot->name));
}
/*
* Synchronize a single slot to the given position.
*
* This creates a new slot if there is no existing one and updates the
* metadata of the slot as per the data received from the primary server.
*
* The slot is created as a temporary slot and stays in the same state until the
* the remote_slot catches up with locally reserved position and local slot is
* updated. The slot is then persisted and is considered as sync-ready for
* periodic syncs.
*/
static void
synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ReplicationSlot *slot;
XLogRecPtr latestFlushPtr;
/*
* Make sure that concerned WAL is received and flushed before syncing
* slot to target lsn received from the primary server.
*/
latestFlushPtr = GetStandbyFlushRecPtr(NULL);
if (remote_slot->confirmed_lsn > latestFlushPtr)
elog(ERROR,
"skipping slot synchronization as the received slot sync"
" LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
remote_slot->name,
LSN_FORMAT_ARGS(latestFlushPtr));
/* Search for the named slot */
if ((slot = SearchNamedReplicationSlot(remote_slot->name, true)))
{
bool synced;
SpinLockAcquire(&slot->mutex);
synced = slot->data.synced;
SpinLockRelease(&slot->mutex);
/* User-created slot with the same name exists, raise ERROR. */
if (!synced)
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("exiting from slot synchronization because same"
" name slot \"%s\" already exists on the standby",
remote_slot->name));
/*
* The slot has been synchronized before.
*
* It is important to acquire the slot here before checking
* invalidation. If we don't acquire the slot first, there could be a
* race condition that the local slot could be invalidated just after
* checking the 'invalidated' flag here and we could end up
* overwriting 'invalidated' flag to remote_slot's value. See
* InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
* if the slot is not acquired by other processes.
*/
ReplicationSlotAcquire(remote_slot->name, true);
Assert(slot == MyReplicationSlot);
/*
* Copy the invalidation cause from remote only if local slot is not
* invalidated locally, we don't want to overwrite existing one.
*/
if (slot->data.invalidated == RS_INVAL_NONE &&
remote_slot->invalidated != RS_INVAL_NONE)
{
SpinLockAcquire(&slot->mutex);
slot->data.invalidated = remote_slot->invalidated;
SpinLockRelease(&slot->mutex);
/* Make sure the invalidated state persists across server restart */
ReplicationSlotMarkDirty();
ReplicationSlotSave();
}
/* Skip the sync of an invalidated slot */
if (slot->data.invalidated != RS_INVAL_NONE)
{
ReplicationSlotRelease();
return;
}
/* Slot not ready yet, let's attempt to make it sync-ready now. */
if (slot->data.persistency == RS_TEMPORARY)
{
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
}
/* Slot ready for sync, so sync it. */
else
{
/*
* Sanity check: As long as the invalidations are handled
* appropriately as above, this should never happen.
*/
if (remote_slot->restart_lsn < slot->data.restart_lsn)
elog(ERROR,
"cannot synchronize local slot \"%s\" LSN(%X/%X)"
" to remote slot's LSN(%X/%X) as synchronization"
" would move it backwards", remote_slot->name,
LSN_FORMAT_ARGS(slot->data.restart_lsn),
LSN_FORMAT_ARGS(remote_slot->restart_lsn));
/* Make sure the slot changes persist across server restart */
if (update_local_synced_slot(remote_slot, remote_dbid))
{
ReplicationSlotMarkDirty();
ReplicationSlotSave();
}
}
}
/* Otherwise create the slot first. */
else
{
NameData plugin_name;
TransactionId xmin_horizon = InvalidTransactionId;
/* Skip creating the local slot if remote_slot is invalidated already */
if (remote_slot->invalidated != RS_INVAL_NONE)
return;
/*
* We create temporary slots instead of ephemeral slots here because
* we want the slots to survive after releasing them. This is done to
* avoid dropping and re-creating the slots in each synchronization
* cycle if the restart_lsn or catalog_xmin of the remote slot has not
* caught up.
*/
ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY,
remote_slot->two_phase,
remote_slot->failover,
true);
/* For shorter lines. */
slot = MyReplicationSlot;
/* Avoid expensive operations while holding a spinlock. */
namestrcpy(&plugin_name, remote_slot->plugin);
SpinLockAcquire(&slot->mutex);
slot->data.database = remote_dbid;
slot->data.plugin = plugin_name;
SpinLockRelease(&slot->mutex);
reserve_wal_for_local_slot(remote_slot->restart_lsn);
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
xmin_horizon = GetOldestSafeDecodingTransactionId(true);
SpinLockAcquire(&slot->mutex);
slot->effective_catalog_xmin = xmin_horizon;
slot->data.catalog_xmin = xmin_horizon;
SpinLockRelease(&slot->mutex);
ReplicationSlotsComputeRequiredXmin(true);
LWLockRelease(ProcArrayLock);
update_and_persist_local_synced_slot(remote_slot, remote_dbid);
}
ReplicationSlotRelease();
}
/*
* Synchronize slots.
*
* Gets the failover logical slots info from the primary server and updates
* the slots locally. Creates the slots if not present on the standby.
*/
static void
synchronize_slots(WalReceiverConn *wrconn)
{
#define SLOTSYNC_COLUMN_COUNT 9
Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID};
WalRcvExecResult *res;
TupleTableSlot *tupslot;
StringInfoData s;
List *remote_slot_list = NIL;
SpinLockAcquire(&SlotSyncCtx->mutex);
if (SlotSyncCtx->syncing)
{
SpinLockRelease(&SlotSyncCtx->mutex);
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot synchronize replication slots concurrently"));
}
SlotSyncCtx->syncing = true;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = true;
initStringInfo(&s);
/* Construct query to fetch slots with failover enabled. */
appendStringInfo(&s,
"SELECT slot_name, plugin, confirmed_flush_lsn,"
" restart_lsn, catalog_xmin, two_phase, failover,"
" database, conflict_reason"
" FROM pg_catalog.pg_replication_slots"
" WHERE failover and NOT temporary");
/* Execute the query */
res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow);
pfree(s.data);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
errmsg("could not fetch failover logical slots info from the primary server: %s",
res->err));
/* Construct the remote_slot tuple and synchronize each slot locally */
tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
{
bool isnull;
RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot));
Datum d;
int col = 0;
remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, ++col,
&isnull));
Assert(!isnull);
remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, ++col,
&isnull));
Assert(!isnull);
/*
* It is possible to get null values for LSN and Xmin if slot is
* invalidated on the primary server, so handle accordingly.
*/
d = slot_getattr(tupslot, ++col, &isnull);
remote_slot->confirmed_lsn = isnull ? InvalidXLogRecPtr :
DatumGetLSN(d);
d = slot_getattr(tupslot, ++col, &isnull);
remote_slot->restart_lsn = isnull ? InvalidXLogRecPtr : DatumGetLSN(d);
d = slot_getattr(tupslot, ++col, &isnull);
remote_slot->catalog_xmin = isnull ? InvalidTransactionId :
DatumGetTransactionId(d);
remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, ++col,
&isnull));
Assert(!isnull);
remote_slot->failover = DatumGetBool(slot_getattr(tupslot, ++col,
&isnull));
Assert(!isnull);
remote_slot->database = TextDatumGetCString(slot_getattr(tupslot,
++col, &isnull));
Assert(!isnull);
d = slot_getattr(tupslot, ++col, &isnull);
remote_slot->invalidated = isnull ? RS_INVAL_NONE :
GetSlotInvalidationCause(TextDatumGetCString(d));
/* Sanity check */
Assert(col == SLOTSYNC_COLUMN_COUNT);
/*
* If restart_lsn, confirmed_lsn or catalog_xmin is invalid but the
* slot is valid, that means we have fetched the remote_slot in its
* RS_EPHEMERAL state. In such a case, don't sync it; we can always
* sync it in the next sync cycle when the remote_slot is persisted
* and has valid lsn(s) and xmin values.
*
* XXX: In future, if we plan to expose 'slot->data.persistency' in
* pg_replication_slots view, then we can avoid fetching RS_EPHEMERAL
* slots in the first place.
*/
if ((XLogRecPtrIsInvalid(remote_slot->restart_lsn) ||
XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) ||
!TransactionIdIsValid(remote_slot->catalog_xmin)) &&
remote_slot->invalidated == RS_INVAL_NONE)
pfree(remote_slot);
else
/* Create list of remote slots */
remote_slot_list = lappend(remote_slot_list, remote_slot);
ExecClearTuple(tupslot);
}
/* Drop local slots that no longer need to be synced. */
drop_local_obsolete_slots(remote_slot_list);
/* Now sync the slots locally */
foreach_ptr(RemoteSlot, remote_slot, remote_slot_list)
{
Oid remote_dbid = get_database_oid(remote_slot->database, false);
/*
* Use shared lock to prevent a conflict with
* ReplicationSlotsDropDBSlots(), trying to drop the same slot during
* a drop-database operation.
*/
LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
synchronize_one_slot(remote_slot, remote_dbid);
UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
}
/* We are done, free remote_slot_list elements */
list_free_deep(remote_slot_list);
walrcv_clear_result(res);
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = false;
}
/*
* Checks the remote server info.
*
* We ensure that the 'primary_slot_name' exists on the remote server and the
* remote server is not a standby node.
*/
static void
validate_remote_info(WalReceiverConn *wrconn)
{
#define PRIMARY_INFO_OUTPUT_COL_COUNT 2
WalRcvExecResult *res;
Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID};
StringInfoData cmd;
bool isnull;
TupleTableSlot *tupslot;
bool remote_in_recovery;
bool primary_slot_valid;
initStringInfo(&cmd);
appendStringInfo(&cmd,
"SELECT pg_is_in_recovery(), count(*) = 1"
" FROM pg_catalog.pg_replication_slots"
" WHERE slot_type='physical' AND slot_name=%s",
quote_literal_cstr(PrimarySlotName));
res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow);
pfree(cmd.data);
if (res->status != WALRCV_OK_TUPLES)
ereport(ERROR,
errmsg("could not fetch primary_slot_name \"%s\" info from the primary server: %s",
PrimarySlotName, res->err),
errhint("Check if \"primary_slot_name\" is configured correctly."));
tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
elog(ERROR,
"failed to fetch tuple for the primary server slot specified by \"primary_slot_name\"");
remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
Assert(!isnull);
if (remote_in_recovery)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot synchronize replication slots from a standby server"));
primary_slot_valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull));
Assert(!isnull);
if (!primary_slot_valid)
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("bad configuration for slot synchronization"),
/* translator: second %s is a GUC variable name */
errdetail("The replication slot \"%s\" specified by \"%s\" does not exist on the primary server.",
PrimarySlotName, "primary_slot_name"));
ExecClearTuple(tupslot);
walrcv_clear_result(res);
}
/*
* Check all necessary GUCs for slot synchronization are set
* appropriately, otherwise, raise ERROR.
*/
void
ValidateSlotSyncParams(void)
{
char *dbname;
/*
* A physical replication slot(primary_slot_name) is required on the
* primary to ensure that the rows needed by the standby are not removed
* after restarting, so that the synchronized slot on the standby will not
* be invalidated.
*/
if (PrimarySlotName == NULL || *PrimarySlotName == '\0')
ereport(ERROR,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("bad configuration for slot synchronization"),
errhint("\"%s\" must be defined.", "primary_slot_name"));
/*
* hot_standby_feedback must be enabled to cooperate with the physical
* replication slot, which allows informing the primary about the xmin and
* catalog_xmin values on the standby.
*/
if (!hot_standby_feedback)
ereport(ERROR,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("bad configuration for slot synchronization"),
errhint("\"%s\" must be enabled.", "hot_standby_feedback"));
/* Logical slot sync/creation requires wal_level >= logical. */
if (wal_level < WAL_LEVEL_LOGICAL)
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("bad configuration for slot synchronization"),
errhint("\"wal_level\" must be >= logical."));
/*
* The primary_conninfo is required to make connection to primary for
* getting slots information.
*/
if (PrimaryConnInfo == NULL || *PrimaryConnInfo == '\0')
ereport(ERROR,
/* translator: %s is a GUC variable name */
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("bad configuration for slot synchronization"),
errhint("\"%s\" must be defined.", "primary_conninfo"));
/*
* The slot synchronization needs a database connection for walrcv_exec to
* work.
*/
dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
if (dbname == NULL)
ereport(ERROR,
/*
* translator: 'dbname' is a specific option; %s is a GUC variable
* name
*/
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("bad configuration for slot synchronization"),
errhint("'dbname' must be specified in \"%s\".", "primary_conninfo"));
}
/*
* Is current process syncing replication slots ?
*/
bool
IsSyncingReplicationSlots(void)
{
return syncing_slots;
}
/*
* Amount of shared memory required for slot synchronization.
*/
Size
SlotSyncShmemSize(void)
{
return sizeof(SlotSyncCtxStruct);
}
/*
* Allocate and initialize the shared memory of slot synchronization.
*/
void
SlotSyncShmemInit(void)
{
bool found;
SlotSyncCtx = (SlotSyncCtxStruct *)
ShmemInitStruct("Slot Sync Data", SlotSyncShmemSize(), &found);
if (!found)
{
SlotSyncCtx->syncing = false;
SpinLockInit(&SlotSyncCtx->mutex);
}
}
/*
* Error cleanup callback for slot synchronization.
*/
static void
slotsync_failure_callback(int code, Datum arg)
{
WalReceiverConn *wrconn = (WalReceiverConn *) DatumGetPointer(arg);
if (syncing_slots)
{
/*
* If syncing_slots is true, it indicates that the process errored out
* without resetting the flag. So, we need to clean up shared memory
* and reset the flag here.
*/
SpinLockAcquire(&SlotSyncCtx->mutex);
SlotSyncCtx->syncing = false;
SpinLockRelease(&SlotSyncCtx->mutex);
syncing_slots = false;
}
walrcv_disconnect(wrconn);
}
/*
* Synchronize the failover enabled replication slots using the specified
* primary server connection.
*/
void
SyncReplicationSlots(WalReceiverConn *wrconn)
{
PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
{
validate_remote_info(wrconn);
synchronize_slots(wrconn);
}
PG_END_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
}

View File

@ -46,6 +46,7 @@
#include "common/string.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@ -90,7 +91,7 @@ typedef struct ReplicationSlotOnDisk
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
#define SLOT_MAGIC 0x1051CA1 /* format identifier */
#define SLOT_VERSION 4 /* version for new files */
#define SLOT_VERSION 5 /* version for new files */
/* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@ -103,7 +104,6 @@ int max_replication_slots = 10; /* the maximum number of replication
* slots */
static void ReplicationSlotShmemExit(int code, Datum arg);
static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
/* internal persistency functions */
@ -250,11 +250,12 @@ ReplicationSlotValidateName(const char *name, int elevel)
* user will only get commit prepared.
* failover: If enabled, allows the slot to be synced to standbys so
* that logical replication can be resumed after failover.
* synced: True if the slot is synchronized from the primary server.
*/
void
ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
bool two_phase, bool failover)
bool two_phase, bool failover, bool synced)
{
ReplicationSlot *slot = NULL;
int i;
@ -263,6 +264,34 @@ ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotValidateName(name, ERROR);
if (failover)
{
/*
* Do not allow users to create the failover enabled slots on the
* standby as we do not support sync to the cascading standby.
*
* However, failover enabled slots can be created during slot
* synchronization because we need to retain the same values as the
* remote slot.
*/
if (RecoveryInProgress() && !IsSyncingReplicationSlots())
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot enable failover for a replication slot created on the standby"));
/*
* Do not allow users to create failover enabled temporary slots,
* because temporary slots will not be synced to the standby.
*
* However, failover enabled temporary slots can be created during
* slot synchronization. See the comments atop slotsync.c for details.
*/
if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot enable failover for a temporary replication slot"));
}
/*
* If some other backend ran this code concurrently with us, we'd likely
* both allocate the same slot, and that would be bad. We'd also be at
@ -315,6 +344,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
slot->data.two_phase = two_phase;
slot->data.two_phase_at = InvalidXLogRecPtr;
slot->data.failover = failover;
slot->data.synced = synced;
/* and then data only present in shared memory */
slot->just_dirtied = false;
@ -677,6 +707,16 @@ ReplicationSlotDrop(const char *name, bool nowait)
ReplicationSlotAcquire(name, nowait);
/*
* Do not allow users to drop the slots which are currently being synced
* from the primary to the standby.
*/
if (RecoveryInProgress() && MyReplicationSlot->data.synced)
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot drop replication slot \"%s\"", name),
errdetail("This slot is being synced from the primary server."));
ReplicationSlotDropAcquired();
}
@ -696,6 +736,38 @@ ReplicationSlotAlter(const char *name, bool failover)
errmsg("cannot use %s with a physical replication slot",
"ALTER_REPLICATION_SLOT"));
if (RecoveryInProgress())
{
/*
* Do not allow users to alter the slots which are currently being
* synced from the primary to the standby.
*/
if (MyReplicationSlot->data.synced)
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot alter replication slot \"%s\"", name),
errdetail("This slot is being synced from the primary server."));
/*
* Do not allow users to enable failover on the standby as we do not
* support sync to the cascading standby.
*/
if (failover)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot enable failover for a replication slot"
" on the standby"));
}
/*
* Do not allow users to enable failover for temporary slots as we do not
* support syncing temporary slots to the standby.
*/
if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
ereport(ERROR,
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot enable failover for a temporary replication slot"));
if (MyReplicationSlot->data.failover != failover)
{
SpinLockAcquire(&MyReplicationSlot->mutex);
@ -712,7 +784,7 @@ ReplicationSlotAlter(const char *name, bool failover)
/*
* Permanently drop the currently acquired replication slot.
*/
static void
void
ReplicationSlotDropAcquired(void)
{
ReplicationSlot *slot = MyReplicationSlot;
@ -868,8 +940,8 @@ ReplicationSlotMarkDirty(void)
}
/*
* Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot,
* guaranteeing it will be there after an eventual crash.
* Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
* RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
*/
void
ReplicationSlotPersist(void)
@ -2189,3 +2261,25 @@ RestoreSlotFromDisk(const char *name)
(errmsg("too many replication slots active before shutdown"),
errhint("Increase max_replication_slots and try again.")));
}
/*
* Maps the pg_replication_slots.conflict_reason text value to
* ReplicationSlotInvalidationCause enum value
*/
ReplicationSlotInvalidationCause
GetSlotInvalidationCause(char *conflict_reason)
{
Assert(conflict_reason);
if (strcmp(conflict_reason, SLOT_INVAL_WAL_REMOVED_TEXT) == 0)
return RS_INVAL_WAL_REMOVED;
else if (strcmp(conflict_reason, SLOT_INVAL_HORIZON_TEXT) == 0)
return RS_INVAL_HORIZON;
else if (strcmp(conflict_reason, SLOT_INVAL_WAL_LEVEL_TEXT) == 0)
return RS_INVAL_WAL_LEVEL;
else
Assert(0);
/* Keep compiler quiet */
return RS_INVAL_NONE;
}

View File

@ -21,7 +21,9 @@
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/pg_lsn.h"
#include "utils/resowner.h"
@ -43,7 +45,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(name, false,
temporary ? RS_TEMPORARY : RS_PERSISTENT, false,
false);
false, false);
if (immediately_reserve)
{
@ -136,7 +138,7 @@ create_logical_replication_slot(char *name, char *plugin,
*/
ReplicationSlotCreate(name, true,
temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
failover);
failover, false);
/*
* Create logical decoding context to find start point or, if we don't
@ -237,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
#define PG_GET_REPLICATION_SLOTS_COLS 16
#define PG_GET_REPLICATION_SLOTS_COLS 17
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
@ -418,21 +420,23 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
break;
case RS_INVAL_WAL_REMOVED:
values[i++] = CStringGetTextDatum("wal_removed");
values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_REMOVED_TEXT);
break;
case RS_INVAL_HORIZON:
values[i++] = CStringGetTextDatum("rows_removed");
values[i++] = CStringGetTextDatum(SLOT_INVAL_HORIZON_TEXT);
break;
case RS_INVAL_WAL_LEVEL:
values[i++] = CStringGetTextDatum("wal_level_insufficient");
values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_LEVEL_TEXT);
break;
}
}
values[i++] = BoolGetDatum(slot_contents.data.failover);
values[i++] = BoolGetDatum(slot_contents.data.synced);
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
@ -700,7 +704,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
XLogRecPtr src_restart_lsn;
bool src_islogical;
bool temporary;
bool failover;
char *plugin;
Datum values[2];
bool nulls[2];
@ -756,7 +759,6 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
src_islogical = SlotIsLogical(&first_slot_contents);
src_restart_lsn = first_slot_contents.data.restart_lsn;
temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
failover = first_slot_contents.data.failover;
plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
/* Check type of replication slot */
@ -791,12 +793,20 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
* We must not try to read WAL, since we haven't reserved it yet --
* hence pass find_startpoint false. confirmed_flush will be set
* below, by copying from the source slot.
*
* To avoid potential issues with the slot synchronization where the
* restart_lsn of a replication slot can go backward, we set the
* failover option to false here. This situation occurs when a slot
* on the primary server is dropped and immediately replaced with a
* new slot of the same name, created by copying from another existing
* slot. However, the slot synchronization will only observe the
* restart_lsn of the same slot going backward.
*/
create_logical_replication_slot(NameStr(*dst_name),
plugin,
temporary,
false,
failover,
false,
src_restart_lsn,
false);
}
@ -943,3 +953,49 @@ pg_copy_physical_replication_slot_b(PG_FUNCTION_ARGS)
{
return copy_replication_slot(fcinfo, false);
}
/*
* Synchronize failover enabled replication slots to a standby server
* from the primary server.
*/
Datum
pg_sync_replication_slots(PG_FUNCTION_ARGS)
{
WalReceiverConn *wrconn;
char *err;
StringInfoData app_name;
CheckSlotPermissions();
if (!RecoveryInProgress())
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("replication slots can only be synchronized to a standby server"));
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
ValidateSlotSyncParams();
initStringInfo(&app_name);
if (cluster_name[0])
appendStringInfo(&app_name, "%s_slotsync", cluster_name);
else
appendStringInfoString(&app_name, "slotsync");
/* Connect to the primary server. */
wrconn = walrcv_connect(PrimaryConnInfo, false, false, false,
app_name.data, &err);
pfree(app_name.data);
if (!wrconn)
ereport(ERROR,
errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the primary server: %s", err));
SyncReplicationSlots(wrconn);
walrcv_disconnect(wrconn);
PG_RETURN_VOID();
}

View File

@ -72,6 +72,7 @@
#include "postmaster/interrupt.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/slotsync.h"
#include "replication/slot.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
@ -243,7 +244,6 @@ static void WalSndShutdown(void) pg_attribute_noreturn();
static void XLogSendPhysical(void);
static void XLogSendLogical(void);
static void WalSndDone(WalSndSendDataCallback send_data);
static XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
static void IdentifySystem(void);
static void UploadManifest(void);
static bool HandleUploadManifestPacket(StringInfo buf, off_t *offset,
@ -1224,7 +1224,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
{
ReplicationSlotCreate(cmd->slotname, false,
cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
false, false);
false, false, false);
if (reserve_wal)
{
@ -1255,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
*/
ReplicationSlotCreate(cmd->slotname, true,
cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
two_phase, failover);
two_phase, failover, false);
/*
* Do options check early so that we can bail before calling the
@ -3385,14 +3385,17 @@ WalSndDone(WalSndSendDataCallback send_data)
}
/*
* Returns the latest point in WAL that has been safely flushed to disk, and
* can be sent to the standby. This should only be called when in recovery,
* ie. we're streaming to a cascaded standby.
* Returns the latest point in WAL that has been safely flushed to disk.
* This should only be called when in recovery.
*
* This is called either by cascading walsender to find WAL postion to be sent
* to a cascaded standby or by slot synchronization function to validate remote
* slot's lsn before syncing it locally.
*
* As a side-effect, *tli is updated to the TLI of the last
* replayed WAL record.
*/
static XLogRecPtr
XLogRecPtr
GetStandbyFlushRecPtr(TimeLineID *tli)
{
XLogRecPtr replayPtr;
@ -3401,6 +3404,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
TimeLineID receiveTLI;
XLogRecPtr result;
Assert(am_cascading_walsender || IsSyncingReplicationSlots());
/*
* We can safely send what's already been replayed. Also, if walreceiver
* is streaming WAL from the same timeline, we can send anything that it

View File

@ -36,6 +36,7 @@
#include "replication/logicallauncher.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/slotsync.h"
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/bufmgr.h"
@ -153,6 +154,7 @@ CalculateShmemSize(int *num_semaphores)
size = add_size(size, StatsShmemSize());
size = add_size(size, WaitEventExtensionShmemSize());
size = add_size(size, InjectionPointShmemSize());
size = add_size(size, SlotSyncShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@ -347,6 +349,7 @@ CreateOrAttachShmemStructs(void)
WalSummarizerShmemInit();
PgArchShmemInit();
ApplyLauncherShmemInit();
SlotSyncShmemInit();
/*
* Set up other modules that need some shared memory space

View File

@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202401301
#define CATALOG_VERSION_NO 202402141
#endif

View File

@ -11127,9 +11127,9 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
@ -11212,6 +11212,10 @@
proname => 'pg_logical_emit_message', provolatile => 'v', proparallel => 'u',
prorettype => 'pg_lsn', proargtypes => 'bool text bytea bool',
prosrc => 'pg_logical_emit_message_bytea' },
{ oid => '9929', descr => 'sync replication slots from the primary to the standby',
proname => 'pg_sync_replication_slots', provolatile => 'v', proparallel => 'u',
prorettype => 'void', proargtypes => '',
prosrc => 'pg_sync_replication_slots' },
# event triggers
{ oid => '3566', descr => 'list objects dropped by the current command',

View File

@ -52,6 +52,14 @@ typedef enum ReplicationSlotInvalidationCause
RS_INVAL_WAL_LEVEL,
} ReplicationSlotInvalidationCause;
/*
* The possible values for 'conflict_reason' returned in
* pg_get_replication_slots.
*/
#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed"
#define SLOT_INVAL_HORIZON_TEXT "rows_removed"
#define SLOT_INVAL_WAL_LEVEL_TEXT "wal_level_insufficient"
/*
* On-Disk data of a replication slot, preserved across restarts.
*/
@ -112,6 +120,11 @@ typedef struct ReplicationSlotPersistentData
/* plugin name */
NameData plugin;
/*
* Was this slot synchronized from the primary server?
*/
char synced;
/*
* Is this a failover slot (sync candidate for standbys)? Only relevant
* for logical slots on the primary server.
@ -224,9 +237,11 @@ extern void ReplicationSlotsShmemInit(void);
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency persistency,
bool two_phase, bool failover);
bool two_phase, bool failover,
bool synced);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
extern void ReplicationSlotDropAcquired(void);
extern void ReplicationSlotAlter(const char *name, bool failover);
extern void ReplicationSlotAcquire(const char *name, bool nowait);
@ -259,5 +274,7 @@ extern void CheckPointReplicationSlots(bool is_shutdown);
extern void CheckSlotRequirements(void);
extern void CheckSlotPermissions(void);
extern ReplicationSlotInvalidationCause
GetSlotInvalidationCause(char *conflict_reason);
#endif /* SLOT_H */

View File

@ -0,0 +1,23 @@
/*-------------------------------------------------------------------------
*
* slotsync.h
* Exports for slot synchronization.
*
* Portions Copyright (c) 2016-2024, PostgreSQL Global Development Group
*
* src/include/replication/slotsync.h
*
*-------------------------------------------------------------------------
*/
#ifndef SLOTSYNC_H
#define SLOTSYNC_H
#include "replication/walreceiver.h"
extern void ValidateSlotSyncParams(void);
extern bool IsSyncingReplicationSlots(void);
extern Size SlotSyncShmemSize(void);
extern void SlotSyncShmemInit(void);
extern void SyncReplicationSlots(WalReceiverConn *wrconn);
#endif /* SLOTSYNC_H */

View File

@ -12,6 +12,8 @@
#ifndef _WALSENDER_H
#define _WALSENDER_H
#include "access/xlogdefs.h"
/*
* What to do with a snapshot in create replication slot command.
*/
@ -37,6 +39,7 @@ extern void InitWalSender(void);
extern bool exec_replication_command(const char *cmd_string);
extern void WalSndErrorCleanup(void);
extern void WalSndResourceCleanup(bool isCommit);
extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);

View File

@ -97,4 +97,241 @@ my ($result, $stdout, $stderr) = $subscriber1->psql('postgres',
ok( $stderr =~ /ERROR: cannot set failover for enabled subscription/,
"altering failover is not allowed for enabled subscription");
##################################################
# Test that pg_sync_replication_slots() cannot be executed on a non-standby server.
##################################################
($result, $stdout, $stderr) =
$publisher->psql('postgres', "SELECT pg_sync_replication_slots();");
ok( $stderr =~
/ERROR: replication slots can only be synchronized to a standby server/,
"cannot sync slots on a non-standby server");
##################################################
# Test logical failover slots on the standby
# Configure standby1 to replicate and synchronize logical slots configured
# for failover on the primary
#
# failover slot lsub1_slot ->| ----> subscriber1 (connected via logical replication)
# failover slot lsub2_slot | inactive
# primary ---> |
# physical slot sb1_slot --->| ----> standby1 (connected via streaming replication)
# | lsub1_slot, lsub2_slot (synced_slot)
##################################################
my $primary = $publisher;
my $backup_name = 'backup';
$primary->backup($backup_name);
# Create a standby
my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
$standby1->init_from_backup(
$primary, $backup_name,
has_streaming => 1,
has_restoring => 1);
my $connstr_1 = $primary->connstr;
$standby1->append_conf(
'postgresql.conf', qq(
hot_standby_feedback = on
primary_slot_name = 'sb1_slot'
primary_conninfo = '$connstr_1 dbname=postgres'
));
$primary->psql('postgres',
q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);}
);
$primary->psql('postgres',
q{SELECT pg_create_physical_replication_slot('sb1_slot');});
# Start the standby so that slot syncing can begin
$standby1->start;
$primary->wait_for_catchup('regress_mysub1');
# Do not allow any further advancement of the restart_lsn for the lsub1_slot.
$subscriber1->safe_psql('postgres',
"ALTER SUBSCRIPTION regress_mysub1 DISABLE");
# Wait for the replication slot to become inactive on the publisher
$primary->poll_query_until(
'postgres',
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
1);
# Wait for the standby to catch up so that the standby is not lagging behind
# the subscriber.
$primary->wait_for_replay_catchup($standby1);
# Synchronize the primary server slots to the standby.
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
# Confirm that the logical failover slots are created on the standby and are
# flagged as 'synced'
is( $standby1->safe_psql(
'postgres',
q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'lsub2_slot') AND synced;}
),
"t",
'logical slots have synced as true on standby');
##################################################
# Test that the synchronized slot will be dropped if the corresponding remote
# slot on the primary server has been dropped.
##################################################
$primary->psql('postgres', "SELECT pg_drop_replication_slot('lsub2_slot');");
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
is( $standby1->safe_psql(
'postgres',
q{SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';}
),
"t",
'synchronized slot has been dropped');
##################################################
# Test that if the synchronized slot is invalidated while the remote slot is
# still valid, the slot will be dropped and re-created on the standby by
# executing pg_sync_replication_slots() again.
##################################################
# Configure the max_slot_wal_keep_size so that the synced slot can be
# invalidated due to wal removal.
$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = 64kB');
$standby1->reload;
# Generate some activity and switch WAL file on the primary
$primary->advance_wal(1);
$primary->psql('postgres', "CHECKPOINT");
$primary->wait_for_replay_catchup($standby1);
# Request a checkpoint on the standby to trigger the WAL file(s) removal
$standby1->safe_psql('postgres', "CHECKPOINT");
# Check if the synced slot is invalidated
is( $standby1->safe_psql(
'postgres',
q{SELECT conflict_reason = 'wal_removed' FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
),
"t",
'synchronized slot has been invalidated');
# Reset max_slot_wal_keep_size to avoid further wal removal
$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = -1');
$standby1->reload;
# Enable the subscription to let it catch up to the latest wal position
$subscriber1->safe_psql('postgres',
"ALTER SUBSCRIPTION regress_mysub1 ENABLE");
$primary->wait_for_catchup('regress_mysub1');
# Do not allow any further advancement of the restart_lsn for the lsub1_slot.
$subscriber1->safe_psql('postgres',
"ALTER SUBSCRIPTION regress_mysub1 DISABLE");
# Wait for the replication slot to become inactive on the publisher
$primary->poll_query_until(
'postgres',
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
1);
# Wait for the standby to catch up so that the standby is not lagging behind
# the subscriber.
$primary->wait_for_replay_catchup($standby1);
my $log_offset = -s $standby1->logfile;
# Synchronize the primary server slots to the standby.
$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
# Confirm that the invalidated slot has been dropped.
$standby1->wait_for_log(qr/dropped replication slot "lsub1_slot" of dbid [0-9]+/,
$log_offset);
# Confirm that the logical slot has been re-created on the standby and is
# flagged as 'synced'
is( $standby1->safe_psql(
'postgres',
q{SELECT conflict_reason IS NULL AND synced FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}
),
"t",
'logical slot is re-synced');
##################################################
# Test that a synchronized slot can not be decoded, altered or dropped by the
# user
##################################################
# Attempting to perform logical decoding on a synced slot should result in an error
($result, $stdout, $stderr) = $standby1->psql('postgres',
"select * from pg_logical_slot_get_changes('lsub1_slot', NULL, NULL);");
ok( $stderr =~
/ERROR: cannot use replication slot "lsub1_slot" for logical decoding/,
"logical decoding is not allowed on synced slot");
# Attempting to alter a synced slot should result in an error
($result, $stdout, $stderr) = $standby1->psql(
'postgres',
qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);],
replication => 'database');
ok($stderr =~ /ERROR: cannot alter replication slot "lsub1_slot"/,
"synced slot on standby cannot be altered");
# Attempting to drop a synced slot should result in an error
($result, $stdout, $stderr) = $standby1->psql('postgres',
"SELECT pg_drop_replication_slot('lsub1_slot');");
ok($stderr =~ /ERROR: cannot drop replication slot "lsub1_slot"/,
"synced slot on standby cannot be dropped");
##################################################
# Test that we cannot synchronize slots if dbname is not specified in the
# primary_conninfo.
##################################################
$standby1->append_conf('postgresql.conf', "primary_conninfo = '$connstr_1'");
$standby1->reload;
($result, $stdout, $stderr) =
$standby1->psql('postgres', "SELECT pg_sync_replication_slots();");
ok( $stderr =~
/HINT: 'dbname' must be specified in "primary_conninfo"/,
"cannot sync slots if dbname is not specified in primary_conninfo");
##################################################
# Test that we cannot synchronize slots to a cascading standby server.
##################################################
# Create a cascading standby
$backup_name = 'backup2';
$standby1->backup($backup_name);
my $cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
$cascading_standby->init_from_backup(
$standby1, $backup_name,
has_streaming => 1,
has_restoring => 1);
my $cascading_connstr = $standby1->connstr;
$cascading_standby->append_conf(
'postgresql.conf', qq(
hot_standby_feedback = on
primary_slot_name = 'cascading_sb_slot'
primary_conninfo = '$cascading_connstr dbname=postgres'
));
$standby1->psql('postgres',
q{SELECT pg_create_physical_replication_slot('cascading_sb_slot');});
$cascading_standby->start;
($result, $stdout, $stderr) =
$cascading_standby->psql('postgres', "SELECT pg_sync_replication_slots();");
ok( $stderr =~
/ERROR: cannot synchronize replication slots from a standby server/,
"cannot sync slots to a cascading standby server");
done_testing();

View File

@ -1474,8 +1474,9 @@ pg_replication_slots| SELECT l.slot_name,
l.safe_wal_size,
l.two_phase,
l.conflict_reason,
l.failover
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover)
l.failover,
l.synced
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,

View File

@ -2325,6 +2325,7 @@ RelocationBufferInfo
RelptrFreePageBtree
RelptrFreePageManager
RelptrFreePageSpanLeader
RemoteSlot
RenameStmt
ReopenPtrType
ReorderBuffer
@ -2584,6 +2585,7 @@ SlabBlock
SlabContext
SlabSlot
SlotNumber
SlotSyncCtxStruct
SlruCtl
SlruCtlData
SlruErrorCause