Morph pg_replication_slots.min_safe_lsn to safe_wal_size

The previous definition of the column was almost universally disliked,
so provide this updated definition which is more useful for monitoring
purposes: a large positive value is good, while zero or a negative value
means danger.  This should be operationally more convenient.

Backpatch to 13, where the new column to pg_replication_slots (and the
feature it represents) were added.

Author: Kyotaro Horiguchi <horikyota.ntt@gmail.com>
Author: Álvaro Herrera <alvherre@alvh.no-ip.org>
Reported-by: Fujii Masao <masao.fujii@oss.nttdata.com>
Discussion: https://postgr.es/m/9ddfbf8c-2f67-904d-44ed-cf8bc5916228@oss.nttdata.com
This commit is contained in:
Alvaro Herrera 2020-07-07 13:08:00 -04:00
parent 6a5c750f3f
commit a8aaa0c786
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
9 changed files with 62 additions and 36 deletions

View File

@ -11275,10 +11275,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>min_safe_lsn</structfield> <type>pg_lsn</type>
<structfield>safe_wal_size</structfield> <type>int8</type>
</para>
<para>
The minimum LSN currently available for walsenders.
The number of bytes that can be written to WAL such that this slot
is not in danger of getting in state "lost". It is NULL for lost
slots, as well as if <varname>max_slot_wal_keep_size</varname>
is <literal>-1</literal>.
</para></entry>
</row>
</tbody>

View File

@ -764,8 +764,7 @@ static ControlFileData *ControlFile = NULL;
* Convert values of GUCs measured in megabytes to equiv. segment count.
* Rounds down.
*/
#define ConvertToXSegs(x, segsize) \
((x) / ((segsize) / (1024 * 1024)))
#define ConvertToXSegs(x, segsize) XLogMBVarToSegs((x), (segsize))
/* The number of bytes in a WAL segment usable for WAL data. */
static int UsableBytesInSegment;
@ -9513,8 +9512,7 @@ GetWALAvailability(XLogRecPtr targetLSN)
XLogSegNo targetSeg; /* segid of targetLSN */
XLogSegNo oldestSeg; /* actual oldest segid */
XLogSegNo oldestSegMaxWalSize; /* oldest segid kept by max_wal_size */
XLogSegNo oldestSlotSeg = InvalidXLogRecPtr; /* oldest segid kept by
* slot */
XLogSegNo oldestSlotSeg; /* oldest segid kept by slot */
uint64 keepSegs;
/*

View File

@ -879,7 +879,7 @@ CREATE VIEW pg_replication_slots AS
L.restart_lsn,
L.confirmed_flush_lsn,
L.wal_status,
L.min_safe_lsn
L.safe_wal_size
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);

View File

@ -242,6 +242,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
XLogRecPtr currlsn;
int slotno;
/* check to see if caller supports us returning a tuplestore */
@ -274,6 +275,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext);
currlsn = GetXLogWriteRecPtr();
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (slotno = 0; slotno < max_replication_slots; slotno++)
{
@ -282,7 +285,6 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
WALAvailability walstate;
XLogSegNo last_removed_seg;
int i;
if (!slot->in_use)
@ -380,6 +382,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
* we looked. If checkpointer signalled the process to
* termination, then it's definitely lost; but if a process is
* still alive, then "unreserved" seems more appropriate.
*
* If we do change it, save the state for safe_wal_size below.
*/
if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
{
@ -387,10 +391,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
SpinLockAcquire(&slot->mutex);
pid = slot->active_pid;
slot_contents.data.restart_lsn = slot->data.restart_lsn;
SpinLockRelease(&slot->mutex);
if (pid != 0)
{
values[i++] = CStringGetTextDatum("unreserved");
walstate = WALAVAIL_UNRESERVED;
break;
}
}
@ -398,18 +404,32 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
break;
}
if (max_slot_wal_keep_size_mb >= 0 &&
(walstate == WALAVAIL_RESERVED || walstate == WALAVAIL_EXTENDED) &&
((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
{
XLogRecPtr min_safe_lsn;
XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 0,
wal_segment_size, min_safe_lsn);
values[i++] = Int64GetDatum(min_safe_lsn);
}
else
/*
* safe_wal_size is only computed for slots that have not been lost,
* and only if there's a configured maximum size.
*/
if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
nulls[i++] = true;
else
{
XLogSegNo targetSeg;
XLogSegNo keepSegs;
XLogSegNo failSeg;
XLogRecPtr failLSN;
XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
/* determine how many segments slots can be kept by slots ... */
keepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
/* ... and override by wal_keep_segments as needed */
keepSegs = Max(keepSegs, wal_keep_segments);
/* if currpos reaches failLSN, we lose our segment */
failSeg = targetSeg + keepSegs + 1;
XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
values[i++] = Int64GetDatum(failLSN - currlsn);
}
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);

View File

@ -121,6 +121,13 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader;
#define XLByteToPrevSeg(xlrp, logSegNo, wal_segsz_bytes) \
logSegNo = ((xlrp) - 1) / (wal_segsz_bytes)
/*
* Convert values of GUCs measured in megabytes to equiv. segment count.
* Rounds down.
*/
#define XLogMBVarToSegs(mbvar, wal_segsz_bytes) \
((mbvar) / ((wal_segsz_bytes) / (1024 * 1024)))
/*
* Is an XLogRecPtr within a particular XLOG segment?
*

View File

@ -53,6 +53,6 @@
*/
/* yyyymmddN */
#define CATALOG_VERSION_NO 202007061
#define CATALOG_VERSION_NO 202007071
#endif

View File

@ -10075,9 +10075,9 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,pg_lsn}',
proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,min_safe_lsn}',
proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',

