Support invalidating replication slots due to horizon and wal_level

Needed for logical decoding on a standby. Slots need to be invalidated because
of the horizon if rows required for logical decoding are removed. If the
primary's wal_level is lowered from 'logical', logical slots on the standby
need to be invalidated.

The new invalidation methods will be used in a subsequent commit.

Logical slots that have been invalidated can be identified via the new
pg_replication_slots.conflicting column.

See 6af1793954 for an overall design of logical decoding on a standby.

Bumps catversion for the addition of the new pg_replication_slots column.

Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Author: Andres Freund <andres@anarazel.de>
Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version)
Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Robert Haas <robertmhaas@gmail.com>
Reviewed-by: Fabrízio de Royes Mello <fabriziomello@gmail.com>
Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org>
Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
This commit is contained in:
Andres Freund 2023-04-07 22:40:27 -07:00
parent 2ed16aacf1
commit be87200efd
10 changed files with 176 additions and 37 deletions

View File

@ -2517,6 +2517,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
false for physical slots.
</para></entry>
</row>
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>conflicting</structfield> <type>bool</type>
</para>
<para>
True if this logical slot conflicted with recovery (and so is now
invalidated). Always NULL for physical slots.
</para></entry>
</row>
</tbody>
</tgroup>
</table>

View File

@ -6809,7 +6809,9 @@ CreateCheckPoint(int flags)
*/
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
KeepLogSeg(recptr, &_logSegNo);
if (InvalidateObsoleteReplicationSlots(_logSegNo))
if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
_logSegNo, InvalidOid,
InvalidTransactionId))
{
/*
* Some slots have been invalidated; recalculate the old-segment
@ -7253,7 +7255,9 @@ CreateRestartPoint(int flags)
replayPtr = GetXLogReplayRecPtr(&replayTLI);
endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
KeepLogSeg(endptr, &_logSegNo);
if (InvalidateObsoleteReplicationSlots(_logSegNo))
if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
_logSegNo, InvalidOid,
InvalidTransactionId))
{
/*
* Some slots have been invalidated; recalculate the old-segment

View File

@ -1000,7 +1000,8 @@ CREATE VIEW pg_replication_slots AS
L.confirmed_flush_lsn,
L.wal_status,
L.safe_wal_size,
L.two_phase
L.two_phase,
L.conflicting
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);

View File

@ -531,6 +531,13 @@ CreateDecodingContext(XLogRecPtr start_lsn,
NameStr(MyReplicationSlot->data.name)),
errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("can no longer get changes from replication slot \"%s\"",
NameStr(MyReplicationSlot->data.name)),
errdetail("This slot has been invalidated because it was conflicting with recovery.")));
Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);

View File

@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)
}
/*
* Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
* and mark it invalid, if necessary and possible.
* Report that replication slot needs to be invalidated
*/
static void
ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
bool terminating,
int pid,
NameData slotname,
XLogRecPtr restart_lsn,
XLogRecPtr oldestLSN,
TransactionId snapshotConflictHorizon)
{
StringInfoData err_detail;
bool hint = false;
initStringInfo(&err_detail);
switch (cause)
{
case RS_INVAL_WAL_REMOVED:
hint = true;
appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
LSN_FORMAT_ARGS(restart_lsn),
(unsigned long long) (oldestLSN - restart_lsn));
break;
case RS_INVAL_HORIZON:
appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
snapshotConflictHorizon);
break;
case RS_INVAL_WAL_LEVEL:
appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
break;
case RS_INVAL_NONE:
pg_unreachable();
}
ereport(LOG,
terminating ?
errmsg("terminating process %d to release replication slot \"%s\"",
pid, NameStr(slotname)) :
errmsg("invalidating obsolete replication slot \"%s\"",
NameStr(slotname)),
errdetail_internal("%s", err_detail.data),
hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
pfree(err_detail.data);
}
/*
* Helper for InvalidateObsoleteReplicationSlots
*
* Acquires the given slot and mark it invalid, if necessary and possible.
*
* Returns whether ReplicationSlotControlLock was released in the interim (and
* in that case we're not holding the lock at return, otherwise we are).
@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)
* for syscalls, so caller must restart if we return true.
*/
static bool
InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
ReplicationSlot *s,
XLogRecPtr oldestLSN,
Oid dboid, TransactionId snapshotConflictHorizon,
bool *invalidated)
{
int last_signaled_pid = 0;
@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
XLogRecPtr restart_lsn;
NameData slotname;
int active_pid = 0;
ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
@ -1286,10 +1340,44 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
restart_lsn = s->data.restart_lsn;
/*
* If the slot is already invalid or is fresh enough, we don't need to
* do anything.
* If the slot is already invalid or is a non conflicting slot, we
* don't need to do anything.
*/
if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
if (s->data.invalidated == RS_INVAL_NONE)
{
switch (cause)
{
case RS_INVAL_WAL_REMOVED:
if (s->data.restart_lsn != InvalidXLogRecPtr &&
s->data.restart_lsn < oldestLSN)
conflict = cause;
break;
case RS_INVAL_HORIZON:
if (!SlotIsLogical(s))
break;
/* invalid DB oid signals a shared relation */
if (dboid != InvalidOid && dboid != s->data.database)
break;
if (TransactionIdIsValid(s->effective_xmin) &&
TransactionIdPrecedesOrEquals(s->effective_xmin,
snapshotConflictHorizon))
conflict = cause;
else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
snapshotConflictHorizon))
conflict = cause;
break;
case RS_INVAL_WAL_LEVEL:
if (SlotIsLogical(s))
conflict = cause;
break;
case RS_INVAL_NONE:
pg_unreachable();
}
}
/* if there's no conflict, we're done */
if (conflict == RS_INVAL_NONE)
{
SpinLockRelease(&s->mutex);
if (released_lock)
@ -1309,13 +1397,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
{
MyReplicationSlot = s;
s->active_pid = MyProcPid;
s->data.invalidated = RS_INVAL_WAL_REMOVED;
s->data.invalidated = conflict;
/*
* XXX: We should consider not overwriting restart_lsn and instead
* just rely on .invalidated.
*/
s->data.restart_lsn = InvalidXLogRecPtr;
if (conflict == RS_INVAL_WAL_REMOVED)
s->data.restart_lsn = InvalidXLogRecPtr;
/* Let caller know */
*invalidated = true;
@ -1349,13 +1438,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
*/
if (last_signaled_pid != active_pid)
{
ereport(LOG,
errmsg("terminating process %d to release replication slot \"%s\"",
active_pid, NameStr(slotname)),
errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
LSN_FORMAT_ARGS(restart_lsn),
(unsigned long long) (oldestLSN - restart_lsn)),
errhint("You might need to increase max_slot_wal_keep_size."));
ReportSlotInvalidation(conflict, true, active_pid,
slotname, restart_lsn,
oldestLSN, snapshotConflictHorizon);
(void) kill(active_pid, SIGTERM);
last_signaled_pid = active_pid;
@ -1390,14 +1475,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ReplicationSlotRelease();
pgstat_drop_replslot(s);
ereport(LOG,
errmsg("invalidating obsolete replication slot \"%s\"",
NameStr(slotname)),
errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
LSN_FORMAT_ARGS(restart_lsn),
(unsigned long long) (oldestLSN - restart_lsn)),
errhint("You might need to increase max_slot_wal_keep_size."));
ReportSlotInvalidation(conflict, false, active_pid,
slotname, restart_lsn,
oldestLSN, snapshotConflictHorizon);
/* done with this slot for now */
break;
@ -1410,19 +1492,34 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
}
/*
* Mark any slot that points to an LSN older than the given segment
* as invalid; it requires WAL that's about to be removed.
* Invalidate slots that require resources about to be removed.
*
* Returns true when any slot have got invalidated.
*
* Whether a slot needs to be invalidated depends on the cause. A slot is
* removed if it:
* - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
* - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
* db; dboid may be InvalidOid for shared relations
* - RS_INVAL_WAL_LEVEL: is logical
*
* NB - this runs as part of checkpoint, so avoid raising errors if possible.
*/
bool
InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
XLogSegNo oldestSegno, Oid dboid,
TransactionId snapshotConflictHorizon)
{
XLogRecPtr oldestLSN;
bool invalidated = false;
Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
Assert(cause != RS_INVAL_NONE);
if (max_replication_slots == 0)
return invalidated;
XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
restart:
@ -1434,7 +1531,9 @@ restart:
if (!s->in_use)
continue;
if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
snapshotConflictHorizon,
&invalidated))
{
/* if the lock was released, start from scratch */
goto restart;

View File

@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
#define PG_GET_REPLICATION_SLOTS_COLS 14
#define PG_GET_REPLICATION_SLOTS_COLS 15
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
@ -402,6 +402,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
if (slot_contents.data.database == InvalidOid)
nulls[i++] = true;
else
{
if (slot_contents.data.invalidated != RS_INVAL_NONE)
values[i++] = BoolGetDatum(true);
else
values[i++] = BoolGetDatum(false);
}
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,

View File

@ -57,6 +57,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202304072
#define CATALOG_VERSION_NO 202304073
#endif

View File

@ -11077,9 +11077,9 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',

View File

@ -46,6 +46,10 @@ typedef enum ReplicationSlotInvalidationCause
RS_INVAL_NONE,
/* required WAL has been removed */
RS_INVAL_WAL_REMOVED,
/* required rows have been removed */
RS_INVAL_HORIZON,
/* wal_level insufficient for slot */
RS_INVAL_WAL_LEVEL,
} ReplicationSlotInvalidationCause;
/*
@ -226,7 +230,10 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
XLogSegNo oldestSegno,
Oid dboid,
TransactionId snapshotConflictHorizon);
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
extern int ReplicationSlotIndex(ReplicationSlot *slot);
extern bool ReplicationSlotName(int index, Name name);

View File

@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name,
l.confirmed_flush_lsn,
l.wal_status,
l.safe_wal_size,
l.two_phase
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
l.two_phase,
l.conflicting
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,