Allow synced slots to have their inactive_since.

This commit does two things:
1) Maintains inactive_since for sync slots whenever the slot is released
just like any other regular slot.

2) Ensures the value is set to the current timestamp during the promotion
of standby to help correctly interpret the time after promotion. We don't
want the slots to appear inactive for a long time after promotion if they
haven't been synchronized recently. This would also avoid the invalidation
of such slots immediately after promotion if tomorrow we have a feature
that invalidates slots based on their inactivity time. Whoever acquires
the slot i.e. makes the slot active will reset it to NULL.

Author: Bharath Rupireddy
Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik, Masahiko Sawada
Discussion: https://postgr.es/m/CAA4eK1KrPGwfZV9LYGidjxHeW+rxJ=E2ThjXvwRGLO=iLNuo=Q@mail.gmail.com
Discussion: https://postgr.es/m/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com
Discussion: https://postgr.es/m/CA+Tgmob_Ta-t2ty8QrKHBGnNLrf4ZYcwhGHGFsuUoFrAEDw4sA@mail.gmail.com
This commit is contained in:
Amit Kapila 2024-04-05 09:48:49 +05:30
parent f98dbdeb51
commit 6f132ed693
6 changed files with 160 additions and 39 deletions

View File

@ -2530,6 +2530,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
<para>
The time since the slot has become inactive.
<literal>NULL</literal> if the slot is currently being used.
Note that for slots on the standby that are being synced from a
primary server (whose <structfield>synced</structfield> field is
<literal>true</literal>), the
<structfield>inactive_since</structfield> indicates the last
synchronization (see
<xref linkend="logicaldecoding-replication-slots-synchronization"/>)
time.
</para></entry>
</row>

View File

