Improve performance of subsystems on top of SLRU

More precisely, what we do here is make the SLRU cache sizes
configurable with new GUCs, so that sites with high concurrency and big
ranges of transactions in flight (resp. multixacts/subtransactions) can
benefit from bigger caches.  In order for this to work with good
performance, two additional changes are made:

1. the cache is divided in "banks" (to borrow terminology from CPU
   caches), and algorithms such as eviction buffer search only affect
   one specific bank.  This forestalls the problem that linear searching
   for a specific buffer across the whole cache takes too long: we only
   have to search the specific bank, whose size is small.  This work is
   authored by Andrey Borodin.

2. Change the locking regime for the SLRU banks, so that each bank uses
   a separate LWLock.  This allows for increased scalability.  This work
   is authored by Dilip Kumar.  (A part of this was previously committed as
   d172b717c6f4.)

Special care is taken so that the algorithms that can potentially
traverse more than one bank release one bank's lock before acquiring the
next.  This should happen rarely, but particularly clog.c's group commit
feature needed code adjustment to cope with this.  I (Álvaro) also added
lots of comments to make sure the design is sound.

The new GUCs match the names introduced by bcdfa5f2e2 in the
pg_stat_slru view.

The default values for these parameters are similar to the previous
sizes of each SLRU.  commit_ts, clog and subtrans accept value 0, which
means to adjust by dividing shared_buffers by 512 (so 2MB for every 1GB
of shared_buffers), with a cap of 8MB.  (A new slru.c function
SimpleLruAutotuneBuffers() was added to support this.)  The cap was
previously 1MB for clog, so for sites with more than 512MB of shared
memory the total memory used increases, which is likely a good tradeoff.
However, other SLRUs (notably multixact ones) retain smaller sizes and
don't support a configured value of 0.  These values based on
shared_buffers may need to be revisited, but that's an easy change.

There was some resistance to adding these new GUCs: it would be better
to adjust to memory pressure automatically somehow, for example by
stealing memory from shared_buffers (where the caches can grow and
shrink naturally).  However, doing that seems to be a much larger
project and one which has made virtually no progress in several years,
and because this is such a pain point for so many users, here we take
the pragmatic approach.

Author: Andrey Borodin <x4mmm@yandex-team.ru>
Author: Dilip Kumar <dilipbalaut@gmail.com>
Reviewed-by: Amul Sul, Gilles Darold, Anastasia Lubennikova,
	Ivan Lazarev, Robert Haas, Thomas Munro, Tomas Vondra,
	Yura Sokolov, Васильев Дмитрий (Dmitry Vasiliev).
Discussion: https://postgr.es/m/2BEC2B3F-9B61-4C1D-9FB5-5FAB0F05EF86@yandex-team.ru
Discussion: https://postgr.es/m/CAFiTN-vzDvNz=ExGXz6gdyjtzGixKSqs0mKHMmaQ8sOSEFZ33A@mail.gmail.com
This commit is contained in:
Alvaro Herrera 2024-02-28 17:05:31 +01:00
parent 1c1eec0f2d
commit 53c2a97a92
No known key found for this signature in database
GPG Key ID: 1C20ACB9D5C564AE
26 changed files with 1177 additions and 353 deletions

View File

@ -2006,6 +2006,145 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
<varlistentry id="guc-commit-timestamp-buffers" xreflabel="commit_timestamp_buffers">
<term><varname>commit_timestamp_buffers</varname> (<type>integer</type>)
<indexterm>
<primary><varname>commit_timestamp_buffers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Specifies the amount of memory to use to cache the contents of
<literal>pg_commit_ts</literal> (see
<xref linkend="pgdata-contents-table"/>).
If this value is specified without units, it is taken as blocks,
that is <symbol>BLCKSZ</symbol> bytes, typically 8kB.
The default value is <literal>0</literal>, which requests
<varname>shared_buffers</varname>/512 up to 1024 blocks,
but not fewer than 16 blocks.
This parameter can only be set at server start.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-multixact-member-buffers" xreflabel="multixact_member_buffers">
<term><varname>multixact_member_buffers</varname> (<type>integer</type>)
<indexterm>
<primary><varname>multixact_member_buffers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Specifies the amount of shared memory to use to cache the contents
of <literal>pg_multixact/members</literal> (see
<xref linkend="pgdata-contents-table"/>).
If this value is specified without units, it is taken as blocks,
that is <symbol>BLCKSZ</symbol> bytes, typically 8kB.
The default value is <literal>32</literal>.
This parameter can only be set at server start.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-multixact-offset-buffers" xreflabel="multixact_offset_buffers">
<term><varname>multixact_offset_buffers</varname> (<type>integer</type>)
<indexterm>
<primary><varname>multixact_offset_buffers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Specifies the amount of shared memory to use to cache the contents
of <literal>pg_multixact/offsets</literal> (see
<xref linkend="pgdata-contents-table"/>).
If this value is specified without units, it is taken as blocks,
that is <symbol>BLCKSZ</symbol> bytes, typically 8kB.
The default value is <literal>16</literal>.
This parameter can only be set at server start.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-notify-buffers" xreflabel="notify_buffers">
<term><varname>notify_buffers</varname> (<type>integer</type>)
<indexterm>
<primary><varname>notify_buffers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Specifies the amount of shared memory to use to cache the contents
of <literal>pg_notify</literal> (see
<xref linkend="pgdata-contents-table"/>).
If this value is specified without units, it is taken as blocks,
that is <symbol>BLCKSZ</symbol> bytes, typically 8kB.
The default value is <literal>16</literal>.
This parameter can only be set at server start.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-serializable-buffers" xreflabel="serializable_buffers">
<term><varname>serializable_buffers</varname> (<type>integer</type>)
<indexterm>
<primary><varname>serializable_buffers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Specifies the amount of shared memory to use to cache the contents
of <literal>pg_serial</literal> (see
<xref linkend="pgdata-contents-table"/>).
If this value is specified without units, it is taken as blocks,
that is <symbol>BLCKSZ</symbol> bytes, typically 8kB.
The default value is <literal>32</literal>.
This parameter can only be set at server start.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-subtransaction-buffers" xreflabel="subtransaction_buffers">
<term><varname>subtransaction_buffers</varname> (<type>integer</type>)
<indexterm>
<primary><varname>subtransaction_buffers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Specifies the amount of shared memory to use to cache the contents
of <literal>pg_subtrans</literal> (see
<xref linkend="pgdata-contents-table"/>).
If this value is specified without units, it is taken as blocks,
that is <symbol>BLCKSZ</symbol> bytes, typically 8kB.
The default value is <literal>0</literal>, which requests
<varname>shared_buffers</varname>/512 up to 1024 blocks,
but not fewer than 16 blocks.
This parameter can only be set at server start.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-transaction-buffers" xreflabel="transaction_buffers">
<term><varname>transaction_buffers</varname> (<type>integer</type>)
<indexterm>
<primary><varname>transaction_buffers</varname> configuration parameter</primary>
</indexterm>
</term>
<listitem>
<para>
Specifies the amount of shared memory to use to cache the contents
of <literal>pg_xact</literal> (see
<xref linkend="pgdata-contents-table"/>).
If this value is specified without units, it is taken as blocks,
that is <symbol>BLCKSZ</symbol> bytes, typically 8kB.
The default value is <literal>0</literal>, which requests
<varname>shared_buffers</varname>/512 up to 1024 blocks,
but not fewer than 16 blocks.
This parameter can only be set at server start.
</para>
</listitem>
</varlistentry>
<varlistentry id="guc-max-stack-depth" xreflabel="max_stack_depth">
<term><varname>max_stack_depth</varname> (<type>integer</type>)
<indexterm>

View File

@ -4482,12 +4482,19 @@ description | Waiting for a newly initialized WAL file to reach durable storage
<para>
<productname>PostgreSQL</productname> accesses certain on-disk information
via <firstterm>SLRU</firstterm> (simple least-recently-used) caches.
via <literal>SLRU</literal> (<firstterm>simple least-recently-used</firstterm>)
caches.
The <structname>pg_stat_slru</structname> view will contain
one row for each tracked SLRU cache, showing statistics about access
to cached pages.
</para>
<para>
For each <literal>SLRU</literal> cache that's part of the core server,
there is a configuration parameter that controls its size, with the suffix
<literal>_buffers</literal> appended.
</para>
<table id="pg-stat-slru-view" xreflabel="pg_stat_slru">
<title><structname>pg_stat_slru</structname> View</title>
<tgroup cols="1">

View File

