Introduce a new smgr bulk loading facility.

The new facility makes it easier to optimize bulk loading, as the
logic for buffering, WAL-logging, and syncing the relation only needs
to be implemented once. It's also less error-prone: We have had a
number of bugs in how a relation is fsync'd - or not - at the end of a
bulk loading operation. By centralizing that logic to one place, we
only need to write it correctly once.

The new facility is faster for small relations: Instead of of calling
smgrimmedsync(), we register the fsync to happen at next checkpoint,
which avoids the fsync latency. That can make a big difference if you
are e.g. restoring a schema-only dump with lots of relations.

It is also slightly more efficient with large relations, as the WAL
logging is performed multiple pages at a time. That avoids some WAL
header overhead. The sorted GiST index build did that already, this
moves the buffering to the new facility.

The changes to pageinspect GiST test needs an explanation: Before this
patch, the sorted GiST index build set the LSN on every page to the
special GistBuildLSN value, not the LSN of the WAL record, even though
they were WAL-logged. There was no particular need for it, it just
happened naturally when we wrote out the pages before WAL-logging
them. Now we WAL-log the pages first, like in B-tree build, so the
pages are stamped with the record's real LSN. When the build is not
WAL-logged, we still use GistBuildLSN. To make the test output
predictable, use an unlogged index.

Reviewed-by: Andres Freund
Discussion: https://www.postgresql.org/message-id/30e8f366-58b3-b239-c521-422122dd5150%40iki.fi
This commit is contained in:
Heikki Linnakangas 2024-02-23 16:10:51 +02:00
parent e612384fc7
commit 8af2565248
17 changed files with 551 additions and 354 deletions

View File

@ -1,13 +1,6 @@
-- The gist_page_opaque_info() function prints the page's LSN. Normally,
-- that's constant 1 (GistBuildLSN) on every page of a freshly built GiST
-- index. But with wal_level=minimal, the whole relation is dumped to WAL at
-- the end of the transaction if it's smaller than wal_skip_threshold, which
-- updates the LSNs. Wrap the tests on gist_page_opaque_info() in the
-- same transaction with the CREATE INDEX so that we see the LSNs before
-- they are possibly overwritten at end of transaction.
BEGIN;
-- Create a test table and GiST index.
CREATE TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
-- The gist_page_opaque_info() function prints the page's LSN.
-- Use an unlogged index, so that the LSN is predictable.
CREATE UNLOGGED TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
generate_series(1,1000) i;
CREATE INDEX test_gist_idx ON test_gist USING gist (p);
-- Page 0 is the root, the rest are leaf pages
@ -29,7 +22,6 @@ SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 2));
0/1 | 0/0 | 1 | {leaf}
(1 row)
COMMIT;
SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 0), 'test_gist_idx');
itemoffset | ctid | itemlen | dead | keys
------------+-----------+---------+------+-------------------------------

View File

@ -1,14 +1,6 @@
-- The gist_page_opaque_info() function prints the page's LSN. Normally,
-- that's constant 1 (GistBuildLSN) on every page of a freshly built GiST
-- index. But with wal_level=minimal, the whole relation is dumped to WAL at
-- the end of the transaction if it's smaller than wal_skip_threshold, which
-- updates the LSNs. Wrap the tests on gist_page_opaque_info() in the
-- same transaction with the CREATE INDEX so that we see the LSNs before
-- they are possibly overwritten at end of transaction.
BEGIN;
-- Create a test table and GiST index.
CREATE TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
-- The gist_page_opaque_info() function prints the page's LSN.
-- Use an unlogged index, so that the LSN is predictable.
CREATE UNLOGGED TABLE test_gist AS SELECT point(i,i) p, i::text t FROM
generate_series(1,1000) i;
CREATE INDEX test_gist_idx ON test_gist USING gist (p);
@ -17,8 +9,6 @@ SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 0));
SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 1));
SELECT * FROM gist_page_opaque_info(get_raw_page('test_gist_idx', 2));
COMMIT;
SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 0), 'test_gist_idx');
SELECT * FROM gist_page_items(get_raw_page('test_gist_idx', 1), 'test_gist_idx') LIMIT 5;

View File

