postgresql/src/backend/access/heap/heapam_handler.c

535 lines
15 KiB
C

/*-------------------------------------------------------------------------
*
* heapam_handler.c
* heap table access method code
*
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
* src/backend/access/heap/heapam_handler.c
*
*
* NOTES
* This files wires up the lower level heapam.c et routines with the
* tableam abstraction.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/tableam.h"
#include "access/xact.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
static const TableAmRoutine heapam_methods;
/* ------------------------------------------------------------------------
* Slot related callbacks for heap AM
* ------------------------------------------------------------------------
*/
static const TupleTableSlotOps *
heapam_slot_callbacks(Relation relation)
{
return &TTSOpsBufferHeapTuple;
}
/* ------------------------------------------------------------------------
* Index Scan Callbacks for heap AM
* ------------------------------------------------------------------------
*/
static IndexFetchTableData *
heapam_index_fetch_begin(Relation rel)
{
IndexFetchHeapData *hscan = palloc0(sizeof(IndexFetchHeapData));
hscan->xs_base.rel = rel;
hscan->xs_cbuf = InvalidBuffer;
return &hscan->xs_base;
}
static void
heapam_index_fetch_reset(IndexFetchTableData *scan)
{
IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
if (BufferIsValid(hscan->xs_cbuf))
{
ReleaseBuffer(hscan->xs_cbuf);
hscan->xs_cbuf = InvalidBuffer;
}
}
static void
heapam_index_fetch_end(IndexFetchTableData *scan)
{
IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
heapam_index_fetch_reset(scan);
pfree(hscan);
}
static bool
heapam_index_fetch_tuple(struct IndexFetchTableData *scan,
ItemPointer tid,
Snapshot snapshot,
TupleTableSlot *slot,
bool *call_again, bool *all_dead)
{
IndexFetchHeapData *hscan = (IndexFetchHeapData *) scan;
BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
bool got_heap_tuple;
Assert(TTS_IS_BUFFERTUPLE(slot));
/* We can skip the buffer-switching logic if we're in mid-HOT chain. */
if (!*call_again)
{
/* Switch to correct buffer if we don't have it already */
Buffer prev_buf = hscan->xs_cbuf;
hscan->xs_cbuf = ReleaseAndReadBuffer(hscan->xs_cbuf,
hscan->xs_base.rel,
ItemPointerGetBlockNumber(tid));
/*
* Prune page, but only if we weren't already on this page
*/
if (prev_buf != hscan->xs_cbuf)
heap_page_prune_opt(hscan->xs_base.rel, hscan->xs_cbuf);
}
/* Obtain share-lock on the buffer so we can examine visibility */
LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_SHARE);
got_heap_tuple = heap_hot_search_buffer(tid,
hscan->xs_base.rel,
hscan->xs_cbuf,
snapshot,
&bslot->base.tupdata,
all_dead,
!*call_again);
bslot->base.tupdata.t_self = *tid;
LockBuffer(hscan->xs_cbuf, BUFFER_LOCK_UNLOCK);
if (got_heap_tuple)
{
/*
* Only in a non-MVCC snapshot can more than one member of the HOT
* chain be visible.
*/
*call_again = !IsMVCCSnapshot(snapshot);
slot->tts_tableOid = RelationGetRelid(scan->rel);
ExecStoreBufferHeapTuple(&bslot->base.tupdata, slot, hscan->xs_cbuf);
}
else
{
/* We've reached the end of the HOT chain. */
*call_again = false;
}
return got_heap_tuple;
}
/* ------------------------------------------------------------------------
* Callbacks for non-modifying operations on individual tuples for heap AM
* ------------------------------------------------------------------------
*/
static bool
heapam_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
Snapshot snapshot)
{
BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
bool res;
Assert(TTS_IS_BUFFERTUPLE(slot));
Assert(BufferIsValid(bslot->buffer));
/*
* We need buffer pin and lock to call HeapTupleSatisfiesVisibility.
* Caller should be holding pin, but not lock.
*/
LockBuffer(bslot->buffer, BUFFER_LOCK_SHARE);
res = HeapTupleSatisfiesVisibility(bslot->base.tuple, snapshot,
bslot->buffer);
LockBuffer(bslot->buffer, BUFFER_LOCK_UNLOCK);
return res;
}
/* ----------------------------------------------------------------------------
* Functions for manipulations of physical tuples for heap AM.
* ----------------------------------------------------------------------------
*/
static void
heapam_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
int options, BulkInsertState bistate)
{
bool shouldFree = true;
HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
/* Update the tuple with table oid */
slot->tts_tableOid = RelationGetRelid(relation);
tuple->t_tableOid = slot->tts_tableOid;
/* Perform the insertion, and copy the resulting ItemPointer */
heap_insert(relation, tuple, cid, options, bistate);
ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
if (shouldFree)
pfree(tuple);
}
static void
heapam_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, CommandId cid,
int options, BulkInsertState bistate, uint32 specToken)
{
bool shouldFree = true;
HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
/* Update the tuple with table oid */
slot->tts_tableOid = RelationGetRelid(relation);
tuple->t_tableOid = slot->tts_tableOid;
HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken);
options |= HEAP_INSERT_SPECULATIVE;
/* Perform the insertion, and copy the resulting ItemPointer */
heap_insert(relation, tuple, cid, options, bistate);
ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
if (shouldFree)
pfree(tuple);
}
static void
heapam_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, uint32 spekToken,
bool succeeded)
{
bool shouldFree = true;
HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
/* adjust the tuple's state accordingly */
if (!succeeded)
heap_finish_speculative(relation, &slot->tts_tid);
else
heap_abort_speculative(relation, &slot->tts_tid);
if (shouldFree)
pfree(tuple);
}
static TM_Result
heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
Snapshot snapshot, Snapshot crosscheck, bool wait,
TM_FailureData *tmfd, bool changingPart)
{
/*
* Currently Deleting of index tuples are handled at vacuum, in case if
* the storage itself is cleaning the dead tuples by itself, it is the
* time to call the index tuple deletion also.
*/
return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart);
}
static TM_Result
heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
CommandId cid, Snapshot snapshot, Snapshot crosscheck,
bool wait, TM_FailureData *tmfd,
LockTupleMode *lockmode, bool *update_indexes)
{
bool shouldFree = true;
HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
TM_Result result;
/* Update the tuple with table oid */
slot->tts_tableOid = RelationGetRelid(relation);
tuple->t_tableOid = slot->tts_tableOid;
result = heap_update(relation, otid, tuple, cid, crosscheck, wait,
tmfd, lockmode);
ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
/*
* Decide whether new index entries are needed for the tuple
*
* Note: heap_update returns the tid (location) of the new tuple in the
* t_self field.
*
* If it's a HOT update, we mustn't insert new index entries.
*/
*update_indexes = result == TM_Ok && !HeapTupleIsHeapOnly(tuple);
if (shouldFree)
pfree(tuple);
return result;
}
static TM_Result
heapam_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
LockWaitPolicy wait_policy, uint8 flags,
TM_FailureData *tmfd)
{
BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot;
TM_Result result;
Buffer buffer;
HeapTuple tuple = &bslot->base.tupdata;
bool follow_updates;
follow_updates = (flags & TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS) != 0;
tmfd->traversed = false;
Assert(TTS_IS_BUFFERTUPLE(slot));
tuple_lock_retry:
tuple->t_self = *tid;
result = heap_lock_tuple(relation, tuple, cid, mode, wait_policy,
follow_updates, &buffer, tmfd);
if (result == TM_Updated &&
(flags & TUPLE_LOCK_FLAG_FIND_LAST_VERSION))
{
ReleaseBuffer(buffer);
/* Should not encounter speculative tuple on recheck */
Assert(!HeapTupleHeaderIsSpeculative(tuple->t_data));
if (!ItemPointerEquals(&tmfd->ctid, &tuple->t_self))
{
SnapshotData SnapshotDirty;
TransactionId priorXmax;
/* it was updated, so look at the updated version */
*tid = tmfd->ctid;
/* updated row should have xmin matching this xmax */
priorXmax = tmfd->xmax;
/* signal that a tuple later in the chain is getting locked */
tmfd->traversed = true;
/*
* fetch target tuple
*
* Loop here to deal with updated or busy tuples
*/
InitDirtySnapshot(SnapshotDirty);
for (;;)
{
if (ItemPointerIndicatesMovedPartitions(tid))
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("tuple to be locked was already moved to another partition due to concurrent update")));
tuple->t_self = *tid;
if (heap_fetch(relation, &SnapshotDirty, tuple, &buffer, NULL))
{
/*
* If xmin isn't what we're expecting, the slot must have
* been recycled and reused for an unrelated tuple. This
* implies that the latest version of the row was deleted,
* so we need do nothing. (Should be safe to examine xmin
* without getting buffer's content lock. We assume
* reading a TransactionId to be atomic, and Xmin never
* changes in an existing tuple, except to invalid or
* frozen, and neither of those can match priorXmax.)
*/
if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data),
priorXmax))
{
ReleaseBuffer(buffer);
return TM_Deleted;
}
/* otherwise xmin should not be dirty... */
if (TransactionIdIsValid(SnapshotDirty.xmin))
elog(ERROR, "t_xmin is uncommitted in tuple to be updated");
/*
* If tuple is being updated by other transaction then we
* have to wait for its commit/abort, or die trying.
*/
if (TransactionIdIsValid(SnapshotDirty.xmax))
{
ReleaseBuffer(buffer);
switch (wait_policy)
{
case LockWaitBlock:
XactLockTableWait(SnapshotDirty.xmax,
relation, &tuple->t_self,
XLTW_FetchUpdated);
break;
case LockWaitSkip:
if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
/* skip instead of waiting */
return TM_WouldBlock;
break;
case LockWaitError:
if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("could not obtain lock on row in relation \"%s\"",
RelationGetRelationName(relation))));
break;
}
continue; /* loop back to repeat heap_fetch */
}
/*
* If tuple was inserted by our own transaction, we have
* to check cmin against cid: cmin >= current CID means
* our command cannot see the tuple, so we should ignore
* it. Otherwise heap_lock_tuple() will throw an error,
* and so would any later attempt to update or delete the
* tuple. (We need not check cmax because
* HeapTupleSatisfiesDirty will consider a tuple deleted
* by our transaction dead, regardless of cmax.) We just
* checked that priorXmax == xmin, so we can test that
* variable instead of doing HeapTupleHeaderGetXmin again.
*/
if (TransactionIdIsCurrentTransactionId(priorXmax) &&
HeapTupleHeaderGetCmin(tuple->t_data) >= cid)
{
ReleaseBuffer(buffer);
return TM_Invisible;
}
/*
* This is a live tuple, so try to lock it again.
*/
ReleaseBuffer(buffer);
goto tuple_lock_retry;
}
/*
* If the referenced slot was actually empty, the latest
* version of the row must have been deleted, so we need do
* nothing.
*/
if (tuple->t_data == NULL)
{
return TM_Deleted;
}
/*
* As above, if xmin isn't what we're expecting, do nothing.
*/
if (!TransactionIdEquals(HeapTupleHeaderGetXmin(tuple->t_data),
priorXmax))
{
if (BufferIsValid(buffer))
ReleaseBuffer(buffer);
return TM_Deleted;
}
/*
* If we get here, the tuple was found but failed
* SnapshotDirty. Assuming the xmin is either a committed xact
* or our own xact (as it certainly should be if we're trying
* to modify the tuple), this must mean that the row was
* updated or deleted by either a committed xact or our own
* xact. If it was deleted, we can ignore it; if it was
* updated then chain up to the next version and repeat the
* whole process.
*
* As above, it should be safe to examine xmax and t_ctid
* without the buffer content lock, because they can't be
* changing.
*/
if (ItemPointerEquals(&tuple->t_self, &tuple->t_data->t_ctid))
{
/* deleted, so forget about it */
if (BufferIsValid(buffer))
ReleaseBuffer(buffer);
return TM_Deleted;
}
/* updated, so look at the updated row */
*tid = tuple->t_data->t_ctid;
/* updated row should have xmin matching this xmax */
priorXmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
if (BufferIsValid(buffer))
ReleaseBuffer(buffer);
/* loop back to fetch next in chain */
}
}
else
{
/* tuple was deleted, so give up */
return TM_Deleted;
}
}
slot->tts_tableOid = RelationGetRelid(relation);
tuple->t_tableOid = slot->tts_tableOid;
/* store in slot, transferring existing pin */
ExecStorePinnedBufferHeapTuple(tuple, slot, buffer);
return result;
}
/* ------------------------------------------------------------------------
* Definition of the heap table access method.
* ------------------------------------------------------------------------
*/
static const TableAmRoutine heapam_methods = {
.type = T_TableAmRoutine,
.slot_callbacks = heapam_slot_callbacks,
.scan_begin = heap_beginscan,
.scan_end = heap_endscan,
.scan_rescan = heap_rescan,
.scan_getnextslot = heap_getnextslot,
.parallelscan_estimate = table_block_parallelscan_estimate,
.parallelscan_initialize = table_block_parallelscan_initialize,
.parallelscan_reinitialize = table_block_parallelscan_reinitialize,
.index_fetch_begin = heapam_index_fetch_begin,
.index_fetch_reset = heapam_index_fetch_reset,
.index_fetch_end = heapam_index_fetch_end,
.index_fetch_tuple = heapam_index_fetch_tuple,
.tuple_insert = heapam_tuple_insert,
.tuple_insert_speculative = heapam_tuple_insert_speculative,
.tuple_complete_speculative = heapam_tuple_complete_speculative,
.tuple_delete = heapam_tuple_delete,
.tuple_update = heapam_tuple_update,
.tuple_lock = heapam_tuple_lock,
.tuple_satisfies_snapshot = heapam_tuple_satisfies_snapshot,
};
const TableAmRoutine *
GetHeapamTableAmRoutine(void)
{
return &heapam_methods;
}
Datum
heap_tableam_handler(PG_FUNCTION_ARGS)
{
PG_RETURN_POINTER(&heapam_methods);
}