@ -3,12 +3,13 @@
* clog.c
* PostgreSQL transaction-commit-log manager
*
* This module replaces the old "pg_log" access code, which treated pg_log
* essentially like a relation, in that it went through the regular buffer
* manager. The problem with that was that there wasn't any good way to
* recycle storage space for transactions so old that they'll never be
* looked up again. Now we use specialized access code so that the commit
* log can be broken into relatively small, independent segments.
* This module stores two bits per transaction regarding its commit/abort
* status; the status for four transactions fit in a byte.
*
* This would be a pretty simple abstraction on top of slru.c, except that
* for performance reasons we allow multiple transactions that are
* committing concurrently to form a queue, so that a single process can
* update the status for all of them within a single lock acquisition run.
*
* XLOG interactions: this module generates an XLOG record whenever a new
* CLOG page is initialized to zeroes. Other writes of CLOG come from
@ -43,6 +44,7 @@
#include "pgstat.h"
#include "storage/proc.h"
#include "storage/sync.h"
#include "utils/guc_hooks.h"
/*
* Defines for CLOG page sizes. A page is the same BLCKSZ as is used
@ -62,6 +64,15 @@
#define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE)
#define CLOG_XACT_BITMASK ((1 << CLOG_BITS_PER_XACT) - 1)
/*
* Because space used in CLOG by each transaction is so small, we place a
* smaller limit on the number of CLOG buffers than SLRU allows. No other
* SLRU needs this.
*/
#define CLOG_MAX_ALLOWED_BUFFERS \
Min(SLRU_MAX_ALLOWED_BUFFERS, \
(((MaxTransactionId / 2) + (CLOG_XACTS_PER_PAGE - 1)) / CLOG_XACTS_PER_PAGE))
/*
* Although we return an int64 the actual value can't currently exceed
@ -284,15 +295,20 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
XLogRecPtr lsn, int64 pageno,
bool all_xact_same_page)
{
LWLock *lock;
/* Can't use group update when PGPROC overflows. */
StaticAssertDecl(THRESHOLD_SUBTRANS_CLOG_OPT <= PGPROC_MAX_CACHED_SUBXIDS,
"group clog threshold less than PGPROC cached subxids");
/* Get the SLRU bank lock for the page we are going to access. */
lock = SimpleLruGetBankLock(XactCtl, pageno);
/*
* When there is contention on XactSLRULock, we try to group multiple
* updates; a single leader process will perform transaction status
* updates for multiple backends so that the number of times XactSLRULock
* needs to be acquired is reduced.
* When there is contention on the SLRU bank lock we need, we try to group
* multiple updates; a single leader process will perform transaction
* status updates for multiple backends so that the number of times the
* bank lock needs to be acquired is reduced.
*
* For this optimization to be safe, the XID and subxids in MyProc must be
* the same as the ones for which we're setting the status. Check that
@ -310,17 +326,17 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
nsubxids * sizeof(TransactionId)) == 0))
{
/*
* If we can immediately acquire XactSLRULock, we update the status of
* our own XID and release the lock. If not, try use group XID
* update. If that doesn't work out, fall back to waiting for the
* lock to perform an update for this transaction only.
* If we can immediately acquire the lock, we update the status of our
* own XID and release the lock. If not, try use group XID update. If
* that doesn't work out, fall back to waiting for the lock to perform
* an update for this transaction only.
*/
if (LWLockConditionalAcquire(XactSLRULock, LW_EXCLUSIVE))
if (LWLockConditionalAcquire(lock, LW_EXCLUSIVE))
{
/* Got the lock without waiting! Do the update. */
TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,
lsn, pageno);
LWLockRelease(XactSLRULock);
LWLockRelease(lock);
return;
}
else if (TransactionGroupUpdateXidStatus(xid, status, lsn, pageno))
@ -333,10 +349,10 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
}
/* Group update not applicable, or couldn't accept this page number. */
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,
lsn, pageno);
LWLockRelease(XactSLRULock);
LWLockRelease(lock);
}
/*
@ -355,7 +371,8 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
Assert(status == TRANSACTION_STATUS_COMMITTED ||
status == TRANSACTION_STATUS_ABORTED ||
(status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid)));
Assert(LWLockHeldByMeInMode(XactSLRULock, LW_EXCLUSIVE));
Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(XactCtl, pageno),
LW_EXCLUSIVE));
/*
* If we're doing an async commit (ie, lsn is valid), then we must wait
@ -406,14 +423,15 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,
}
/*
* When we cannot immediately acquire XactSLRULock in exclusive mode at
* Subroutine for TransactionIdSetPageStatus, q.v.
*
* When we cannot immediately acquire the SLRU bank lock in exclusive mode at
* commit time, add ourselves to a list of processes that need their XIDs
* status update. The first process to add itself to the list will acquire
* XactSLRULock in exclusive mode and set transaction status as required
* on behalf of all group members. This avoids a great deal of contention
* around XactSLRULock when many processes are trying to commit at once,
* since the lock need not be repeatedly handed off from one committing
* process to the next.
* the lock in exclusive mode and set transaction status as required on behalf
* of all group members. This avoids a great deal of contention when many
* processes are trying to commit at once, since the lock need not be
* repeatedly handed off from one committing process to the next.
*
* Returns true when transaction status has been updated in clog; returns
* false if we decided against applying the optimization because the page
@ -425,16 +443,17 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
{
volatile PROC_HDR *procglobal = ProcGlobal;
PGPROC *proc = MyProc;
int pgprocno = MyProcNumber;
uint32 nextidx;
uint32 wakeidx;
int prevpageno;
LWLock *prevlock = NULL;
/* We should definitely have an XID whose status needs to be updated. */
Assert(TransactionIdIsValid(xid));
/*
* Add ourselves to the list of processes needing a group XID status
* update.
* Prepare to add ourselves to the list of processes needing a group XID
* status update.
*/
proc->clogGroupMember = true;
proc->clogGroupMemberXid = xid;
@ -442,6 +461,29 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
proc->clogGroupMemberPage = pageno;
proc->clogGroupMemberLsn = lsn;
/*
* We put ourselves in the queue by writing MyProcNumber to
* ProcGlobal->clogGroupFirst. However, if there's already a process
* listed there, we compare our pageno with that of that process; if it
* differs, we cannot participate in the group, so we return for caller to
* update pg_xact in the normal way.
*
* If we're not the first process in the list, we must follow the leader.
* We do this by storing the data we want updated in our PGPROC entry
* where the leader can find it, then going to sleep.
*
* If no process is already in the list, we're the leader; our first step
* is to lock the SLRU bank to which our page belongs, then we close out
* the group by resetting the list pointer from ProcGlobal->clogGroupFirst
* (this lets other processes set up other groups later); finally we do
* the SLRU updates, release the SLRU bank lock, and wake up the sleeping
* processes.
*
* If another group starts to update a page in a different SLRU bank, they
* can proceed concurrently, since the bank lock they're going to use is
* different from ours. If another group starts to update a page in the
* same bank as ours, they wait until we release the lock.
*/
nextidx = pg_atomic_read_u32(&procglobal->clogGroupFirst);
while (true)
@ -453,10 +495,11 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
* There is a race condition here, which is that after doing the below
* check and before adding this proc's clog update to a group, the
* group leader might have already finished the group update for this
* page and becomes group leader of another group. This will lead to a
* situation where a single group can have different clog page
* updates. This isn't likely and will still work, just maybe a bit
* less efficiently.
* page and becomes group leader of another group, updating a
* different page. This will lead to a situation where a single group
* can have different clog page updates. This isn't likely and will
* still work, just less efficiently -- we handle this case by
* switching to a different bank lock in the loop below.
*/
if (nextidx != INVALID_PGPROCNO &&
GetPGProcByNumber(nextidx)->clogGroupMemberPage != proc->clogGroupMemberPage)
@ -474,7 +517,7 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
if (pg_atomic_compare_exchange_u32(&procglobal->clogGroupFirst,
&nextidx,
(uint32) pgprocno))
(uint32) MyProcNumber))
break;
}
@ -508,13 +551,21 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
return true;
}
/* We are the leader. Acquire the lock on behalf of everyone. */
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
/*
* By here, we know we're the leader process. Acquire the SLRU bank lock
* that corresponds to the page we originally wanted to modify.
*/
prevpageno = proc->clogGroupMemberPage;
prevlock = SimpleLruGetBankLock(XactCtl, prevpageno);
LWLockAcquire(prevlock, LW_EXCLUSIVE);
/*
* Now that we've got the lock, clear the list of processes waiting for
* group XID status update, saving a pointer to the head of the list.
* Trying to pop elements one at a time could lead to an ABA problem.
* (Trying to pop elements one at a time could lead to an ABA problem.)
*
* At this point, any processes trying to do this would create a separate
* group.
*/
nextidx = pg_atomic_exchange_u32(&procglobal->clogGroupFirst,
INVALID_PGPROCNO);
@ -526,6 +577,31 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
while (nextidx != INVALID_PGPROCNO)
{
PGPROC *nextproc = &ProcGlobal->allProcs[nextidx];
int thispageno = nextproc->clogGroupMemberPage;
/*
* If the page to update belongs to a different bank than the previous
* one, exchange bank lock to the new one. This should be quite rare,
* as described above.
*
* (We could try to optimize this by waking up the processes for which
* we have already updated the status while we exchange the lock, but
* the code doesn't do that at present. I think it'd require
* additional bookkeeping, making the common path slower in order to
* improve an infrequent case.)
*/
if (thispageno != prevpageno)
{
LWLock *lock = SimpleLruGetBankLock(XactCtl, thispageno);
if (prevlock != lock)
{
LWLockRelease(prevlock);
LWLockAcquire(lock, LW_EXCLUSIVE);
}
prevlock = lock;
prevpageno = thispageno;
}
/*
* Transactions with more than THRESHOLD_SUBTRANS_CLOG_OPT sub-XIDs
@ -545,12 +621,17 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
}
/* We're done with the lock now. */
LWLockRelease(XactSLRULock);
if (prevlock != NULL)
LWLockRelease(prevlock);
/*
* Now that we've released the lock, go back and wake everybody up. We
* don't do this under the lock so as to keep lock hold times to a
* minimum.
*
* (Perhaps we could do this in two passes, the first setting
* clogGroupNext to invalid while saving the semaphores to an array, then
* a single write barrier, then another pass unlocking the semaphores.)
*/
while (wakeidx != INVALID_PGPROCNO)
{
@ -574,7 +655,7 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status,
/*
* Sets the commit status of a single transaction.
*
* Must be called with XactSLRULock held
* Caller must hold the corresponding SLRU bank lock, will be held at exit.
*/
static void
TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno)
@ -585,6 +666,11 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i
char byteval;
char curval;
Assert(XactCtl->shared->page_number[slotno] == TransactionIdToPage(xid));
Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(XactCtl,
XactCtl->shared->page_number[slotno]),
LW_EXCLUSIVE));
byteptr = XactCtl->shared->page_buffer[slotno] + byteno;
curval = (*byteptr >> bshift) & CLOG_XACT_BITMASK;
@ -666,7 +752,7 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
lsnindex = GetLSNIndex(slotno, xid);
*lsn = XactCtl->shared->group_lsn[lsnindex];
LWLockRelease(XactSLRULock);
LWLockRelease(SimpleLruGetBankLock(XactCtl, pageno));
return status;
}
@ -674,23 +760,18 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
/*
* Number of shared CLOG buffers.
*
* On larger multi-processor systems, it is possible to have many CLOG page
* requests in flight at one time which could lead to disk access for CLOG
* page if the required page is not found in memory. Testing revealed that we
* can get the best performance by having 128 CLOG buffers, more than that it
* doesn't improve performance.
*
* Unconditionally keeping the number of CLOG buffers to 128 did not seem like
* a good idea, because it would increase the minimum amount of shared memory
* required to start, which could be a problem for people running very small
* configurations. The following formula seems to represent a reasonable
* compromise: people with very low values for shared_buffers will get fewer
* CLOG buffers as well, and everyone else will get 128.
* If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
* Otherwise just cap the configured amount to be between 16 and the maximum
* allowed.
*/
Size
static int
CLOGShmemBuffers(void)
{
return Min(128, Max(4, NBuffers / 512));
/* auto-tune based on shared buffers */
if (transaction_buffers == 0)
return SimpleLruAutotuneBuffers(512, 1024);
return Min(Max(16, transaction_buffers), CLOG_MAX_ALLOWED_BUFFERS);
}
/*
@ -705,13 +786,43 @@ CLOGShmemSize(void)
void
CLOGShmemInit(void)
{
/* If auto-tuning is requested, now is the time to do it */
if (transaction_buffers == 0)
{
char buf[32];
snprintf(buf, sizeof(buf), "%d", CLOGShmemBuffers());
SetConfigOption("transaction_buffers", buf, PGC_POSTMASTER,
PGC_S_DYNAMIC_DEFAULT);
/*
* We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
* However, if the DBA explicitly set transaction_buffers = 0 in the
* config file, then PGC_S_DYNAMIC_DEFAULT will fail to override that
* and we must force the matter with PGC_S_OVERRIDE.
*/
if (transaction_buffers == 0) /* failed to apply it? */
SetConfigOption("transaction_buffers", buf, PGC_POSTMASTER,
PGC_S_OVERRIDE);
}
Assert(transaction_buffers != 0);
XactCtl->PagePrecedes = CLOGPagePrecedes;
SimpleLruInit(XactCtl, "transaction", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE,
XactSLRULock, "pg_xact", LWTRANCHE_XACT_BUFFER,
SYNC_HANDLER_CLOG, false);
"pg_xact", LWTRANCHE_XACT_BUFFER,
LWTRANCHE_XACT_SLRU, SYNC_HANDLER_CLOG, false);
SlruPagePrecedesUnitTests(XactCtl, CLOG_XACTS_PER_PAGE);
}
/*
* GUC check_hook for transaction_buffers
*/
bool
check_transaction_buffers(int *newval, void **extra, GucSource source)
{
return check_slru_buffers("transaction_buffers", newval);
}
/*
* This func must be called ONCE on system install. It creates
* the initial CLOG segment. (The CLOG directory is assumed to
@ -722,8 +833,9 @@ void
BootStrapCLOG(void)
{
int slotno;
LWLock *lock = SimpleLruGetBankLock(XactCtl, 0);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
/* Create and zero the first page of the commit log */
slotno = ZeroCLOGPage(0, false);
@ -732,7 +844,7 @@ BootStrapCLOG(void)
SimpleLruWritePage(XactCtl, slotno);
Assert(!XactCtl->shared->page_dirty[slotno]);
LWLockRelease(XactSLRULock);
LWLockRelease(lock);
}
/*
@ -781,8 +893,9 @@ TrimCLOG(void)
{
TransactionId xid = XidFromFullTransactionId(TransamVariables->nextXid);
int64 pageno = TransactionIdToPage(xid);
LWLock *lock = SimpleLruGetBankLock(XactCtl, pageno);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
/*
* Zero out the remainder of the current clog page. Under normal
@ -814,7 +927,7 @@ TrimCLOG(void)
XactCtl->shared->page_dirty[slotno] = true;
}
LWLockRelease(XactSLRULock);
LWLockRelease(lock);
}
/*
@ -846,6 +959,7 @@ void
ExtendCLOG(TransactionId newestXact)
{
int64 pageno;
LWLock *lock;
/*
* No work except at first XID of a page. But beware: just after
@ -856,13 +970,14 @@ ExtendCLOG(TransactionId newestXact)
return;
pageno = TransactionIdToPage(newestXact);
lock = SimpleLruGetBankLock(XactCtl, pageno);
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroCLOGPage(pageno, true);
LWLockRelease(XactSLRULock);
LWLockRelease(lock);
}
@ -1000,16 +1115,18 @@ clog_redo(XLogReaderState *record)
{
int64 pageno;
int slotno;
LWLock *lock;
memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);
lock = SimpleLruGetBankLock(XactCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = ZeroCLOGPage(pageno, false);
SimpleLruWritePage(XactCtl, slotno);
Assert(!XactCtl->shared->page_dirty[slotno]);
LWLockRelease(XactSLRULock);
LWLockRelease(lock);
}
else if (info == CLOG_TRUNCATE)
{

View File

@ -33,6 +33,7 @@
#include "pg_trace.h"
#include "storage/shmem.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
#include "utils/snapmgr.h"
#include "utils/timestamp.h"
@ -225,10 +226,11 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
TransactionId *subxids, TimestampTz ts,
RepOriginId nodeid, int64 pageno)
{
LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
int slotno;
int i;
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
@ -238,13 +240,13 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids,
CommitTsCtl->shared->page_dirty[slotno] = true;
LWLockRelease(CommitTsSLRULock);
LWLockRelease(lock);
}
/*
* Sets the commit timestamp of a single transaction.
*
* Must be called with CommitTsSLRULock held
* Caller must hold the correct SLRU bank lock, will be held at exit
*/
static void
TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
@ -345,7 +347,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
if (nodeid)
*nodeid = entry.nodeid;
LWLockRelease(CommitTsSLRULock);
LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
return *ts != 0;
}
@ -499,14 +501,18 @@ pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
/*
* Number of shared CommitTS buffers.
*
* We use a very similar logic as for the number of CLOG buffers (except we
* scale up twice as fast with shared buffers, and the maximum is twice as
* high); see comments in CLOGShmemBuffers.
* If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
* Otherwise just cap the configured amount to be between 16 and the maximum
* allowed.
*/
Size
static int
CommitTsShmemBuffers(void)
{
return Min(256, Max(4, NBuffers / 256));
/* auto-tune based on shared buffers */
if (commit_timestamp_buffers == 0)
return SimpleLruAutotuneBuffers(512, 1024);
return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
}
/*
@ -528,10 +534,31 @@ CommitTsShmemInit(void)
{
bool found;
/* If auto-tuning is requested, now is the time to do it */
if (commit_timestamp_buffers == 0)
{
char buf[32];
snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
PGC_S_DYNAMIC_DEFAULT);
/*
* We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
* However, if the DBA explicitly set commit_timestamp_buffers = 0 in
* the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
* that and we must force the matter with PGC_S_OVERRIDE.
*/
if (commit_timestamp_buffers == 0) /* failed to apply it? */
SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
PGC_S_OVERRIDE);
}
Assert(commit_timestamp_buffers != 0);
CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
CommitTsSLRULock, "pg_commit_ts",
LWTRANCHE_COMMITTS_BUFFER,
"pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
LWTRANCHE_COMMITTS_SLRU,
SYNC_HANDLER_COMMIT_TS,
false);
SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
@ -553,6 +580,15 @@ CommitTsShmemInit(void)
Assert(found);
}
/*
* GUC check_hook for commit_timestamp_buffers
*/
bool
check_commit_ts_buffers(int *newval, void **extra, GucSource source)
{
return check_slru_buffers("commit_timestamp_buffers", newval);
}
/*
* This function must be called ONCE on system install.
*
@ -715,13 +751,14 @@ ActivateCommitTs(void)
/* Create the current segment file, if necessary */
if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
{
LWLock *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
int slotno;
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = ZeroCommitTsPage(pageno, false);
SimpleLruWritePage(CommitTsCtl, slotno);
Assert(!CommitTsCtl->shared->page_dirty[slotno]);
LWLockRelease(CommitTsSLRULock);
LWLockRelease(lock);
}
/* Change the activation status in shared memory. */
@ -760,8 +797,6 @@ DeactivateCommitTs(void)
TransamVariables->oldestCommitTsXid = InvalidTransactionId;
TransamVariables->newestCommitTsXid = InvalidTransactionId;
LWLockRelease(CommitTsLock);
/*
* Remove *all* files. This is necessary so that there are no leftover
* files; in the case where this feature is later enabled after running
@ -769,10 +804,16 @@ DeactivateCommitTs(void)
* (We can probably tolerate out-of-sequence files, as they are going to
* be overwritten anyway when we wrap around, but it seems better to be
* tidy.)
*
* Note that we do this with CommitTsLock acquired in exclusive mode. This
* is very heavy-handed, but since this routine can only be called in the
* replica and should happen very rarely, we don't worry too much about
* it. Note also that no process should be consulting this SLRU if we
* have just deactivated it.
*/
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
(void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
LWLockRelease(CommitTsSLRULock);
LWLockRelease(CommitTsLock);
}
/*
@ -804,6 +845,7 @@ void
ExtendCommitTs(TransactionId newestXact)
{
int64 pageno;
LWLock *lock;
/*
* Nothing to do if module not enabled. Note we do an unlocked read of
@ -824,12 +866,14 @@ ExtendCommitTs(TransactionId newestXact)
pageno = TransactionIdToCTsPage(newestXact);
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroCommitTsPage(pageno, !InRecovery);
LWLockRelease(CommitTsSLRULock);
LWLockRelease(lock);
}
/*
@ -983,16 +1027,18 @@ commit_ts_redo(XLogReaderState *record)
{
int64 pageno;
int slotno;
LWLock *lock;
memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE);
lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = ZeroCommitTsPage(pageno, false);
SimpleLruWritePage(CommitTsCtl, slotno);
Assert(!CommitTsCtl->shared->page_dirty[slotno]);
LWLockRelease(CommitTsSLRULock);
LWLockRelease(lock);
}
else if (info == COMMIT_TS_TRUNCATE)
{

View File

@ -88,6 +88,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/snapmgr.h"
@ -192,10 +193,10 @@ static SlruCtlData MultiXactMemberCtlData;
/*
* MultiXact state shared across all backends. All this state is protected
* by MultiXactGenLock. (We also use MultiXactOffsetSLRULock and
* MultiXactMemberSLRULock to guard accesses to the two sets of SLRU
* buffers. For concurrency's sake, we avoid holding more than one of these
* locks at a time.)
* by MultiXactGenLock. (We also use SLRU bank's lock of MultiXactOffset and
* MultiXactMember to guard accesses to the two sets of SLRU buffers. For
* concurrency's sake, we avoid holding more than one of these locks at a
* time.)
*/
typedef struct MultiXactStateData
{
@ -870,12 +871,15 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
int slotno;
MultiXactOffset *offptr;
int i;
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
LWLock *lock;
LWLock *prevlock = NULL;
pageno = MultiXactIdToOffsetPage(multi);
entryno = MultiXactIdToOffsetEntry(multi);
lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
/*
* Note: we pass the MultiXactId to SimpleLruReadPage as the "transaction"
* to complain about if there's any I/O error. This is kinda bogus, but
@ -891,10 +895,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
MultiXactOffsetCtl->shared->page_dirty[slotno] = true;
/* Exchange our lock */
LWLockRelease(MultiXactOffsetSLRULock);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
/* Release MultiXactOffset SLRU lock. */
LWLockRelease(lock);
prev_pageno = -1;
@ -916,6 +918,20 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
if (pageno != prev_pageno)
{
/*
* MultiXactMember SLRU page is changed so check if this new page
* fall into the different SLRU bank then release the old bank's
* lock and acquire lock on the new bank.
*/
lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno);
if (lock != prevlock)
{
if (prevlock != NULL)
LWLockRelease(prevlock);
LWLockAcquire(lock, LW_EXCLUSIVE);
prevlock = lock;
}
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi);
prev_pageno = pageno;
}
@ -936,7 +952,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset,
MultiXactMemberCtl->shared->page_dirty[slotno] = true;
}
LWLockRelease(MultiXactMemberSLRULock);
if (prevlock != NULL)
LWLockRelease(prevlock);
}
/*
@ -1239,6 +1256,8 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
MultiXactId tmpMXact;
MultiXactOffset nextOffset;
MultiXactMember *ptr;
LWLock *lock;
LWLock *prevlock = NULL;
debug_elog3(DEBUG2, "GetMembers: asked for %u", multi);
@ -1342,11 +1361,22 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members,
* time on every multixact creation.
*/
retry:
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
pageno = MultiXactIdToOffsetPage(multi);
entryno = MultiXactIdToOffsetEntry(multi);
/*
* If this page falls under a different bank, release the old bank's lock
* and acquire the lock of the new bank.
*/
lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
if (lock != prevlock)
{
if (prevlock != NULL)
LWLockRelease(prevlock);
LWLockAcquire(lock, LW_EXCLUSIVE);
prevlock = lock;
}
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
@ -1379,7 +1409,21 @@ retry:
entryno = MultiXactIdToOffsetEntry(tmpMXact);
if (pageno != prev_pageno)
{
/*
* Since we're going to access a different SLRU page, if this page
* falls under a different bank, release the old bank's lock and
* acquire the lock of the new bank.
*/
lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
if (prevlock != lock)
{
LWLockRelease(prevlock);
LWLockAcquire(lock, LW_EXCLUSIVE);
prevlock = lock;
}
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, tmpMXact);
}
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
@ -1388,7 +1432,8 @@ retry:
if (nextMXOffset == 0)
{
/* Corner case 2: next multixact is still being filled in */
LWLockRelease(MultiXactOffsetSLRULock);
LWLockRelease(prevlock);
prevlock = NULL;
CHECK_FOR_INTERRUPTS();
pg_usleep(1000L);
goto retry;
@ -1397,13 +1442,11 @@ retry:
length = nextMXOffset - offset;
}
LWLockRelease(MultiXactOffsetSLRULock);
LWLockRelease(prevlock);
prevlock = NULL;
ptr = (MultiXactMember *) palloc(length * sizeof(MultiXactMember));
/* Now get the members themselves. */
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
truelength = 0;
prev_pageno = -1;
for (i = 0; i < length; i++, offset++)
@ -1419,6 +1462,20 @@ retry:
if (pageno != prev_pageno)
{
/*
* Since we're going to access a different SLRU page, if this page
* falls under a different bank, release the old bank's lock and
* acquire the lock of the new bank.
*/
lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno);
if (lock != prevlock)
{
if (prevlock)
LWLockRelease(prevlock);
LWLockAcquire(lock, LW_EXCLUSIVE);
prevlock = lock;
}
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi);
prev_pageno = pageno;
}
@ -1442,7 +1499,8 @@ retry:
truelength++;
}
LWLockRelease(MultiXactMemberSLRULock);
if (prevlock)
LWLockRelease(prevlock);
/* A multixid with zero members should not happen */
Assert(truelength > 0);
@ -1834,8 +1892,8 @@ MultiXactShmemSize(void)
mul_size(sizeof(MultiXactId) * 2, MaxOldestSlot))
size = SHARED_MULTIXACT_STATE_SIZE;
size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTOFFSET_BUFFERS, 0));
size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTMEMBER_BUFFERS, 0));
size = add_size(size, SimpleLruShmemSize(multixact_offset_buffers, 0));
size = add_size(size, SimpleLruShmemSize(multixact_member_buffers, 0));
return size;
}
@ -1851,16 +1909,16 @@ MultiXactShmemInit(void)
MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes;
SimpleLruInit(MultiXactOffsetCtl,
"multixact_offset", NUM_MULTIXACTOFFSET_BUFFERS, 0,
MultiXactOffsetSLRULock, "pg_multixact/offsets",
LWTRANCHE_MULTIXACTOFFSET_BUFFER,
"multixact_offset", multixact_offset_buffers, 0,
"pg_multixact/offsets", LWTRANCHE_MULTIXACTOFFSET_BUFFER,
LWTRANCHE_MULTIXACTOFFSET_SLRU,
SYNC_HANDLER_MULTIXACT_OFFSET,
false);
SlruPagePrecedesUnitTests(MultiXactOffsetCtl, MULTIXACT_OFFSETS_PER_PAGE);
SimpleLruInit(MultiXactMemberCtl,
"multixact_member", NUM_MULTIXACTMEMBER_BUFFERS, 0,
MultiXactMemberSLRULock, "pg_multixact/members",
LWTRANCHE_MULTIXACTMEMBER_BUFFER,
"multixact_member", multixact_member_buffers, 0,
"pg_multixact/members", LWTRANCHE_MULTIXACTMEMBER_BUFFER,
LWTRANCHE_MULTIXACTMEMBER_SLRU,
SYNC_HANDLER_MULTIXACT_MEMBER,
false);
/* doesn't call SimpleLruTruncate() or meet criteria for unit tests */
@ -1887,6 +1945,24 @@ MultiXactShmemInit(void)
OldestVisibleMXactId = OldestMemberMXactId + MaxOldestSlot;
}
/*
* GUC check_hook for multixact_offset_buffers
*/
bool
check_multixact_offset_buffers(int *newval, void **extra, GucSource source)
{
return check_slru_buffers("multixact_offset_buffers", newval);
}
/*
* GUC check_hook for multixact_member_buffer
*/
bool
check_multixact_member_buffers(int *newval, void **extra, GucSource source)
{
return check_slru_buffers("multixact_member_buffers", newval);
}
/*
* This func must be called ONCE on system install. It creates the initial
* MultiXact segments. (The MultiXacts directories are assumed to have been
@ -1896,8 +1972,10 @@ void
BootStrapMultiXact(void)
{
int slotno;
LWLock *lock;
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
lock = SimpleLruGetBankLock(MultiXactOffsetCtl, 0);
LWLockAcquire(lock, LW_EXCLUSIVE);
/* Create and zero the first page of the offsets log */
slotno = ZeroMultiXactOffsetPage(0, false);
@ -1906,9 +1984,10 @@ BootStrapMultiXact(void)
SimpleLruWritePage(MultiXactOffsetCtl, slotno);
Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]);
LWLockRelease(MultiXactOffsetSLRULock);
LWLockRelease(lock);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
lock = SimpleLruGetBankLock(MultiXactMemberCtl, 0);
LWLockAcquire(lock, LW_EXCLUSIVE);
/* Create and zero the first page of the members log */
slotno = ZeroMultiXactMemberPage(0, false);
@ -1917,7 +1996,7 @@ BootStrapMultiXact(void)
SimpleLruWritePage(MultiXactMemberCtl, slotno);
Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]);
LWLockRelease(MultiXactMemberSLRULock);
LWLockRelease(lock);
}
/*
@ -1977,10 +2056,12 @@ static void
MaybeExtendOffsetSlru(void)
{
int64 pageno;
LWLock *lock;
pageno = MultiXactIdToOffsetPage(MultiXactState->nextMXact);
lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
if (!SimpleLruDoesPhysicalPageExist(MultiXactOffsetCtl, pageno))
{
@ -1995,7 +2076,7 @@ MaybeExtendOffsetSlru(void)
SimpleLruWritePage(MultiXactOffsetCtl, slotno);
}
LWLockRelease(MultiXactOffsetSLRULock);
LWLockRelease(lock);
}
/*
@ -2049,6 +2130,8 @@ TrimMultiXact(void)
oldestMXactDB = MultiXactState->oldestMultiXactDB;
LWLockRelease(MultiXactGenLock);
/* Clean up offsets state */
/*
* (Re-)Initialize our idea of the latest page number for offsets.
*/
@ -2056,9 +2139,6 @@ TrimMultiXact(void)
pg_atomic_write_u64(&MultiXactOffsetCtl->shared->latest_page_number,
pageno);
/* Clean up offsets state */
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
/*
* Zero out the remainder of the current offsets page. See notes in
* TrimCLOG() for background. Unlike CLOG, some WAL record covers every
@ -2072,7 +2152,9 @@ TrimMultiXact(void)
{
int slotno;
MultiXactOffset *offptr;
LWLock *lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, nextMXact);
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
@ -2080,10 +2162,9 @@ TrimMultiXact(void)
MemSet(offptr, 0, BLCKSZ - (entryno * sizeof(MultiXactOffset)));
MultiXactOffsetCtl->shared->page_dirty[slotno] = true;
LWLockRelease(lock);
}
LWLockRelease(MultiXactOffsetSLRULock);
/*
* And the same for members.
*
@ -2093,8 +2174,6 @@ TrimMultiXact(void)
pg_atomic_write_u64(&MultiXactMemberCtl->shared->latest_page_number,
pageno);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
/*
* Zero out the remainder of the current members page. See notes in
* TrimCLOG() for motivation.
@ -2105,7 +2184,9 @@ TrimMultiXact(void)
int slotno;
TransactionId *xidptr;
int memberoff;
LWLock *lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
memberoff = MXOffsetToMemberOffset(offset);
slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, offset);
xidptr = (TransactionId *)
@ -2120,10 +2201,9 @@ TrimMultiXact(void)
*/
MultiXactMemberCtl->shared->page_dirty[slotno] = true;
LWLockRelease(lock);
}
LWLockRelease(MultiXactMemberSLRULock);
/* signal that we're officially up */
LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE);
MultiXactState->finishedStartup = true;
@ -2411,6 +2491,7 @@ static void
ExtendMultiXactOffset(MultiXactId multi)
{
int64 pageno;
LWLock *lock;
/*
* No work except at first MultiXactId of a page. But beware: just after
@ -2421,13 +2502,14 @@ ExtendMultiXactOffset(MultiXactId multi)
return;
pageno = MultiXactIdToOffsetPage(multi);
lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroMultiXactOffsetPage(pageno, true);
LWLockRelease(MultiXactOffsetSLRULock);
LWLockRelease(lock);
}
/*
@ -2460,15 +2542,17 @@ ExtendMultiXactMember(MultiXactOffset offset, int nmembers)
if (flagsoff == 0 && flagsbit == 0)
{
int64 pageno;
LWLock *lock;
pageno = MXOffsetToMemberPage(offset);
lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno);
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
/* Zero the page and make an XLOG entry about it */
ZeroMultiXactMemberPage(pageno, true);
LWLockRelease(MultiXactMemberSLRULock);
LWLockRelease(lock);
}
/*
@ -2766,7 +2850,7 @@ find_multixact_start(MultiXactId multi, MultiXactOffset *result)
offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno];
offptr += entryno;
offset = *offptr;
LWLockRelease(MultiXactOffsetSLRULock);
LWLockRelease(SimpleLruGetBankLock(MultiXactOffsetCtl, pageno));
*result = offset;
return true;
@ -3248,31 +3332,35 @@ multixact_redo(XLogReaderState *record)
{
int64 pageno;
int slotno;
LWLock *lock;
memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE);
lock = SimpleLruGetBankLock(MultiXactOffsetCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = ZeroMultiXactOffsetPage(pageno, false);
SimpleLruWritePage(MultiXactOffsetCtl, slotno);
Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]);
LWLockRelease(MultiXactOffsetSLRULock);
LWLockRelease(lock);
}
else if (info == XLOG_MULTIXACT_ZERO_MEM_PAGE)
{
int64 pageno;
int slotno;
LWLock *lock;
memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE);
lock = SimpleLruGetBankLock(MultiXactMemberCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = ZeroMultiXactMemberPage(pageno, false);
SimpleLruWritePage(MultiXactMemberCtl, slotno);
Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]);
LWLockRelease(MultiXactMemberSLRULock);
LWLockRelease(lock);
}
else if (info == XLOG_MULTIXACT_CREATE_ID)
{

View File

@ -1,28 +1,38 @@
/*-------------------------------------------------------------------------
*
* slru.c
* Simple LRU buffering for transaction status logfiles
* Simple LRU buffering for wrap-around-able permanent metadata
*
* We use a simple least-recently-used scheme to manage a pool of page
* buffers. Under ordinary circumstances we expect that write
* traffic will occur mostly to the latest page (and to the just-prior
* page, soon after a page transition). Read traffic will probably touch
* a larger span of pages, but in any case a fairly small number of page
* buffers should be sufficient. So, we just search the buffers using plain
* linear search; there's no need for a hashtable or anything fancy.
* The management algorithm is straight LRU except that we will never swap
* out the latest page (since we know it's going to be hit again eventually).
* This module is used to maintain various pieces of transaction status
* indexed by TransactionId (such as commit status, parent transaction ID,
* commit timestamp), as well as storage for multixacts, serializable
* isolation locks and NOTIFY traffic. Extensions can define their own
* SLRUs, too.
*
* We use a control LWLock to protect the shared data structures, plus
* per-buffer LWLocks that synchronize I/O for each buffer. The control lock
* must be held to examine or modify any shared state. A process that is
* reading in or writing out a page buffer does not hold the control lock,
* only the per-buffer lock for the buffer it is working on. One exception
* is latest_page_number, which is read and written using atomic ops.
* Under ordinary circumstances we expect that write traffic will occur
* mostly to the latest page (and to the just-prior page, soon after a
* page transition). Read traffic will probably touch a larger span of
* pages, but a relatively small number of buffers should be sufficient.
*
* "Holding the control lock" means exclusive lock in all cases except for
* SimpleLruReadPage_ReadOnly(); see comments for SlruRecentlyUsed() for
* the implications of that.
* We use a simple least-recently-used scheme to manage a pool of shared
* page buffers, split in banks by the lowest bits of the page number, and
* the management algorithm only processes the bank to which the desired
* page belongs, so a linear search is sufficient; there's no need for a
* hashtable or anything fancy. The algorithm is straight LRU except that
* we will never swap out the latest page (since we know it's going to be
* hit again eventually).
*
* We use per-bank control LWLocks to protect the shared data structures,
* plus per-buffer LWLocks that synchronize I/O for each buffer. The
* bank's control lock must be held to examine or modify any of the bank's
* shared state. A process that is reading in or writing out a page
* buffer does not hold the control lock, only the per-buffer lock for the
* buffer it is working on. One exception is latest_page_number, which is
* read and written using atomic ops.
*
* "Holding the bank control lock" means exclusive lock in all cases
* except for SimpleLruReadPage_ReadOnly(); see comments for
* SlruRecentlyUsed() for the implications of that.
*
* When initiating I/O on a buffer, we acquire the per-buffer lock exclusively
* before releasing the control lock. The per-buffer lock is released after
@ -60,6 +70,7 @@
#include "pgstat.h"
#include "storage/fd.h"
#include "storage/shmem.h"
#include "utils/guc_hooks.h"
static inline int
SlruFileName(SlruCtl ctl, char *path, int64 segno)
@ -106,6 +117,23 @@ typedef struct SlruWriteAllData
typedef struct SlruWriteAllData *SlruWriteAll;
/*
* Bank size for the slot array. Pages are assigned a bank according to their
* page number, with each bank being this size. We want a power of 2 so that
* we can determine the bank number for a page with just bit shifting; we also
* want to keep the bank size small so that LRU victim search is fast. 16
* buffers per bank seems a good number.
*/
#define SLRU_BANK_BITSHIFT 4
#define SLRU_BANK_SIZE (1 << SLRU_BANK_BITSHIFT)
/*
* Macro to get the bank number to which the slot belongs.
*/
#define SlotGetBankNumber(slotno) ((slotno) >> SLRU_BANK_BITSHIFT)
/*
* Populate a file tag describing a segment file. We only use the segment
* number, since we can derive everything else we need by having separate
@ -118,34 +146,6 @@ typedef struct SlruWriteAllData *SlruWriteAll;
(a).segno = (xx_segno) \
)
/*
* Macro to mark a buffer slot "most recently used". Note multiple evaluation
* of arguments!
*
* The reason for the if-test is that there are often many consecutive
* accesses to the same page (particularly the latest page). By suppressing
* useless increments of cur_lru_count, we reduce the probability that old
* pages' counts will "wrap around" and make them appear recently used.
*
* We allow this code to be executed concurrently by multiple processes within
* SimpleLruReadPage_ReadOnly(). As long as int reads and writes are atomic,
* this should not cause any completely-bogus values to enter the computation.
* However, it is possible for either cur_lru_count or individual
* page_lru_count entries to be "reset" to lower values than they should have,
* in case a process is delayed while it executes this macro. With care in
* SlruSelectLRUPage(), this does little harm, and in any case the absolute
* worst possible consequence is a nonoptimal choice of page to evict. The
* gain from allowing concurrent reads of SLRU pages seems worth it.
*/
#define SlruRecentlyUsed(shared, slotno) \
do { \
int new_lru_count = (shared)->cur_lru_count; \
if (new_lru_count != (shared)->page_lru_count[slotno]) { \
(shared)->cur_lru_count = ++new_lru_count; \
(shared)->page_lru_count[slotno] = new_lru_count; \
} \
} while (0)
/* Saved info for SlruReportIOError */
typedef enum
{
@ -173,6 +173,7 @@ static int SlruSelectLRUPage(SlruCtl ctl, int64 pageno);
static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
int64 segpage, void *data);
static void SlruInternalDeleteSegment(SlruCtl ctl, int64 segno);
static inline void SlruRecentlyUsed(SlruShared shared, int slotno);
/*
@ -182,8 +183,12 @@ static void SlruInternalDeleteSegment(SlruCtl ctl, int64 segno);
Size
SimpleLruShmemSize(int nslots, int nlsns)
{
int nbanks = nslots / SLRU_BANK_SIZE;
Size sz;
Assert(nslots <= SLRU_MAX_ALLOWED_BUFFERS);
Assert(nslots % SLRU_BANK_SIZE == 0);
/* we assume nslots isn't so large as to risk overflow */
sz = MAXALIGN(sizeof(SlruSharedData));
sz += MAXALIGN(nslots * sizeof(char *)); /* page_buffer[] */
@ -192,6 +197,8 @@ SimpleLruShmemSize(int nslots, int nlsns)
sz += MAXALIGN(nslots * sizeof(int64)); /* page_number[] */
sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */
sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */
sz += MAXALIGN(nbanks * sizeof(LWLockPadded)); /* bank_locks[] */
sz += MAXALIGN(nbanks * sizeof(int)); /* bank_cur_lru_count[] */
if (nlsns > 0)
sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */
@ -199,6 +206,21 @@ SimpleLruShmemSize(int nslots, int nlsns)
return BUFFERALIGN(sz) + BLCKSZ * nslots;
}
/*
* Determine a number of SLRU buffers to use.
*
* We simply divide shared_buffers by the divisor given and cap
* that at the maximum given; but always at least SLRU_BANK_SIZE.
* Round down to the nearest multiple of SLRU_BANK_SIZE.
*/
int
SimpleLruAutotuneBuffers(int divisor, int max)
{
return Min(max - (max % SLRU_BANK_SIZE),
Max(SLRU_BANK_SIZE,
NBuffers / divisor - (NBuffers / divisor) % SLRU_BANK_SIZE));
}
/*
* Initialize, or attach to, a simple LRU cache in shared memory.
*
@ -208,16 +230,20 @@ SimpleLruShmemSize(int nslots, int nlsns)
* nlsns: number of LSN groups per page (set to zero if not relevant).
* ctllock: LWLock to use to control access to the shared control structure.
* subdir: PGDATA-relative subdirectory that will contain the files.
* tranche_id: LWLock tranche ID to use for the SLRU's per-buffer LWLocks.
* buffer_tranche_id: tranche ID to use for the SLRU's per-buffer LWLocks.
* bank_tranche_id: tranche ID to use for the bank LWLocks.
* sync_handler: which set of functions to use to handle sync requests
*/
void
SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
LWLock *ctllock, const char *subdir, int tranche_id,
const char *subdir, int buffer_tranche_id, int bank_tranche_id,
SyncRequestHandler sync_handler, bool long_segment_names)
{
SlruShared shared;
bool found;
int nbanks = nslots / SLRU_BANK_SIZE;
Assert(nslots <= SLRU_MAX_ALLOWED_BUFFERS);
shared = (SlruShared) ShmemInitStruct(name,
SimpleLruShmemSize(nslots, nlsns),
@ -233,12 +259,9 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
memset(shared, 0, sizeof(SlruSharedData));
shared->ControlLock = ctllock;
shared->num_slots = nslots;
shared->lsn_groups_per_page = nlsns;
shared->cur_lru_count = 0;
pg_atomic_init_u64(&shared->latest_page_number, 0);
shared->slru_stats_idx = pgstat_get_slru_index(name);
@ -259,6 +282,10 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
/* Initialize LWLocks */
shared->buffer_locks = (LWLockPadded *) (ptr + offset);
offset += MAXALIGN(nslots * sizeof(LWLockPadded));
shared->bank_locks = (LWLockPadded *) (ptr + offset);
offset += MAXALIGN(nbanks * sizeof(LWLockPadded));
shared->bank_cur_lru_count = (int *) (ptr + offset);
offset += MAXALIGN(nbanks * sizeof(int));
if (nlsns > 0)
{
@ -270,7 +297,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
for (int slotno = 0; slotno < nslots; slotno++)
{
LWLockInitialize(&shared->buffer_locks[slotno].lock,
tranche_id);
buffer_tranche_id);
shared->page_buffer[slotno] = ptr;
shared->page_status[slotno] = SLRU_PAGE_EMPTY;
@ -279,11 +306,21 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
ptr += BLCKSZ;
}
/* Initialize the slot banks. */
for (int bankno = 0; bankno < nbanks; bankno++)
{
LWLockInitialize(&shared->bank_locks[bankno].lock, bank_tranche_id);
shared->bank_cur_lru_count[bankno] = 0;
}
/* Should fit to estimated shmem size */
Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns));
}
else
{
Assert(found);
Assert(shared->num_slots == nslots);
}
/*
* Initialize the unshared control struct, including directory path. We
@ -292,16 +329,33 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
ctl->shared = shared;
ctl->sync_handler = sync_handler;
ctl->long_segment_names = long_segment_names;
ctl->bank_mask = (nslots / SLRU_BANK_SIZE) - 1;
strlcpy(ctl->Dir, subdir, sizeof(ctl->Dir));
}
/*
* Helper function for GUC check_hook to check whether slru buffers are in
* multiples of SLRU_BANK_SIZE.
*/
bool
check_slru_buffers(const char *name, int *newval)
{
/* Valid values are multiples of SLRU_BANK_SIZE */
if (*newval % SLRU_BANK_SIZE == 0)
return true;
GUC_check_errdetail("\"%s\" must be a multiple of %d", name,
SLRU_BANK_SIZE);
return false;
}
/*
* Initialize (or reinitialize) a page to zeroes.
*
* The page is not actually written, just set up in shared memory.
* The slot number of the new page is returned.
*
* Control lock must be held at entry, and will be held at exit.
* Bank lock must be held at entry, and will be held at exit.
*/
int
SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
@ -309,6 +363,8 @@ SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
SlruShared shared = ctl->shared;
int slotno;
Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(ctl, pageno), LW_EXCLUSIVE));
/* Find a suitable buffer slot for the page */
slotno = SlruSelectLRUPage(ctl, pageno);
Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
@ -369,18 +425,21 @@ SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
* guarantee that new I/O hasn't been started before we return, though.
* In fact the slot might not even contain the same page anymore.)
*
* Control lock must be held at entry, and will be held at exit.
* Bank lock must be held at entry, and will be held at exit.
*/
static void
SimpleLruWaitIO(SlruCtl ctl, int slotno)
{
SlruShared shared = ctl->shared;
int bankno = SlotGetBankNumber(slotno);
Assert(&shared->page_status[slotno] != SLRU_PAGE_EMPTY);
/* See notes at top of file */
LWLockRelease(shared->ControlLock);
LWLockRelease(&shared->bank_locks[bankno].lock);
LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED);
LWLockRelease(&shared->buffer_locks[slotno].lock);
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
/*
* If the slot is still in an io-in-progress state, then either someone
@ -423,7 +482,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno)
* Return value is the shared-buffer slot number now holding the page.
* The buffer's LRU access info is updated.
*
* Control lock must be held at entry, and will be held at exit.
* The correct bank lock must be held at entry, and will be held at exit.
*/
int
SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
@ -431,18 +490,21 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
{
SlruShared shared = ctl->shared;
Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(ctl, pageno), LW_EXCLUSIVE));
/* Outer loop handles restart if we must wait for someone else's I/O */
for (;;)
{
int slotno;
int bankno;
bool ok;
/* See if page already is in memory; if not, pick victim slot */
slotno = SlruSelectLRUPage(ctl, pageno);
/* Did we find the page in memory? */
if (shared->page_number[slotno] == pageno &&
shared->page_status[slotno] != SLRU_PAGE_EMPTY)
if (shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
shared->page_number[slotno] == pageno)
{
/*
* If page is still being read in, we must wait for I/O. Likewise
@ -477,9 +539,10 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
bankno = SlotGetBankNumber(slotno);
/* Release control lock while doing I/O */
LWLockRelease(shared->ControlLock);
/* Release bank lock while doing I/O */
LWLockRelease(&shared->bank_locks[bankno].lock);
/* Do the read */
ok = SlruPhysicalReadPage(ctl, pageno, slotno);
@ -487,8 +550,8 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
/* Set the LSNs for this newly read-in page to zero */
SimpleLruZeroLSNs(ctl, slotno);
/* Re-acquire control lock and update page state */
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
/* Re-acquire bank control lock and update page state */
LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
Assert(shared->page_number[slotno] == pageno &&
shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS &&
@ -522,22 +585,25 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
* Return value is the shared-buffer slot number now holding the page.
* The buffer's LRU access info is updated.
*
* Control lock must NOT be held at entry, but will be held at exit.
* Bank control lock must NOT be held at entry, but will be held at exit.
* It is unspecified whether the lock will be shared or exclusive.
*/
int
SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
{
SlruShared shared = ctl->shared;
int bankno = pageno & ctl->bank_mask;
int bankstart = bankno * SLRU_BANK_SIZE;
int bankend = bankstart + SLRU_BANK_SIZE;
/* Try to find the page while holding only shared lock */
LWLockAcquire(shared->ControlLock, LW_SHARED);
LWLockAcquire(&shared->bank_locks[bankno].lock, LW_SHARED);
/* See if page is already in a buffer */
for (int slotno = 0; slotno < shared->num_slots; slotno++)
for (int slotno = bankstart; slotno < bankend; slotno++)
{
if (shared->page_number[slotno] == pageno &&
shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
if (shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
shared->page_number[slotno] == pageno &&
shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
{
/* See comments for SlruRecentlyUsed macro */
@ -551,8 +617,8 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
}
/* No luck, so switch to normal exclusive lock and do regular read */
LWLockRelease(shared->ControlLock);
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
LWLockRelease(&shared->bank_locks[bankno].lock);
LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
return SimpleLruReadPage(ctl, pageno, true, xid);
}
@ -566,15 +632,19 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
* the write). However, we *do* attempt a fresh write even if the page
* is already being written; this is for checkpoints.
*
* Control lock must be held at entry, and will be held at exit.
* Bank lock must be held at entry, and will be held at exit.
*/
static void
SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
{
SlruShared shared = ctl->shared;
int64 pageno = shared->page_number[slotno];
int bankno = SlotGetBankNumber(slotno);
bool ok;
Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY);
Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(ctl, pageno), LW_EXCLUSIVE));
/* If a write is in progress, wait for it to finish */
while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
shared->page_number[slotno] == pageno)
@ -601,8 +671,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
/* Acquire per-buffer lock (cannot deadlock, see notes at top) */
LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
/* Release control lock while doing I/O */
LWLockRelease(shared->ControlLock);
/* Release bank lock while doing I/O */
LWLockRelease(&shared->bank_locks[bankno].lock);
/* Do the write */
ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
@ -614,8 +684,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
CloseTransientFile(fdata->fd[i]);
}
/* Re-acquire control lock and update page state */
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
/* Re-acquire bank lock and update page state */
LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
Assert(shared->page_number[slotno] == pageno &&
shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS);
@ -644,6 +714,8 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
void
SimpleLruWritePage(SlruCtl ctl, int slotno)
{
Assert(&ctl->shared->page_status[slotno] != SLRU_PAGE_EMPTY);
SlruInternalWritePage(ctl, slotno, NULL);
}
@ -1028,17 +1100,53 @@ SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid)
}
/*
* Select the slot to re-use when we need a free slot.
* Mark a buffer slot "most recently used".
*/
static inline void
SlruRecentlyUsed(SlruShared shared, int slotno)
{
int bankno = SlotGetBankNumber(slotno);
int new_lru_count = shared->bank_cur_lru_count[bankno];
Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY);
/*
* The reason for the if-test is that there are often many consecutive
* accesses to the same page (particularly the latest page). By
* suppressing useless increments of bank_cur_lru_count, we reduce the
* probability that old pages' counts will "wrap around" and make them
* appear recently used.
*
* We allow this code to be executed concurrently by multiple processes
* within SimpleLruReadPage_ReadOnly(). As long as int reads and writes
* are atomic, this should not cause any completely-bogus values to enter
* the computation. However, it is possible for either bank_cur_lru_count
* or individual page_lru_count entries to be "reset" to lower values than
* they should have, in case a process is delayed while it executes this
* function. With care in SlruSelectLRUPage(), this does little harm, and
* in any case the absolute worst possible consequence is a nonoptimal
* choice of page to evict. The gain from allowing concurrent reads of
* SLRU pages seems worth it.
*/
if (new_lru_count != shared->page_lru_count[slotno])
{
shared->bank_cur_lru_count[bankno] = ++new_lru_count;
shared->page_lru_count[slotno] = new_lru_count;
}
}
/*
* Select the slot to re-use when we need a free slot for the given page.
*
* The target page number is passed because we need to consider the
* possibility that some other process reads in the target page while
* we are doing I/O to free a slot. Hence, check or recheck to see if
* any slot already holds the target page, and return that slot if so.
* Thus, the returned slot is *either* a slot already holding the pageno
* (could be any state except EMPTY), *or* a freeable slot (state EMPTY
* or CLEAN).
* The target page number is passed not only because we need to know the
* correct bank to use, but also because we need to consider the possibility
* that some other process reads in the target page while we are doing I/O to
* free a slot. Hence, check or recheck to see if any slot already holds the
* target page, and return that slot if so. Thus, the returned slot is
* *either* a slot already holding the pageno (could be any state except
* EMPTY), *or* a freeable slot (state EMPTY or CLEAN).
*
* Control lock must be held at entry, and will be held at exit.
* The correct bank lock must be held at entry, and will be held at exit.
*/
static int
SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
@ -1055,12 +1163,17 @@ SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
int bestinvalidslot = 0; /* keep compiler quiet */
int best_invalid_delta = -1;
int64 best_invalid_page_number = 0; /* keep compiler quiet */
int bankno = pageno & ctl->bank_mask;
int bankstart = bankno * SLRU_BANK_SIZE;
int bankend = bankstart + SLRU_BANK_SIZE;
Assert(LWLockHeldByMe(&shared->bank_locks[bankno].lock));
/* See if page already has a buffer assigned */
for (int slotno = 0; slotno < shared->num_slots; slotno++)
{
if (shared->page_number[slotno] == pageno &&
shared->page_status[slotno] != SLRU_PAGE_EMPTY)
if (shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
shared->page_number[slotno] == pageno)
return slotno;
}
@ -1091,14 +1204,15 @@ SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
* That gets us back on the path to having good data when there are
* multiple pages with the same lru_count.
*/
cur_count = (shared->cur_lru_count)++;
for (int slotno = 0; slotno < shared->num_slots; slotno++)
cur_count = (shared->bank_cur_lru_count[bankno])++;
for (int slotno = bankstart; slotno < bankend; slotno++)
{
int this_delta;
int64 this_page_number;
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
return slotno;
this_delta = cur_count - shared->page_lru_count[slotno];
if (this_delta < 0)
{
@ -1193,6 +1307,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
SlruShared shared = ctl->shared;
SlruWriteAllData fdata;
int64 pageno = 0;
int prevbank = SlotGetBankNumber(0);
bool ok;
/* update the stats counter of flushes */
@ -1203,10 +1318,27 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
*/
fdata.num_files = 0;
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE);
for (int slotno = 0; slotno < shared->num_slots; slotno++)
{
int curbank = SlotGetBankNumber(slotno);
/*
* If the current bank lock is not same as the previous bank lock then
* release the previous lock and acquire the new lock.
*/
if (curbank != prevbank)
{
LWLockRelease(&shared->bank_locks[prevbank].lock);
LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE);
prevbank = curbank;
}
/* Do nothing if slot is unused */
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
continue;
SlruInternalWritePage(ctl, slotno, &fdata);
/*
@ -1220,7 +1352,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
!shared->page_dirty[slotno]));
}
LWLockRelease(shared->ControlLock);
LWLockRelease(&shared->bank_locks[prevbank].lock);
/*
* Now close any files that were open
@ -1259,6 +1391,7 @@ void
SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
{
SlruShared shared = ctl->shared;
int prevbank;
/* update the stats counter of truncates */
pgstat_count_slru_truncate(shared->slru_stats_idx);
@ -1269,8 +1402,6 @@ SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
* or just after a checkpoint, any dirty pages should have been flushed
* already ... we're just being extra careful here.)
*/
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
restart:
/*
@ -1282,15 +1413,29 @@ restart:
if (ctl->PagePrecedes(pg_atomic_read_u64(&shared->latest_page_number),
cutoffPage))
{
LWLockRelease(shared->ControlLock);
ereport(LOG,
(errmsg("could not truncate directory \"%s\": apparent wraparound",
ctl->Dir)));
return;
}
prevbank = SlotGetBankNumber(0);
LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE);
for (int slotno = 0; slotno < shared->num_slots; slotno++)
{
int curbank = SlotGetBankNumber(slotno);
/*
* If the current bank lock is not same as the previous bank lock then
* release the previous lock and acquire the new lock.
*/
if (curbank != prevbank)
{
LWLockRelease(&shared->bank_locks[prevbank].lock);
LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE);
prevbank = curbank;
}
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
continue;
if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage))
@ -1320,10 +1465,12 @@ restart:
SlruInternalWritePage(ctl, slotno, NULL);
else
SimpleLruWaitIO(ctl, slotno);
LWLockRelease(&shared->bank_locks[prevbank].lock);
goto restart;
}
LWLockRelease(shared->ControlLock);
LWLockRelease(&shared->bank_locks[prevbank].lock);
/* Now we can remove the old segment(s) */
(void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
@ -1362,19 +1509,33 @@ void
SlruDeleteSegment(SlruCtl ctl, int64 segno)
{
SlruShared shared = ctl->shared;
int prevbank = SlotGetBankNumber(0);
bool did_write;
/* Clean out any possibly existing references to the segment. */
LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE);
LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE);
restart:
did_write = false;
for (int slotno = 0; slotno < shared->num_slots; slotno++)
{
int pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT;
int pagesegno;
int curbank = SlotGetBankNumber(slotno);
/*
* If the current bank lock is not same as the previous bank lock then
* release the previous lock and acquire the new lock.
*/
if (curbank != prevbank)
{
LWLockRelease(&shared->bank_locks[prevbank].lock);
LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE);
prevbank = curbank;
}
if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
continue;
pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT;
/* not the segment we're looking for */
if (pagesegno != segno)
continue;
@ -1405,7 +1566,7 @@ restart:
SlruInternalDeleteSegment(ctl, segno);
LWLockRelease(shared->ControlLock);
LWLockRelease(&shared->bank_locks[prevbank].lock);
}
/*

View File

@ -31,7 +31,9 @@
#include "access/slru.h"
#include "access/subtrans.h"
#include "access/transam.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "utils/guc_hooks.h"
#include "utils/snapmgr.h"
@ -85,12 +87,14 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
int64 pageno = TransactionIdToPage(xid);
int entryno = TransactionIdToEntry(xid);
int slotno;
LWLock *lock;
TransactionId *ptr;
Assert(TransactionIdIsValid(parent));
Assert(TransactionIdFollows(xid, parent));
LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
lock = SimpleLruGetBankLock(SubTransCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(SubTransCtl, pageno, true, xid);
ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno];
@ -108,7 +112,7 @@ SubTransSetParent(TransactionId xid, TransactionId parent)
SubTransCtl->shared->page_dirty[slotno] = true;
}
LWLockRelease(SubtransSLRULock);
LWLockRelease(lock);
}
/*
@ -138,7 +142,7 @@ SubTransGetParent(TransactionId xid)
parent = *ptr;
LWLockRelease(SubtransSLRULock);
LWLockRelease(SimpleLruGetBankLock(SubTransCtl, pageno));
return parent;
}
@ -186,6 +190,22 @@ SubTransGetTopmostTransaction(TransactionId xid)
return previousXid;
}
/*
* Number of shared SUBTRANS buffers.
*
* If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
* Otherwise just cap the configured amount to be between 16 and the maximum
* allowed.
*/
static int
SUBTRANSShmemBuffers(void)
{
/* auto-tune based on shared buffers */
if (subtransaction_buffers == 0)
return SimpleLruAutotuneBuffers(512, 1024);
return Min(Max(16, subtransaction_buffers), SLRU_MAX_ALLOWED_BUFFERS);
}
/*
* Initialization of shared memory for SUBTRANS
@ -193,20 +213,49 @@ SubTransGetTopmostTransaction(TransactionId xid)
Size
SUBTRANSShmemSize(void)
{
return SimpleLruShmemSize(NUM_SUBTRANS_BUFFERS, 0);
return SimpleLruShmemSize(SUBTRANSShmemBuffers(), 0);
}
void
SUBTRANSShmemInit(void)
{
/* If auto-tuning is requested, now is the time to do it */
if (subtransaction_buffers == 0)
{
char buf[32];
snprintf(buf, sizeof(buf), "%d", SUBTRANSShmemBuffers());
SetConfigOption("subtransaction_buffers", buf, PGC_POSTMASTER,
PGC_S_DYNAMIC_DEFAULT);
/*
* We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
* However, if the DBA explicitly set subtransaction_buffers = 0 in
* the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
* that and we must force the matter with PGC_S_OVERRIDE.
*/
if (subtransaction_buffers == 0) /* failed to apply it? */
SetConfigOption("subtransaction_buffers", buf, PGC_POSTMASTER,
PGC_S_OVERRIDE);
}
Assert(subtransaction_buffers != 0);
SubTransCtl->PagePrecedes = SubTransPagePrecedes;
SimpleLruInit(SubTransCtl, "subtransaction", NUM_SUBTRANS_BUFFERS, 0,
SubtransSLRULock, "pg_subtrans",
LWTRANCHE_SUBTRANS_BUFFER, SYNC_HANDLER_NONE,
false);
SimpleLruInit(SubTransCtl, "subtransaction", SUBTRANSShmemBuffers(), 0,
"pg_subtrans", LWTRANCHE_SUBTRANS_BUFFER,
LWTRANCHE_SUBTRANS_SLRU, SYNC_HANDLER_NONE, false);
SlruPagePrecedesUnitTests(SubTransCtl, SUBTRANS_XACTS_PER_PAGE);
}
/*
* GUC check_hook for subtransaction_buffers
*/
bool
check_subtrans_buffers(int *newval, void **extra, GucSource source)
{
return check_slru_buffers("subtransaction_buffers", newval);
}
/*
* This func must be called ONCE on system install. It creates
* the initial SUBTRANS segment. (The SUBTRANS directory is assumed to
@ -221,8 +270,9 @@ void
BootStrapSUBTRANS(void)
{
int slotno;
LWLock *lock = SimpleLruGetBankLock(SubTransCtl, 0);
LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
/* Create and zero the first page of the subtrans log */
slotno = ZeroSUBTRANSPage(0);
@ -231,7 +281,7 @@ BootStrapSUBTRANS(void)
SimpleLruWritePage(SubTransCtl, slotno);
Assert(!SubTransCtl->shared->page_dirty[slotno]);
LWLockRelease(SubtransSLRULock);
LWLockRelease(lock);
}
/*
@ -261,6 +311,8 @@ StartupSUBTRANS(TransactionId oldestActiveXID)
FullTransactionId nextXid;
int64 startPage;
int64 endPage;
LWLock *prevlock;
LWLock *lock;
/*
* Since we don't expect pg_subtrans to be valid across crashes, we
@ -268,23 +320,47 @@ StartupSUBTRANS(TransactionId oldestActiveXID)
* Whenever we advance into a new page, ExtendSUBTRANS will likewise zero
* the new page without regard to whatever was previously on disk.
*/
LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
startPage = TransactionIdToPage(oldestActiveXID);
nextXid = TransamVariables->nextXid;
endPage = TransactionIdToPage(XidFromFullTransactionId(nextXid));
prevlock = SimpleLruGetBankLock(SubTransCtl, startPage);
LWLockAcquire(prevlock, LW_EXCLUSIVE);
while (startPage != endPage)
{
lock = SimpleLruGetBankLock(SubTransCtl, startPage);
/*
* Check if we need to acquire the lock on the new bank then release
* the lock on the old bank and acquire on the new bank.
*/
if (prevlock != lock)
{
LWLockRelease(prevlock);
LWLockAcquire(lock, LW_EXCLUSIVE);
prevlock = lock;
}
(void) ZeroSUBTRANSPage(startPage);
startPage++;
/* must account for wraparound */
if (startPage > TransactionIdToPage(MaxTransactionId))
startPage = 0;
}
(void) ZeroSUBTRANSPage(startPage);
LWLockRelease(SubtransSLRULock);
lock = SimpleLruGetBankLock(SubTransCtl, startPage);
/*
* Check if we need to acquire the lock on the new bank then release the
* lock on the old bank and acquire on the new bank.
*/
if (prevlock != lock)
{
LWLockRelease(prevlock);
LWLockAcquire(lock, LW_EXCLUSIVE);
}
(void) ZeroSUBTRANSPage(startPage);
LWLockRelease(lock);
}
/*
@ -318,6 +394,7 @@ void
ExtendSUBTRANS(TransactionId newestXact)
{
int64 pageno;
LWLock *lock;
/*
* No work except at first XID of a page. But beware: just after
@ -329,12 +406,13 @@ ExtendSUBTRANS(TransactionId newestXact)
pageno = TransactionIdToPage(newestXact);
LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE);
lock = SimpleLruGetBankLock(SubTransCtl, pageno);
LWLockAcquire(lock, LW_EXCLUSIVE);
/* Zero the page */
ZeroSUBTRANSPage(pageno);
LWLockRelease(SubtransSLRULock);
LWLockRelease(lock);
}