@ -43,7 +43,8 @@
#include "miscadmin.h"
#include "optimizer/optimizer.h"
#include "storage/bufmgr.h"
#include "storage/smgr.h"
#include "storage/bulk_write.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/tuplesort.h"
@ -106,11 +107,8 @@ typedef struct
Tuplesortstate *sortstate; /* state data for tuplesort.c */
BlockNumber pages_allocated;
BlockNumber pages_written;
int ready_num_pages;
BlockNumber ready_blknos[XLR_MAX_BLOCK_ID];
Page ready_pages[XLR_MAX_BLOCK_ID];
BulkWriteState *bulkstate;
} GISTBuildState;
#define GIST_SORTED_BUILD_PAGE_NUM 4
@ -142,7 +140,6 @@ static void gist_indexsortbuild_levelstate_add(GISTBuildState *state,
IndexTuple itup);
static void gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
GistSortedBuildLevelState *levelstate);
static void gist_indexsortbuild_flush_ready_pages(GISTBuildState *state);
static void gistInitBuffering(GISTBuildState *buildstate);
static int calculatePagesPerBuffer(GISTBuildState *buildstate, int levelStep);
@ -405,27 +402,18 @@ gist_indexsortbuild(GISTBuildState *state)
{
IndexTuple itup;
GistSortedBuildLevelState *levelstate;
Page page;
BulkWriteBuffer rootbuf;
state->pages_allocated = 0;
state->pages_written = 0;
state->ready_num_pages = 0;
/* Reserve block 0 for the root page */
state->pages_allocated = 1;
/*
* Write an empty page as a placeholder for the root page. It will be
* replaced with the real root page at the end.
*/
page = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, MCXT_ALLOC_ZERO);
smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
page, true);
state->pages_allocated++;
state->pages_written++;
state->bulkstate = smgr_bulk_start_rel(state->indexrel, MAIN_FORKNUM);
/* Allocate a temporary buffer for the first leaf page batch. */
levelstate = palloc0(sizeof(GistSortedBuildLevelState));
levelstate->pages[0] = page;
levelstate->pages[0] = palloc(BLCKSZ);
levelstate->parent = NULL;
gistinitpage(page, F_LEAF);
gistinitpage(levelstate->pages[0], F_LEAF);
/*
* Fill index pages with tuples in the sorted order.
@ -455,31 +443,15 @@ gist_indexsortbuild(GISTBuildState *state)
levelstate = parent;
}
gist_indexsortbuild_flush_ready_pages(state);
/* Write out the root */
PageSetLSN(levelstate->pages[0], GistBuildLSN);
PageSetChecksumInplace(levelstate->pages[0], GIST_ROOT_BLKNO);
smgrwrite(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, GIST_ROOT_BLKNO,
levelstate->pages[0], true);
if (RelationNeedsWAL(state->indexrel))
log_newpage(&state->indexrel->rd_locator, MAIN_FORKNUM, GIST_ROOT_BLKNO,
levelstate->pages[0], true);
rootbuf = smgr_bulk_get_buf(state->bulkstate);
memcpy(rootbuf, levelstate->pages[0], BLCKSZ);
smgr_bulk_write(state->bulkstate, GIST_ROOT_BLKNO, rootbuf, true);
pfree(levelstate->pages[0]);
pfree(levelstate);
/*
* When we WAL-logged index pages, we must nonetheless fsync index files.
* Since we're building outside shared buffers, a CHECKPOINT occurring
* during the build has no way to flush the previously written data to
* disk (indeed it won't know the index even exists). A crash later on
* would replay WAL from the checkpoint, therefore it wouldn't replay our
* earlier WAL entries. If we do not fsync those pages here, they might
* still not be on disk when the crash occurs.
*/
if (RelationNeedsWAL(state->indexrel))
smgrimmedsync(RelationGetSmgr(state->indexrel), MAIN_FORKNUM);
smgr_bulk_finish(state->bulkstate);
}
/*
@ -509,8 +481,7 @@ gist_indexsortbuild_levelstate_add(GISTBuildState *state,
levelstate->current_page++;
if (levelstate->pages[levelstate->current_page] == NULL)
levelstate->pages[levelstate->current_page] =
palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
levelstate->pages[levelstate->current_page] = palloc0(BLCKSZ);
newPage = levelstate->pages[levelstate->current_page];
gistinitpage(newPage, old_page_flags);
@ -573,6 +544,7 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
for (; dist != NULL; dist = dist->next)
{
char *data;
BulkWriteBuffer buf;
Page target;
/* check once per page */
@ -580,7 +552,8 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
/* Create page and copy data */
data = (char *) (dist->list);
target = palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, MCXT_ALLOC_ZERO);
buf = smgr_bulk_get_buf(state->bulkstate);
target = (Page) buf;
gistinitpage(target, isleaf ? F_LEAF : 0);
for (int i = 0; i < dist->block.num; i++)
{
@ -593,20 +566,6 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
}
union_tuple = dist->itup;
if (state->ready_num_pages == XLR_MAX_BLOCK_ID)
gist_indexsortbuild_flush_ready_pages(state);
/*
* The page is now complete. Assign a block number to it, and add it
* to the list of finished pages. (We don't write it out immediately,
* because we want to WAL-log the pages in batches.)
*/
blkno = state->pages_allocated++;
state->ready_blknos[state->ready_num_pages] = blkno;
state->ready_pages[state->ready_num_pages] = target;
state->ready_num_pages++;
ItemPointerSetBlockNumber(&(union_tuple->t_tid), blkno);
/*
* Set the right link to point to the previous page. This is just for
* debugging purposes: GiST only follows the right link if a page is
@ -621,6 +580,15 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
*/
if (levelstate->last_blkno)
GistPageGetOpaque(target)->rightlink = levelstate->last_blkno;
/*
* The page is now complete. Assign a block number to it, and pass it
* to the bulk writer.
*/
blkno = state->pages_allocated++;
PageSetLSN(target, GistBuildLSN);
smgr_bulk_write(state->bulkstate, blkno, buf, true);
ItemPointerSetBlockNumber(&(union_tuple->t_tid), blkno);
levelstate->last_blkno = blkno;
/*
@ -631,7 +599,7 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
if (parent == NULL)
{
parent = palloc0(sizeof(GistSortedBuildLevelState));
parent->pages[0] = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
parent->pages[0] = palloc(BLCKSZ);
parent->parent = NULL;
gistinitpage(parent->pages[0], 0);
@ -641,39 +609,6 @@ gist_indexsortbuild_levelstate_flush(GISTBuildState *state,
}
}
static void
gist_indexsortbuild_flush_ready_pages(GISTBuildState *state)
{
if (state->ready_num_pages == 0)
return;
for (int i = 0; i < state->ready_num_pages; i++)
{
Page page = state->ready_pages[i];
BlockNumber blkno = state->ready_blknos[i];
/* Currently, the blocks must be buffered in order. */
if (blkno != state->pages_written)
elog(ERROR, "unexpected block number to flush GiST sorting build");
PageSetLSN(page, GistBuildLSN);
PageSetChecksumInplace(page, blkno);
smgrextend(RelationGetSmgr(state->indexrel), MAIN_FORKNUM, blkno, page,
true);
state->pages_written++;
}
if (RelationNeedsWAL(state->indexrel))
log_newpages(&state->indexrel->rd_locator, MAIN_FORKNUM, state->ready_num_pages,
state->ready_blknos, state->ready_pages, true);
for (int i = 0; i < state->ready_num_pages; i++)
pfree(state->ready_pages[i]);
state->ready_num_pages = 0;
}
/*-------------------------------------------------------------------------
* Routines for non-sorted build

View File

@ -87,8 +87,8 @@
* is optimized for bulk inserting a lot of tuples, knowing that we have
* exclusive access to the heap. raw_heap_insert builds new pages in
* local storage. When a page is full, or at the end of the process,
* we insert it to WAL as a single record and then write it to disk
* directly through smgr. Note, however, that any data sent to the new
* we insert it to WAL as a single record and then write it to disk with
* the bulk smgr writer. Note, however, that any data sent to the new
* heap's TOAST table will go through the normal bufmgr.
*
*
@ -119,9 +119,9 @@
#include "replication/logical.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/bulk_write.h"
#include "storage/fd.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
@ -133,9 +133,9 @@ typedef struct RewriteStateData
{
Relation rs_old_rel; /* source heap */
Relation rs_new_rel; /* destination heap */
Page rs_buffer; /* page currently being built */
BulkWriteState *rs_bulkstate; /* writer for the destination */
BulkWriteBuffer rs_buffer; /* page currently being built */
BlockNumber rs_blockno; /* block where page will go */
bool rs_buffer_valid; /* T if any tuples in buffer */
bool rs_logical_rewrite; /* do we need to do logical rewriting */
TransactionId rs_oldest_xmin; /* oldest xmin used by caller to determine
* tuple visibility */
@ -255,14 +255,14 @@ begin_heap_rewrite(Relation old_heap, Relation new_heap, TransactionId oldest_xm
state->rs_old_rel = old_heap;
state->rs_new_rel = new_heap;
state->rs_buffer = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
state->rs_buffer = NULL;
/* new_heap needn't be empty, just locked */
state->rs_blockno = RelationGetNumberOfBlocks(new_heap);
state->rs_buffer_valid = false;
state->rs_oldest_xmin = oldest_xmin;
state->rs_freeze_xid = freeze_xid;
state->rs_cutoff_multi = cutoff_multi;
state->rs_cxt = rw_cxt;
state->rs_bulkstate = smgr_bulk_start_rel(new_heap, MAIN_FORKNUM);
/* Initialize hash tables used to track update chains */
hash_ctl.keysize = sizeof(TidHashKey);
@ -314,30 +314,13 @@ end_heap_rewrite(RewriteState state)
}
/* Write the last page, if any */
if (state->rs_buffer_valid)
if (state->rs_buffer)
{
if (RelationNeedsWAL(state->rs_new_rel))
log_newpage(&state->rs_new_rel->rd_locator,
MAIN_FORKNUM,
state->rs_blockno,
state->rs_buffer,
true);
PageSetChecksumInplace(state->rs_buffer, state->rs_blockno);
smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
state->rs_blockno, state->rs_buffer, true);
smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
state->rs_buffer = NULL;
}
/*
* When we WAL-logged rel pages, we must nonetheless fsync them. The
* reason is the same as in storage.c's RelationCopyStorage(): we're
* writing data that's not in shared buffers, and so a CHECKPOINT
* occurring during the rewriteheap operation won't have fsync'd data we
* wrote before the checkpoint.
*/
if (RelationNeedsWAL(state->rs_new_rel))
smgrimmedsync(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM);
smgr_bulk_finish(state->rs_bulkstate);
logical_end_heap_rewrite(state);
@ -611,7 +594,7 @@ rewrite_heap_dead_tuple(RewriteState state, HeapTuple old_tuple)
static void
raw_heap_insert(RewriteState state, HeapTuple tup)
{
Page page = state->rs_buffer;
Page page;
Size pageFreeSpace,
saveFreeSpace;
Size len;
@ -664,7 +647,8 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
HEAP_DEFAULT_FILLFACTOR);
/* Now we can check to see if there's enough free space already. */
if (state->rs_buffer_valid)
page = (Page) state->rs_buffer;
if (page)
{
pageFreeSpace = PageGetHeapFreeSpace(page);
@ -675,35 +659,19 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
* contains a tuple. Hence, unlike RelationGetBufferForTuple(),
* enforce saveFreeSpace unconditionally.
*/
/* XLOG stuff */
if (RelationNeedsWAL(state->rs_new_rel))
log_newpage(&state->rs_new_rel->rd_locator,
MAIN_FORKNUM,
state->rs_blockno,
page,
true);
/*
* Now write the page. We say skipFsync = true because there's no
* need for smgr to schedule an fsync for this write; we'll do it
* ourselves in end_heap_rewrite.
*/
PageSetChecksumInplace(page, state->rs_blockno);
smgrextend(RelationGetSmgr(state->rs_new_rel), MAIN_FORKNUM,
state->rs_blockno, page, true);
smgr_bulk_write(state->rs_bulkstate, state->rs_blockno, state->rs_buffer, true);
state->rs_buffer = NULL;
page = NULL;
state->rs_blockno++;
state->rs_buffer_valid = false;
}
}
if (!state->rs_buffer_valid)
if (!page)
{
/* Initialize a new empty page */
state->rs_buffer = smgr_bulk_get_buf(state->rs_bulkstate);
page = (Page) state->rs_buffer;
PageInit(page, BLCKSZ, 0);
state->rs_buffer_valid = true;
}
/* And now we can insert the tuple into the page */

