Split XLogCtl->LogwrtResult into separate struct members

After this change we have XLogCtl->logWriteResult and ->logFlushResult.
There's no functional change, other than the fact that the assignment
from shared memory to local is no longer done via struct assignment, but
instead using a macro that copies each member separately.

The current representation is inconvenient going forward; notably, we
would like to add a new member "Copy" (to keep track of the last
position copied into WAL buffers), so the symmetry between the values in
shared memory vs. those in local would be lost.

This also gives us freedom to later change the concurrency model for the
values in shared memory: we can make them use atomics instead of relying
on the info_lck spinlock.

Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com>
Discussion: https://postgr.es/m/202404031119.cd2kugjk2vho@alvherre.pgsql
This commit is contained in:
Alvaro Herrera 2024-04-03 19:55:11 +02:00
parent deb1486c7d
commit c9920a9068
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
1 changed files with 35 additions and 24 deletions

View File

@ -291,16 +291,15 @@ static bool doPageWrites;
*
* LogwrtRqst indicates a byte position that we need to write and/or fsync
* the log up to (all records before that point must be written or fsynced).
* LogwrtResult indicates the byte positions we have already written/fsynced.
* These structs are identical but are declared separately to indicate their
* slightly different functions.
* The positions already written/fsynced are maintained in logWriteResult
* and logFlushResult.
*
* To read XLogCtl->LogwrtResult, you must hold either info_lck or
* WALWriteLock. To update it, you need to hold both locks. The point of
* this arrangement is that the value can be examined by code that already
* holds WALWriteLock without needing to grab info_lck as well. In addition
* to the shared variable, each backend has a private copy of LogwrtResult,
* which is updated when convenient.
* To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either
* info_lck or WALWriteLock. To update them, you need to hold both locks.
* The point of this arrangement is that the value can be examined by code
* that already holds WALWriteLock without needing to grab info_lck as well.
* In addition to the shared variable, each backend has a private copy of
* both in LogwrtResult, which is updated when convenient.
*
* The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
* (protected by info_lck), but we don't need to cache any copies of it.
@ -478,7 +477,8 @@ typedef struct XLogCtlData
* Protected by info_lck and WALWriteLock (you must hold either lock to
* read it, but both to update)
*/
XLogwrtResult LogwrtResult;
XLogRecPtr logWriteResult; /* last byte + 1 written out */
XLogRecPtr logFlushResult; /* last byte + 1 flushed */
/*
* Latest initialized page in the cache (last byte position + 1).
@ -614,6 +614,15 @@ static int UsableBytesInSegment;
*/
static XLogwrtResult LogwrtResult = {0, 0};
/*
* Update local copy of shared XLogCtl->log{Write,Flush}Result
*/
#define RefreshXLogWriteResult(_target) \
do { \
_target.Write = XLogCtl->logWriteResult; \
_target.Flush = XLogCtl->logFlushResult; \
} while (0)
/*
* openLogFile is -1 or a kernel FD for an open log file segment.
* openLogSegNo identifies the segment, and openLogTLI the corresponding TLI.
@ -960,7 +969,7 @@ XLogInsertRecord(XLogRecData *rdata,
if (XLogCtl->LogwrtRqst.Write < EndPos)
XLogCtl->LogwrtRqst.Write = EndPos;
/* update local result copy while I have the chance */
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
}
@ -1984,7 +1993,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
SpinLockAcquire(&XLogCtl->info_lck);
if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
/*
@ -2005,7 +2014,7 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
if (LogwrtResult.Write >= OldPageRqstPtr)
{
/* OK, someone wrote it already */
@ -2289,7 +2298,7 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
/*
* Update local LogwrtResult (caller probably did this already, but...)
*/
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
/*
* Since successive pages in the xlog cache are consecutively allocated,
@ -2549,7 +2558,8 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
*/
{
SpinLockAcquire(&XLogCtl->info_lck);
XLogCtl->LogwrtResult = LogwrtResult;
XLogCtl->logWriteResult = LogwrtResult.Write;
XLogCtl->logFlushResult = LogwrtResult.Flush;
if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
@ -2572,7 +2582,7 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
XLogRecPtr prevAsyncXactLSN;
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
sleeping = XLogCtl->WalWriterSleeping;
prevAsyncXactLSN = XLogCtl->asyncXactLSN;
if (XLogCtl->asyncXactLSN < asyncXactLSN)
@ -2784,7 +2794,7 @@ XLogFlush(XLogRecPtr record)
SpinLockAcquire(&XLogCtl->info_lck);
if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
/* done already? */
@ -2815,7 +2825,7 @@ XLogFlush(XLogRecPtr record)
}
/* Got the lock; recheck whether request is satisfied */
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
if (record <= LogwrtResult.Flush)
{
LWLockRelease(WALWriteLock);
@ -2939,7 +2949,7 @@ XLogBackgroundFlush(void)
/* read LogwrtResult and update local state */
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
WriteRqst = XLogCtl->LogwrtRqst;
SpinLockRelease(&XLogCtl->info_lck);
@ -3027,7 +3037,7 @@ XLogBackgroundFlush(void)
/* now wait for any in-progress insertions to finish and get write lock */
WaitXLogInsertionsToFinish(WriteRqst.Write);
LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
if (WriteRqst.Write > LogwrtResult.Write ||
WriteRqst.Flush > LogwrtResult.Flush)
{
@ -3116,7 +3126,7 @@ XLogNeedsFlush(XLogRecPtr record)
/* read LogwrtResult and update local state */
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
/* check again */
@ -5953,7 +5963,8 @@ StartupXLOG(void)
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
XLogCtl->LogwrtResult = LogwrtResult;
XLogCtl->logWriteResult = LogwrtResult.Write;
XLogCtl->logFlushResult = LogwrtResult.Flush;
XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
@ -6400,7 +6411,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
/*
@ -9316,7 +9327,7 @@ XLogRecPtr
GetXLogWriteRecPtr(void)
{
SpinLockAcquire(&XLogCtl->info_lck);
LogwrtResult = XLogCtl->LogwrtResult;
RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
return LogwrtResult.Write;