View File

@ -116,7 +116,7 @@
* frontend during startup.) The above design guarantees that notifies from
* other backends will never be missed by ignoring self-notifies.
*
* The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS)
* The amount of shared memory used for notify management (notify_buffers)
* can be varied without affecting anything but performance. The maximum
* amount of notification data that can be queued at one time is determined
* by max_notify_queue_pages GUC.
@ -148,6 +148,7 @@
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/guc_hooks.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
#include "utils/snapmgr.h"
@ -234,7 +235,7 @@ typedef struct QueuePosition
*
* Resist the temptation to make this really large. While that would save
* work in some places, it would add cost in others. In particular, this
* should likely be less than NUM_NOTIFY_BUFFERS, to ensure that backends
* should likely be less than notify_buffers, to ensure that backends
* catch up before the pages they'll need to read fall out of SLRU cache.
*/
#define QUEUE_CLEANUP_DELAY 4
@ -266,9 +267,10 @@ typedef struct QueueBackendStatus
* both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends
* can change the tail pointers.
*
* NotifySLRULock is used as the control lock for the pg_notify SLRU buffers.
* SLRU buffer pool is divided in banks and bank wise SLRU lock is used as
* the control lock for the pg_notify SLRU buffers.
* In order to avoid deadlocks, whenever we need multiple locks, we first get
* NotifyQueueTailLock, then NotifyQueueLock, and lastly NotifySLRULock.
* NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock.
*
* Each backend uses the backend[] array entry with index equal to its
* BackendId (which can range from 1 to MaxBackends). We rely on this to make
@ -492,7 +494,7 @@ AsyncShmemSize(void)
size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus));
size = add_size(size, offsetof(AsyncQueueControl, backend));
size = add_size(size, SimpleLruShmemSize(NUM_NOTIFY_BUFFERS, 0));
size = add_size(size, SimpleLruShmemSize(notify_buffers, 0));
return size;
}
@ -541,8 +543,8 @@ AsyncShmemInit(void)
* names are used in order to avoid wraparound.
*/
NotifyCtl->PagePrecedes = asyncQueuePagePrecedes;
SimpleLruInit(NotifyCtl, "notify", NUM_NOTIFY_BUFFERS, 0,
NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER,
SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0,
"pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU,
SYNC_HANDLER_NONE, true);
if (!found)
@ -1356,7 +1358,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe)
* Eventually we will return NULL indicating all is done.
*
* We are holding NotifyQueueLock already from the caller and grab
* NotifySLRULock locally in this function.
* page specific SLRU bank lock locally in this function.
*/
static ListCell *
asyncQueueAddEntries(ListCell *nextNotify)
@ -1366,9 +1368,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
int64 pageno;
int offset;
int slotno;
/* We hold both NotifyQueueLock and NotifySLRULock during this operation */
LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE);
LWLock *prevlock;
/*
* We work with a local copy of QUEUE_HEAD, which we write back to shared
@ -1389,6 +1389,11 @@ asyncQueueAddEntries(ListCell *nextNotify)
* page should be initialized already, so just fetch it.
*/
pageno = QUEUE_POS_PAGE(queue_head);
prevlock = SimpleLruGetBankLock(NotifyCtl, pageno);
/* We hold both NotifyQueueLock and SLRU bank lock during this operation */
LWLockAcquire(prevlock, LW_EXCLUSIVE);
if (QUEUE_POS_IS_ZERO(queue_head))
slotno = SimpleLruZeroPage(NotifyCtl, pageno);
else
@ -1434,6 +1439,17 @@ asyncQueueAddEntries(ListCell *nextNotify)
/* Advance queue_head appropriately, and detect if page is full */
if (asyncQueueAdvance(&(queue_head), qe.length))
{
LWLock *lock;
pageno = QUEUE_POS_PAGE(queue_head);
lock = SimpleLruGetBankLock(NotifyCtl, pageno);
if (lock != prevlock)
{
LWLockRelease(prevlock);
LWLockAcquire(lock, LW_EXCLUSIVE);
prevlock = lock;
}
/*
* Page is full, so we're done here, but first fill the next page
* with zeroes. The reason to do this is to ensure that slru.c's
@ -1460,7 +1476,7 @@ asyncQueueAddEntries(ListCell *nextNotify)
/* Success, so update the global QUEUE_HEAD */
QUEUE_HEAD = queue_head;
LWLockRelease(NotifySLRULock);
LWLockRelease(prevlock);
return nextNotify;
}
@ -1931,9 +1947,9 @@ asyncQueueReadAllNotifications(void)
/*
* We copy the data from SLRU into a local buffer, so as to avoid
* holding the NotifySLRULock while we are examining the entries
* and possibly transmitting them to our frontend. Copy only the
* part of the page we will actually inspect.
* holding the SLRU lock while we are examining the entries and
* possibly transmitting them to our frontend. Copy only the part
* of the page we will actually inspect.
*/
slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage,
InvalidTransactionId);
@ -1953,7 +1969,7 @@ asyncQueueReadAllNotifications(void)
NotifyCtl->shared->page_buffer[slotno] + curoffset,
copysize);
/* Release lock that we got from SimpleLruReadPage_ReadOnly() */
LWLockRelease(NotifySLRULock);
LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage));
/*
* Process messages up to the stop position, end of page, or an
@ -1994,7 +2010,7 @@ asyncQueueReadAllNotifications(void)
*
* The current page must have been fetched into page_buffer from shared
* memory. (We could access the page right in shared memory, but that
* would imply holding the NotifySLRULock throughout this routine.)
* would imply holding the SLRU bank lock throughout this routine.)
*
* We stop if we reach the "stop" position, or reach a notification from an
* uncommitted transaction, or reach the end of the page.
@ -2147,7 +2163,7 @@ asyncQueueAdvanceTail(void)
if (asyncQueuePagePrecedes(oldtailpage, boundary))
{
/*
* SimpleLruTruncate() will ask for NotifySLRULock but will also
* SimpleLruTruncate() will ask for SLRU bank locks but will also
* release the lock again.
*/
SimpleLruTruncate(NotifyCtl, newtailpage);
@ -2378,3 +2394,12 @@ ClearPendingActionsAndNotifies(void)
pendingActions = NULL;
pendingNotifies = NULL;
}
/*
* GUC check_hook for notify_buffers
*/
bool
check_notify_buffers(int *newval, void **extra, GucSource source)
{
return check_slru_buffers("notify_buffers", newval);
}