View File

@ -29,11 +29,11 @@
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
#include "storage/bulk_write.h"
#include "storage/condition_variable.h"
#include "storage/indexfsm.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/smgr.h"
#include "utils/builtins.h"
#include "utils/index_selfuncs.h"
#include "utils/memutils.h"
@ -154,32 +154,17 @@ void
btbuildempty(Relation index)
{
bool allequalimage = _bt_allequalimage(index, false);
Buffer metabuf;
Page metapage;
BulkWriteState *bulkstate;
BulkWriteBuffer metabuf;
/*
* Initialize the metapage.
*
* Regular index build bypasses the buffer manager and uses smgr functions
* directly, with an smgrimmedsync() call at the end. That makes sense
* when the index is large, but for an empty index, it's better to use the
* buffer cache to avoid the smgrimmedsync().
*/
metabuf = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
Assert(BufferGetBlockNumber(metabuf) == BTREE_METAPAGE);
_bt_lockbuf(index, metabuf, BT_WRITE);
bulkstate = smgr_bulk_start_rel(index, INIT_FORKNUM);
START_CRIT_SECTION();
/* Construct metapage. */
metabuf = smgr_bulk_get_buf(bulkstate);
_bt_initmetapage((Page) metabuf, P_NONE, 0, allequalimage);
smgr_bulk_write(bulkstate, BTREE_METAPAGE, metabuf, true);
metapage = BufferGetPage(metabuf);
_bt_initmetapage(metapage, P_NONE, 0, allequalimage);
MarkBufferDirty(metabuf);
log_newpage_buffer(metabuf, true);
END_CRIT_SECTION();
_bt_unlockbuf(index, metabuf);
ReleaseBuffer(metabuf);
smgr_bulk_finish(bulkstate);
}
/*

View File

@ -23,13 +23,8 @@
* many upper pages if the keys are reasonable-size) without risking a lot of
* cascading splits during early insertions.
*
* Formerly the index pages being built were kept in shared buffers, but
* that is of no value (since other backends have no interest in them yet)
* and it created locking problems for CHECKPOINT, because the upper-level
* pages were held exclusive-locked for long periods. Now we just build
* the pages in local memory and smgrwrite or smgrextend them as we finish
* them. They will need to be re-read into shared buffers on first use after
* the build finishes.
* We use the bulk smgr loading facility to bypass the buffer cache and
* WAL-log the pages efficiently.
*
* This code isn't concerned about the FSM at all. The caller is responsible
* for initializing that.
@ -57,7 +52,7 @@
#include "executor/instrument.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "storage/smgr.h"
#include "storage/bulk_write.h"
#include "tcop/tcopprot.h" /* pgrminclude ignore */
#include "utils/rel.h"
#include "utils/sortsupport.h"
@ -234,7 +229,7 @@ typedef struct BTBuildState
*/
typedef struct BTPageState
{
Page btps_page; /* workspace for page building */
BulkWriteBuffer btps_buf; /* workspace for page building */
BlockNumber btps_blkno; /* block # to write this page at */
IndexTuple btps_lowkey; /* page's strict lower bound pivot tuple */
OffsetNumber btps_lastoff; /* last item offset loaded */
@ -251,11 +246,9 @@ typedef struct BTWriteState
{
Relation heap;
Relation index;
BulkWriteState *bulkstate;
BTScanInsert inskey; /* generic insertion scankey */
bool btws_use_wal; /* dump pages to WAL? */
BlockNumber btws_pages_alloced; /* # pages allocated */
BlockNumber btws_pages_written; /* # pages written out */
Page btws_zeropage; /* workspace for filling zeroes */
} BTWriteState;
@ -267,7 +260,7 @@ static void _bt_spool(BTSpool *btspool, ItemPointer self,
static void _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2);
static void _bt_build_callback(Relation index, ItemPointer tid, Datum *values,
bool *isnull, bool tupleIsAlive, void *state);
static Page _bt_blnewpage(uint32 level);
static BulkWriteBuffer _bt_blnewpage(BTWriteState *wstate, uint32 level);
static BTPageState *_bt_pagestate(BTWriteState *wstate, uint32 level);
static void _bt_slideleft(Page rightmostpage);
static void _bt_sortaddtup(Page page, Size itemsize,
@ -569,12 +562,9 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2)
wstate.inskey = _bt_mkscankey(wstate.index, NULL);
/* _bt_mkscankey() won't set allequalimage without metapage */
wstate.inskey->allequalimage = _bt_allequalimage(wstate.index, true);
wstate.btws_use_wal = RelationNeedsWAL(wstate.index);
/* reserve the metapage */
wstate.btws_pages_alloced = BTREE_METAPAGE + 1;
wstate.btws_pages_written = 0;
wstate.btws_zeropage = NULL; /* until needed */
pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
PROGRESS_BTREE_PHASE_LEAF_LOAD);
@ -613,13 +603,15 @@ _bt_build_callback(Relation index,
/*
* allocate workspace for a new, clean btree page, not linked to any siblings.
*/
static Page
_bt_blnewpage(uint32 level)
static BulkWriteBuffer
_bt_blnewpage(BTWriteState *wstate, uint32 level)
{
BulkWriteBuffer buf;
Page page;
BTPageOpaque opaque;
page = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
buf = smgr_bulk_get_buf(wstate->bulkstate);
page = (Page) buf;
/* Zero the page and set up standard page header info */
_bt_pageinit(page, BLCKSZ);
@ -634,63 +626,17 @@ _bt_blnewpage(uint32 level)
/* Make the P_HIKEY line pointer appear allocated */
((PageHeader) page)->pd_lower += sizeof(ItemIdData);
return page;
return buf;
}
/*
* emit a completed btree page, and release the working storage.
*/
static void
_bt_blwritepage(BTWriteState *wstate, Page page, BlockNumber blkno)
_bt_blwritepage(BTWriteState *wstate, BulkWriteBuffer buf, BlockNumber blkno)
{
/* XLOG stuff */
if (wstate->btws_use_wal)
{
/* We use the XLOG_FPI record type for this */
log_newpage(&wstate->index->rd_locator, MAIN_FORKNUM, blkno, page, true);
}
/*
* If we have to write pages nonsequentially, fill in the space with
* zeroes until we come back and overwrite. This is not logically
* necessary on standard Unix filesystems (unwritten space will read as
* zeroes anyway), but it should help to avoid fragmentation. The dummy
* pages aren't WAL-logged though.
*/
while (blkno > wstate->btws_pages_written)
{
if (!wstate->btws_zeropage)
wstate->btws_zeropage = (Page) palloc_aligned(BLCKSZ,
PG_IO_ALIGN_SIZE,
MCXT_ALLOC_ZERO);
/* don't set checksum for all-zero page */
smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM,
wstate->btws_pages_written++,
wstate->btws_zeropage,
true);
}
PageSetChecksumInplace(page, blkno);
/*
* Now write the page. There's no need for smgr to schedule an fsync for
* this write; we'll do it ourselves before ending the build.
*/
if (blkno == wstate->btws_pages_written)
{
/* extending the file... */
smgrextend(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
page, true);
wstate->btws_pages_written++;
}
else
{
/* overwriting a block we zero-filled before */
smgrwrite(RelationGetSmgr(wstate->index), MAIN_FORKNUM, blkno,
page, true);
}
pfree(page);
smgr_bulk_write(wstate->bulkstate, blkno, buf, true);
/* smgr_bulk_write took ownership of 'buf' */
}
/*
@ -703,7 +649,7 @@ _bt_pagestate(BTWriteState *wstate, uint32 level)
BTPageState *state = (BTPageState *) palloc0(sizeof(BTPageState));
/* create initial page for level */
state->btps_page = _bt_blnewpage(level);
state->btps_buf = _bt_blnewpage(wstate, level);
/* and assign it a page position */
state->btps_blkno = wstate->btws_pages_alloced++;
@ -839,6 +785,7 @@ static void
_bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
Size truncextra)
{
BulkWriteBuffer nbuf;
Page npage;
BlockNumber nblkno;
OffsetNumber last_off;
@ -853,7 +800,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
*/
CHECK_FOR_INTERRUPTS();
npage = state->btps_page;
nbuf = state->btps_buf;
npage = (Page) nbuf;
nblkno = state->btps_blkno;
last_off = state->btps_lastoff;
last_truncextra = state->btps_lastextra;
@ -909,6 +857,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
/*
* Finish off the page and write it out.
*/
BulkWriteBuffer obuf = nbuf;
Page opage = npage;
BlockNumber oblkno = nblkno;
ItemId ii;
@ -916,7 +865,8 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
IndexTuple oitup;
/* Create new page of same level */
npage = _bt_blnewpage(state->btps_level);
nbuf = _bt_blnewpage(wstate, state->btps_level);
npage = (Page) nbuf;
/* and assign it a page position */
nblkno = wstate->btws_pages_alloced++;
@ -1028,10 +978,10 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
}
/*
* Write out the old page. We never need to touch it again, so we can
* free the opage workspace too.
* Write out the old page. _bt_blwritepage takes ownership of the
* 'opage' buffer.
*/
_bt_blwritepage(wstate, opage, oblkno);
_bt_blwritepage(wstate, obuf, oblkno);
/*
* Reset last_off to point to new page
@ -1064,7 +1014,7 @@ _bt_buildadd(BTWriteState *wstate, BTPageState *state, IndexTuple itup,
_bt_sortaddtup(npage, itupsz, itup, last_off,
!isleaf && last_off == P_FIRSTKEY);
state->btps_page = npage;
state->btps_buf = nbuf;
state->btps_blkno = nblkno;
state->btps_lastoff = last_off;
}
@ -1116,7 +1066,7 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
BTPageState *s;
BlockNumber rootblkno = P_NONE;
uint32 rootlevel = 0;
Page metapage;
BulkWriteBuffer metabuf;
/*
* Each iteration of this loop completes one more level of the tree.
@ -1127,7 +1077,7 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
BTPageOpaque opaque;
blkno = s->btps_blkno;
opaque = BTPageGetOpaque(s->btps_page);
opaque = BTPageGetOpaque((Page) s->btps_buf);
/*
* We have to link the last page on this level to somewhere.
@ -1161,9 +1111,9 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
* This is the rightmost page, so the ItemId array needs to be slid
* back one slot. Then we can dump out the page.
*/
_bt_slideleft(s->btps_page);
_bt_blwritepage(wstate, s->btps_page, s->btps_blkno);
s->btps_page = NULL; /* writepage freed the workspace */
_bt_slideleft((Page) s->btps_buf);
_bt_blwritepage(wstate, s->btps_buf, s->btps_blkno);
s->btps_buf = NULL; /* writepage took ownership of the buffer */
}
/*
@ -1172,10 +1122,10 @@ _bt_uppershutdown(BTWriteState *wstate, BTPageState *state)
* set to point to "P_NONE"). This changes the index to the "valid" state
* by filling in a valid magic number in the metapage.
*/
metapage = (Page) palloc_aligned(BLCKSZ, PG_IO_ALIGN_SIZE, 0);
_bt_initmetapage(metapage, rootblkno, rootlevel,
metabuf = smgr_bulk_get_buf(wstate->bulkstate);
_bt_initmetapage((Page) metabuf, rootblkno, rootlevel,
wstate->inskey->allequalimage);
_bt_blwritepage(wstate, metapage, BTREE_METAPAGE);
_bt_blwritepage(wstate, metabuf, BTREE_METAPAGE);
}
/*
@ -1197,6 +1147,8 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
int64 tuples_done = 0;
bool deduplicate;
wstate->bulkstate = smgr_bulk_start_rel(wstate->index, MAIN_FORKNUM);
deduplicate = wstate->inskey->allequalimage && !btspool->isunique &&
BTGetDeduplicateItems(wstate->index);
@ -1352,7 +1304,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
*/
dstate->maxpostingsize = MAXALIGN_DOWN((BLCKSZ * 10 / 100)) -
sizeof(ItemIdData);
Assert(dstate->maxpostingsize <= BTMaxItemSize(state->btps_page) &&
Assert(dstate->maxpostingsize <= BTMaxItemSize((Page) state->btps_buf) &&
dstate->maxpostingsize <= INDEX_SIZE_MASK);
dstate->htids = palloc(dstate->maxpostingsize);
@ -1422,18 +1374,7 @@ _bt_load(BTWriteState *wstate, BTSpool *btspool, BTSpool *btspool2)
/* Close down final pages and write the metapage */
_bt_uppershutdown(wstate, state);
/*
* When we WAL-logged index pages, we must nonetheless fsync index files.
* Since we're building outside shared buffers, a CHECKPOINT occurring
* during the build has no way to flush the previously written data to
* disk (indeed it won't know the index even exists). A crash later on
* would replay WAL from the checkpoint, therefore it wouldn't replay our
* earlier WAL entries. If we do not fsync those pages here, they might
* still not be on disk when the crash occurs.
*/
if (wstate->btws_use_wal)
smgrimmedsync(RelationGetSmgr(wstate->index), MAIN_FORKNUM);
smgr_bulk_finish(wstate->bulkstate);
}
/*

View File

@ -25,7 +25,7 @@
#include "catalog/index.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/smgr.h"
#include "storage/bulk_write.h"
#include "utils/memutils.h"
#include "utils/rel.h"
@ -155,42 +155,27 @@ spgbuild(Relation heap, Relation index, IndexInfo *indexInfo)
void
spgbuildempty(Relation index)
{
Buffer metabuffer,
rootbuffer,
nullbuffer;
BulkWriteState *bulkstate;
BulkWriteBuffer buf;
/*
* Initialize the meta page and root pages
*/
metabuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
LockBuffer(metabuffer, BUFFER_LOCK_EXCLUSIVE);
rootbuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
LockBuffer(rootbuffer, BUFFER_LOCK_EXCLUSIVE);
nullbuffer = ReadBufferExtended(index, INIT_FORKNUM, P_NEW, RBM_NORMAL, NULL);
LockBuffer(nullbuffer, BUFFER_LOCK_EXCLUSIVE);
bulkstate = smgr_bulk_start_rel(index, INIT_FORKNUM);
Assert(BufferGetBlockNumber(metabuffer) == SPGIST_METAPAGE_BLKNO);
Assert(BufferGetBlockNumber(rootbuffer) == SPGIST_ROOT_BLKNO);
Assert(BufferGetBlockNumber(nullbuffer) == SPGIST_NULL_BLKNO);
/* Construct metapage. */
buf = smgr_bulk_get_buf(bulkstate);
SpGistInitMetapage((Page) buf);
smgr_bulk_write(bulkstate, SPGIST_METAPAGE_BLKNO, buf, true);
START_CRIT_SECTION();
/* Likewise for the root page. */
buf = smgr_bulk_get_buf(bulkstate);
SpGistInitPage((Page) buf, SPGIST_LEAF);
smgr_bulk_write(bulkstate, SPGIST_ROOT_BLKNO, buf, true);
SpGistInitMetapage(BufferGetPage(metabuffer));
MarkBufferDirty(metabuffer);
SpGistInitBuffer(rootbuffer, SPGIST_LEAF);
MarkBufferDirty(rootbuffer);
SpGistInitBuffer(nullbuffer, SPGIST_LEAF | SPGIST_NULLS);
MarkBufferDirty(nullbuffer);
/* Likewise for the null-tuples root page. */
buf = smgr_bulk_get_buf(bulkstate);
SpGistInitPage((Page) buf, SPGIST_LEAF | SPGIST_NULLS);
smgr_bulk_write(bulkstate, SPGIST_NULL_BLKNO, buf, true);
log_newpage_buffer(metabuffer, true);
log_newpage_buffer(rootbuffer, true);
log_newpage_buffer(nullbuffer, true);
END_CRIT_SECTION();
UnlockReleaseBuffer(metabuffer);
UnlockReleaseBuffer(rootbuffer);
UnlockReleaseBuffer(nullbuffer);
smgr_bulk_finish(bulkstate);
}
/*

View File

@ -28,6 +28,7 @@
#include "catalog/storage.h"
#include "catalog/storage_xlog.h"
#include "miscadmin.h"
#include "storage/bulk_write.h"
#include "storage/freespace.h"
#include "storage/smgr.h"
#include "utils/hsearch.h"
@ -451,14 +452,11 @@ void
RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
ForkNumber forkNum, char relpersistence)
{
PGIOAlignedBlock buf;
Page page;
bool use_wal;
bool copying_initfork;
BlockNumber nblocks;
BlockNumber blkno;
page = (Page) buf.data;
BulkWriteState *bulkstate;
/*
* The init fork for an unlogged relation in many respects has to be
@ -477,16 +475,21 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
use_wal = XLogIsNeeded() &&
(relpersistence == RELPERSISTENCE_PERMANENT || copying_initfork);
bulkstate = smgr_bulk_start_smgr(dst, forkNum, use_wal);
nblocks = smgrnblocks(src, forkNum);
for (blkno = 0; blkno < nblocks; blkno++)
{
BulkWriteBuffer buf;
/* If we got a cancel signal during the copy of the data, quit */
CHECK_FOR_INTERRUPTS();
smgrread(src, forkNum, blkno, buf.data);
buf = smgr_bulk_get_buf(bulkstate);
smgrread(src, forkNum, blkno, (Page) buf);
if (!PageIsVerifiedExtended(page, blkno,
if (!PageIsVerifiedExtended((Page) buf, blkno,
PIV_LOG_WARNING | PIV_REPORT_STAT))
{
/*
@ -507,34 +510,13 @@ RelationCopyStorage(SMgrRelation src, SMgrRelation dst,
}
/*
* WAL-log the copied page. Unfortunately we don't know what kind of a
* page this is, so we have to log the full page including any unused
* space.
* Queue the page for WAL-logging and writing out. Unfortunately we
* don't know what kind of a page this is, so we have to log the full
* page including any unused space.
*/
if (use_wal)
log_newpage(&dst->smgr_rlocator.locator, forkNum, blkno, page, false);
PageSetChecksumInplace(page, blkno);
/*
* Now write the page. We say skipFsync = true because there's no
* need for smgr to schedule an fsync for this write; we'll do it
* ourselves below.
*/
smgrextend(dst, forkNum, blkno, buf.data, true);
smgr_bulk_write(bulkstate, blkno, buf, false);
}
/*
* When we WAL-logged rel pages, we must nonetheless fsync them. The
* reason is that since we're copying outside shared buffers, a CHECKPOINT
* occurring during the copy has no way to flush the previously written
* data to disk (indeed it won't know the new rel even exists). A crash
* later on would replay WAL from the checkpoint, therefore it wouldn't
* replay our earlier WAL entries. If we do not fsync those pages here,
* they might still not be on disk when the crash occurs.
*/
if (use_wal || copying_initfork)
smgrimmedsync(dst, forkNum);
smgr_bulk_finish(bulkstate);
}
/*

View File

@ -13,6 +13,7 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = \
bulk_write.o \
md.o \
smgr.o

View File

@ -0,0 +1,298 @@
/*-------------------------------------------------------------------------
*
* bulk_write.c
* Efficiently and reliably populate a new relation
*
* The assumption is that no other backends access the relation while we are
* loading it, so we can take some shortcuts. Do not mix operations through
* the regular buffer manager and the bulk loading interface!
*
* We bypass the buffer manager to avoid the locking overhead, and call
* smgrextend() directly. A downside is that the pages will need to be
* re-read into shared buffers on first use after the build finishes. That's
* usually a good tradeoff for large relations, and for small relations, the
* overhead isn't very significant compared to creating the relation in the
* first place.
*
* The pages are WAL-logged if needed. To save on WAL header overhead, we
* WAL-log several pages in one record.
*
* One tricky point is that because we bypass the buffer manager, we need to
* register the relation for fsyncing at the next checkpoint ourselves, and
* make sure that the relation is correctly fsync'd by us or the checkpointer
* even if a checkpoint happens concurrently.
*
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/storage/smgr/bulk_write.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/xloginsert.h"
#include "access/xlogrecord.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "storage/bulk_write.h"
#include "storage/proc.h"
#include "storage/smgr.h"
#include "utils/rel.h"
#define MAX_PENDING_WRITES XLR_MAX_BLOCK_ID
static const PGIOAlignedBlock zero_buffer = {{0}}; /* worth BLCKSZ */
typedef struct PendingWrite
{
BulkWriteBuffer buf;
BlockNumber blkno;
bool page_std;
} PendingWrite;
/*
* Bulk writer state for one relation fork.
*/
typedef struct BulkWriteState
{
/* Information about the target relation we're writing */
SMgrRelation smgr;
ForkNumber forknum;
bool use_wal;
/* We keep several writes queued, and WAL-log them in batches */
int npending;
PendingWrite pending_writes[MAX_PENDING_WRITES];
/* Current size of the relation */
BlockNumber pages_written;
/* The RedoRecPtr at the time that the bulk operation started */
XLogRecPtr start_RedoRecPtr;
MemoryContext memcxt;
} BulkWriteState;
static void smgr_bulk_flush(BulkWriteState *bulkstate);
/*
* Start a bulk write operation on a relation fork.
*/
BulkWriteState *
smgr_bulk_start_rel(Relation rel, ForkNumber forknum)
{
return smgr_bulk_start_smgr(RelationGetSmgr(rel),
forknum,
RelationNeedsWAL(rel) || forknum == INIT_FORKNUM);
}
/*
* Start a bulk write operation on a relation fork.
*
* This is like smgr_bulk_start_rel, but can be used without a relcache entry.
*/
BulkWriteState *
smgr_bulk_start_smgr(SMgrRelation smgr, ForkNumber forknum, bool use_wal)
{
BulkWriteState *state;
state = palloc(sizeof(BulkWriteState));
state->smgr = smgr;
state->forknum = forknum;
state->use_wal = use_wal;
state->npending = 0;
state->pages_written = 0;
state->start_RedoRecPtr = GetRedoRecPtr();
/*
* Remember the memory context. We will use it to allocate all the
* buffers later.
*/
state->memcxt = CurrentMemoryContext;
return state;
}
/*
* Finish bulk write operation.
*
* This WAL-logs and flushes any remaining pending writes to disk, and fsyncs
* the relation if needed.
*/
void
smgr_bulk_finish(BulkWriteState *bulkstate)
{
/* WAL-log and flush any remaining pages */
smgr_bulk_flush(bulkstate);
/*
* When we wrote out the pages, we passed skipFsync=true to avoid the
* overhead of registering all the writes with the checkpointer. Register
* the whole relation now.
*
* There is one hole in that idea: If a checkpoint occurred while we were
* writing the pages, it already missed fsyncing the pages we had written
* before the checkpoint started. A crash later on would replay the WAL
* starting from the checkpoint, therefore it wouldn't replay our earlier
* WAL records. So if a checkpoint started after the bulk write, fsync
* the files now.
*/
if (!SmgrIsTemp(bulkstate->smgr))
{
/*
* Prevent a checkpoint from starting between the GetRedoRecPtr() and
* smgrregistersync() calls.
*/
Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
MyProc->delayChkptFlags |= DELAY_CHKPT_START;
if (bulkstate->start_RedoRecPtr != GetRedoRecPtr())
{
/*
* A checkpoint occurred and it didn't know about our writes, so
* fsync() the relation ourselves.
*/
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
smgrimmedsync(bulkstate->smgr, bulkstate->forknum);
elog(DEBUG1, "flushed relation because a checkpoint occurred concurrently");
}
else
{
smgrregistersync(bulkstate->smgr, bulkstate->forknum);
MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
}
}
}
static int
buffer_cmp(const void *a, const void *b)
{
const PendingWrite *bufa = (const PendingWrite *) a;
const PendingWrite *bufb = (const PendingWrite *) b;
/* We should not see duplicated writes for the same block */
Assert(bufa->blkno != bufb->blkno);
if (bufa->blkno > bufb->blkno)
return 1;
else
return -1;
}
/*
* Finish all the pending writes.
*/
static void
smgr_bulk_flush(BulkWriteState *bulkstate)
{
int npending = bulkstate->npending;
PendingWrite *pending_writes = bulkstate->pending_writes;
if (npending == 0)
return;
if (npending > 1)
qsort(pending_writes, npending, sizeof(PendingWrite), buffer_cmp);
if (bulkstate->use_wal)
{
BlockNumber blknos[MAX_PENDING_WRITES];
Page pages[MAX_PENDING_WRITES];
bool page_std = true;
for (int i = 0; i < npending; i++)
{
blknos[i] = pending_writes[i].blkno;
pages[i] = pending_writes[i].buf->data;
/*
* If any of the pages use !page_std, we log them all as such.
* That's a bit wasteful, but in practice, a mix of standard and
* non-standard page layout is rare. None of the built-in AMs do
* that.
*/
if (!pending_writes[i].page_std)
page_std = false;
}
log_newpages(&bulkstate->smgr->smgr_rlocator.locator, bulkstate->forknum,
npending, blknos, pages, page_std);
}
for (int i = 0; i < npending; i++)
{
BlockNumber blkno = pending_writes[i].blkno;
Page page = pending_writes[i].buf->data;
PageSetChecksumInplace(page, blkno);
if (blkno >= bulkstate->pages_written)
{
/*
* If we have to write pages nonsequentially, fill in the space
* with zeroes until we come back and overwrite. This is not
* logically necessary on standard Unix filesystems (unwritten
* space will read as zeroes anyway), but it should help to avoid
* fragmentation. The dummy pages aren't WAL-logged though.
*/
while (blkno > bulkstate->pages_written)
{
/* don't set checksum for all-zero page */
smgrextend(bulkstate->smgr, bulkstate->forknum,
bulkstate->pages_written++,
&zero_buffer,
true);
}
smgrextend(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
bulkstate->pages_written = pending_writes[i].blkno + 1;
}
else
smgrwrite(bulkstate->smgr, bulkstate->forknum, blkno, page, true);
pfree(page);
}
bulkstate->npending = 0;
}
/*
* Queue write of 'buf'.
*
* NB: this takes ownership of 'buf'!
*
* You are only allowed to write a given block once as part of one bulk write
* operation.
*/
void
smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std)
{
PendingWrite *w;
w = &bulkstate->pending_writes[bulkstate->npending++];
w->buf = buf;
w->blkno = blocknum;
w->page_std = page_std;
if (bulkstate->npending == MAX_PENDING_WRITES)
smgr_bulk_flush(bulkstate);
}
/*
* Allocate a new buffer which can later be written with smgr_bulk_write().
*
* There is no function to free the buffer. When you pass it to
* smgr_bulk_write(), it takes ownership and frees it when it's no longer
* needed.
*
* This is currently implemented as a simple palloc, but could be implemented
* using a ring buffer or larger chunks in the future, so don't rely on it.
*/
BulkWriteBuffer
smgr_bulk_get_buf(BulkWriteState *bulkstate)
{
return MemoryContextAllocAligned(bulkstate->memcxt, BLCKSZ, PG_IO_ALIGN_SIZE, 0);
}

View File

@ -1236,6 +1236,49 @@ mdtruncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks)
}
}
/*
* mdregistersync() -- Mark whole relation as needing fsync
*/
void
mdregistersync(SMgrRelation reln, ForkNumber forknum)
{
int segno;
int min_inactive_seg;
/*
* NOTE: mdnblocks makes sure we have opened all active segments, so that
* the loop below will get them all!
*/
mdnblocks(reln, forknum);
min_inactive_seg = segno = reln->md_num_open_segs[forknum];
/*
* Temporarily open inactive segments, then close them after sync. There
* may be some inactive segments left opened after error, but that is
* harmless. We don't bother to clean them up and take a risk of further
* trouble. The next mdclose() will soon close them.
*/
while (_mdfd_openseg(reln, forknum, segno, 0) != NULL)
segno++;
while (segno > 0)
{
MdfdVec *v = &reln->md_seg_fds[forknum][segno - 1];
register_dirty_segment(reln, forknum, v);
/* Close inactive segments immediately */
if (segno > min_inactive_seg)
{
FileClose(v->mdfd_vfd);
_fdvec_resize(reln, forknum, segno - 1);
}
segno--;
}
}
/*
* mdimmedsync() -- Immediately sync a relation to stable storage.
*
@ -1255,7 +1298,7 @@ mdimmedsync(SMgrRelation reln, ForkNumber forknum)
/*
* NOTE: mdnblocks makes sure we have opened all active segments, so that
* fsync loop will get them all!
* the loop below will get them all!
*/
mdnblocks(reln, forknum);

