Provide API for streaming relation data.

Introduce an abstraction allowing relation data to be accessed as a
stream of buffers, with an implementation that is more efficient than
the equivalent sequence of ReadBuffer() calls.

Client code supplies a callback that can say which block number it wants
next, and then consumes individual buffers one at a time from the
stream.  This division puts read_stream.c in control of how far ahead it
can see and allows it to read clusters of neighboring blocks with
StartReadBuffers().  It also issues POSIX_FADV_WILLNEED advice ahead of
time when random access is detected.

Other variants of I/O stream will be proposed in future work (for
example to support recovery, whose LsnReadQueue device in
xlogprefetcher.c is a distant cousin of this code and should eventually
be replaced by this), but this basic API is sufficient for many common
executor usage patterns involving predictable access to a single fork of
a single relation.

Several patches using this API are proposed separately.

This stream concept is loosely based on ideas from Andres Freund on how
we should pave the way for later work on asynchronous I/O.

Author: Thomas Munro <thomas.munro@gmail.com>
Author: Heikki Linnakangas <hlinnaka@iki.fi> (contributions)
Author: Melanie Plageman <melanieplageman@gmail.com> (contributions)
Suggested-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Heikki Linnakangas <hlinnaka@iki.fi>
Reviewed-by: Melanie Plageman <melanieplageman@gmail.com>
Reviewed-by: Nazir Bilal Yavuz <byavuz81@gmail.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Tested-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Discussion: https://postgr.es/m/CA+hUKGJkOiOCa+mag4BF+zHo7qo=o9CFheB8=g6uT5TUm2gkvA@mail.gmail.com
This commit is contained in:
Thomas Munro 2024-04-03 00:17:06 +13:00
parent 210622c60e
commit b5a9b18cd0
7 changed files with 898 additions and 1 deletions

View File

@ -8,6 +8,6 @@ subdir = src/backend/storage
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
SUBDIRS = buffer file freespace ipc large_object lmgr page smgr sync
SUBDIRS = aio buffer file freespace ipc large_object lmgr page smgr sync
include $(top_srcdir)/src/backend/common.mk

View File

@ -0,0 +1,14 @@
#
# Makefile for storage/aio
#
# src/backend/storage/aio/Makefile
#
subdir = src/backend/storage/aio
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = \
read_stream.o
include $(top_srcdir)/src/backend/common.mk

View File

@ -0,0 +1,5 @@
# Copyright (c) 2024, PostgreSQL Global Development Group
backend_sources += files(
'read_stream.c',
)

View File