View File

@ -163,6 +163,13 @@ static const char *const BuiltinTrancheNames[] = {
[LWTRANCHE_LAUNCHER_HASH] = "LogicalRepLauncherHash",
[LWTRANCHE_DSM_REGISTRY_DSA] = "DSMRegistryDSA",
[LWTRANCHE_DSM_REGISTRY_HASH] = "DSMRegistryHash",
[LWTRANCHE_COMMITTS_SLRU] = "CommitTSSLRU",
[LWTRANCHE_MULTIXACTOFFSET_SLRU] = "MultixactOffsetSLRU",
[LWTRANCHE_MULTIXACTMEMBER_SLRU] = "MultixactMemberSLRU",
[LWTRANCHE_NOTIFY_SLRU] = "NotifySLRU",
[LWTRANCHE_SERIAL_SLRU] = "SerialSLRU",
[LWTRANCHE_SUBTRANS_SLRU] = "SubtransSLRU",
[LWTRANCHE_XACT_SLRU] = "XactSLRU",
};
StaticAssertDecl(lengthof(BuiltinTrancheNames) ==
@ -776,7 +783,7 @@ GetLWLockIdentifier(uint32 classId, uint16 eventId)
* in mode.
*
* This function will not block waiting for a lock to become free - that's the
* callers job.
* caller's job.
*
* Returns true if the lock isn't free and we need to wait.
*/

View File

@ -16,11 +16,11 @@ WALBufMappingLock 7
WALWriteLock 8
ControlFileLock 9
# 10 was CheckpointLock
XactSLRULock 11
SubtransSLRULock 12
# 11 was XactSLRULock
# 12 was SubtransSLRULock
MultiXactGenLock 13
MultiXactOffsetSLRULock 14
MultiXactMemberSLRULock 15
# 14 was MultiXactOffsetSLRULock
# 15 was MultiXactMemberSLRULock
RelCacheInitLock 16
CheckpointerCommLock 17
TwoPhaseStateLock 18
@ -31,19 +31,19 @@ AutovacuumLock 22
AutovacuumScheduleLock 23
SyncScanLock 24
RelationMappingLock 25
NotifySLRULock 26
#26 was NotifySLRULock
NotifyQueueLock 27
SerializableXactHashLock 28
SerializableFinishedListLock 29
SerializablePredicateListLock 30
SerialSLRULock 31
# 31 was SerialSLRULock
SyncRepLock 32
BackgroundWorkerLock 33
DynamicSharedMemoryControlLock 34
AutoFileLock 35
ReplicationSlotAllocationLock 36
ReplicationSlotControlLock 37
CommitTsSLRULock 38
#38 was CommitTsSLRULock
CommitTsLock 39
ReplicationOriginLock 40
MultiXactTruncationLock 41

View File

@ -213,6 +213,7 @@
#include "storage/predicate_internals.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/guc_hooks.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
@ -813,9 +814,9 @@ SerialInit(void)
*/
SerialSlruCtl->PagePrecedes = SerialPagePrecedesLogically;
SimpleLruInit(SerialSlruCtl, "serializable",
NUM_SERIAL_BUFFERS, 0, SerialSLRULock, "pg_serial",
LWTRANCHE_SERIAL_BUFFER, SYNC_HANDLER_NONE,
false);
serializable_buffers, 0, "pg_serial",
LWTRANCHE_SERIAL_BUFFER, LWTRANCHE_SERIAL_SLRU,
SYNC_HANDLER_NONE, false);
#ifdef USE_ASSERT_CHECKING
SerialPagePrecedesLogicallyUnitTests();
#endif
@ -841,6 +842,15 @@ SerialInit(void)
}
}
/*
* GUC check_hook for serializable_buffers
*/
bool
check_serial_buffers(int *newval, void **extra, GucSource source)
{
return check_slru_buffers("serializable_buffers", newval);
}
/*
* Record a committed read write serializable xid and the minimum
* commitSeqNo of any transactions to which this xid had a rw-conflict out.
@ -854,15 +864,17 @@ SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
int slotno;
int64 firstZeroPage;
bool isNewPage;
LWLock *lock;
Assert(TransactionIdIsValid(xid));
targetPage = SerialPage(xid);
lock = SimpleLruGetBankLock(SerialSlruCtl, targetPage);
/*
* In this routine, we must hold both SerialControlLock and SerialSLRULock
* simultaneously while making the SLRU data catch up with the new state
* that we determine.
* In this routine, we must hold both SerialControlLock and the SLRU bank
* lock simultaneously while making the SLRU data catch up with the new
* state that we determine.
*/
LWLockAcquire(SerialControlLock, LW_EXCLUSIVE);
@ -898,7 +910,7 @@ SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
if (isNewPage)
serialControl->headPage = targetPage;
LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
if (isNewPage)
{
@ -916,7 +928,7 @@ SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
SerialValue(slotno, xid) = minConflictCommitSeqNo;
SerialSlruCtl->shared->page_dirty[slotno] = true;
LWLockRelease(SerialSLRULock);
LWLockRelease(lock);
LWLockRelease(SerialControlLock);
}
@ -950,13 +962,13 @@ SerialGetMinConflictCommitSeqNo(TransactionId xid)
return 0;
/*
* The following function must be called without holding SerialSLRULock,
* The following function must be called without holding SLRU bank lock,
* but will return with that lock held, which must then be released.
*/
slotno = SimpleLruReadPage_ReadOnly(SerialSlruCtl,
SerialPage(xid), xid);
val = SerialValue(slotno, xid);
LWLockRelease(SerialSLRULock);
LWLockRelease(SimpleLruGetBankLock(SerialSlruCtl, SerialPage(xid)));
return val;
}
@ -1367,7 +1379,7 @@ PredicateLockShmemSize(void)
/* Shared memory structures for SLRU tracking of old committed xids. */
size = add_size(size, sizeof(SerialControlData));
size = add_size(size, SimpleLruShmemSize(NUM_SERIAL_BUFFERS, 0));
size = add_size(size, SimpleLruShmemSize(serializable_buffers, 0));
return size;
}