View File

@ -1,6 +1,7 @@
# Copyright (c) 2022-2024, PostgreSQL Global Development Group
backend_sources += files(
'bulk_write.c',
'md.c',
'smgr.c',
)

View File

@ -102,6 +102,7 @@ typedef struct f_smgr
void (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum,
BlockNumber nblocks);
void (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum);
void (*smgr_registersync) (SMgrRelation reln, ForkNumber forknum);
} f_smgr;
static const f_smgr smgrsw[] = {
@ -123,6 +124,7 @@ static const f_smgr smgrsw[] = {
.smgr_nblocks = mdnblocks,
.smgr_truncate = mdtruncate,
.smgr_immedsync = mdimmedsync,
.smgr_registersync = mdregistersync,
}
};
@ -616,6 +618,14 @@ smgrreadv(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum,
* on disk at return, only dumped out to the kernel. However,
* provisions will be made to fsync the write before the next checkpoint.
*
* NB: The mechanism to ensure fsync at next checkpoint assumes that there is
* something that prevents a concurrent checkpoint from "racing ahead" of the
* write. One way to prevent that is by holding a lock on the buffer; the
* buffer manager's writes are protected by that. The bulk writer facility
* in bulk_write.c checks the redo pointer and calls smgrimmedsync() if a
* checkpoint happened; that relies on the fact that no other backend can be
* concurrently modifying the page.
*
* skipFsync indicates that the caller will make other provisions to
* fsync the relation, so we needn't bother. Temporary relations also
* do not require fsync.
@ -733,6 +743,24 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
}
}
/*
* smgrregistersync() -- Request a relation to be sync'd at next checkpoint
*
* This can be used after calling smgrwrite() or smgrextend() with skipFsync =
* true, to register the fsyncs that were skipped earlier.
*
* Note: be mindful that a checkpoint could already have happened between the
* smgrwrite or smgrextend calls and this! In that case, the checkpoint
* already missed fsyncing this relation, and you should use smgrimmedsync
* instead. Most callers should use the bulk loading facility in bulk_write.c
* which handles all that.
*/
void
smgrregistersync(SMgrRelation reln, ForkNumber forknum)
{
smgrsw[reln->smgr_which].smgr_registersync(reln, forknum);
}
/*
* smgrimmedsync() -- Force the specified relation to stable storage.
*
@ -755,6 +783,9 @@ smgrtruncate(SMgrRelation reln, ForkNumber *forknum, int nforks, BlockNumber *nb
* Note that you need to do FlushRelationBuffers() first if there is
* any possibility that there are dirty buffers for the relation;
* otherwise the sync is not very meaningful.
*
* Most callers should use the bulk loading facility in bulk_write.c
* instead of calling this directly.
*/
void
smgrimmedsync(SMgrRelation reln, ForkNumber forknum)