@ -0,0 +1,812 @@
/*-------------------------------------------------------------------------
*
* read_stream.c
* Mechanism for accessing buffered relation data with look-ahead
*
* Code that needs to access relation data typically pins blocks one at a
* time, often in a predictable order that might be sequential or data-driven.
* Calling the simple ReadBuffer() function for each block is inefficient,
* because blocks that are not yet in the buffer pool require I/O operations
* that are small and might stall waiting for storage. This mechanism looks
* into the future and calls StartReadBuffers() and WaitReadBuffers() to read
* neighboring blocks together and ahead of time, with an adaptive look-ahead
* distance.
*
* A user-provided callback generates a stream of block numbers that is used
* to form reads of up to io_combine_limit, by attempting to merge them with a
* pending read. When that isn't possible, the existing pending read is sent
* to StartReadBuffers() so that a new one can begin to form.
*
* The algorithm for controlling the look-ahead distance tries to classify the
* stream into three ideal behaviors:
*
* A) No I/O is necessary, because the requested blocks are fully cached
* already. There is no benefit to looking ahead more than one block, so
* distance is 1. This is the default initial assumption.
*
* B) I/O is necessary, but fadvise is undesirable because the access is
* sequential, or impossible because direct I/O is enabled or the system
* doesn't support advice. There is no benefit in looking ahead more than
* io_combine_limit, because in this case only goal is larger read system
* calls. Looking further ahead would pin many buffers and perform
* speculative work looking ahead for no benefit.
*
* C) I/O is necesssary, it appears random, and this system supports fadvise.
* We'll look further ahead in order to reach the configured level of I/O
* concurrency.
*
* The distance increases rapidly and decays slowly, so that it moves towards
* those levels as different I/O patterns are discovered. For example, a
* sequential scan of fully cached data doesn't bother looking ahead, but a
* sequential scan that hits a region of uncached blocks will start issuing
* increasingly wide read calls until it plateaus at io_combine_limit.
*
* The main data structure is a circular queue of buffers of size
* max_pinned_buffers plus some extra space for technical reasons, ready to be
* returned by read_stream_next_buffer(). Each buffer also has an optional
* variable sized object that is passed from the callback to the consumer of
* buffers.
*
* Parallel to the queue of buffers, there is a circular queue of in-progress
* I/Os that have been started with StartReadBuffers(), and for which
* WaitReadBuffers() must be called before returning the buffer.
*
* For example, if the callback return block numbers 10, 42, 43, 60 in
* successive calls, then these data structures might appear as follows:
*
* buffers buf/data ios
*
* +----+ +-----+ +--------+
* | | | | +----+ 42..44 | <- oldest_io_index
* +----+ +-----+ | +--------+
* oldest_buffer_index -> | 10 | | ? | | +--+ 60..60 |
* +----+ +-----+ | | +--------+
* | 42 | | ? |<-+ | | | <- next_io_index
* +----+ +-----+ | +--------+
* | 43 | | ? | | | |
* +----+ +-----+ | +--------+
* | 44 | | ? | | | |
* +----+ +-----+ | +--------+
* | 60 | | ? |<---+
* +----+ +-----+
* next_buffer_index -> | | | |
* +----+ +-----+
*
* In the example, 5 buffers are pinned, and the next buffer to be streamed to
* the client is block 10. Block 10 was a hit and has no associated I/O, but
* the range 42..44 requires an I/O wait before its buffers are returned, as
* does block 60.
*
*
* Portions Copyright (c) 2024, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/storage/aio/read_stream.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/pg_tablespace.h"
#include "miscadmin.h"
#include "storage/fd.h"
#include "storage/smgr.h"
#include "storage/read_stream.h"
#include "utils/memdebug.h"
#include "utils/rel.h"
#include "utils/spccache.h"
typedef struct InProgressIO
{
int16 buffer_index;
ReadBuffersOperation op;
} InProgressIO;
/*
* State for managing a stream of reads.
*/
struct ReadStream
{
int16 max_ios;
int16 ios_in_progress;
int16 queue_size;
int16 max_pinned_buffers;
int16 pinned_buffers;
int16 distance;
bool advice_enabled;
/*
* Small buffer of block numbers, useful for 'ungetting' to resolve flow
* control problems when I/Os are split. Also useful for batch-loading
* block numbers in the fast path.
*/
BlockNumber blocknums[16];
int16 blocknums_count;
int16 blocknums_next;
/*
* The callback that will tell us which block numbers to read, and an
* opaque pointer that will be pass to it for its own purposes.
*/
ReadStreamBlockNumberCB callback;
void *callback_private_data;
/* Next expected block, for detecting sequential access. */
BlockNumber seq_blocknum;
/* The read operation we are currently preparing. */
BlockNumber pending_read_blocknum;
int16 pending_read_nblocks;
/* Space for buffers and optional per-buffer private data. */
size_t per_buffer_data_size;
void *per_buffer_data;
/* Read operations that have been started but not waited for yet. */
InProgressIO *ios;
int16 oldest_io_index;
int16 next_io_index;
bool fast_path;
/* Circular queue of buffers. */
int16 oldest_buffer_index; /* Next pinned buffer to return */
int16 next_buffer_index; /* Index of next buffer to pin */
Buffer buffers[FLEXIBLE_ARRAY_MEMBER];
};
/*
* Return a pointer to the per-buffer data by index.
*/
static inline void *
get_per_buffer_data(ReadStream *stream, int16 buffer_index)
{
return (char *) stream->per_buffer_data +
stream->per_buffer_data_size * buffer_index;
}
/*
* Ask the callback which block it would like us to read next, with a small
* buffer in front to allow read_stream_unget_block() to work and to allow the
* fast path to work in batches.
*/
static inline BlockNumber
read_stream_get_block(ReadStream *stream, void *per_buffer_data)
{
if (stream->blocknums_next < stream->blocknums_count)
return stream->blocknums[stream->blocknums_next++];
/*
* We only bother to fetch one at a time here (but see the fast path which
* uses more).
*/
return stream->callback(stream,
stream->callback_private_data,
per_buffer_data);
}
/*
* In order to deal with short reads in StartReadBuffers(), we sometimes need
* to defer handling of a block until later.
*/
static inline void
read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
{
if (stream->blocknums_next == stream->blocknums_count)
{
/* Never initialized or entirely consumed. Re-initialize. */
stream->blocknums[0] = blocknum;
stream->blocknums_count = 1;
stream->blocknums_next = 0;
}
else
{
/* Must be the last value return from blocknums array. */
Assert(stream->blocknums_next > 0);
stream->blocknums_next--;
Assert(stream->blocknums[stream->blocknums_next] == blocknum);
}
}
#ifndef READ_STREAM_DISABLE_FAST_PATH
static void
read_stream_fill_blocknums(ReadStream *stream)
{
BlockNumber blocknum;
int i = 0;
do
{
blocknum = stream->callback(stream,
stream->callback_private_data,
NULL);
stream->blocknums[i++] = blocknum;
} while (i < lengthof(stream->blocknums) &&
blocknum != InvalidBlockNumber);
stream->blocknums_count = i;
stream->blocknums_next = 0;
}
#endif
static void
read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
{
bool need_wait;
int nblocks;
int flags;
int16 io_index;
int16 overflow;
int16 buffer_index;
/* This should only be called with a pending read. */
Assert(stream->pending_read_nblocks > 0);
Assert(stream->pending_read_nblocks <= io_combine_limit);
/* We had better not exceed the pin limit by starting this read. */
Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
stream->max_pinned_buffers);
/* We had better not be overwriting an existing pinned buffer. */
if (stream->pinned_buffers > 0)
Assert(stream->next_buffer_index != stream->oldest_buffer_index);
else
Assert(stream->next_buffer_index == stream->oldest_buffer_index);
/*
* If advice hasn't been suppressed, this system supports it, and this
* isn't a strictly sequential pattern, then we'll issue advice.
*/
if (!suppress_advice &&
stream->advice_enabled &&
stream->pending_read_blocknum != stream->seq_blocknum)
flags = READ_BUFFERS_ISSUE_ADVICE;
else
flags = 0;
/* We say how many blocks we want to read, but may be smaller on return. */
buffer_index = stream->next_buffer_index;
io_index = stream->next_io_index;
nblocks = stream->pending_read_nblocks;
need_wait = StartReadBuffers(&stream->ios[io_index].op,
&stream->buffers[buffer_index],
stream->pending_read_blocknum,
&nblocks,
flags);
stream->pinned_buffers += nblocks;
/* Remember whether we need to wait before returning this buffer. */
if (!need_wait)
{
/* Look-ahead distance decays, no I/O necessary (behavior A). */
if (stream->distance > 1)
stream->distance--;
}
else
{
/*
* Remember to call WaitReadBuffers() before returning head buffer.
* Look-ahead distance will be adjusted after waiting.
*/
stream->ios[io_index].buffer_index = buffer_index;
if (++stream->next_io_index == stream->max_ios)
stream->next_io_index = 0;
Assert(stream->ios_in_progress < stream->max_ios);
stream->ios_in_progress++;
stream->seq_blocknum = stream->pending_read_blocknum + nblocks;
}
/*
* We gave a contiguous range of buffer space to StartReadBuffers(), but
* we want it to wrap around at queue_size. Slide overflowing buffers to
* the front of the array.
*/
overflow = (buffer_index + nblocks) - stream->queue_size;
if (overflow > 0)
memmove(&stream->buffers[0],
&stream->buffers[stream->queue_size],
sizeof(stream->buffers[0]) * overflow);
/* Compute location of start of next read, without using % operator. */
buffer_index += nblocks;
if (buffer_index >= stream->queue_size)
buffer_index -= stream->queue_size;
Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
stream->next_buffer_index = buffer_index;
/* Adjust the pending read to cover the remaining portion, if any. */
stream->pending_read_blocknum += nblocks;
stream->pending_read_nblocks -= nblocks;
}
static void
read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
{
while (stream->ios_in_progress < stream->max_ios &&
stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
{
BlockNumber blocknum;
int16 buffer_index;
void *per_buffer_data;
if (stream->pending_read_nblocks == io_combine_limit)
{
read_stream_start_pending_read(stream, suppress_advice);
suppress_advice = false;
continue;
}
/*
* See which block the callback wants next in the stream. We need to
* compute the index of the Nth block of the pending read including
* wrap-around, but we don't want to use the expensive % operator.
*/
buffer_index = stream->next_buffer_index + stream->pending_read_nblocks;
if (buffer_index >= stream->queue_size)
buffer_index -= stream->queue_size;
Assert(buffer_index >= 0 && buffer_index < stream->queue_size);
per_buffer_data = get_per_buffer_data(stream, buffer_index);
blocknum = read_stream_get_block(stream, per_buffer_data);
if (blocknum == InvalidBlockNumber)
{
/* End of stream. */
stream->distance = 0;
break;
}
/* Can we merge it with the pending read? */
if (stream->pending_read_nblocks > 0 &&
stream->pending_read_blocknum + stream->pending_read_nblocks == blocknum)
{
stream->pending_read_nblocks++;
continue;
}
/* We have to start the pending read before we can build another. */
if (stream->pending_read_nblocks > 0)
{
read_stream_start_pending_read(stream, suppress_advice);
suppress_advice = false;
if (stream->ios_in_progress == stream->max_ios)
{
/* And we've hit the limit. Rewind, and stop here. */
read_stream_unget_block(stream, blocknum);
return;
}
}
/* This is the start of a new pending read. */
stream->pending_read_blocknum = blocknum;
stream->pending_read_nblocks = 1;
}
/*
* We don't start the pending read just because we've hit the distance
* limit, preferring to give it another chance to grow to full
* io_combine_limit size once more buffers have been consumed. However,
* if we've already reached io_combine_limit, or we've reached the
* distance limit and there isn't anything pinned yet, or the callback has
* signaled end-of-stream, we start the read immediately.
*/
if (stream->pending_read_nblocks > 0 &&
(stream->pending_read_nblocks == io_combine_limit ||
(stream->pending_read_nblocks == stream->distance &&
stream->pinned_buffers == 0) ||
stream->distance == 0) &&
stream->ios_in_progress < stream->max_ios)
read_stream_start_pending_read(stream, suppress_advice);
}
/*
* Create a new read stream object that can be used to perform the equivalent
* of a series of ReadBuffer() calls for one fork of one relation.
* Internally, it generates larger vectored reads where possible by looking
* ahead. The callback should return block numbers or InvalidBlockNumber to
* signal end-of-stream, and if per_buffer_data_size is non-zero, it may also
* write extra data for each block into the space provided to it. It will
* also receive callback_private_data for its own purposes.
*/
ReadStream *
read_stream_begin_relation(int flags,
BufferAccessStrategy strategy,
Relation rel,
ForkNumber forknum,
ReadStreamBlockNumberCB callback,
void *callback_private_data,
size_t per_buffer_data_size)
{
ReadStream *stream;
size_t size;
int16 queue_size;
int16 max_ios;
uint32 max_pinned_buffers;
Oid tablespace_id;
SMgrRelation smgr;
smgr = RelationGetSmgr(rel);
/*
* Decide how many I/Os we will allow to run at the same time. That
* currently means advice to the kernel to tell it that we will soon read.
* This number also affects how far we look ahead for opportunities to
* start more I/Os.
*/
tablespace_id = smgr->smgr_rlocator.locator.spcOid;
if (!OidIsValid(MyDatabaseId) ||
IsCatalogRelation(rel) ||
IsCatalogRelationOid(smgr->smgr_rlocator.locator.relNumber))
{
/*
* Avoid circularity while trying to look up tablespace settings or
* before spccache.c is ready.
*/
max_ios = effective_io_concurrency;
}
else if (flags & READ_STREAM_MAINTENANCE)
max_ios = get_tablespace_maintenance_io_concurrency(tablespace_id);
else
max_ios = get_tablespace_io_concurrency(tablespace_id);
max_ios = Min(max_ios, PG_INT16_MAX);
/*
* Choose the maximum number of buffers we're prepared to pin. We try to
* pin fewer if we can, though. We clamp it to at least io_combine_limit
* so that we can have a chance to build up a full io_combine_limit sized
* read, even when max_ios is zero. Be careful not to allow int16 to
* overflow (even though that's not possible with the current GUC range
* limits), allowing also for the spare entry and the overflow space.
*/
max_pinned_buffers = Max(max_ios * 4, io_combine_limit);
max_pinned_buffers = Min(max_pinned_buffers,
PG_INT16_MAX - io_combine_limit - 1);
/* Don't allow this backend to pin more than its share of buffers. */
if (SmgrIsTemp(smgr))
LimitAdditionalLocalPins(&max_pinned_buffers);
else
LimitAdditionalPins(&max_pinned_buffers);
Assert(max_pinned_buffers > 0);
/*
* We need one extra entry for buffers and per-buffer data, because users
* of per-buffer data have access to the object until the next call to
* read_stream_next_buffer(), so we need a gap between the head and tail
* of the queue so that we don't clobber it.
*/
queue_size = max_pinned_buffers + 1;
/*
* Allocate the object, the buffers, the ios and per_data_data space in
* one big chunk. Though we have queue_size buffers, we want to be able
* to assume that all the buffers for a single read are contiguous (i.e.
* don't wrap around halfway through), so we allow temporary overflows of
* up to the maximum possible read size by allocating an extra
* io_combine_limit - 1 elements.
*/
size = offsetof(ReadStream, buffers);
size += sizeof(Buffer) * (queue_size + io_combine_limit - 1);
size += sizeof(InProgressIO) * Max(1, max_ios);
size += per_buffer_data_size * queue_size;
size += MAXIMUM_ALIGNOF * 2;
stream = (ReadStream *) palloc(size);
memset(stream, 0, offsetof(ReadStream, buffers));
stream->ios = (InProgressIO *)
MAXALIGN(&stream->buffers[queue_size + io_combine_limit - 1]);
if (per_buffer_data_size > 0)
stream->per_buffer_data = (void *)
MAXALIGN(&stream->ios[Max(1, max_ios)]);
#ifdef USE_PREFETCH
/*
* This system supports prefetching advice. We can use it as long as
* direct I/O isn't enabled, the caller hasn't promised sequential access
* (overriding our detection heuristics), and max_ios hasn't been set to
* zero.
*/
if ((io_direct_flags & IO_DIRECT_DATA) == 0 &&
(flags & READ_STREAM_SEQUENTIAL) == 0 &&
max_ios > 0)
stream->advice_enabled = true;
#endif
/*
* For now, max_ios = 0 is interpreted as max_ios = 1 with advice disabled
* above. If we had real asynchronous I/O we might need a slightly
* different definition.
*/
if (max_ios == 0)
max_ios = 1;
stream->max_ios = max_ios;
stream->per_buffer_data_size = per_buffer_data_size;
stream->max_pinned_buffers = max_pinned_buffers;
stream->queue_size = queue_size;
stream->callback = callback;
stream->callback_private_data = callback_private_data;
/*
* Skip the initial ramp-up phase if the caller says we're going to be
* reading the whole relation. This way we start out assuming we'll be
* doing full io_combine_limit sized reads (behavior B).
*/
if (flags & READ_STREAM_FULL)
stream->distance = Min(max_pinned_buffers, io_combine_limit);
else
stream->distance = 1;
/*
* Since we always always access the same relation, we can initialize
* parts of the ReadBuffersOperation objects and leave them that way, to
* avoid wasting CPU cycles writing to them for each read.
*/
for (int i = 0; i < max_ios; ++i)
{
stream->ios[i].op.rel = rel;
stream->ios[i].op.smgr = RelationGetSmgr(rel);
stream->ios[i].op.smgr_persistence = 0;
stream->ios[i].op.forknum = forknum;
stream->ios[i].op.strategy = strategy;
}
return stream;
}
/*
* Pull one pinned buffer out of a stream. Each call returns successive
* blocks in the order specified by the callback. If per_buffer_data_size was
* set to a non-zero size, *per_buffer_data receives a pointer to the extra
* per-buffer data that the callback had a chance to populate, which remains
* valid until the next call to read_stream_next_buffer(). When the stream
* runs out of data, InvalidBuffer is returned. The caller may decide to end
* the stream early at any time by calling read_stream_end().
*/
Buffer
read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
{
Buffer buffer;
int16 oldest_buffer_index;
#ifndef READ_STREAM_DISABLE_FAST_PATH
/*
* A fast path for all-cached scans (behavior A). This is the same as the
* usual algorithm, but it is specialized for no I/O and no per-buffer
* data, so we can skip the queue management code, stay in the same buffer
* slot and use singular StartReadBuffer().
*/
if (likely(stream->fast_path))
{
BlockNumber next_blocknum;
bool need_wait;
/* Fast path assumptions. */
Assert(stream->ios_in_progress == 0);
Assert(stream->pinned_buffers == 1);
Assert(stream->distance == 1);
Assert(stream->pending_read_nblocks == 1);
Assert(stream->per_buffer_data_size == 0);
/* We're going to return the buffer we pinned last time. */
oldest_buffer_index = stream->oldest_buffer_index;
Assert((oldest_buffer_index + 1) % stream->queue_size ==
stream->next_buffer_index);
buffer = stream->buffers[oldest_buffer_index];
Assert(buffer != InvalidBuffer);
/*
* Pin a buffer for the next call. Same buffer entry, and arbitrary
* I/O entry (they're all free).
*/
need_wait = StartReadBuffer(&stream->ios[0].op,
&stream->buffers[oldest_buffer_index],
stream->pending_read_blocknum,
stream->advice_enabled ?
READ_BUFFERS_ISSUE_ADVICE : 0);
/* Choose the block the next call will pin. */
if (unlikely(stream->blocknums_next == stream->blocknums_count))
read_stream_fill_blocknums(stream);
next_blocknum = stream->blocknums[stream->blocknums_next++];
/*
* Fast return if the next call doesn't require I/O for the buffer we
* just pinned, and we have a block number to give it as a pending
* read.
*/
if (likely(!need_wait && next_blocknum != InvalidBlockNumber))
{
stream->pending_read_blocknum = next_blocknum;
return buffer;
}
/*
* For anything more complex, set up some more state and take the slow
* path next time.
*/
stream->fast_path = false;
if (need_wait)
{
/* Next call must wait for I/O for the newly pinned buffer. */
stream->oldest_io_index = 0;
stream->next_io_index = stream->max_ios > 1 ? 1 : 0;
stream->ios_in_progress = 1;
stream->ios[0].buffer_index = oldest_buffer_index;
stream->seq_blocknum = next_blocknum + 1;
}
if (next_blocknum == InvalidBlockNumber)
{
/* Next call hits end of stream and can't pin anything more. */
stream->distance = 0;
stream->pending_read_nblocks = 0;
}
else
{
/* Set up the pending read. */
stream->pending_read_blocknum = next_blocknum;
}
return buffer;
}
#endif
if (unlikely(stream->pinned_buffers == 0))
{
Assert(stream->oldest_buffer_index == stream->next_buffer_index);
/* End of stream reached? */
if (stream->distance == 0)
return InvalidBuffer;
/*
* The usual order of operations is that we look ahead at the bottom
* of this function after potentially finishing an I/O and making
* space for more, but if we're just starting up we'll need to crank
* the handle to get started.
*/
read_stream_look_ahead(stream, true);
/* End of stream reached? */
if (stream->pinned_buffers == 0)
{
Assert(stream->distance == 0);
return InvalidBuffer;
}
}
/* Grab the oldest pinned buffer and associated per-buffer data. */
Assert(stream->pinned_buffers > 0);
oldest_buffer_index = stream->oldest_buffer_index;
Assert(oldest_buffer_index >= 0 &&
oldest_buffer_index < stream->queue_size);
buffer = stream->buffers[oldest_buffer_index];
if (per_buffer_data)
*per_buffer_data = get_per_buffer_data(stream, oldest_buffer_index);
Assert(BufferIsValid(buffer));
/* Do we have to wait for an associated I/O first? */
if (stream->ios_in_progress > 0 &&
stream->ios[stream->oldest_io_index].buffer_index == oldest_buffer_index)
{
int16 io_index = stream->oldest_io_index;
int16 distance;
/* Sanity check that we still agree on the buffers. */
Assert(stream->ios[io_index].op.buffers ==
&stream->buffers[oldest_buffer_index]);
WaitReadBuffers(&stream->ios[io_index].op);
Assert(stream->ios_in_progress > 0);
stream->ios_in_progress--;
if (++stream->oldest_io_index == stream->max_ios)
stream->oldest_io_index = 0;
if (stream->ios[io_index].op.flags & READ_BUFFERS_ISSUE_ADVICE)
{
/* Distance ramps up fast (behavior C). */
distance = stream->distance * 2;
distance = Min(distance, stream->max_pinned_buffers);
stream->distance = distance;
}
else
{
/* No advice; move towards io_combine_limit (behavior B). */
if (stream->distance > io_combine_limit)
{
stream->distance--;
}
else
{
distance = stream->distance * 2;
distance = Min(distance, io_combine_limit);
distance = Min(distance, stream->max_pinned_buffers);
stream->distance = distance;
}
}
}
#ifdef CLOBBER_FREED_MEMORY
/* Clobber old buffer and per-buffer data for debugging purposes. */
stream->buffers[oldest_buffer_index] = InvalidBuffer;
/*
* The caller will get access to the per-buffer data, until the next call.
* We wipe the one before, which is never occupied because queue_size
* allowed one extra element. This will hopefully trip up client code
* that is holding a dangling pointer to it.
*/
if (stream->per_buffer_data)
wipe_mem(get_per_buffer_data(stream,
oldest_buffer_index == 0 ?
stream->queue_size - 1 :
oldest_buffer_index - 1),
stream->per_buffer_data_size);
#endif
/* Pin transferred to caller. */
Assert(stream->pinned_buffers > 0);
stream->pinned_buffers--;
/* Advance oldest buffer, with wrap-around. */
stream->oldest_buffer_index++;
if (stream->oldest_buffer_index == stream->queue_size)
stream->oldest_buffer_index = 0;
/* Prepare for the next call. */
read_stream_look_ahead(stream, false);
#ifndef READ_STREAM_DISABLE_FAST_PATH
/* See if we can take the fast path for all-cached scans next time. */
if (stream->ios_in_progress == 0 &&
stream->pinned_buffers == 1 &&
stream->distance == 1 &&
stream->pending_read_nblocks == 1 &&
stream->per_buffer_data_size == 0)
{
stream->fast_path = true;
}
else
{
stream->fast_path = false;
}
#endif
return buffer;
}
/*
* Reset a read stream by releasing any queued up buffers, allowing the stream
* to be used again for different blocks. This can be used to clear an
* end-of-stream condition and start again, or to throw away blocks that were
* speculatively read and read some different blocks instead.
*/
void
read_stream_reset(ReadStream *stream)
{
Buffer buffer;
/* Stop looking ahead. */
stream->distance = 0;
/* Unpin anything that wasn't consumed. */
while ((buffer = read_stream_next_buffer(stream, NULL)) != InvalidBuffer)
ReleaseBuffer(buffer);
Assert(stream->pinned_buffers == 0);
Assert(stream->ios_in_progress == 0);
/* Start off assuming data is cached. */
stream->distance = 1;
}
/*
* Release and free a read stream.
*/
void
read_stream_end(ReadStream *stream)
{
read_stream_reset(stream);
pfree(stream);
}

View File

@ -1,5 +1,6 @@
# Copyright (c) 2022-2024, PostgreSQL Global Development Group
subdir('aio')
subdir('buffer')
subdir('file')
subdir('freespace')

View File

@ -0,0 +1,63 @@
/*-------------------------------------------------------------------------
*
* read_stream.h
* Mechanism for accessing buffered relation data with look-ahead
*
*
* Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* src/include/storage/read_stream.h
*
*-------------------------------------------------------------------------
*/
#ifndef READ_STREAM_H
#define READ_STREAM_H
#include "storage/bufmgr.h"
/* Default tuning, reasonable for many users. */
#define READ_STREAM_DEFAULT 0x00
/*
* I/O streams that are performing maintenance work on behalf of potentially
* many users, and thus should be governed by maintenance_io_concurrency
* instead of effective_io_concurrency. For example, VACUUM or CREATE INDEX.
*/
#define READ_STREAM_MAINTENANCE 0x01
/*
* We usually avoid issuing prefetch advice automatically when sequential
* access is detected, but this flag explicitly disables it, for cases that
* might not be correctly detected. Explicit advice is known to perform worse
* than letting the kernel (at least Linux) detect sequential access.
*/
#define READ_STREAM_SEQUENTIAL 0x02
/*
* We usually ramp up from smaller reads to larger ones, to support users who
* don't know if it's worth reading lots of buffers yet. This flag disables
* that, declaring ahead of time that we'll be reading all available buffers.
*/
#define READ_STREAM_FULL 0x04
struct ReadStream;
typedef struct ReadStream ReadStream;
/* Callback that returns the next block number to read. */
typedef BlockNumber (*ReadStreamBlockNumberCB) (ReadStream *stream,
void *callback_private_data,
void *per_buffer_data);
extern ReadStream *read_stream_begin_relation(int flags,
BufferAccessStrategy strategy,
Relation rel,
ForkNumber forknum,
ReadStreamBlockNumberCB callback,
void *callback_private_data,
size_t per_buffer_data_size);
extern Buffer read_stream_next_buffer(ReadStream *stream, void **per_buffer_private);
extern void read_stream_reset(ReadStream *stream);
extern void read_stream_end(ReadStream *stream);
#endif /* READ_STREAM_H */

View File

@ -1215,6 +1215,7 @@ InjectionPointCacheEntry
InjectionPointEntry
InjectionPointSharedState
InlineCodeBlock
InProgressIO
InsertStmt
Instrumentation
Int128AggState
@ -2295,6 +2296,7 @@ ReadExtraTocPtrType
ReadFunc
ReadLocalXLogPageNoWaitPrivate
ReadReplicationSlotCmd
ReadStream
ReassignOwnedStmt
RecheckForeignScan_function
RecordCacheArrayEntry