View File

@ -295,11 +295,7 @@ SInvalWrite "Waiting to add a message to the shared catalog invalidation queue."
WALBufMapping "Waiting to replace a page in WAL buffers."
WALWrite "Waiting for WAL buffers to be written to disk."
ControlFile "Waiting to read or update the <filename>pg_control</filename> file or create a new WAL file."
XactSLRU "Waiting to access the transaction status SLRU cache."
SubtransSLRU "Waiting to access the sub-transaction SLRU cache."
MultiXactGen "Waiting to read or update shared multixact state."
MultiXactOffsetSLRU "Waiting to access the multixact offset SLRU cache."
MultiXactMemberSLRU "Waiting to access the multixact member SLRU cache."
RelCacheInit "Waiting to read or update a <filename>pg_internal.init</filename> relation cache initialization file."
CheckpointerComm "Waiting to manage fsync requests."
TwoPhaseState "Waiting to read or update the state of prepared transactions."
@ -310,19 +306,16 @@ Autovacuum "Waiting to read or update the current state of autovacuum workers."
AutovacuumSchedule "Waiting to ensure that a table selected for autovacuum still needs vacuuming."
SyncScan "Waiting to select the starting location of a synchronized table scan."
RelationMapping "Waiting to read or update a <filename>pg_filenode.map</filename> file (used to track the filenode assignments of certain system catalogs)."
NotifySLRU "Waiting to access the <command>NOTIFY</command> message SLRU cache."
NotifyQueue "Waiting to read or update <command>NOTIFY</command> messages."
SerializableXactHash "Waiting to read or update information about serializable transactions."
SerializableFinishedList "Waiting to access the list of finished serializable transactions."
SerializablePredicateList "Waiting to access the list of predicate locks held by serializable transactions."
SerialSLRU "Waiting to access the serializable transaction conflict SLRU cache."
SyncRep "Waiting to read or update information about the state of synchronous replication."
BackgroundWorker "Waiting to read or update background worker state."
DynamicSharedMemoryControl "Waiting to read or update dynamic shared memory allocation information."
AutoFile "Waiting to update the <filename>postgresql.auto.conf</filename> file."
ReplicationSlotAllocation "Waiting to allocate or free a replication slot."
ReplicationSlotControl "Waiting to read or update replication slot state."
CommitTsSLRU "Waiting to access the commit timestamp SLRU cache."
CommitTs "Waiting to read or update the last value set for a transaction commit timestamp."
ReplicationOrigin "Waiting to create, drop or use a replication origin."
MultiXactTruncation "Waiting to read or truncate multixact information."
@ -375,6 +368,14 @@ LogicalRepLauncherDSA "Waiting to access logical replication launcher's dynamic
LogicalRepLauncherHash "Waiting to access logical replication launcher's shared hash table."
DSMRegistryDSA "Waiting to access dynamic shared memory registry's dynamic shared memory allocator."
DSMRegistryHash "Waiting to access dynamic shared memory registry's shared hash table."
CommitTsSLRU "Waiting to access the commit timestamp SLRU cache."
MultiXactOffsetSLRU "Waiting to access the multixact offset SLRU cache."
MultiXactMemberSLRU "Waiting to access the multixact member SLRU cache."
NotifySLRU "Waiting to access the <command>NOTIFY</command> message SLRU cache."
SerialSLRU "Waiting to access the serializable transaction conflict SLRU cache."
SubtransSLRU "Waiting to access the sub-transaction SLRU cache."
XactSLRU "Waiting to access the transaction status SLRU cache."
#
# Wait Events - Lock