View File

@ -0,0 +1,40 @@
/*-------------------------------------------------------------------------
*
* bulk_write.h
* Efficiently and reliably populate a new relation
*
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/storage/bulk_write.h
*
*-------------------------------------------------------------------------
*/
#ifndef BULK_WRITE_H
#define BULK_WRITE_H
#include "storage/smgr.h"
#include "utils/rel.h"
typedef struct BulkWriteState BulkWriteState;
/*
* Temporary buffer to hold a page to until it's written out. Use
* smgr_bulk_get_buf() to reserve one of these. This is a separate typedef to
* distinguish it from other block-sized buffers passed around in the system.
*/
typedef PGIOAlignedBlock *BulkWriteBuffer;
/* forward declared from smgr.h */
struct SMgrRelationData;
extern BulkWriteState *smgr_bulk_start_rel(Relation rel, ForkNumber forknum);
extern BulkWriteState *smgr_bulk_start_smgr(struct SMgrRelationData *smgr, ForkNumber forknum, bool use_wal);
extern BulkWriteBuffer smgr_bulk_get_buf(BulkWriteState *bulkstate);
extern void smgr_bulk_write(BulkWriteState *bulkstate, BlockNumber blocknum, BulkWriteBuffer buf, bool page_std);
extern void smgr_bulk_finish(BulkWriteState *bulkstate);
#endif /* BULK_WRITE_H */