View File

@ -28,7 +28,7 @@ $node_master->safe_psql('postgres',
# The slot state and remain should be null before the first connection
my $result = $node_master->safe_psql('postgres',
"SELECT restart_lsn IS NULL, wal_status is NULL, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
"SELECT restart_lsn IS NULL, wal_status is NULL, safe_wal_size is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
);
is($result, "t|t|t", 'check the state of non-reserved slot is "unknown"');
@ -52,9 +52,9 @@ $node_master->wait_for_catchup($node_standby, 'replay', $start_lsn);
# Stop standby
$node_standby->stop;
# Preparation done, the slot is the state "normal" now
# Preparation done, the slot is the state "reserved" now
$result = $node_master->safe_psql('postgres',
"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
"SELECT wal_status, safe_wal_size IS NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
);
is($result, "reserved|t", 'check the catching-up state');
@ -64,7 +64,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
# The slot is always "safe" when fitting max_wal_size
$result = $node_master->safe_psql('postgres',
"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
"SELECT wal_status, safe_wal_size IS NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
);
is($result, "reserved|t",
'check that it is safe if WAL fits in max_wal_size');
@ -74,7 +74,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
# The slot is always "safe" when max_slot_wal_keep_size is not set
$result = $node_master->safe_psql('postgres',
"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
"SELECT wal_status, safe_wal_size IS NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
);
is($result, "reserved|t", 'check that slot is working');
@ -94,9 +94,7 @@ max_slot_wal_keep_size = ${max_slot_wal_keep_size_mb}MB
));
$node_master->reload;
# The slot is in safe state. The distance from the min_safe_lsn should
# be as almost (max_slot_wal_keep_size - 1) times large as the segment
# size
# The slot is in safe state.
$result = $node_master->safe_psql('postgres',
"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
@ -110,7 +108,7 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
$result = $node_master->safe_psql('postgres',
"SELECT wal_status FROM pg_replication_slots WHERE slot_name = 'rep1'");
is($result, "reserved",
'check that min_safe_lsn gets close to the current LSN');
'check that safe_wal_size gets close to the current LSN');
# The standby can reconnect to master
$node_standby->start;
@ -152,9 +150,9 @@ $node_master->safe_psql('postgres', "CHECKPOINT;");
# Advance WAL again without checkpoint; remain goes to 0.
advance_wal($node_master, 1);
# Slot gets into 'unreserved' state
# Slot gets into 'unreserved' state and safe_wal_size is negative
$result = $node_master->safe_psql('postgres',
"SELECT wal_status, min_safe_lsn is NULL FROM pg_replication_slots WHERE slot_name = 'rep1'"
"SELECT wal_status, safe_wal_size <= 0 FROM pg_replication_slots WHERE slot_name = 'rep1'"
);
is($result, "unreserved|t",
'check that the slot state changes to "unreserved"');
@ -186,7 +184,7 @@ ok( find_in_log(
# This slot should be broken
$result = $node_master->safe_psql('postgres',
"SELECT slot_name, active, restart_lsn IS NULL, wal_status, min_safe_lsn FROM pg_replication_slots WHERE slot_name = 'rep1'"
"SELECT slot_name, active, restart_lsn IS NULL, wal_status, safe_wal_size FROM pg_replication_slots WHERE slot_name = 'rep1'"
);
is($result, "rep1|f|t|lost|",
'check that the slot became inactive and the state "lost" persists');

View File

@ -1464,8 +1464,8 @@ pg_replication_slots| SELECT l.slot_name,
l.restart_lsn,
l.confirmed_flush_lsn,
l.wal_status,
l.min_safe_lsn
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, min_safe_lsn)
l.safe_wal_size
FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,