View File

@ -157,3 +157,12 @@ int64 VacuumPageDirty = 0;
int VacuumCostBalance = 0; /* working state for vacuum */
bool VacuumCostActive = false;
/* configurable SLRU buffer sizes */
int commit_timestamp_buffers = 0;
int multixact_member_buffers = 32;
int multixact_offset_buffers = 16;
int notify_buffers = 16;
int serializable_buffers = 32;
int subtransaction_buffers = 0;
int transaction_buffers = 0;

View File

@ -28,6 +28,7 @@
#include "access/commit_ts.h"
#include "access/gin.h"
#include "access/slru.h"
#include "access/toast_compression.h"
#include "access/twophase.h"
#include "access/xlog_internal.h"
@ -2283,6 +2284,83 @@ struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
{
{"commit_timestamp_buffers", PGC_POSTMASTER, RESOURCES_MEM,
gettext_noop("Sets the size of the dedicated buffer pool used for the commit timestamp cache."),
NULL,
GUC_UNIT_BLOCKS
},
&commit_timestamp_buffers,
0, 0, SLRU_MAX_ALLOWED_BUFFERS,
check_commit_ts_buffers, NULL, NULL
},
{
{"multixact_member_buffers", PGC_POSTMASTER, RESOURCES_MEM,
gettext_noop("Sets the size of the dedicated buffer pool used for the MultiXact member cache."),
NULL,
GUC_UNIT_BLOCKS
},
&multixact_member_buffers,
32, 16, SLRU_MAX_ALLOWED_BUFFERS,
check_multixact_member_buffers, NULL, NULL
},
{
{"multixact_offset_buffers", PGC_POSTMASTER, RESOURCES_MEM,
gettext_noop("Sets the size of the dedicated buffer pool used for the MultiXact offset cache."),
NULL,
GUC_UNIT_BLOCKS
},
&multixact_offset_buffers,
16, 16, SLRU_MAX_ALLOWED_BUFFERS,
check_multixact_offset_buffers, NULL, NULL
},
{
{"notify_buffers", PGC_POSTMASTER, RESOURCES_MEM,
gettext_noop("Sets the size of the dedicated buffer pool used for the LISTEN/NOTIFY message cache."),
NULL,
GUC_UNIT_BLOCKS
},
&notify_buffers,
16, 16, SLRU_MAX_ALLOWED_BUFFERS,
check_notify_buffers, NULL, NULL
},
{
{"serializable_buffers", PGC_POSTMASTER, RESOURCES_MEM,
gettext_noop("Sets the size of the dedicated buffer pool used for the serializable transaction cache."),
NULL,
GUC_UNIT_BLOCKS
},
&serializable_buffers,
32, 16, SLRU_MAX_ALLOWED_BUFFERS,
check_serial_buffers, NULL, NULL
},
{
{"subtransaction_buffers", PGC_POSTMASTER, RESOURCES_MEM,
gettext_noop("Sets the size of the dedicated buffer pool used for the sub-transaction cache."),
NULL,
GUC_UNIT_BLOCKS
},
&subtransaction_buffers,
0, 0, SLRU_MAX_ALLOWED_BUFFERS,
check_subtrans_buffers, NULL, NULL
},
{
{"transaction_buffers", PGC_POSTMASTER, RESOURCES_MEM,
gettext_noop("Sets the size of the dedicated buffer pool used for the transaction status cache."),
NULL,
GUC_UNIT_BLOCKS
},
&transaction_buffers,
0, 0, SLRU_MAX_ALLOWED_BUFFERS,
check_transaction_buffers, NULL, NULL
},
{
{"temp_buffers", PGC_USERSET, RESOURCES_MEM,
gettext_noop("Sets the maximum number of temporary buffers used by each session."),

View File

@ -50,6 +50,15 @@
#external_pid_file = '' # write an extra PID file
# (change requires restart)
# - SLRU Buffers (change requires restart) -
#commit_timestamp_buffers = 0 # memory for pg_commit_ts (0 = auto)
#multixact_offset_buffers = 16 # memory for pg_multixact/offsets
#multixact_member_buffers = 32 # memory for pg_multixact/members
#notify_buffers = 16 # memory for pg_notify
#serializable_buffers = 32 # memory for pg_serial
#subtransaction_buffers = 0 # memory for pg_subtrans (0 = auto)
#transaction_buffers = 0 # memory for pg_xact (0 = auto)
#------------------------------------------------------------------------------
# CONNECTIONS AND AUTHENTICATION

View File

@ -40,7 +40,6 @@ extern void TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
extern XidStatus TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn);
extern Size CLOGShmemBuffers(void);
extern Size CLOGShmemSize(void);
extern void CLOGShmemInit(void);
extern void BootStrapCLOG(void);

View File

@ -27,7 +27,6 @@ extern bool TransactionIdGetCommitTsData(TransactionId xid,
extern TransactionId GetLatestCommitTsData(TimestampTz *ts,
RepOriginId *nodeid);
extern Size CommitTsShmemBuffers(void);
extern Size CommitTsShmemSize(void);
extern void CommitTsShmemInit(void);
extern void BootStrapCommitTs(void);

View File

@ -29,10 +29,6 @@
#define MaxMultiXactOffset ((MultiXactOffset) 0xFFFFFFFF)
/* Number of SLRU buffers to use for multixact */
#define NUM_MULTIXACTOFFSET_BUFFERS 8
#define NUM_MULTIXACTMEMBER_BUFFERS 16
/*
* Possible multixact lock modes ("status"). The first four modes are for
* tuple locks (FOR KEY SHARE, FOR SHARE, FOR NO KEY UPDATE, FOR UPDATE); the

View File

@ -17,6 +17,11 @@
#include "storage/lwlock.h"
#include "storage/sync.h"
/*
* To avoid overflowing internal arithmetic and the size_t data type, the
* number of buffers must not exceed this number.
*/
#define SLRU_MAX_ALLOWED_BUFFERS ((1024 * 1024 * 1024) / BLCKSZ)
/*
* Define SLRU segment size. A page is the same BLCKSZ as is used everywhere
@ -55,8 +60,6 @@ typedef enum
*/
typedef struct SlruSharedData
{
LWLock *ControlLock;
/* Number of buffers managed by this SLRU structure */
int num_slots;
@ -69,30 +72,41 @@ typedef struct SlruSharedData
bool *page_dirty;
int64 *page_number;
int *page_lru_count;
/* The buffer_locks protects the I/O on each buffer slots */
LWLockPadded *buffer_locks;
/* Locks to protect the in memory buffer slot access in SLRU bank. */
LWLockPadded *bank_locks;
/*----------
* A bank-wise LRU counter is maintained because we do a victim buffer
* search within a bank. Furthermore, manipulating an individual bank
* counter avoids frequent cache invalidation since we update it every time
* we access the page.
*
* We mark a page "most recently used" by setting
* page_lru_count[slotno] = ++bank_cur_lru_count[bankno];
* The oldest page in the bank is therefore the one with the highest value
* of
* bank_cur_lru_count[bankno] - page_lru_count[slotno]
* The counts will eventually wrap around, but this calculation still
* works as long as no page's age exceeds INT_MAX counts.
*----------
*/
int *bank_cur_lru_count;
/*
* Optional array of WAL flush LSNs associated with entries in the SLRU
* pages. If not zero/NULL, we must flush WAL before writing pages (true
* for pg_xact, false for multixact, pg_subtrans, pg_notify). group_lsn[]
* has lsn_groups_per_page entries per buffer slot, each containing the
* for pg_xact, false for everything else). group_lsn[] has
* lsn_groups_per_page entries per buffer slot, each containing the
* highest LSN known for a contiguous group of SLRU entries on that slot's
* page.
*/
XLogRecPtr *group_lsn;
int lsn_groups_per_page;
/*----------
* We mark a page "most recently used" by setting
* page_lru_count[slotno] = ++cur_lru_count;
* The oldest page is therefore the one with the highest value of
* cur_lru_count - page_lru_count[slotno]
* The counts will eventually wrap around, but this calculation still
* works as long as no page's age exceeds INT_MAX counts.
*----------
*/
int cur_lru_count;
/*
* latest_page_number is the page number of the current end of the log;
* this is not critical data, since we use it only to avoid swapping out
@ -114,6 +128,19 @@ typedef struct SlruCtlData
{
SlruShared shared;
/*
* Bitmask to determine bank number from page number.
*/
bits16 bank_mask;
/*
* If true, use long segment filenames formed from lower 48 bits of the
* segment number, e.g. pg_xact/000000001234. Otherwise, use short
* filenames formed from lower 16 bits of the segment number e.g.
* pg_xact/1234.
*/
bool long_segment_names;
/*
* Which sync handler function to use when handing sync requests over to
* the checkpointer. SYNC_HANDLER_NONE to disable fsync (eg pg_notify).
@ -132,28 +159,36 @@ typedef struct SlruCtlData
*/
bool (*PagePrecedes) (int64, int64);
/*
* If true, use long segment filenames formed from lower 48 bits of the
* segment number, e.g. pg_xact/000000001234. Otherwise, use short
* filenames formed from lower 16 bits of the segment number e.g.
* pg_xact/1234.
*/
bool long_segment_names;
/*
* Dir is set during SimpleLruInit and does not change thereafter. Since
* it's always the same, it doesn't need to be in shared memory.
*/
char Dir[64];
} SlruCtlData;
typedef SlruCtlData *SlruCtl;
/*
* Get the SLRU bank lock for given SlruCtl and the pageno.
*
* This lock needs to be acquired to access the slru buffer slots in the
* respective bank.
*/
static inline LWLock *
SimpleLruGetBankLock(SlruCtl ctl, int64 pageno)
{
int bankno;
bankno = pageno & ctl->bank_mask;
return &(ctl->shared->bank_locks[bankno].lock);
}
extern Size SimpleLruShmemSize(int nslots, int nlsns);
extern int SimpleLruAutotuneBuffers(int divisor, int max);
extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
LWLock *ctllock, const char *subdir, int tranche_id,
SyncRequestHandler sync_handler,
const char *subdir, int buffer_tranche_id,
int bank_tranche_id, SyncRequestHandler sync_handler,
bool long_segment_names);
extern int SimpleLruZeroPage(SlruCtl ctl, int64 pageno);
extern int SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
@ -182,5 +217,6 @@ extern bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename,
int64 segpage, void *data);
extern bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage,
void *data);
extern bool check_slru_buffers(const char *name, int *newval);
#endif /* SLRU_H */