View File

@ -43,6 +43,7 @@ extern BlockNumber mdnblocks(SMgrRelation reln, ForkNumber forknum);
extern void mdtruncate(SMgrRelation reln, ForkNumber forknum,
BlockNumber nblocks);
extern void mdimmedsync(SMgrRelation reln, ForkNumber forknum);
extern void mdregistersync(SMgrRelation reln, ForkNumber forknum);
extern void ForgetDatabaseSyncRequests(Oid dbid);
extern void DropRelationFiles(RelFileLocator *delrels, int ndelrels, bool isRedo);

View File

@ -106,6 +106,7 @@ extern BlockNumber smgrnblocks_cached(SMgrRelation reln, ForkNumber forknum);
extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum,
int nforks, BlockNumber *nblocks);
extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum);
extern void smgrregistersync(SMgrRelation reln, ForkNumber forknum);
extern void AtEOXact_SMgr(void);
extern bool ProcessBarrierSmgrRelease(void);

View File

@ -333,6 +333,8 @@ BuildAccumulator
BuiltinScript
BulkInsertState
BulkInsertStateData
BulkWriteBuffer
BulkWriteState
CACHESIGN
CAC_state
CCFastEqualFN
@ -2018,6 +2020,7 @@ PendingFsyncEntry
PendingRelDelete
PendingRelSync
PendingUnlinkEntry
PendingWrite
PendingWriteback
PerLockTagEntry
PerlInterpreter