Serialized mode works!

This commit is contained in:
Vadim B. Mikheev 1998-12-16 11:53:55 +00:00
parent 54c3e65242
commit c13a64d7fb
9 changed files with 209 additions and 47 deletions

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/transam.c,v 1.20 1998/12/15 12:45:30 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/access/transam/transam.c,v 1.21 1998/12/16 11:53:44 vadim Exp $
*
* NOTES
* This file contains the high level access-method interface to the
@ -172,12 +172,8 @@ TransactionLogTest(TransactionId transactionId, /* transaction id to test */
if (!fail)
{
/* must not cache status of running xaction !!! */
if (xidstatus != XID_INPROGRESS)
{
TransactionIdStore(transactionId, &cachedTestXid);
cachedTestXidStatus = xidstatus;
}
TransactionIdStore(transactionId, &cachedTestXid);
cachedTestXidStatus = xidstatus;
return (bool)
(status == xidstatus);
}
@ -230,11 +226,8 @@ TransactionLogUpdate(TransactionId transactionId, /* trans id to update */
*
* What's the hell ?! Why != XID_COMMIT ?!
*/
if (status != XID_INPROGRESS)
{
TransactionIdStore(transactionId, &cachedTestXid);
cachedTestXidStatus = status;
}
TransactionIdStore(transactionId, &cachedTestXid);
cachedTestXidStatus = status;
}
@ -588,14 +581,11 @@ TransactionIdAbort(TransactionId transactionId)
TransactionLogUpdate(transactionId, XID_ABORT);
}
#ifdef NOT_USED
void
TransactionIdSetInProgress(TransactionId transactionId)
TransactionIdFlushCache()
{
if (AMI_OVERRIDE)
return;
TransactionLogUpdate(transactionId, XID_INPROGRESS);
TransactionIdStore(AmiTransactionId, &cachedTestXid);
cachedTestXidStatus = XID_COMMIT;
}
#endif

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.26 1998/12/15 12:45:35 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.27 1998/12/16 11:53:44 vadim Exp $
*
* NOTES
* Transaction aborts can now occur two ways:
@ -516,6 +516,9 @@ CommandCounterIncrement()
/* make cache changes visible to me */
AtCommit_Cache();
AtStart_Cache();
TransactionIdFlushCache();
}
void
@ -793,6 +796,9 @@ StartTransaction()
{
TransactionState s = CurrentTransactionState;
TransactionIdFlushCache();
FreeXactSnapshot();
/* ----------------
* Check the current transaction state. If the transaction system
* is switched off, or if we're already in a transaction, do nothing.

View File

@ -26,7 +26,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.60 1998/12/15 12:46:04 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/executor/execMain.c,v 1.61 1998/12/16 11:53:45 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -124,7 +124,7 @@ ExecutorStart(QueryDesc *queryDesc, EState *estate)
memset(estate->es_param_exec_vals, 0, queryDesc->plantree->nParamExec * sizeof(ParamExecData));
}
estate->es_snapshot = SnapshotNow;
estate->es_snapshot = QuerySnapshot;
result = InitPlan(queryDesc->operation,
queryDesc->parsetree,

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/shmem.c,v 1.32 1998/12/15 12:46:24 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/ipc/shmem.c,v 1.33 1998/12/16 11:53:46 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -633,27 +633,21 @@ TransactionIdIsInProgress(TransactionId xid)
/*
* GetSnapshotData -- returns information about running transactions.
*
* InvalidTransactionId is used as terminator in snapshot->xip array.
* If serialized is true then XID >= current xact ID will not be
* placed in array. Current xact ID are never placed there (just
* to reduce its length, xmin/xmax may be equal to cid).
* MyProc->xmin will be setted if equal to InvalidTransactionId.
*
* Yet another strange func for this place... - vadim 07/21/98
*/
Snapshot
GetSnapshotData(bool serialized)
GetSnapshotData(void)
{
Snapshot snapshot = (Snapshot) malloc(sizeof(SnapshotData));
ShmemIndexEnt *result;
PROC *proc;
TransactionId cid = GetCurrentTransactionId();
uint32 count = 0;
uint32 have = 31;
uint32 have = 32;
Assert(ShmemIndex);
snapshot->xip = (TransactionId *) malloc(32 * sizeof(TransactionId));
snapshot->xip = (TransactionId *) malloc(have * sizeof(TransactionId));
snapshot->xmax = cid;
snapshot->xmin = cid;
@ -667,15 +661,14 @@ GetSnapshotData(bool serialized)
if (MyProc->xmin == InvalidTransactionId)
MyProc->xmin = snapshot->xmin;
SpinRelease(ShmemIndexLock);
snapshot->xip[count] = InvalidTransactionId;
snapshot->xcnt = count;
return snapshot;
}
if (result->location == INVALID_OFFSET ||
strncmp(result->key, "PID ", 4) != 0)
continue;
proc = (PROC *) MAKE_PTR(result->location);
if (proc == MyProc || proc->xid < FirstTransactionId ||
(serialized && proc->xid >= cid))
if (proc == MyProc || proc->xid < FirstTransactionId)
continue;
if (proc->xid < snapshot->xmin)
snapshot->xmin = proc->xid;
@ -684,7 +677,7 @@ GetSnapshotData(bool serialized)
if (have == 0)
{
snapshot->xip = (TransactionId *) realloc(snapshot->xip,
(count + 33) * sizeof(TransactionId));
(count + 32) * sizeof(TransactionId));
have = 32;
}
snapshot->xip[count] = proc->xid;

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.20 1998/12/15 12:46:30 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.21 1998/12/16 11:53:48 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -325,6 +325,7 @@ XactLockTableWait(TransactionId xid)
LockAcquire(LockTableId, &tag, ShareLock);
TransactionIdFlushCache();
/*
* Transaction was committed/aborted/crashed -
* we have to update pg_log if transaction is still

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.94 1998/10/16 06:05:13 momjian Exp $
* $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.95 1998/12/16 11:53:52 vadim Exp $
*
* NOTES
* this is the "main" module of the postgres backend and
@ -786,9 +786,10 @@ pg_exec_query_dest(char *query_string, /* string to execute */
}
#endif
/* ----------------
SetQuerySnapshot();
/*
* execute the plan
*
*/
if (ShowExecutorStats)
ResetUsage();
@ -1519,7 +1520,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[])
if (!IsUnderPostmaster)
{
puts("\nPOSTGRES backend interactive interface ");
puts("$Revision: 1.94 $ $Date: 1998/10/16 06:05:13 $\n");
puts("$Revision: 1.95 $ $Date: 1998/12/16 11:53:52 $\n");
}
/* ----------------

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/time/tqual.c,v 1.21 1998/12/15 12:46:40 vadim Exp $
* $Header: /cvsroot/pgsql/src/backend/utils/time/tqual.c,v 1.22 1998/12/16 11:53:55 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -29,6 +29,9 @@ extern bool PostgresIsInitialized;
SnapshotData SnapshotDirtyData;
Snapshot SnapshotDirty = &SnapshotDirtyData;
Snapshot QuerySnapshot = NULL;
static Snapshot SerializedSnapshot = NULL;
/*
* XXX Transaction system override hacks start here
*/
@ -436,3 +439,159 @@ HeapTupleSatisfiesDirty(HeapTupleHeader tuple)
return false; /* updated by other */
}
bool
HeapTupleSatisfiesSnapshot(HeapTupleHeader tuple, Snapshot snapshot)
{
if (AMI_OVERRIDE)
return true;
if (!(tuple->t_infomask & HEAP_XMIN_COMMITTED))
{
if (tuple->t_infomask & HEAP_XMIN_INVALID) /* xid invalid or
* aborted */
return false;
if (TransactionIdIsCurrentTransactionId(tuple->t_xmin))
{
if (CommandIdGEScanCommandId(tuple->t_cmin))
return false; /* inserted after scan started */
if (tuple->t_infomask & HEAP_XMAX_INVALID) /* xid invalid */
return true;
Assert(TransactionIdIsCurrentTransactionId(tuple->t_xmax));
if (tuple->t_infomask & HEAP_MARKED_FOR_UPDATE)
return true;
if (CommandIdGEScanCommandId(tuple->t_cmax))
return true; /* deleted after scan started */
else
return false; /* deleted before scan started */
}
/*
* this call is VERY expensive - requires a log table lookup.
*/
if (!TransactionIdDidCommit(tuple->t_xmin))
{
if (TransactionIdDidAbort(tuple->t_xmin))
tuple->t_infomask |= HEAP_XMIN_INVALID; /* aborted */
return false;
}
tuple->t_infomask |= HEAP_XMIN_COMMITTED;
}
/*
* By here, the inserting transaction has committed -
* have to check when...
*/
if (tuple->t_xmin >= snapshot->xmax)
return false;
if (tuple->t_xmin >= snapshot->xmin)
{
uint32 i;
for (i = 0; i < snapshot->xcnt; i++)
{
if (tuple->t_xmin == snapshot->xip[i])
return false;
}
}
if (tuple->t_infomask & HEAP_XMAX_INVALID) /* xid invalid or aborted */
return true;
if (tuple->t_infomask & HEAP_MARKED_FOR_UPDATE)
return true;
if (!(tuple->t_infomask & HEAP_XMAX_COMMITTED))
{
if (TransactionIdIsCurrentTransactionId(tuple->t_xmax))
{
if (CommandIdGEScanCommandId(tuple->t_cmax))
return true; /* deleted after scan started */
else
return false; /* deleted before scan started */
}
if (!TransactionIdDidCommit(tuple->t_xmax))
{
if (TransactionIdDidAbort(tuple->t_xmax))
tuple->t_infomask |= HEAP_XMAX_INVALID; /* aborted */
return true;
}
/* xmax transaction committed */
tuple->t_infomask |= HEAP_XMAX_COMMITTED;
}
if (tuple->t_xmax >= snapshot->xmax)
return true;
if (tuple->t_xmax >= snapshot->xmin)
{
uint32 i;
for (i = 0; i < snapshot->xcnt; i++)
{
if (tuple->t_xmax == snapshot->xip[i])
return true;
}
}
return false;
}
void
SetQuerySnapshot(void)
{
/* 1st call in xaction */
if (SerializedSnapshot == NULL)
{
SerializedSnapshot = GetSnapshotData();
QuerySnapshot = SerializedSnapshot;
Assert(QuerySnapshot != NULL);
return;
}
if (QuerySnapshot != SerializedSnapshot)
{
free(QuerySnapshot->xip);
free(QuerySnapshot);
}
if (XactIsoLevel == XACT_SERIALIZED)
QuerySnapshot = SerializedSnapshot;
else
QuerySnapshot = GetSnapshotData();
Assert(QuerySnapshot != NULL);
}
void
FreeXactSnapshot(void)
{
if (QuerySnapshot != NULL && QuerySnapshot != SerializedSnapshot)
{
free(QuerySnapshot->xip);
free(QuerySnapshot);
}
QuerySnapshot = NULL;
if (SerializedSnapshot != NULL)
{
free(SerializedSnapshot->xip);
free(SerializedSnapshot);
}
SerializedSnapshot = NULL;
}