View File

@ -11,9 +11,6 @@
#ifndef SUBTRANS_H
#define SUBTRANS_H
/* Number of SLRU buffers to use for subtrans */
#define NUM_SUBTRANS_BUFFERS 32
extern void SubTransSetParent(TransactionId xid, TransactionId parent);
extern TransactionId SubTransGetParent(TransactionId xid);
extern TransactionId SubTransGetTopmostTransaction(TransactionId xid);

View File

@ -15,11 +15,6 @@
#include <signal.h>
/*
* The number of SLRU page buffers we use for the notification queue.
*/
#define NUM_NOTIFY_BUFFERS 8
extern PGDLLIMPORT bool Trace_notify;
extern PGDLLIMPORT int max_notify_queue_pages;
extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending;

View File

@ -179,6 +179,14 @@ extern PGDLLIMPORT int MaxConnections;
extern PGDLLIMPORT int max_worker_processes;
extern PGDLLIMPORT int max_parallel_workers;
extern PGDLLIMPORT int commit_timestamp_buffers;
extern PGDLLIMPORT int multixact_member_buffers;
extern PGDLLIMPORT int multixact_offset_buffers;
extern PGDLLIMPORT int notify_buffers;
extern PGDLLIMPORT int serializable_buffers;
extern PGDLLIMPORT int subtransaction_buffers;
extern PGDLLIMPORT int transaction_buffers;
extern PGDLLIMPORT int MyProcPid;
extern PGDLLIMPORT pg_time_t MyStartTime;
extern PGDLLIMPORT TimestampTz MyStartTimestamp;

