Add XLogCtl->logInsertResult

This tracks the position of WAL that's been fully copied into WAL
buffers by all processes emitting WAL.  (For some reason we call that
"WAL insertion").  This is updated using atomic monotonic advance during
WaitXLogInsertionsToFinish, which is not when the insertions actually
occur, but it's the only place where we know where have all the
insertions have completed.

This value is useful in WALReadFromBuffers, which can verify that
callers don't try to read past what has been inserted.  (However, more
infrastructure is needed in order to actually use WAL after the flush
point, since it could be lost.)

The value is also useful in WaitXLogInsertionsToFinish() itself, since
we can now exit quickly when all WAL has been already inserted, without
even having to take any locks.
This commit is contained in:
Alvaro Herrera 2024-04-07 14:06:30 +02:00
parent 29f6a959cf
commit f3ff7bf83b
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
2 changed files with 75 additions and 1 deletions

View File

@ -469,6 +469,7 @@ typedef struct XLogCtlData
XLogRecPtr lastSegSwitchLSN;
/* These are accessed using atomics -- info_lck not needed */
pg_atomic_uint64 logInsertResult; /* last byte + 1 inserted to buffers */
pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */
pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */
@ -1499,6 +1500,7 @@ static XLogRecPtr
WaitXLogInsertionsToFinish(XLogRecPtr upto)
{
uint64 bytepos;
XLogRecPtr inserted;
XLogRecPtr reservedUpto;
XLogRecPtr finishedUpto;
XLogCtlInsert *Insert = &XLogCtl->Insert;
@ -1507,6 +1509,14 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
if (MyProc == NULL)
elog(PANIC, "cannot wait without a PGPROC structure");
/*
* Check if there's any work to do. Use a barrier to ensure we get the
* freshest value.
*/
inserted = pg_atomic_read_membarrier_u64(&XLogCtl->logInsertResult);
if (upto <= inserted)
return inserted;
/* Read the current insert position */
SpinLockAcquire(&Insert->insertpos_lck);
bytepos = Insert->CurrBytePos;
@ -1586,6 +1596,15 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto)
if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto)
finishedUpto = insertingat;
}
/*
* Advance the limit we know to have been inserted and return the freshest
* value we know of, which might be beyond what we requested if somebody
* is concurrently doing this with an 'upto' pointer ahead of us.
*/
finishedUpto = pg_atomic_monotonic_advance_u64(&XLogCtl->logInsertResult,
finishedUpto);
return finishedUpto;
}
@ -1727,13 +1746,24 @@ WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
{
char *pdst = dstbuf;
XLogRecPtr recptr = startptr;
XLogRecPtr inserted;
Size nbytes = count;
if (RecoveryInProgress() || tli != GetWALInsertionTimeLine())
return 0;
Assert(!XLogRecPtrIsInvalid(startptr));
Assert(startptr + count <= LogwrtResult.Write);
/*
* Caller should ensure that the requested data has been inserted into WAL
* buffers before we try to read it.
*/
inserted = pg_atomic_read_u64(&XLogCtl->logInsertResult);
if (startptr + count > inserted)
ereport(ERROR,
errmsg("cannot read past end of generated WAL: requested %X/%X, current position %X/%X",
LSN_FORMAT_ARGS(startptr + count),
LSN_FORMAT_ARGS(inserted)));
/*
* Loop through the buffers without a lock. For each buffer, atomically
@ -2571,13 +2601,19 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
{
XLogRecPtr Flush;
XLogRecPtr Write;
XLogRecPtr Insert;
Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult);
pg_read_barrier();
Write = pg_atomic_read_u64(&XLogCtl->logWriteResult);
pg_read_barrier();
Insert = pg_atomic_read_u64(&XLogCtl->logInsertResult);
/* WAL written to disk is always ahead of WAL flushed */
Assert(Write >= Flush);
/* WAL inserted to buffers is always ahead of WAL written */
Assert(Insert >= Write);
}
#endif
}
@ -4951,6 +4987,7 @@ XLOGShmemInit(void)
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck);
pg_atomic_init_u64(&XLogCtl->logInsertResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
@ -5979,6 +6016,7 @@ StartupXLOG(void)
* because no other process can be reading or writing WAL yet.
*/
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
pg_atomic_write_u64(&XLogCtl->logInsertResult, EndOfLog);
pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog);
pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog);
XLogCtl->LogwrtRqst.Write = EndOfLog;

View File

@ -570,6 +570,42 @@ pg_atomic_sub_fetch_u64(volatile pg_atomic_uint64 *ptr, int64 sub_)
return pg_atomic_sub_fetch_u64_impl(ptr, sub_);
}
/*
* Monotonically advance the given variable using only atomic operations until
* it's at least the target value. Returns the latest value observed, which
* may or may not be the target value.
*
* Full barrier semantics (even when value is unchanged).
*/
static inline uint64
pg_atomic_monotonic_advance_u64(volatile pg_atomic_uint64 *ptr, uint64 target_)
{
uint64 currval;
#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
AssertPointerAlignment(ptr, 8);
#endif
currval = pg_atomic_read_u64_impl(ptr);
if (currval >= target_)
{
pg_memory_barrier();
return currval;
}
#ifndef PG_HAVE_ATOMIC_U64_SIMULATION
AssertPointerAlignment(&currval, 8);
#endif
while (currval < target_)
{
if (pg_atomic_compare_exchange_u64_impl(ptr, &currval, target_))
break;
}
return Max(target_, currval);
}
#undef INSIDE_ATOMICS_H
#endif /* ATOMICS_H */