@ -150,6 +150,7 @@ typedef struct RemoteSlot
} RemoteSlot;
static void slotsync_failure_callback(int code, Datum arg);
static void update_synced_slots_inactive_since(void);
/*
* If necessary, update the local synced slot's metadata based on the data
@ -584,6 +585,11 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* overwriting 'invalidated' flag to remote_slot's value. See
* InvalidatePossiblyObsoleteSlot() where it invalidates slot directly
* if the slot is not acquired by other processes.
*
* XXX: If it ever turns out that slot acquire/release is costly for
* cases when none of the slot properties is changed then we can do a
* pre-check to ensure that at least one of the slot properties is
* changed before acquiring the slot.
*/
ReplicationSlotAcquire(remote_slot->name, true);
@ -1355,6 +1361,54 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
Assert(false);
}
/*
* Update the inactive_since property for synced slots.
*
* Note that this function is currently called when we shutdown the slot
* sync machinery.
*/
static void
update_synced_slots_inactive_since(void)
{
TimestampTz now = 0;
/*
* We need to update inactive_since only when we are promoting standby to
* correctly interpret the inactive_since if the standby gets promoted
* without a restart. We don't want the slots to appear inactive for a
* long time after promotion if they haven't been synchronized recently.
* Whoever acquires the slot i.e.makes the slot active will reset it.
*/
if (!StandbyMode)
return;
/* The slot sync worker mustn't be running by now */
Assert(SlotSyncCtx->pid == InvalidPid);
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (int i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
/* Check if it is a synchronized slot */
if (s->in_use && s->data.synced)
{
Assert(SlotIsLogical(s));
/* Use the same inactive_since time for all the slots. */
if (now == 0)
now = GetCurrentTimestamp();
SpinLockAcquire(&s->mutex);
s->inactive_since = now;
SpinLockRelease(&s->mutex);
}
}
LWLockRelease(ReplicationSlotControlLock);
}
/*
* Shut down the slot sync worker.
*/
@ -1368,6 +1422,7 @@ ShutDownSlotSync(void)
if (SlotSyncCtx->pid == InvalidPid)
{
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
return;
}
SpinLockRelease(&SlotSyncCtx->mutex);
@ -1400,6 +1455,8 @@ ShutDownSlotSync(void)
}
SpinLockRelease(&SlotSyncCtx->mutex);
update_synced_slots_inactive_since();
}
/*

View File

@ -690,13 +690,10 @@ ReplicationSlotRelease(void)
}
/*
* Set the last inactive time after marking the slot inactive. We don't
* set it for the slots currently being synced from the primary to the
* standby because such slots are typically inactive as decoding is not
* allowed on those.
* Set the time since the slot has become inactive. We get the current
* time beforehand to avoid system call while holding the spinlock.
*/
if (!(RecoveryInProgress() && slot->data.synced))
now = GetCurrentTimestamp();
now = GetCurrentTimestamp();
if (slot->data.persistency == RS_PERSISTENT)
{
@ -2369,16 +2366,11 @@ RestoreSlotFromDisk(const char *name)
slot->active_pid = 0;
/*
* We set the last inactive time after loading the slot from the disk
* into memory. Whoever acquires the slot i.e. makes the slot active
* will reset it. We don't set it for the slots currently being synced
* from the primary to the standby because such slots are typically
* inactive as decoding is not allowed on those.
* Set the time since the slot has become inactive after loading the
* slot from the disk into memory. Whoever acquires the slot i.e.
* makes the slot active will reset it.
*/
if (!(RecoveryInProgress() && slot->data.synced))
slot->inactive_since = GetCurrentTimestamp();
else
slot->inactive_since = 0;
slot->inactive_since = GetCurrentTimestamp();
restored = true;
break;

View File

@ -3276,6 +3276,37 @@ sub create_logical_slot_on_standby
=pod
=item $node->validate_slot_inactive_since(self, slot_name, reference_time)
Validate inactive_since value of a given replication slot against the reference
time and return it.
=cut
sub validate_slot_inactive_since
{
my ($self, $slot_name, $reference_time) = @_;
my $name = $self->name;
my $inactive_since = $self->safe_psql('postgres',
qq(SELECT inactive_since FROM pg_replication_slots
WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
);
# Check that the inactive_since is sane
is($self->safe_psql('postgres',
qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
'$inactive_since'::timestamptz > '$reference_time'::timestamptz;]
),
't',
"last inactive time for slot $slot_name is valid on node $name")
or die "could not validate captured inactive_since for slot $slot_name";
return $inactive_since;
}
=pod
=item $node->advance_wal(num)
Advance WAL of node by given number of segments.

View File

@ -443,7 +443,7 @@ $primary4->safe_psql(
# Get inactive_since value after the slot's creation. Note that the slot is
# still inactive till it's used by the standby below.
my $inactive_since =
capture_and_validate_slot_inactive_since($primary4, $sb4_slot, $slot_creation_time);
$primary4->validate_slot_inactive_since($sb4_slot, $slot_creation_time);
$standby4->start;
@ -502,7 +502,7 @@ $publisher4->safe_psql('postgres',
# Get inactive_since value after the slot's creation. Note that the slot is
# still inactive till it's used by the subscriber below.
$inactive_since =
capture_and_validate_slot_inactive_since($publisher4, $lsub4_slot, $slot_creation_time);
$publisher4->validate_slot_inactive_since($lsub4_slot, $slot_creation_time);
$subscriber4->start;
$subscriber4->safe_psql('postgres',
@ -540,26 +540,4 @@ is( $publisher4->safe_psql(
$publisher4->stop;
$subscriber4->stop;
# Capture and validate inactive_since of a given slot.
sub capture_and_validate_slot_inactive_since
{
my ($node, $slot_name, $slot_creation_time) = @_;
my $inactive_since = $node->safe_psql('postgres',
qq(SELECT inactive_since FROM pg_replication_slots
WHERE slot_name = '$slot_name' AND inactive_since IS NOT NULL;)
);
# Check that the captured time is sane
is( $node->safe_psql(
'postgres',
qq[SELECT '$inactive_since'::timestamptz > to_timestamp(0) AND
'$inactive_since'::timestamptz >= '$slot_creation_time'::timestamptz;]
),
't',
"last inactive time for an active slot $slot_name is sane");
return $inactive_since;
}
done_testing();

View File

@ -35,6 +35,13 @@ my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
$subscriber1->init;
$subscriber1->start;
# Capture the time before the logical failover slot is created on the
# primary. We later call this publisher as primary anyway.
my $slot_creation_time_on_primary = $publisher->safe_psql(
'postgres', qq[
SELECT current_timestamp;
]);
# Create a slot on the publisher with failover disabled
$publisher->safe_psql('postgres',
"SELECT 'init' FROM pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, false);"
@ -174,6 +181,11 @@ $primary->poll_query_until(
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
1);
# Capture the inactive_since of the slot from the primary. Note that the slot
# will be inactive since the corresponding subscription is disabled.
my $inactive_since_on_primary =
$primary->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
# Wait for the standby to catch up so that the standby is not lagging behind
# the subscriber.
$primary->wait_for_replay_catchup($standby1);
@ -190,6 +202,18 @@ is( $standby1->safe_psql(
"t",
'logical slots have synced as true on standby');
# Capture the inactive_since of the synced slot on the standby
my $inactive_since_on_standby =
$standby1->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
# Synced slot on the standby must get its own inactive_since
is( $standby1->safe_psql(
'postgres',
"SELECT '$inactive_since_on_primary'::timestamptz < '$inactive_since_on_standby'::timestamptz;"
),
"t",
'synchronized slot has got its own inactive_since');
##################################################
# Test that the synchronized slot will be dropped if the corresponding remote
# slot on the primary server has been dropped.
@ -237,6 +261,13 @@ is( $standby1->safe_psql(
$standby1->append_conf('postgresql.conf', 'max_slot_wal_keep_size = -1');
$standby1->reload;
# Capture the time before the logical failover slot is created on the primary.
# Note that the subscription creates the slot again on the primary.
$slot_creation_time_on_primary = $publisher->safe_psql(
'postgres', qq[
SELECT current_timestamp;
]);
# To ensure that restart_lsn has moved to a recent WAL position, we re-create
# the subscription and the logical slot.
$subscriber1->safe_psql(
@ -257,6 +288,11 @@ $primary->poll_query_until(
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active = 'f'",
1);
# Capture the inactive_since of the slot from the primary. Note that the slot
# will be inactive since the corresponding subscription is disabled.
$inactive_since_on_primary =
$primary->validate_slot_inactive_since('lsub1_slot', $slot_creation_time_on_primary);
# Wait for the standby to catch up so that the standby is not lagging behind
# the subscriber.
$primary->wait_for_replay_catchup($standby1);
@ -808,8 +844,28 @@ $primary->reload;
$standby1->start;
$primary->wait_for_replay_catchup($standby1);
# Capture the time before the standby is promoted
my $promotion_time_on_primary = $standby1->safe_psql(
'postgres', qq[
SELECT current_timestamp;
]);
$standby1->promote;
# Capture the inactive_since of the synced slot after the promotion.
# The expectation here is that the slot gets its inactive_since as part of the
# promotion. We do this check before the slot is enabled on the new primary
# below, otherwise, the slot gets active setting inactive_since to NULL.
my $inactive_since_on_new_primary =
$standby1->validate_slot_inactive_since('lsub1_slot', $promotion_time_on_primary);
is( $standby1->safe_psql(
'postgres',
"SELECT '$inactive_since_on_new_primary'::timestamptz > '$inactive_since_on_primary'::timestamptz"
),
"t",
'synchronized slot has got its own inactive_since on the new primary after promotion');
# Update subscription with the new primary's connection info
my $standby1_conninfo = $standby1->connstr . ' dbname=postgres';
$subscriber1->safe_psql('postgres',