View File

@ -209,6 +209,13 @@ typedef enum BuiltinTrancheIds
LWTRANCHE_LAUNCHER_HASH,
LWTRANCHE_DSM_REGISTRY_DSA,
LWTRANCHE_DSM_REGISTRY_HASH,
LWTRANCHE_COMMITTS_SLRU,
LWTRANCHE_MULTIXACTMEMBER_SLRU,
LWTRANCHE_MULTIXACTOFFSET_SLRU,
LWTRANCHE_NOTIFY_SLRU,
LWTRANCHE_SERIAL_SLRU,
LWTRANCHE_SUBTRANS_SLRU,
LWTRANCHE_XACT_SLRU,
LWTRANCHE_FIRST_USER_DEFINED,
} BuiltinTrancheIds;

View File

@ -26,10 +26,6 @@ extern PGDLLIMPORT int max_predicate_locks_per_xact;
extern PGDLLIMPORT int max_predicate_locks_per_relation;
extern PGDLLIMPORT int max_predicate_locks_per_page;
/* Number of SLRU buffers to use for Serial SLRU */
#define NUM_SERIAL_BUFFERS 16
/*
* A handle used for sharing SERIALIZABLEXACT objects between the participants
* in a parallel query.

View File

@ -46,6 +46,8 @@ extern bool check_client_connection_check_interval(int *newval, void **extra,
extern bool check_client_encoding(char **newval, void **extra, GucSource source);
extern void assign_client_encoding(const char *newval, void *extra);
extern bool check_cluster_name(char **newval, void **extra, GucSource source);
extern bool check_commit_ts_buffers(int *newval, void **extra,
GucSource source);
extern const char *show_data_directory_mode(void);
extern bool check_datestyle(char **newval, void **extra, GucSource source);
extern void assign_datestyle(const char *newval, void *extra);
@ -91,6 +93,11 @@ extern bool check_max_worker_processes(int *newval, void **extra,
GucSource source);
extern bool check_max_stack_depth(int *newval, void **extra, GucSource source);
extern void assign_max_stack_depth(int newval, void *extra);
extern bool check_multixact_member_buffers(int *newval, void **extra,
GucSource source);
extern bool check_multixact_offset_buffers(int *newval, void **extra,
GucSource source);
extern bool check_notify_buffers(int *newval, void **extra, GucSource source);
extern bool check_primary_slot_name(char **newval, void **extra,
GucSource source);
extern bool check_random_seed(double *newval, void **extra, GucSource source);
@ -122,12 +129,15 @@ extern void assign_role(const char *newval, void *extra);
extern const char *show_role(void);
extern bool check_search_path(char **newval, void **extra, GucSource source);
extern void assign_search_path(const char *newval, void *extra);
extern bool check_serial_buffers(int *newval, void **extra, GucSource source);
extern bool check_session_authorization(char **newval, void **extra, GucSource source);
extern void assign_session_authorization(const char *newval, void *extra);
extern void assign_session_replication_role(int newval, void *extra);
extern void assign_stats_fetch_consistency(int newval, void *extra);
extern bool check_ssl(bool *newval, void **extra, GucSource source);
extern bool check_stage_log_stats(bool *newval, void **extra, GucSource source);
extern bool check_subtrans_buffers(int *newval, void **extra,
GucSource source);
extern bool check_synchronous_standby_names(char **newval, void **extra,
GucSource source);
extern void assign_synchronous_standby_names(const char *newval, void *extra);
@ -152,6 +162,7 @@ extern const char *show_timezone(void);
extern bool check_timezone_abbreviations(char **newval, void **extra,
GucSource source);
extern void assign_timezone_abbreviations(const char *newval, void *extra);
extern bool check_transaction_buffers(int *newval, void **extra, GucSource source);
extern bool check_transaction_deferrable(bool *newval, void **extra, GucSource source);
extern bool check_transaction_isolation(int *newval, void **extra, GucSource source);
extern bool check_transaction_read_only(bool *newval, void **extra, GucSource source);

View File

@ -40,10 +40,6 @@ PG_FUNCTION_INFO_V1(test_slru_delete_all);
/* Number of SLRU page slots */
#define NUM_TEST_BUFFERS 16
/* SLRU control lock */
LWLock TestSLRULock;
#define TestSLRULock (&TestSLRULock)
static SlruCtlData TestSlruCtlData;
#define TestSlruCtl (&TestSlruCtlData)
@ -63,9 +59,9 @@ test_slru_page_write(PG_FUNCTION_ARGS)
int64 pageno = PG_GETARG_INT64(0);
char *data = text_to_cstring(PG_GETARG_TEXT_PP(1));
int slotno;
LWLock *lock = SimpleLruGetBankLock(TestSlruCtl, pageno);
LWLockAcquire(TestSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruZeroPage(TestSlruCtl, pageno);
/* these should match */
@ -80,7 +76,7 @@ test_slru_page_write(PG_FUNCTION_ARGS)
BLCKSZ - 1);
SimpleLruWritePage(TestSlruCtl, slotno);
LWLockRelease(TestSLRULock);
LWLockRelease(lock);
PG_RETURN_VOID();
}
@ -99,13 +95,14 @@ test_slru_page_read(PG_FUNCTION_ARGS)
bool write_ok = PG_GETARG_BOOL(1);
char *data = NULL;
int slotno;
LWLock *lock = SimpleLruGetBankLock(TestSlruCtl, pageno);
/* find page in buffers, reading it if necessary */
LWLockAcquire(TestSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
slotno = SimpleLruReadPage(TestSlruCtl, pageno,
write_ok, InvalidTransactionId);
data = (char *) TestSlruCtl->shared->page_buffer[slotno];
LWLockRelease(TestSLRULock);
LWLockRelease(lock);
PG_RETURN_TEXT_P(cstring_to_text(data));
}
@ -116,14 +113,15 @@ test_slru_page_readonly(PG_FUNCTION_ARGS)
int64 pageno = PG_GETARG_INT64(0);
char *data = NULL;
int slotno;
LWLock *lock = SimpleLruGetBankLock(TestSlruCtl, pageno);
/* find page in buffers, reading it if necessary */
slotno = SimpleLruReadPage_ReadOnly(TestSlruCtl,
pageno,
InvalidTransactionId);
Assert(LWLockHeldByMe(TestSLRULock));
Assert(LWLockHeldByMe(lock));
data = (char *) TestSlruCtl->shared->page_buffer[slotno];
LWLockRelease(TestSLRULock);
LWLockRelease(lock);
PG_RETURN_TEXT_P(cstring_to_text(data));
}
@ -133,10 +131,11 @@ test_slru_page_exists(PG_FUNCTION_ARGS)
{
int64 pageno = PG_GETARG_INT64(0);
bool found;
LWLock *lock = SimpleLruGetBankLock(TestSlruCtl, pageno);
LWLockAcquire(TestSLRULock, LW_EXCLUSIVE);
LWLockAcquire(lock, LW_EXCLUSIVE);
found = SimpleLruDoesPhysicalPageExist(TestSlruCtl, pageno);
LWLockRelease(TestSLRULock);
LWLockRelease(lock);
PG_RETURN_BOOL(found);
}
@ -221,6 +220,7 @@ test_slru_shmem_startup(void)
const bool long_segment_names = true;
const char slru_dir_name[] = "pg_test_slru";
int test_tranche_id;
int test_buffer_tranche_id;
if (prev_shmem_startup_hook)
prev_shmem_startup_hook();
@ -234,12 +234,15 @@ test_slru_shmem_startup(void)
/* initialize the SLRU facility */
test_tranche_id = LWLockNewTrancheId();
LWLockRegisterTranche(test_tranche_id, "test_slru_tranche");
LWLockInitialize(TestSLRULock, test_tranche_id);
test_buffer_tranche_id = LWLockNewTrancheId();
LWLockRegisterTranche(test_tranche_id, "test_buffer_tranche");
TestSlruCtl->PagePrecedes = test_slru_page_precedes_logically;
SimpleLruInit(TestSlruCtl, "TestSLRU",
NUM_TEST_BUFFERS, 0, TestSLRULock, slru_dir_name,
test_tranche_id, SYNC_HANDLER_NONE, long_segment_names);
NUM_TEST_BUFFERS, 0, slru_dir_name,
test_buffer_tranche_id, test_tranche_id, SYNC_HANDLER_NONE,
long_segment_names);
}
void