View File

@ -6,7 +6,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
* $Id: transam.h,v 1.16 1998/09/01 04:34:31 momjian Exp $
* $Id: transam.h,v 1.17 1998/12/16 11:52:10 vadim Exp $
*
* NOTES
* Transaction System Version 101 now support proper oid
@ -145,6 +145,7 @@ extern bool TransactionIdDidCommit(TransactionId transactionId);
extern bool TransactionIdDidAbort(TransactionId transactionId);
extern void TransactionIdCommit(TransactionId transactionId);
extern void TransactionIdAbort(TransactionId transactionId);
extern void TransactionIdFlushCache(void);
/* in transam/transsup.c */
extern void AmiTransactionOverride(bool flag);

View File

@ -7,7 +7,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
* $Id: tqual.h,v 1.16 1998/12/15 12:47:01 vadim Exp $
* $Id: tqual.h,v 1.17 1998/12/16 11:52:11 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@ -19,7 +19,8 @@
typedef struct SnapshotData
{
TransactionId xmin; /* XID < xmin are visible to me */
TransactionId xmax; /* XID > xmax are invisible to me */
TransactionId xmax; /* XID >= xmax are invisible to me */
uint32 xcnt; /* # of xact below */
TransactionId *xip; /* array of xacts in progress */
} SnapshotData;
@ -27,7 +28,9 @@ typedef SnapshotData *Snapshot;
#define SnapshotNow ((Snapshot) 0x0)
#define SnapshotSelf ((Snapshot) 0x1)
extern Snapshot SnapshotDirty;
extern Snapshot QuerySnapshot;
#define IsSnapshotNow(snapshot) ((Snapshot) snapshot == SnapshotNow)
#define IsSnapshotSelf(snapshot) ((Snapshot) snapshot == SnapshotSelf)
@ -55,7 +58,11 @@ extern CommandId HeapSpecialCommandId;
((IsSnapshotDirty(snapshot)) ? \
HeapTupleSatisfiesDirty((tuple)->t_data) \
: \
HeapTupleSatisfiesNow((tuple)->t_data) \
((IsSnapshotNow(snapshot)) ? \
HeapTupleSatisfiesNow((tuple)->t_data) \
: \
HeapTupleSatisfiesSnapshot((tuple)->t_data, snapshot) \
) \
) \
) \
)
@ -87,9 +94,13 @@ extern CommandId HeapSpecialCommandId;
extern bool HeapTupleSatisfiesItself(HeapTupleHeader tuple);
extern bool HeapTupleSatisfiesNow(HeapTupleHeader tuple);
extern bool HeapTupleSatisfiesDirty(HeapTupleHeader tuple);
extern bool HeapTupleSatisfiesSnapshot(HeapTupleHeader tuple, Snapshot snapshot);
extern int HeapTupleSatisfiesUpdate(HeapTuple tuple);
extern void setheapoverride(bool on);
extern Snapshot GetSnapshotData(bool serialized);
extern Snapshot GetSnapshotData(void);
extern void SetQuerySnapshot(void);
extern void FreeXactSnapshot(void);
#endif /* TQUAL_H */