I have written some patches to the postgres lock manager which allow the

use of long term cooperative locks managed by the user applications.

Submitted by: Massimo Dal Zotto <dz@cs.unitn.it>
This commit is contained in:
Marc G. Fournier 1996-10-11 03:22:59 +00:00
parent 97906ac697
commit 2663dfd94e
2 changed files with 311 additions and 3 deletions

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.2 1996/07/30 07:47:33 scrappy Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.3 1996/10/11 03:22:56 scrappy Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@ -366,6 +366,48 @@ LockTabRename(LockTableId tableId)
* Side Effects: The lock is always acquired. No way to abort
* a lock acquisition other than aborting the transaction.
* Lock is recorded in the lkchain.
#ifdef USER_LOCKS
* Note on User Locks:
* User locks are handled totally on the application side as
* long term cooperative locks which extend beyond the normal
* transaction boundaries. Their purpose is to indicate to an
* application that someone is `working' on an item. So it is
* possible to put an user lock on a tuple's oid, retrieve the
* tuple, work on it for an hour and then update it and remove
* the lock. While the lock is active other clients can still
* read and write the tuple but they can be aware that it has
* been locked at the application level by someone.
* User locks use lock tags made of an uint16 and an uint32, for
* example 0 and a tuple oid, or any other arbitrary pair of
* numbers following a convention established by the application.
* In this sense tags don't refer to tuples or database entities.
* User locks and normal locks are completely orthogonal and
* they don't interfere with each other, so it is possible
* to acquire a normal lock on an user-locked tuple or user-lock
* a tuple for which a normal write lock already exists.
* User locks are always non blocking, therefore they are never
* acquired if already held by another process. They must be
* released explicitly by the application but they are released
* automatically when a backend terminates.
* They are indicated by a dummy tableId 0 which doesn't have
* any table allocated but uses the normal lock table, and are
* distinguished from normal locks for the following differences:
*
* normal lock user lock
*
* tableId 1 0
* tag.relId rel oid 0
* tag.ItemPointerData.ip_blkid block id lock id2
* tag.ItemPointerData.ip_posid tuple offset lock id1
* xid.pid 0 backend pid
* xid.xid current xid 0
* persistence transaction user or backend
*
* The lockt parameter can have the same values for normal locks
* although probably only WRITE_LOCK can have some practical use.
*
* DZ - 4 Oct 1996
#endif
*/
bool
LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
@ -379,6 +421,22 @@ LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
int status;
TransactionId myXid;
#ifdef USER_LOCKS
int is_user_lock;
is_user_lock = (tableId == 0);
if (is_user_lock) {
tableId = 1;
#ifdef USER_LOCKS_DEBUG
elog(NOTICE,"LockAcquire: user lock tag [%u,%u] %d",
lockName->tupleId.ip_posid,
((lockName->tupleId.ip_blkid.bi_hi<<16)+
lockName->tupleId.ip_blkid.bi_lo),
lockt);
#endif
}
#endif
Assert (tableId < NumTables);
ltable = AllTables[tableId];
if (!ltable)
@ -445,6 +503,17 @@ LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
item.tag.pid = MyPid;
#endif
#ifdef USER_LOCKS
if (is_user_lock) {
item.tag.pid = getpid();
item.tag.xid = myXid = 0;
#ifdef USER_LOCKS_DEBUG
elog(NOTICE,"LockAcquire: user lock xid [%d,%d,%d]",
item.tag.lock, item.tag.pid, item.tag.xid);
#endif
}
#endif
result = (XIDLookupEnt *)hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found);
if (!result)
{
@ -494,6 +563,25 @@ LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
}
else if (status == STATUS_FOUND)
{
#ifdef USER_LOCKS
/*
* User locks are non blocking. If we can't acquire a lock
* remove the xid entry and return FALSE without waiting.
*/
if (is_user_lock) {
if (!result->nHolding) {
SHMQueueDelete(&result->queue);
hash_search(xidTable, (Pointer)&item, HASH_REMOVE, &found);
}
lock->nHolding--;
lock->holders[lockt]--;
SpinRelease(masterLock);
#ifdef USER_LOCKS_DEBUG
elog(NOTICE,"LockAcquire: user lock failed");
#endif
return(FALSE);
}
#endif
status = WaitOnLock(ltable, tableId, lock, lockt);
XID_PRINT("Someone granted me the lock", result);
}
@ -690,6 +778,22 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
HTAB *xidTable;
bool wakeupNeeded = true;
#ifdef USER_LOCKS
int is_user_lock;
is_user_lock = (tableId == 0);
if (is_user_lock) {
tableId = 1;
#ifdef USER_LOCKS_DEBUG
elog(NOTICE,"LockRelease: user lock tag [%u,%u] %d",
lockName->tupleId.ip_posid,
((lockName->tupleId.ip_blkid.bi_hi<<16)+
lockName->tupleId.ip_blkid.bi_lo),
lockt);
#endif
}
#endif
Assert (tableId < NumTables);
ltable = AllTables[tableId];
if (!ltable) {
@ -713,6 +817,18 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
lock = (LOCK *)
hash_search(ltable->lockHash,(Pointer)lockName,HASH_FIND_SAVE,&found);
#ifdef USER_LOCKS
/*
* If the entry is not found hash_search returns TRUE
* instead of NULL, so we must check it explicitly.
*/
if ((is_user_lock) && (lock == (LOCK *)TRUE)) {
SpinRelease(masterLock);
elog(NOTICE,"LockRelease: there are no locks with this tag");
return(FALSE);
}
#endif
/* let the caller print its own error message, too.
* Do not elog(WARN).
*/
@ -732,6 +848,14 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
Assert(lock->nHolding > 0);
#ifdef USER_LOCKS
/*
* If this is an user lock it can be removed only after
* checking that it was acquired by the current process,
* so this code is skipped and executed later.
*/
if (!is_user_lock) {
#endif
/*
* fix the general lock stats
*/
@ -758,6 +882,9 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
Assert(lock && found);
wakeupNeeded = false;
}
#ifdef USER_LOCKS
}
#endif
/* ------------------
* Zero out all of the tag bytes (this clears the padding bytes for long
@ -772,6 +899,17 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
item.tag.pid = MyPid;
#endif
#ifdef USER_LOCKS
if (is_user_lock) {
item.tag.pid = getpid();
item.tag.xid = 0;
#ifdef USER_LOCKS_DEBUG
elog(NOTICE,"LockRelease: user lock xid [%d,%d,%d]",
item.tag.lock, item.tag.pid, item.tag.xid);
#endif
}
#endif
if (! ( result = (XIDLookupEnt *) hash_search(xidTable,
(Pointer)&item,
HASH_FIND_SAVE,
@ -779,7 +917,15 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
|| !found)
{
SpinRelease(masterLock);
#ifdef USER_LOCKS
if ((is_user_lock) && (result)) {
elog(NOTICE,"LockRelease: you don't have a lock on this tag");
} else {
elog(NOTICE,"LockRelease: find xid, table corrupted");
}
#else
elog(NOTICE,"LockReplace: xid table corrupted");
#endif
return(FALSE);
}
/*
@ -797,6 +943,14 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
*/
if (! result->nHolding)
{
#ifdef USER_LOCKS
if (result->queue.prev == INVALID_OFFSET) {
elog(NOTICE,"LockRelease: xid.prev == INVALID_OFFSET");
}
if (result->queue.next == INVALID_OFFSET) {
elog(NOTICE,"LockRelease: xid.next == INVALID_OFFSET");
}
#endif
if (result->queue.next != INVALID_OFFSET)
SHMQueueDelete(&result->queue);
if (! (result = (XIDLookupEnt *)
@ -804,11 +958,50 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
! found)
{
SpinRelease(masterLock);
#ifdef USER_LOCKS
elog(NOTICE,"LockRelease: remove xid, table corrupted");
#else
elog(NOTICE,"LockReplace: xid table corrupted");
#endif
return(FALSE);
}
}
#ifdef USER_LOCKS
/*
* If this is an user lock remove it now, after the
* corresponding xid entry has been found and deleted.
*/
if (is_user_lock) {
/*
* fix the general lock stats
*/
lock->nHolding--;
lock->holders[lockt]--;
lock->nActive--;
lock->activeHolders[lockt]--;
Assert(lock->nActive >= 0);
if (! lock->nHolding)
{
/* ------------------
* if there's no one waiting in the queue,
* we just released the last lock.
* Delete it from the lock table.
* ------------------
*/
Assert( ltable->lockHash->hash == tag_hash);
lock = (LOCK *) hash_search(ltable->lockHash,
(Pointer) &(lock->tag),
HASH_REMOVE,
&found);
Assert(lock && found);
wakeupNeeded = false;
}
}
#endif
/* --------------------------
* If there are still active locks of the type I just released, no one
* should be woken up. Whoever is asleep will still conflict
@ -848,6 +1041,18 @@ GrantLock(LOCK *lock, LOCKT lockt)
lock->mask |= BITS_ON[lockt];
}
#ifdef USER_LOCKS
/*
* LockReleaseAll -- Release all locks in a process lock queue.
*
* Note: This code is a little complicated by the presence in the
* same queue of user locks which can't be removed from the
* normal lock queue at the end of a transaction. They must
* however be removed when the backend exits.
* A dummy tableId 0 is used to indicate that we are releasing
* the user locks, from the code added to ProcKill().
*/
#endif
bool
LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
{
@ -862,6 +1067,19 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
LOCK *lock;
bool found;
#ifdef USER_LOCKS
int is_user_lock_table, my_pid, count, nskip;
is_user_lock_table = (tableId == 0);
my_pid = getpid();
#ifdef USER_LOCKS_DEBUG
elog(NOTICE,"LockReleaseAll: tableId=%d, pid=%d", tableId, my_pid);
#endif
if (is_user_lock_table) {
tableId = 1;
}
#endif
Assert (tableId < NumTables);
ltable = AllTables[tableId];
if (!ltable)
@ -873,11 +1091,18 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
if (SHMQueueEmpty(lockQueue))
return TRUE;
#ifdef USER_LOCKS
SpinAcquire(masterLock);
#endif
SHMQueueFirst(lockQueue,(Pointer*)&xidLook,&xidLook->queue);
XID_PRINT("LockReleaseAll:", xidLook);
#ifndef USER_LOCKS
SpinAcquire(masterLock);
#else
count = nskip = 0;
#endif
for (;;)
{
/* ---------------------------
@ -891,6 +1116,65 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
LOCK_PRINT("ReleaseAll",(&lock->tag),0);
#ifdef USER_LOCKS
/*
* Sometimes the queue appears to be messed up.
*/
if (count++ > 2000) {
elog(NOTICE,"LockReleaseAll: xid loop detected, giving up");
nskip = 0;
break;
}
if (is_user_lock_table) {
if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0)) {
#ifdef USER_LOCKS_DEBUG
elog(NOTICE,"LockReleaseAll: skip normal lock [%d,%d,%d]",
xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
#endif
nskip++;
goto next_item;
}
if (xidLook->tag.pid != my_pid) {
/* This should never happen */
#ifdef USER_LOCKS_DEBUG
elog(NOTICE,
"LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
lock->tag.tupleId.ip_posid,
((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
lock->tag.tupleId.ip_blkid.bi_lo),
xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
#endif
nskip++;
goto next_item;
}
#ifdef USER_LOCKS_DEBUG
elog(NOTICE,
"LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
lock->tag.tupleId.ip_posid,
((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
lock->tag.tupleId.ip_blkid.bi_lo),
xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
#endif
} else {
if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0)) {
#ifdef USER_LOCKS_DEBUG
elog(NOTICE,
"LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
lock->tag.tupleId.ip_posid,
((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
lock->tag.tupleId.ip_blkid.bi_lo),
xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
#endif
nskip++;
goto next_item;
}
#ifdef USER_LOCKS_DEBUG
elog(NOTICE,"LockReleaseAll: release normal lock [%d,%d,%d]",
xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
#endif
}
#endif
/* ------------------
* fix the general lock stats
* ------------------
@ -921,11 +1205,18 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
* always remove the xidLookup entry, we're done with it now
* ----------------
*/
#ifdef USER_LOCKS
SHMQueueDelete(&xidLook->queue);
#endif
if ((! hash_search(ltable->xidHash, (Pointer)xidLook, HASH_REMOVE, &found))
|| !found)
{
SpinRelease(masterLock);
#ifdef USER_LOCKS
elog(NOTICE,"LockReleaseAll: xid table corrupted");
#else
elog(NOTICE,"LockReplace: xid table corrupted");
#endif
return(FALSE);
}
@ -943,7 +1234,11 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
if ((! lock) || (!found))
{
SpinRelease(masterLock);
#ifdef USER_LOCKS
elog(NOTICE,"LockReleaseAll: cannot remove lock from HTAB");
#else
elog(NOTICE,"LockReplace: cannot remove lock from HTAB");
#endif
return(FALSE);
}
}
@ -959,12 +1254,21 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
(void) ProcLockWakeup(waitQueue, (char *) ltable, (char *) lock);
}
#ifdef USER_LOCKS
next_item:
#endif
if (done)
break;
SHMQueueFirst(&xidLook->queue,(Pointer*)&tmp,&tmp->queue);
xidLook = tmp;
}
SpinRelease(masterLock);
#ifdef USER_LOCKS
/*
* Reinitialize the queue only if nothing has been left in.
*/
if (nskip == 0)
#endif
SHMQueueInit(lockQueue);
return TRUE;
}

View File

@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.5 1996/08/01 05:11:33 scrappy Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.6 1996/10/11 03:22:59 scrappy Exp $
*
*-------------------------------------------------------------------------
*/
@ -46,7 +46,7 @@
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
*
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.5 1996/08/01 05:11:33 scrappy Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.6 1996/10/11 03:22:59 scrappy Exp $
*/
#include <sys/time.h>
#ifndef WIN32
@ -361,6 +361,10 @@ ProcKill(int exitStatus, int pid)
ProcReleaseSpins(proc);
LockReleaseAll(1,&proc->lockQueue);
#ifdef USER_LOCKS
LockReleaseAll(0,&proc->lockQueue);
#endif
/* ----------------
* get off the wait queue
* ----------------