/*-------------------------------------------------------------------------
*
* lock.c
- * simple lock acquisition
+ * POSTGRES low-level lock mechanism
*
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.73 2000/11/28 23:27:56 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.74 2000/12/22 00:51:54 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
* locks. A lock table is a shared memory hash table. When
- * a process tries to acquire a lock of a type that conflictRs
+ * a process tries to acquire a lock of a type that conflicts
* with existing locks, it is put to sleep using the routines
* in storage/lmgr/proc.c.
*
+ * For the most part, this code should be invoked via lmgr.c
+ * or another lock-management module, not directly.
+ *
* Interface:
*
* LockAcquire(), LockRelease(), LockMethodTableInit(),
* LockMethodTableRename(), LockReleaseAll,
* LockResolveConflicts(), GrantLock()
*
- * NOTE: This module is used to define new lock tables. The
- * multi-level lock table (multi.c) used by the heap
- * access methods calls these routines. See multi.c for
- * examples showing how to use this interface.
- *
*-------------------------------------------------------------------------
*/
#include <sys/types.h>
#include "utils/memutils.h"
#include "utils/ps_status.h"
-static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
+static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
+ LOCK *lock, HOLDER *holder);
+static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
+ int *myHolders);
+static int LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc);
static char *lock_types[] =
{
inline static void
-XID_PRINT(const char * where, const XIDLookupEnt * xidentP)
+HOLDER_PRINT(const char * where, const HOLDER * holderP)
{
if (
- (((XIDENT_LOCKMETHOD(*xidentP) == DEFAULT_LOCKMETHOD && Trace_locks)
- || (XIDENT_LOCKMETHOD(*xidentP) == USER_LOCKMETHOD && Trace_userlocks))
- && (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId >= Trace_lock_oidmin))
- || (Trace_lock_table && (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId == Trace_lock_table))
+ (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
+ || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
+ && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= Trace_lock_oidmin))
+ || (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
)
elog(DEBUG,
- "%s: xid(%lx) lock(%lx) tbl(%d) pid(%d) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
- where, MAKE_OFFSET(xidentP), xidentP->tag.lock, XIDENT_LOCKMETHOD(*(xidentP)),
- xidentP->tag.pid, xidentP->tag.xid,
- xidentP->holders[1], xidentP->holders[2], xidentP->holders[3], xidentP->holders[4],
- xidentP->holders[5], xidentP->holders[6], xidentP->holders[7], xidentP->nHolding);
+ "%s: holder(%lx) lock(%lx) tbl(%d) pid(%d) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
+ where, MAKE_OFFSET(holderP), holderP->tag.lock,
+ HOLDER_LOCKMETHOD(*(holderP)),
+ holderP->tag.pid, holderP->tag.xid,
+ holderP->holders[1], holderP->holders[2], holderP->holders[3], holderP->holders[4],
+ holderP->holders[5], holderP->holders[6], holderP->holders[7], holderP->nHolding);
}
#else /* not LOCK_DEBUG */
#define LOCK_PRINT(where, lock, type)
-#define XID_PRINT(where, xidentP)
+#define HOLDER_PRINT(where, holderP)
#endif /* not LOCK_DEBUG */
static LOCKMASK BITS_ON[MAX_LOCKMODES];
/* -----------------
- * XXX Want to move this to this file
+ * Disable flag
* -----------------
*/
static bool LockingIsDisabled;
* -------------------
*/
void
-InitLocks()
+InitLocks(void)
{
int i;
int bit;
bit = 1;
- /* -------------------
- * remember 0th lockmode is invalid
- * -------------------
- */
for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
{
BITS_ON[i] = bit;
* ------------------
*/
void
-LockDisable(int status)
+LockDisable(bool status)
{
LockingIsDisabled = status;
}
+/* -----------------
+ * Boolean function to determine current locking status
+ * -----------------
+ */
+bool
+LockingDisabled(void)
+{
+ return LockingIsDisabled;
+}
+
/*
* LockMethodInit -- initialize the lock table's lock type
LockMethodTableInit(char *tabName,
LOCKMASK *conflictsP,
int *prioP,
- int numModes)
+ int numModes,
+ int maxBackends)
{
LOCKMETHODTABLE *lockMethodTable;
char *shmemName;
HASHCTL info;
int hash_flags;
bool found;
- int status = TRUE;
+ long init_table_size,
+ max_table_size;
if (numModes > MAX_LOCKMODES)
{
return INVALID_LOCKMETHOD;
}
+ /* Compute init/max size to request for lock hashtables */
+ max_table_size = NLOCKENTS(maxBackends);
+ init_table_size = max_table_size / 10;
+
/* Allocate a string for the shmem index table lookups. */
/* This is just temp space in this routine, so palloc is OK. */
shmemName = (char *) palloc(strlen(tabName) + 32);
ShmemInitStruct(shmemName, sizeof(LOCKMETHODCTL), &found);
if (!lockMethodTable->ctl)
- {
elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
- status = FALSE;
- }
/* -------------------
* no zero-th table
}
/* --------------------
- * other modules refer to the lock table by a lockmethod
+ * other modules refer to the lock table by a lockmethod ID
* --------------------
*/
LockMethodTable[NumLockMethods] = lockMethodTable;
Assert(NumLockMethods <= MAX_LOCK_METHODS);
/* ----------------------
- * allocate a hash table for the lock tags. This is used
- * to find the different locks.
+ * allocate a hash table for LOCK structs. This is used
+ * to store per-locked-object information.
* ----------------------
*/
info.keysize = SHMEM_LOCKTAB_KEYSIZE;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
sprintf(shmemName, "%s (lock hash)", tabName);
- lockMethodTable->lockHash = (HTAB *) ShmemInitHash(shmemName,
- INIT_TABLE_SIZE, MAX_TABLE_SIZE,
- &info, hash_flags);
+ lockMethodTable->lockHash = ShmemInitHash(shmemName,
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
- Assert(lockMethodTable->lockHash->hash == tag_hash);
if (!lockMethodTable->lockHash)
- {
elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
- status = FALSE;
- }
+ Assert(lockMethodTable->lockHash->hash == tag_hash);
/* -------------------------
- * allocate an xid table. When different transactions hold
- * the same lock, additional information must be saved (locks per tx).
+ * allocate a hash table for HOLDER structs. This is used
+ * to store per-lock-holder information.
* -------------------------
*/
- info.keysize = SHMEM_XIDTAB_KEYSIZE;
- info.datasize = SHMEM_XIDTAB_DATASIZE;
+ info.keysize = SHMEM_HOLDERTAB_KEYSIZE;
+ info.datasize = SHMEM_HOLDERTAB_DATASIZE;
info.hash = tag_hash;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
- sprintf(shmemName, "%s (xid hash)", tabName);
- lockMethodTable->xidHash = (HTAB *) ShmemInitHash(shmemName,
- INIT_TABLE_SIZE, MAX_TABLE_SIZE,
- &info, hash_flags);
+ sprintf(shmemName, "%s (holder hash)", tabName);
+ lockMethodTable->holderHash = ShmemInitHash(shmemName,
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
- if (!lockMethodTable->xidHash)
- {
+ if (!lockMethodTable->holderHash)
elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
- status = FALSE;
- }
/* init ctl data structures */
LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
pfree(shmemName);
- if (status)
- return lockMethodTable->ctl->lockmethod;
- else
- return INVALID_LOCKMETHOD;
+ return lockMethodTable->ctl->lockmethod;
}
/*
- * LockMethodTableRename -- allocate another lockmethod to the same
+ * LockMethodTableRename -- allocate another lockmethod ID to the same
* lock table.
*
* NOTES: Both the lock module and the lock chain (lchain.c)
* the same to the lock table, but are handled differently
* by the lock chain manager. This function allows the
* client to use different lockmethods when acquiring/releasing
- * short term and long term locks.
+ * short term and long term locks, yet store them all in one hashtable.
*/
LOCKMETHOD
if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
return INVALID_LOCKMETHOD;
- /* other modules refer to the lock table by a lockmethod */
+ /* other modules refer to the lock table by a lockmethod ID */
newLockMethod = NumLockMethods;
NumLockMethods++;
* a lock acquisition other than aborting the transaction.
* Lock is recorded in the lkchain.
*
-#ifdef USER_LOCKS
*
* Note on User Locks:
*
* automatically when a backend terminates.
* They are indicated by a lockmethod 2 which is an alias for the
* normal lock table, and are distinguished from normal locks
- * for the following differences:
+ * by the following differences:
*
* normal lock user lock
*
* lockmethod 1 2
- * tag.relId rel oid 0
- * tag.ItemPointerData.ip_blkid block id lock id2
- * tag.ItemPointerData.ip_posid tuple offset lock id1
- * xid.pid 0 backend pid
+ * tag.dbId database oid database oid
+ * tag.relId rel oid or 0 0
+ * tag.objId block id lock id2
+ * or xact id
+ * tag.offnum 0 lock id1
+ * xid.pid backend pid backend pid
* xid.xid xid or 0 0
* persistence transaction user or backend
+ * or backend
*
* The lockmode parameter can have the same values for normal locks
* although probably only WRITE_LOCK can have some practical use.
*
* DZ - 22 Nov 1997
-#endif
*/
bool
-LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
+LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+ TransactionId xid, LOCKMODE lockmode)
{
- XIDLookupEnt *xident,
- item;
- HTAB *xidTable;
+ HOLDER *holder;
+ HOLDERTAG holdertag;
+ HTAB *holderTable;
bool found;
- LOCK *lock = NULL;
+ LOCK *lock;
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
int status;
- TransactionId xid;
+ int myHolders[MAX_LOCKMODES];
+ int i;
#ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
- elog(DEBUG, "LockAcquire: user lock [%u] %s", locktag->objId.blkno, lock_types[lockmode]);
+ elog(DEBUG, "LockAcquire: user lock [%u] %s",
+ locktag->objId.blkno, lock_types[lockmode]);
#endif
/* ???????? This must be changed when short term locks will be used */
}
/* --------------------
- * if there was nothing else there, complete initialization
+ * if it's a new lock object, initialize it
* --------------------
*/
if (!found)
MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
ProcQueueInit(&(lock->waitProcs));
- Assert(lock->tag.objId.blkno == locktag->objId.blkno);
LOCK_PRINT("LockAcquire: new", lock, lockmode);
}
else
}
/* ------------------
- * add an element to the lock queue so that we can clear the
- * locks at end of transaction.
+ * Create the hash key for the holder table.
* ------------------
*/
- xidTable = lockMethodTable->xidHash;
-
- /* ------------------
- * Zero out all of the tag bytes (this clears the padding bytes for long
- * word alignment and ensures hashing consistency).
- * ------------------
- */
- MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */
- item.tag.lock = MAKE_OFFSET(lock);
-#ifdef USE_XIDTAG_LOCKMETHOD
- item.tag.lockmethod = lockmethod;
-#endif
-
-#ifdef USER_LOCKS
- if (lockmethod == USER_LOCKMETHOD)
- {
- item.tag.pid = MyProcPid;
- item.tag.xid = xid = 0;
- }
- else
- {
- xid = GetCurrentTransactionId();
- TransactionIdStore(xid, &item.tag.xid);
- }
-#else /* not USER_LOCKS */
- xid = GetCurrentTransactionId();
- TransactionIdStore(xid, &item.tag.xid);
-#endif /* not USER_LOCKS */
+ MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
+ holdertag.lock = MAKE_OFFSET(lock);
+ holdertag.pid = MyProcPid;
+ TransactionIdStore(xid, &holdertag.xid);
/*
- * Find or create an xid entry with this tag
+ * Find or create a holder entry with this tag
*/
- xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
- HASH_ENTER, &found);
- if (!xident)
+ holderTable = lockMethodTable->holderHash;
+ holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
+ HASH_ENTER, &found);
+ if (!holder)
{
SpinRelease(masterLock);
- elog(NOTICE, "LockAcquire: xid table corrupted");
+ elog(NOTICE, "LockAcquire: holder table corrupted");
return FALSE;
}
/*
- * If not found initialize the new entry
+ * If new, initialize the new entry
*/
if (!found)
{
- xident->nHolding = 0;
- MemSet((char *) xident->holders, 0, sizeof(int) * MAX_LOCKMODES);
- ProcAddLock(&xident->queue);
- XID_PRINT("LockAcquire: new", xident);
+ holder->nHolding = 0;
+ MemSet((char *) holder->holders, 0, sizeof(int) * MAX_LOCKMODES);
+ ProcAddLock(&holder->queue);
+ HOLDER_PRINT("LockAcquire: new", holder);
}
else
{
- int i;
+ HOLDER_PRINT("LockAcquire: found", holder);
+ Assert((holder->nHolding > 0) && (holder->holders[lockmode] >= 0));
+ Assert(holder->nHolding <= lock->nActive);
- XID_PRINT("LockAcquire: found", xident);
- Assert((xident->nHolding > 0) && (xident->holders[lockmode] >= 0));
- Assert(xident->nHolding <= lock->nActive);
+#ifdef CHECK_DEADLOCK_RISK
/*
* Issue warning if we already hold a lower-level lock on this
* object and do not hold a lock of the requested level or higher.
* a deadlock if another backend were following the same code path
* at about the same time).
*
+ * This is not enabled by default, because it may generate log entries
+ * about user-level coding practices that are in fact safe in context.
+ * It can be enabled to help find system-level problems.
+ *
* XXX Doing numeric comparison on the lockmodes is a hack;
* it'd be better to use a table. For now, though, this works.
*/
for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
{
- if (xident->holders[i] > 0)
+ if (holder->holders[i] > 0)
{
if (i >= (int) lockmode)
break; /* safe: we have a lock >= req level */
break;
}
}
+#endif /* CHECK_DEADLOCK_RISK */
}
/* ----------------
- * lock->nholding tells us how many processes have _tried_ to
- * acquire this lock, Regardless of whether they succeeded or
- * failed in doing so.
+ * lock->nHolding and lock->holders count the total number of holders
+ * either holding or waiting for the lock, so increment those immediately.
+ * The other counts don't increment till we get the lock.
* ----------------
*/
lock->nHolding++;
Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
/* --------------------
- * If I'm the only one holding a lock, then there
- * cannot be a conflict. The same is true if we already
- * hold this lock.
+ * If I'm the only one holding any lock on this object, then there
+ * cannot be a conflict. The same is true if I already hold this lock.
+ * --------------------
+ */
+ if (holder->nHolding == lock->nActive || holder->holders[lockmode] != 0)
+ {
+ GrantLock(lock, holder, lockmode);
+ HOLDER_PRINT("LockAcquire: owning", holder);
+ SpinRelease(masterLock);
+ return TRUE;
+ }
+
+ /* --------------------
+ * If this process (under any XID) is a holder of the lock,
+ * then there is no conflict, either.
* --------------------
*/
- if (xident->nHolding == lock->nActive || xident->holders[lockmode] != 0)
+ LockCountMyLocks(holder->tag.lock, MyProc, myHolders);
+ if (myHolders[lockmode] != 0)
{
- xident->holders[lockmode]++;
- xident->nHolding++;
- XID_PRINT("LockAcquire: owning", xident);
- Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
- GrantLock(lock, lockmode);
+ GrantLock(lock, holder, lockmode);
+ HOLDER_PRINT("LockAcquire: my other XID owning", holder);
SpinRelease(masterLock);
return TRUE;
}
*/
if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
{
- int i = 1;
-
/*
- * If I don't hold locks or my locks don't conflict with waiters
- * then force to sleep.
+ * If my process doesn't hold any locks that conflict with waiters
+ * then force to sleep, so that prior waiters get first chance.
*/
- if (xident->nHolding > 0)
+ for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++)
{
- for (; i <= lockMethodTable->ctl->numLockModes; i++)
- {
- if (xident->holders[i] > 0 &&
- lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
- break; /* conflict */
- }
+ if (myHolders[i] > 0 &&
+ lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
+ break; /* yes, there is a conflict */
}
- if (xident->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
+ if (i > lockMethodTable->ctl->numLockModes)
{
- XID_PRINT("LockAcquire: higher priority proc waiting",
- xident);
+ HOLDER_PRINT("LockAcquire: another proc already waiting",
+ holder);
status = STATUS_FOUND;
}
else
- status = LockResolveConflicts(lockmethod, lock, lockmode, xid, xident);
+ status = LockResolveConflicts(lockmethod, lockmode,
+ lock, holder,
+ MyProc, myHolders);
}
else
- status = LockResolveConflicts(lockmethod, lock, lockmode, xid, xident);
+ status = LockResolveConflicts(lockmethod, lockmode,
+ lock, holder,
+ MyProc, myHolders);
if (status == STATUS_OK)
- GrantLock(lock, lockmode);
+ GrantLock(lock, holder, lockmode);
else if (status == STATUS_FOUND)
{
#ifdef USER_LOCKS
/*
* User locks are non blocking. If we can't acquire a lock we must
- * remove the xid entry and return FALSE without waiting.
+ * remove the holder entry and return FALSE without waiting.
*/
if (lockmethod == USER_LOCKMETHOD)
{
- if (!xident->nHolding)
+ if (holder->nHolding == 0)
{
- SHMQueueDelete(&xident->queue);
- xident = (XIDLookupEnt *) hash_search(xidTable,
- (Pointer) xident,
- HASH_REMOVE, &found);
- if (!xident || !found)
- elog(NOTICE, "LockAcquire: remove xid, table corrupted");
+ SHMQueueDelete(&holder->queue);
+ holder = (HOLDER *) hash_search(holderTable,
+ (Pointer) holder,
+ HASH_REMOVE, &found);
+ if (!holder || !found)
+ elog(NOTICE, "LockAcquire: remove holder, table corrupted");
}
else
- XID_PRINT("LockAcquire: NHOLDING", xident);
+ HOLDER_PRINT("LockAcquire: NHOLDING", holder);
lock->nHolding--;
lock->holders[lockmode]--;
LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
#endif /* USER_LOCKS */
/*
- * Construct bitmask of locks we hold before going to sleep.
+ * Construct bitmask of locks this process holds on this object.
*/
- MyProc->holdLock = 0;
- if (xident->nHolding > 0)
{
- int i,
- tmpMask = 2;
+ int holdLock = 0;
+ int tmpMask;
- for (i = 1; i <= lockMethodTable->ctl->numLockModes;
+ for (i = 1, tmpMask = 2;
+ i <= lockMethodTable->ctl->numLockModes;
i++, tmpMask <<= 1)
{
- if (xident->holders[i] > 0)
- MyProc->holdLock |= tmpMask;
+ if (myHolders[i] > 0)
+ holdLock |= tmpMask;
}
- Assert(MyProc->holdLock != 0);
+ MyProc->holdLock = holdLock;
}
- status = WaitOnLock(lockmethod, lock, lockmode);
+ /*
+ * Sleep till someone wakes me up.
+ */
+ status = WaitOnLock(lockmethod, lockmode, lock, holder);
/*
- * Check the xid entry status, in case something in the ipc
+ * Check the holder entry status, in case something in the ipc
* communication doesn't work correctly.
*/
- if (!((xident->nHolding > 0) && (xident->holders[lockmode] > 0)))
+ if (!((holder->nHolding > 0) && (holder->holders[lockmode] > 0)))
{
- XID_PRINT("LockAcquire: INCONSISTENT", xident);
+ HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
/* Should we retry ? */
SpinRelease(masterLock);
return FALSE;
}
- XID_PRINT("LockAcquire: granted", xident);
+ HOLDER_PRINT("LockAcquire: granted", holder);
LOCK_PRINT("LockAcquire: granted", lock, lockmode);
}
* determining whether or not any new lock acquired conflicts with
* the old ones.
*
+ * The caller can optionally pass the process's total holders counts, if
+ * known. If NULL is passed then these values will be computed internally.
* ----------------------------
*/
int
LockResolveConflicts(LOCKMETHOD lockmethod,
- LOCK *lock,
LOCKMODE lockmode,
- TransactionId xid,
- XIDLookupEnt *xidentP) /* xident ptr or NULL */
+ LOCK *lock,
+ HOLDER *holder,
+ PROC *proc,
+ int *myHolders) /* myHolders[] array or NULL */
{
- XIDLookupEnt *xident,
- item;
- int *myHolders;
- int numLockModes;
- HTAB *xidTable;
- bool found;
+ LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl;
+ int numLockModes = lockctl->numLockModes;
int bitmask;
int i,
tmpMask;
+ int localHolders[MAX_LOCKMODES];
- numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
- xidTable = LockMethodTable[lockmethod]->xidHash;
-
- if (xidentP)
- {
-
- /*
- * A pointer to the xid entry was supplied from the caller.
- * Actually only LockAcquire can do it.
- */
- xident = xidentP;
- }
- else
- {
- /* ---------------------
- * read my own statistics from the xid table. If there
- * isn't an entry, then we'll just add one.
- *
- * Zero out the tag, this clears the padding bytes for long
- * word alignment and ensures hashing consistency.
- * ------------------
- */
- MemSet(&item, 0, XID_TAGSIZE);
- item.tag.lock = MAKE_OFFSET(lock);
-#ifdef USE_XIDTAG_LOCKMETHOD
- item.tag.lockmethod = lockmethod;
-#endif
-#ifdef USER_LOCKS
- if (lockmethod == USER_LOCKMETHOD)
- {
- item.tag.pid = MyProcPid;
- item.tag.xid = 0;
- }
- else
-#endif
- TransactionIdStore(xid, &item.tag.xid);
-
- /*
- * Find or create an xid entry with this tag
- */
- xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
- HASH_ENTER, &found);
- if (!xident)
- {
- elog(NOTICE, "LockResolveConflicts: xid table corrupted");
- return STATUS_ERROR;
- }
-
- /*
- * If not found initialize the new entry. THIS SHOULD NEVER
- * HAPPEN, if we are trying to resolve a conflict we must already
- * have allocated an xid entry for this lock. dz 21-11-1997
- */
- if (!found)
- {
- /* ---------------
- * we're not holding any type of lock yet. Clear
- * the lock stats.
- * ---------------
- */
- MemSet(xident->holders, 0, numLockModes * sizeof(*(lock->holders)));
- xident->nHolding = 0;
- XID_PRINT("LockResolveConflicts: NOT FOUND", xident);
- }
- else
- XID_PRINT("LockResolveConflicts: found", xident);
- }
- Assert((xident->nHolding >= 0) && (xident->holders[lockmode] >= 0));
+ Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0));
/* ----------------------------
* first check for global conflicts: If no locks conflict
* compare tells if there is a conflict.
* ----------------------------
*/
- if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
+ if (!(lockctl->conflictTab[lockmode] & lock->mask))
{
- xident->holders[lockmode]++;
- xident->nHolding++;
- XID_PRINT("LockResolveConflicts: no conflict", xident);
- Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
+ HOLDER_PRINT("LockResolveConflicts: no conflict", holder);
return STATUS_OK;
}
/* ------------------------
* Rats. Something conflicts. But it could still be my own
* lock. We have to construct a conflict mask
- * that does not reflect our own locks.
+ * that does not reflect our own locks. Locks held by the current
+ * process under another XID also count as "our own locks".
* ------------------------
*/
- myHolders = xident->holders;
+ if (myHolders == NULL)
+ {
+ /* Caller didn't do calculation of total holding for me */
+ LockCountMyLocks(holder->tag.lock, proc, localHolders);
+ myHolders = localHolders;
+ }
+
+ /* Compute mask of lock types held by other processes */
bitmask = 0;
tmpMask = 2;
for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
* conflict and I have to sleep.
* ------------------------
*/
- if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
+ if (!(lockctl->conflictTab[lockmode] & bitmask))
{
- /* no conflict. Get the lock and go on */
- xident->holders[lockmode]++;
- xident->nHolding++;
- XID_PRINT("LockResolveConflicts: resolved", xident);
- Assert((xident->nHolding > 0) && (xident->holders[lockmode] > 0));
+ /* no conflict. OK to get the lock */
+ HOLDER_PRINT("LockResolveConflicts: resolved", holder);
return STATUS_OK;
}
- XID_PRINT("LockResolveConflicts: conflicting", xident);
+ HOLDER_PRINT("LockResolveConflicts: conflicting", holder);
return STATUS_FOUND;
}
/*
- * GrantLock -- update the lock data structure to show
- * the new lock holder.
+ * LockCountMyLocks --- Count total number of locks held on a given lockable
+ * object by a given process (under any transaction ID).
+ *
+ * XXX This could be rather slow if the process holds a large number of locks.
+ * Perhaps it could be sped up if we kept yet a third hashtable of per-
+ * process lock information. However, for the normal case where a transaction
+ * doesn't hold a large number of locks, keeping such a table would probably
+ * be a net slowdown.
+ */
+static void
+LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders)
+{
+ HOLDER *holder = NULL;
+ HOLDER *nextHolder = NULL;
+ SHM_QUEUE *lockQueue = &(proc->lockQueue);
+ SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
+ int i;
+
+ MemSet(myHolders, 0, MAX_LOCKMODES * sizeof(int));
+
+ if (SHMQueueEmpty(lockQueue))
+ return;
+
+ SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
+
+ do
+ {
+ /* ---------------------------
+ * XXX Here we assume the shared memory queue is circular and
+ * that we know its internal structure. Should have some sort of
+ * macros to allow one to walk it. mer 20 July 1991
+ * ---------------------------
+ */
+ if (holder->queue.next == end)
+ nextHolder = NULL;
+ else
+ SHMQueueFirst(&holder->queue,
+ (Pointer *) &nextHolder, &nextHolder->queue);
+
+ if (lockOffset == holder->tag.lock)
+ {
+ for (i = 1; i < MAX_LOCKMODES; i++)
+ {
+ myHolders[i] += holder->holders[i];
+ }
+ }
+
+ holder = nextHolder;
+ } while (holder);
+}
+
+/*
+ * LockGetMyHoldLocks -- compute bitmask of lock types held by a process
+ * for a given lockable object.
+ */
+static int
+LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc)
+{
+ int myHolders[MAX_LOCKMODES];
+ int holdLock = 0;
+ int i,
+ tmpMask;
+
+ LockCountMyLocks(lockOffset, proc, myHolders);
+
+ for (i = 1, tmpMask = 2;
+ i < MAX_LOCKMODES;
+ i++, tmpMask <<= 1)
+ {
+ if (myHolders[i] > 0)
+ holdLock |= tmpMask;
+ }
+ return holdLock;
+}
+
+/*
+ * GrantLock -- update the lock and holder data structures to show
+ * the new lock has been granted.
*/
void
-GrantLock(LOCK *lock, LOCKMODE lockmode)
+GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
{
lock->nActive++;
lock->activeHolders[lockmode]++;
LOCK_PRINT("GrantLock", lock, lockmode);
Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
Assert(lock->nActive <= lock->nHolding);
+ holder->holders[lockmode]++;
+ holder->nHolding++;
+ Assert((holder->nHolding > 0) && (holder->holders[lockmode] > 0));
}
+/*
+ * WaitOnLock -- wait to acquire a lock
+ *
+ * The locktable spinlock must be held at entry.
+ */
static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
+WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
+ LOCK *lock, HOLDER *holder)
{
- PROC_QUEUE *waitQueue = &(lock->waitProcs);
LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
- char *new_status, *old_status;
+ char *new_status,
+ *old_status;
Assert(lockmethod < NumLockMethods);
LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
old_status = pstrdup(get_ps_display());
- new_status = (char *) palloc(strlen(get_ps_display()) + 10);
- strcpy(new_status, get_ps_display());
+ new_status = (char *) palloc(strlen(old_status) + 10);
+ strcpy(new_status, old_status);
strcat(new_status, " waiting");
set_ps_display(new_status);
- if (ProcSleep(waitQueue,
- lockMethodTable->ctl,
+ if (ProcSleep(lockMethodTable->ctl,
lockmode,
- lock) != NO_ERROR)
+ lock,
+ holder) != NO_ERROR)
{
/* -------------------
* We failed as a result of a deadlock, see HandleDeadLock().
* Decrement the lock nHolding and holders fields as
- * we are no longer waiting on this lock.
+ * we are no longer waiting on this lock. Removal of the holder and
+ * lock objects, if no longer needed, will happen in xact cleanup.
* -------------------
*/
lock->nHolding--;
* priority waiting process, that process is granted the lock
* and awoken. (We have to grant the lock here to avoid a
* race between the waking process and any new process to
- * come along and request the lock).
+ * come along and request the lock.)
*/
bool
-LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
+LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+ TransactionId xid, LOCKMODE lockmode)
{
- LOCK *lock = NULL;
+ LOCK *lock;
SPINLOCK masterLock;
bool found;
LOCKMETHODTABLE *lockMethodTable;
- XIDLookupEnt *xident,
- item;
- HTAB *xidTable;
- TransactionId xid;
+ HOLDER *holder;
+ HOLDERTAG holdertag;
+ HTAB *holderTable;
bool wakeupNeeded = true;
#ifdef LOCK_DEBUG
Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding);
-
- /* ------------------
- * Zero out all of the tag bytes (this clears the padding bytes for long
- * word alignment and ensures hashing consistency).
- * ------------------
- */
- MemSet(&item, 0, XID_TAGSIZE);
- item.tag.lock = MAKE_OFFSET(lock);
-#ifdef USE_XIDTAG_LOCKMETHOD
- item.tag.lockmethod = lockmethod;
-#endif
-#ifdef USER_LOCKS
- if (lockmethod == USER_LOCKMETHOD)
- {
- item.tag.pid = MyProcPid;
- item.tag.xid = xid = 0;
- }
- else
- {
- xid = GetCurrentTransactionId();
- TransactionIdStore(xid, &item.tag.xid);
- }
-#else
- xid = GetCurrentTransactionId();
- TransactionIdStore(xid, &item.tag.xid);
-#endif
-
/*
- * Find an xid entry with this tag
+ * Find the holder entry for this holder.
*/
- xidTable = lockMethodTable->xidHash;
- xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
- HASH_FIND_SAVE, &found);
- if (!xident || !found)
+ MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
+ holdertag.lock = MAKE_OFFSET(lock);
+ holdertag.pid = MyProcPid;
+ TransactionIdStore(xid, &holdertag.xid);
+
+ holderTable = lockMethodTable->holderHash;
+ holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
+ HASH_FIND_SAVE, &found);
+ if (!holder || !found)
{
SpinRelease(masterLock);
#ifdef USER_LOCKS
elog(NOTICE, "LockRelease: no lock with this tag");
else
#endif
- elog(NOTICE, "LockRelease: xid table corrupted");
+ elog(NOTICE, "LockRelease: holder table corrupted");
return FALSE;
}
- XID_PRINT("LockRelease: found", xident);
- Assert(xident->tag.lock == MAKE_OFFSET(lock));
+ HOLDER_PRINT("LockRelease: found", holder);
+ Assert(holder->tag.lock == MAKE_OFFSET(lock));
/*
* Check that we are actually holding a lock of the type we want to
* release.
*/
- if (!(xident->holders[lockmode] > 0))
+ if (!(holder->holders[lockmode] > 0))
{
SpinRelease(masterLock);
- XID_PRINT("LockAcquire: WRONGTYPE", xident);
+ HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
elog(NOTICE, "LockRelease: you don't own a lock of type %s",
lock_types[lockmode]);
- Assert(xident->holders[lockmode] >= 0);
+ Assert(holder->holders[lockmode] >= 0);
return FALSE;
}
- Assert(xident->nHolding > 0);
+ Assert(holder->nHolding > 0);
/*
* fix the general lock stats
lock->nActive--;
lock->activeHolders[lockmode]--;
+ if (!(lock->activeHolders[lockmode]))
+ {
+ /* change the conflict mask. No more of this lock type. */
+ lock->mask &= BITS_OFF[lockmode];
+ }
+
#ifdef NOT_USED
/* --------------------------
* If there are still active locks of the type I just released, no one
if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
wakeupNeeded = true;
- if (!(lock->activeHolders[lockmode]))
- {
- /* change the conflict mask. No more of this lock type. */
- lock->mask &= BITS_OFF[lockmode];
- }
-
LOCK_PRINT("LockRelease: updated", lock, lockmode);
Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
{
/* ------------------
* if there's no one waiting in the queue,
- * we just released the last lock.
+ * we just released the last lock on this object.
* Delete it from the lock table.
* ------------------
*/
}
/*
- * now check to see if I have any private locks. If I do, decrement
- * the counts associated with them.
+ * Now fix the per-holder lock stats.
*/
- xident->holders[lockmode]--;
- xident->nHolding--;
- XID_PRINT("LockRelease: updated", xident);
- Assert((xident->nHolding >= 0) && (xident->holders[lockmode] >= 0));
+ holder->holders[lockmode]--;
+ holder->nHolding--;
+ HOLDER_PRINT("LockRelease: updated", holder);
+ Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0));
/*
- * If this was my last hold on this lock, delete my entry in the XID
+ * If this was my last hold on this lock, delete my entry in the holder
* table.
*/
- if (!xident->nHolding)
+ if (!holder->nHolding)
{
- if (xident->queue.prev == INVALID_OFFSET)
- elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
- if (xident->queue.next == INVALID_OFFSET)
- elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
- if (xident->queue.next != INVALID_OFFSET)
- SHMQueueDelete(&xident->queue);
- XID_PRINT("LockRelease: deleting", xident);
- xident = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &xident,
- HASH_REMOVE_SAVED, &found);
- if (!xident || !found)
+ if (holder->queue.prev == INVALID_OFFSET)
+ elog(NOTICE, "LockRelease: holder.prev == INVALID_OFFSET");
+ if (holder->queue.next == INVALID_OFFSET)
+ elog(NOTICE, "LockRelease: holder.next == INVALID_OFFSET");
+ if (holder->queue.next != INVALID_OFFSET)
+ SHMQueueDelete(&holder->queue);
+ HOLDER_PRINT("LockRelease: deleting", holder);
+ holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
+ HASH_REMOVE_SAVED, &found);
+ if (!holder || !found)
{
SpinRelease(masterLock);
- elog(NOTICE, "LockRelease: remove xid, table corrupted");
+ elog(NOTICE, "LockRelease: remove holder, table corrupted");
return FALSE;
}
}
if (wakeupNeeded)
- ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
+ ProcLockWakeup(lockmethod, lock);
#ifdef LOCK_DEBUG
else if (LOCK_DEBUG_ENABLED(lock))
elog(DEBUG, "LockRelease: no wakeup needed");
}
/*
- * LockReleaseAll -- Release all locks in a process lock queue.
+ * LockReleaseAll -- Release all locks in a process's lock queue.
+ *
+ * Well, not really *all* locks.
+ *
+ * If 'allxids' is TRUE, all locks of the specified lock method are
+ * released, regardless of transaction affiliation.
+ *
+ * If 'allxids' is FALSE, all locks of the specified lock method and
+ * specified XID are released.
*/
bool
-LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
+LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
+ bool allxids, TransactionId xid)
{
- PROC_QUEUE *waitQueue;
- int done;
- XIDLookupEnt *xidLook = NULL;
- XIDLookupEnt *tmp = NULL;
- XIDLookupEnt *xident;
+ HOLDER *holder = NULL;
+ HOLDER *nextHolder = NULL;
+ SHM_QUEUE *lockQueue = &(proc->lockQueue);
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
numLockModes;
LOCK *lock;
bool found;
- int xidtag_lockmethod,
- nleft;
+ int nleft;
#ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
- elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d", lockmethod, MyProcPid);
+ elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
+ lockmethod, MyProcPid);
#endif
- nleft = 0;
-
Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod];
if (!lockMethodTable)
{
- elog(NOTICE, "LockAcquire: bad lockmethod %d", lockmethod);
+ elog(NOTICE, "LockReleaseAll: bad lockmethod %d", lockmethod);
return FALSE;
}
masterLock = lockMethodTable->ctl->masterLock;
SpinAcquire(masterLock);
- SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
- for (;;)
+ SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
+
+ nleft = 0;
+
+ do
{
bool wakeupNeeded = false;
* macros to allow one to walk it. mer 20 July 1991
* ---------------------------
*/
- done = (xidLook->queue.next == end);
- lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
+ if (holder->queue.next == end)
+ nextHolder = NULL;
+ else
+ SHMQueueFirst(&holder->queue,
+ (Pointer *) &nextHolder, &nextHolder->queue);
+
+ Assert(holder->tag.pid == proc->pid);
- xidtag_lockmethod = XIDENT_LOCKMETHOD(*xidLook);
- if (xidtag_lockmethod == lockmethod)
+ lock = (LOCK *) MAKE_PTR(holder->tag.lock);
+
+ /* Ignore items that are not of the lockmethod to be removed */
+ if (LOCK_LOCKMETHOD(*lock) != lockmethod)
{
- XID_PRINT("LockReleaseAll", xidLook);
- LOCK_PRINT("LockReleaseAll", lock, 0);
+ nleft++;
+ goto next_item;
}
-#ifdef USE_XIDTAG_LOCKMETHOD
- if (xidtag_lockmethod != LOCK_LOCKMETHOD(*lock))
- elog(NOTICE, "LockReleaseAll: xid/lock method mismatch: %d != %d",
- xidtag_lockmethod, lock->tag.lockmethod);
-#endif
- if (xidtag_lockmethod != lockmethod)
+ /* If not allxids, ignore items that are of the wrong xid */
+ if (!allxids && xid != holder->tag.xid)
{
nleft++;
goto next_item;
}
+ HOLDER_PRINT("LockReleaseAll", holder);
+ LOCK_PRINT("LockReleaseAll", lock, 0);
Assert(lock->nHolding > 0);
Assert(lock->nActive > 0);
Assert(lock->nActive <= lock->nHolding);
- Assert(xidLook->nHolding >= 0);
- Assert(xidLook->nHolding <= lock->nHolding);
-
-#ifdef USER_LOCKS
- if (lockmethod == USER_LOCKMETHOD)
- {
- if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
- {
-#ifdef LOCK_DEBUG
- if (Trace_userlocks)
- elog(DEBUG, "LockReleaseAll: skiping normal lock [%ld,%d,%d]",
- xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif /* LOCK_DEBUG */
- nleft++;
- goto next_item;
- }
- if (xidLook->tag.pid != MyProcPid)
- {
- /* Should never happen */
- elog(NOTICE,
- "LockReleaseAll: INVALID PID: [%u] [%ld,%d,%d]",
- lock->tag.objId.blkno,
- xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
- nleft++;
- goto next_item;
- }
-#ifdef LOCK_DEBUG
- if (Trace_userlocks)
- elog(DEBUG, "LockReleaseAll: releasing user lock [%u] [%ld,%d,%d]",
- lock->tag.objId.blkno, xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif /* LOCK_DEBUG */
- }
- else
- {
- /*
- * Can't check xidLook->tag.xid, can be 0 also for normal locks
- */
- if (xidLook->tag.pid != 0)
- {
-#ifdef LOCK_DEBUG
- if (Trace_userlocks)
- elog(DEBUG, "LockReleaseAll: skiping user lock [%u] [%ld,%d,%d]",
- lock->tag.objId.blkno, xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
-#endif /* LOCK_DEBUG */
- nleft++;
- goto next_item;
- }
- }
-#endif /* USER_LOCKS */
+ Assert(holder->nHolding >= 0);
+ Assert(holder->nHolding <= lock->nHolding);
/* ------------------
* fix the general lock stats
* ------------------
*/
- if (lock->nHolding != xidLook->nHolding)
+ if (lock->nHolding != holder->nHolding)
{
for (i = 1; i <= numLockModes; i++)
{
- Assert(xidLook->holders[i] >= 0);
- lock->holders[i] -= xidLook->holders[i];
- lock->activeHolders[i] -= xidLook->holders[i];
+ Assert(holder->holders[i] >= 0);
+ lock->holders[i] -= holder->holders[i];
+ lock->activeHolders[i] -= holder->holders[i];
Assert((lock->holders[i] >= 0) \
&&(lock->activeHolders[i] >= 0));
if (!lock->activeHolders[i])
/*
* Read comments in LockRelease
*/
- if (!wakeupNeeded && xidLook->holders[i] > 0 &&
+ if (!wakeupNeeded && holder->holders[i] > 0 &&
lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
wakeupNeeded = true;
}
- lock->nHolding -= xidLook->nHolding;
- lock->nActive -= xidLook->nHolding;
+ lock->nHolding -= holder->nHolding;
+ lock->nActive -= holder->nHolding;
Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
Assert(lock->nActive <= lock->nHolding);
}
}
LOCK_PRINT("LockReleaseAll: updated", lock, 0);
+ HOLDER_PRINT("LockReleaseAll: deleting", holder);
+
/*
- * Remove the xid from the process lock queue
+ * Remove the holder entry from the process' lock queue
*/
- SHMQueueDelete(&xidLook->queue);
+ SHMQueueDelete(&holder->queue);
- /* ----------------
- * always remove the xidLookup entry, we're done with it now
- * ----------------
+ /*
+ * remove the holder entry from the hashtable
*/
-
- XID_PRINT("LockReleaseAll: deleting", xidLook);
- xident = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
- (Pointer) xidLook,
- HASH_REMOVE,
- &found);
- if (!xident || !found)
+ holder = (HOLDER *) hash_search(lockMethodTable->holderHash,
+ (Pointer) holder,
+ HASH_REMOVE,
+ &found);
+ if (!holder || !found)
{
SpinRelease(masterLock);
- elog(NOTICE, "LockReleaseAll: xid table corrupted");
+ elog(NOTICE, "LockReleaseAll: holder table corrupted");
return FALSE;
}
}
}
else if (wakeupNeeded)
- {
- waitQueue = &(lock->waitProcs);
- ProcLockWakeup(waitQueue, lockmethod, lock);
- }
+ ProcLockWakeup(lockmethod, lock);
next_item:
- if (done)
- break;
- SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
- xidLook = tmp;
- }
+ holder = nextHolder;
+ } while (holder);
/*
* Reinitialize the queue only if nothing has been left in.
SHMEM_LOCKTAB_KEYSIZE,
SHMEM_LOCKTAB_DATASIZE);
- /* xidHash table */
+ /* holderHash table */
size += hash_estimate_size(NLOCKENTS(maxBackends),
- SHMEM_XIDTAB_KEYSIZE,
- SHMEM_XIDTAB_DATASIZE);
+ SHMEM_HOLDERTAB_KEYSIZE,
+ SHMEM_HOLDERTAB_DATASIZE);
/*
* Since the lockHash entry count above is only an estimate, add 10%
return size;
}
-/* -----------------
- * Boolean function to determine current locking status
- * -----------------
- */
-bool
-LockingDisabled()
-{
- return LockingIsDisabled;
-}
-
/*
* DeadlockCheck -- Checks for deadlocks for a given process
*
* We have already locked the master lock before being called.
*/
bool
-DeadLockCheck(void *proc, LOCK *findlock)
+DeadLockCheck(PROC *thisProc, LOCK *findlock)
{
- XIDLookupEnt *xidLook = NULL;
- XIDLookupEnt *tmp = NULL;
- PROC *thisProc = (PROC *) proc,
- *waitProc;
+ HOLDER *holder = NULL;
+ HOLDER *nextHolder = NULL;
+ PROC *waitProc;
+ PROC_QUEUE *waitQueue;
SHM_QUEUE *lockQueue = &(thisProc->lockQueue);
SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
+ LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
LOCK *lock;
- PROC_QUEUE *waitQueue;
int i,
j;
- bool first_run = (thisProc == MyProc),
- done;
+ bool first_run = (thisProc == MyProc);
static PROC *checked_procs[MAXBACKENDS];
static int nprocs;
/* initialize at start of recursion */
if (first_run)
{
- checked_procs[0] = MyProc;
+ checked_procs[0] = thisProc;
nprocs = 1;
}
if (SHMQueueEmpty(lockQueue))
return false;
- SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
-
- XID_PRINT("DeadLockCheck", xidLook);
+ SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
- for (;;)
+ do
{
- done = (xidLook->queue.next == end);
- lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
+ /* ---------------------------
+ * XXX Here we assume the shared memory queue is circular and
+ * that we know its internal structure. Should have some sort of
+ * macros to allow one to walk it. mer 20 July 1991
+ * ---------------------------
+ */
+ if (holder->queue.next == end)
+ nextHolder = NULL;
+ else
+ SHMQueueFirst(&holder->queue,
+ (Pointer *) &nextHolder, &nextHolder->queue);
- LOCK_PRINT("DeadLockCheck", lock, 0);
+ Assert(holder->tag.pid == thisProc->pid);
+
+ lock = (LOCK *) MAKE_PTR(holder->tag.lock);
- if (lock->tag.relId == 0) /* user' lock */
+ /* Ignore user locks */
+ if (lock->tag.lockmethod != DEFAULT_LOCKMETHOD)
goto nxtl;
+ HOLDER_PRINT("DeadLockCheck", holder);
+ LOCK_PRINT("DeadLockCheck", lock, 0);
+
/*
* waitLock is always in lockQueue of waiting proc, if !first_run
* then upper caller will handle waitProcs queue of waitLock.
*/
if (lock == findlock && !first_run)
{
- LOCKMETHODCTL *lockctl =
- LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
int lm;
- Assert(xidLook->nHolding > 0);
+ Assert(holder->nHolding > 0);
for (lm = 1; lm <= lockctl->numLockModes; lm++)
{
- if (xidLook->holders[lm] > 0 &&
+ if (holder->holders[lm] > 0 &&
lockctl->conflictTab[lm] & findlock->waitMask)
return true;
}
waitQueue = &(lock->waitProcs);
waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
- for (i = 0; i < waitQueue->size; i++)
+ /*
+ * NOTE: loop must count down because we want to examine each item
+ * in the queue even if waitQueue->size decreases due to waking up
+ * some of the processes.
+ */
+ for (i = waitQueue->size; --i >= 0; )
{
+ Assert(waitProc->waitLock == lock);
if (waitProc == thisProc)
{
- Assert(waitProc->waitLock == lock);
+ /* This should only happen at first level */
Assert(waitProc == MyProc);
- waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
- continue;
+ goto nextWaitProc;
}
if (lock == findlock) /* first_run also true */
{
- LOCKMETHODCTL *lockctl =
- LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
-
/*
* If me blocked by his holdlock...
*/
- if (lockctl->conflictTab[MyProc->token] & waitProc->holdLock)
+ if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->holdLock)
{
/* and he blocked by me -> deadlock */
- if (lockctl->conflictTab[waitProc->token] & MyProc->holdLock)
+ if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock)
return true;
/* we shouldn't look at lockQueue of our blockers */
- waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
- continue;
+ goto nextWaitProc;
}
/*
* If he isn't blocked by me and we request
* non-conflicting lock modes - no deadlock here because
- * of he isn't blocked by me in any sence (explicitle or
+ * he isn't blocked by me in any sense (explicitly or
* implicitly). Note that we don't do like test if
* !first_run (when thisProc is holder and non-waiter on
* lock) and so we call DeadLockCheck below for every
* un-blocked by thisProc. Should we? This could save us
* some time...
*/
- if (!(lockctl->conflictTab[waitProc->token] & MyProc->holdLock) &&
- !(lockctl->conflictTab[waitProc->token] & (1 << MyProc->token)))
- {
- waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
- continue;
- }
+ if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock) &&
+ !(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode)))
+ goto nextWaitProc;
}
/*
- * Look in lockQueue of this waitProc, if didn't do this
- * before.
+ * Skip this waiter if already checked.
*/
for (j = 0; j < nprocs; j++)
{
if (checked_procs[j] == waitProc)
- break;
+ goto nextWaitProc;
}
- if (j >= nprocs)
+
+ /* Recursively check this process's lockQueue. */
+ Assert(nprocs < MAXBACKENDS);
+ checked_procs[nprocs++] = waitProc;
+
+ if (DeadLockCheck(waitProc, findlock))
{
- Assert(nprocs < MAXBACKENDS);
- checked_procs[nprocs++] = waitProc;
+ int holdLock;
- if (DeadLockCheck(waitProc, findlock))
+ /*
+ * Ok, but is waitProc waiting for me (thisProc) ?
+ */
+ if (thisProc->waitLock == lock)
+ {
+ Assert(first_run);
+ holdLock = thisProc->holdLock;
+ }
+ else
+ {
+ /* should we cache holdLock to speed this up? */
+ holdLock = LockGetMyHoldLocks(holder->tag.lock, thisProc);
+ Assert(holdLock != 0);
+ }
+ if (lockctl->conflictTab[waitProc->waitLockMode] & holdLock)
{
- LOCKMETHODCTL *lockctl =
- LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
- int holdLock;
-
/*
- * Ok, but is waitProc waiting for me (thisProc) ?
+ * Last attempt to avoid deadlock: try to wakeup myself.
*/
- if (thisProc->waitLock == lock)
- {
- Assert(first_run);
- holdLock = thisProc->holdLock;
- }
- else
-/* should we cache holdLock ? */
- {
- int lm,
- tmpMask = 2;
-
- Assert(xidLook->nHolding > 0);
- for (holdLock = 0, lm = 1;
- lm <= lockctl->numLockModes;
- lm++, tmpMask <<= 1)
- {
- if (xidLook->holders[lm] > 0)
- holdLock |= tmpMask;
- }
- Assert(holdLock != 0);
- }
- if (lockctl->conflictTab[waitProc->token] & holdLock)
+ if (first_run)
{
-
- /*
- * Last attempt to avoid deadlock - try to wakeup
- * myself.
- */
- if (first_run)
+ if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
+ MyProc->waitLockMode,
+ MyProc->waitLock,
+ MyProc->waitHolder,
+ MyProc,
+ NULL) == STATUS_OK)
{
- if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
- MyProc->waitLock,
- MyProc->token,
- MyProc->xid,
- NULL) == STATUS_OK)
- {
- SetWaitingForLock(false);
- GrantLock(MyProc->waitLock, MyProc->token);
- (MyProc->waitLock->waitProcs.size)--;
- ProcWakeup(MyProc, NO_ERROR);
- return false;
- }
+ SetWaitingForLock(false);
+ GrantLock(MyProc->waitLock,
+ MyProc->waitHolder,
+ MyProc->waitLockMode);
+ ProcWakeup(MyProc, NO_ERROR);
+ return false;
}
- return true;
- }
-
- /*
- * Hell! Is he blocked by any (other) holder ?
- */
- if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
- lock,
- waitProc->token,
- waitProc->xid,
- NULL) != STATUS_OK)
- {
-
- /*
- * Blocked by others - no deadlock...
- */
- LOCK_PRINT("DeadLockCheck: blocked by others", lock, waitProc->token);
- waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
- continue;
}
+ return true;
+ }
+ /*
+ * Hell! Is he blocked by any (other) holder ?
+ */
+ if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
+ waitProc->waitLockMode,
+ lock,
+ waitProc->waitHolder,
+ waitProc,
+ NULL) != STATUS_OK)
+ {
/*
- * Well - wakeup this guy! This is the case of
- * implicit blocking: thisProc blocked someone who
- * blocked waitProc by the fact that he/someone is
- * already waiting for lock. We do this for
- * anti-starving.
+ * Blocked by others - no deadlock...
*/
- GrantLock(lock, waitProc->token);
- waitQueue->size--;
- waitProc = ProcWakeup(waitProc, NO_ERROR);
- continue;
+ LOCK_PRINT("DeadLockCheck: blocked by others",
+ lock, waitProc->waitLockMode);
+ goto nextWaitProc;
}
+
+ /*
+ * Well - wakeup this guy! This is the case of
+ * implicit blocking: thisProc blocked someone who
+ * blocked waitProc by the fact that he/someone is
+ * already waiting for lock. We do this for
+ * anti-starving.
+ */
+ GrantLock(lock, waitProc->waitHolder, waitProc->waitLockMode);
+ waitProc = ProcWakeup(waitProc, NO_ERROR);
+ /*
+ * Use next-proc link returned by ProcWakeup, since this
+ * proc's own links field is now cleared.
+ */
+ continue;
}
+
+nextWaitProc:
waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
}
-nxtl: ;
- if (done)
- break;
- SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
- xidLook = tmp;
- }
+nxtl:
+ holder = nextHolder;
+ } while (holder);
/* if we got here, no deadlock */
return false;
* the masterLock.
*/
void
-DumpLocks()
+DumpLocks(void)
{
SHMEM_OFFSET location;
PROC *proc;
SHM_QUEUE *lockQueue;
- int done;
- XIDLookupEnt *xidLook = NULL;
- XIDLookupEnt *tmp = NULL;
+ HOLDER *holder = NULL;
+ HOLDER *nextHolder = NULL;
SHMEM_OFFSET end;
LOCK *lock;
- int count = 0;
int lockmethod = DEFAULT_LOCKMETHOD;
LOCKMETHODTABLE *lockMethodTable;
if (proc != MyProc)
return;
lockQueue = &proc->lockQueue;
+ end = MAKE_OFFSET(lockQueue);
Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod];
if (!lockMethodTable)
return;
+ if (proc->waitLock)
+ LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
+
if (SHMQueueEmpty(lockQueue))
return;
- SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
- end = MAKE_OFFSET(lockQueue);
-
- if (MyProc->waitLock)
- LOCK_PRINT("DumpLocks: waiting on", MyProc->waitLock, 0);
+ SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
- for (;;)
+ do
{
- if (count++ > 2000)
- {
- elog(NOTICE, "DumpLocks: xid loop detected, giving up");
- break;
- }
-
/* ---------------------------
* XXX Here we assume the shared memory queue is circular and
* that we know its internal structure. Should have some sort of
* macros to allow one to walk it. mer 20 July 1991
* ---------------------------
*/
- done = (xidLook->queue.next == end);
- lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
+ if (holder->queue.next == end)
+ nextHolder = NULL;
+ else
+ SHMQueueFirst(&holder->queue,
+ (Pointer *) &nextHolder, &nextHolder->queue);
- XID_PRINT("DumpLocks", xidLook);
- LOCK_PRINT("DumpLocks", lock, 0);
+ Assert(holder->tag.pid == proc->pid);
- if (done)
- break;
+ lock = (LOCK *) MAKE_PTR(holder->tag.lock);
- SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
- xidLook = tmp;
- }
+ HOLDER_PRINT("DumpLocks", holder);
+ LOCK_PRINT("DumpLocks", lock, 0);
+
+ holder = nextHolder;
+ } while (holder);
}
/*
* Dump all postgres locks. Must have already acquired the masterLock.
*/
void
-DumpAllLocks()
+DumpAllLocks(void)
{
SHMEM_OFFSET location;
PROC *proc;
- XIDLookupEnt *xidLook = NULL;
+ HOLDER *holder = NULL;
LOCK *lock;
int pid;
- int count = 0;
int lockmethod = DEFAULT_LOCKMETHOD;
LOCKMETHODTABLE *lockMethodTable;
- HTAB *xidTable;
+ HTAB *holderTable;
pid = getpid();
ShmemPIDLookup(pid, &location);
if (!lockMethodTable)
return;
- xidTable = lockMethodTable->xidHash;
+ holderTable = lockMethodTable->holderHash;
- if (MyProc->waitLock)
- LOCK_PRINT("DumpAllLocks: waiting on", MyProc->waitLock, 0);
+ if (proc->waitLock)
+ LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
hash_seq(NULL);
- while ((xidLook = (XIDLookupEnt *) hash_seq(xidTable)) &&
- (xidLook != (XIDLookupEnt *) TRUE))
+ while ((holder = (HOLDER *) hash_seq(holderTable)) &&
+ (holder != (HOLDER *) TRUE))
{
- XID_PRINT("DumpAllLocks", xidLook);
+ HOLDER_PRINT("DumpAllLocks", holder);
- if (xidLook->tag.lock)
+ if (holder->tag.lock)
{
- lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
+ lock = (LOCK *) MAKE_PTR(holder->tag.lock);
LOCK_PRINT("DumpAllLocks", lock, 0);
}
else
- elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL");
-
- if (count++ > 2000)
- {
- elog(NOTICE, "DumpAllLocks: possible loop, giving up");
- break;
- }
+ elog(DEBUG, "DumpAllLocks: holder->tag.lock = NULL");
}
}
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.88 2000/12/18 17:33:41 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.89 2000/12/22 00:51:54 tgl Exp $
*
*-------------------------------------------------------------------------
*/
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
*
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.88 2000/12/18 17:33:41 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.89 2000/12/22 00:51:54 tgl Exp $
*/
#include "postgres.h"
#include <sys/sem.h>
#endif
+#include "access/xact.h"
#include "storage/proc.h"
void HandleDeadLock(SIGNAL_ARGS);
static void ProcFreeAllSemaphores(void);
-static bool GetOffWaitqueue(PROC *);
+static bool GetOffWaitQueue(PROC *);
int DeadlockTimeout = 1000;
/* -----------------------
* get process off any wait queue it might be on
+ *
+ * NB: this does not remove the process' holder object, nor the lock object,
+ * even though their holder counts might now have gone to zero. That will
+ * happen during a subsequent LockReleaseAll call, which we expect will happen
+ * during transaction cleanup. (Removal of a proc from its wait queue by
+ * this routine can only happen if we are aborting the transaction.)
* -----------------------
*/
static bool
-GetOffWaitqueue(PROC *proc)
+GetOffWaitQueue(PROC *proc)
{
- bool getoffed = false;
+ bool gotoff = false;
LockLockTable();
if (proc->links.next != INVALID_OFFSET)
{
- int lockmode = proc->token;
- LOCK *waitLock = proc->waitLock;
+ LOCK *waitLock = proc->waitLock;
+ LOCKMODE lockmode = proc->waitLockMode;
+ /* Remove proc from lock's wait queue */
Assert(waitLock);
Assert(waitLock->waitProcs.size > 0);
SHMQueueDelete(&(proc->links));
--waitLock->waitProcs.size;
+
+ /* Undo increments of holder counts by waiting process */
Assert(waitLock->nHolding > 0);
Assert(waitLock->nHolding > proc->waitLock->nActive);
--waitLock->nHolding;
Assert(waitLock->holders[lockmode] > 0);
--waitLock->holders[lockmode];
+ /* don't forget to clear waitMask bit if appropriate */
if (waitLock->activeHolders[lockmode] == waitLock->holders[lockmode])
waitLock->waitMask &= ~(1 << lockmode);
- ProcLockWakeup(&(waitLock->waitProcs), LOCK_LOCKMETHOD(*waitLock), waitLock);
- getoffed = true;
+
+ /* Clean up the proc's own state */
+ SHMQueueElemInit(&(proc->links));
+ proc->waitLock = NULL;
+ proc->waitHolder = NULL;
+
+ /* See if any other waiters can be woken up now */
+ ProcLockWakeup(LOCK_LOCKMETHOD(*waitLock), waitLock);
+
+ gotoff = true;
}
- SHMQueueElemInit(&(proc->links));
UnlockLockTable();
- return getoffed;
+ return gotoff;
}
/*
- * ProcReleaseLocks() -- release all locks associated with current transaction
+ * ProcReleaseLocks() -- release locks associated with current transaction
+ * at transaction commit or abort
+ *
+ * At commit, we release only locks tagged with the current transaction's XID,
+ * leaving those marked with XID 0 (ie, session locks) undisturbed. At abort,
+ * we release all locks including XID 0, because we need to clean up after
+ * a failure. This logic will need extension if we ever support nested
+ * transactions.
*
+ * Note that user locks are not released in either case.
*/
void
-ProcReleaseLocks()
+ProcReleaseLocks(bool isCommit)
{
if (!MyProc)
return;
- LockReleaseAll(DEFAULT_LOCKMETHOD, &MyProc->lockQueue);
- GetOffWaitqueue(MyProc);
+ GetOffWaitQueue(MyProc);
+ LockReleaseAll(DEFAULT_LOCKMETHOD, MyProc,
+ !isCommit, GetCurrentTransactionId());
}
/*
ProcKill(int exitStatus, Datum pid)
{
PROC *proc;
- SHMEM_OFFSET location;
/* --------------------
* If this is a FATAL exit the postmaster will have to kill all the
- * existing backends and reinitialize shared memory. So all we don't
+ * existing backends and reinitialize shared memory. So we don't
* need to do anything here.
* --------------------
*/
if (exitStatus != 0)
return;
- ShmemPIDLookup(MyProcPid, &location);
- if (location == INVALID_OFFSET)
- return;
-
- proc = (PROC *) MAKE_PTR(location);
+ if ((int) pid == MyProcPid)
+ {
+ proc = MyProc;
+ MyProc = NULL;
+ }
+ else
+ {
+ /* This path is dead code at the moment ... */
+ SHMEM_OFFSET location = INVALID_OFFSET;
- Assert(proc == MyProc || (int)pid != MyProcPid);
+ ShmemPIDLookup((int) pid, &location);
+ if (location == INVALID_OFFSET)
+ return;
+ proc = (PROC *) MAKE_PTR(location);
+ }
- MyProc = NULL;
+ Assert(proc);
- /* ---------------
- * Assume one lock table.
- * ---------------
- */
+ /* Release any spinlocks the proc is holding */
ProcReleaseSpins(proc);
- LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue);
-#ifdef USER_LOCKS
+ /* Get the proc off any wait queue it might be on */
+ GetOffWaitQueue(proc);
- /*
- * Assume we have a second lock table.
- */
- LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue);
-#endif
+ /* Remove from the standard lock table */
+ LockReleaseAll(DEFAULT_LOCKMETHOD, proc, true, InvalidTransactionId);
- /* ----------------
- * get off the wait queue
- * ----------------
- */
- GetOffWaitqueue(proc);
+#ifdef USER_LOCKS
+ /* Remove from the user lock table */
+ LockReleaseAll(USER_LOCKMETHOD, proc, true, InvalidTransactionId);
+#endif
}
/*
}
if (QueryCancel) /* cancel request pending */
{
- if (GetOffWaitqueue(MyProc))
+ if (GetOffWaitQueue(MyProc))
{
lockWaiting = false;
- elog(ERROR, "Query cancel requested while waiting lock");
+ elog(ERROR, "Query cancel requested while waiting for lock");
}
}
}
set_alarm(B_INFINITE_TIMEOUT, B_PERIODIC_ALARM);
#endif /* __BEOS__ */
- if (GetOffWaitqueue(MyProc))
- elog(ERROR, "Query cancel requested while waiting lock");
+ if (GetOffWaitQueue(MyProc))
+ elog(ERROR, "Query cancel requested while waiting for lock");
}
/*
* NOTES: The process queue is now a priority queue for locking.
*/
int
-ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
- LOCKMETHODCTL *lockctl,
- int token, /* lockmode */
- LOCK *lock)
+ProcSleep(LOCKMETHODCTL *lockctl,
+ LOCKMODE lockmode,
+ LOCK *lock,
+ HOLDER *holder)
{
- int i;
+ PROC_QUEUE *waitQueue = &(lock->waitProcs);
SPINLOCK spinlock = lockctl->masterLock;
- PROC *proc;
- int myMask = (1 << token);
+ int myMask = (1 << lockmode);
int waitMask = lock->waitMask;
+ PROC *proc;
+ int i;
int aheadHolders[MAX_LOCKMODES];
- bool selfConflict = (lockctl->conflictTab[token] & myMask),
+ bool selfConflict = (lockctl->conflictTab[lockmode] & myMask),
prevSame = false;
#ifndef __BEOS__
struct itimerval timeval,
bigtime_t time_interval;
#endif
- MyProc->token = token;
MyProc->waitLock = lock;
+ MyProc->waitHolder = holder;
+ MyProc->waitLockMode = lockmode;
+ /* We assume the caller set up MyProc->holdLock */
proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
/* if we don't conflict with any waiter - be first in queue */
- if (!(lockctl->conflictTab[token] & waitMask))
+ if (!(lockctl->conflictTab[lockmode] & waitMask))
goto ins;
for (i = 1; i < MAX_LOCKMODES; i++)
aheadHolders[i] = lock->activeHolders[i];
- (aheadHolders[token])++;
+ (aheadHolders[lockmode])++;
for (i = 0; i < waitQueue->size; i++)
{
/* am I waiting for him ? */
- if (lockctl->conflictTab[token] & proc->holdLock)
+ if (lockctl->conflictTab[lockmode] & proc->holdLock)
{
/* is he waiting for me ? */
- if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+ if (lockctl->conflictTab[proc->waitLockMode] & MyProc->holdLock)
{
/* Yes, report deadlock failure */
MyProc->errType = STATUS_ERROR;
/* being waiting for him - go past */
}
/* if he waits for me */
- else if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+ else if (lockctl->conflictTab[proc->waitLockMode] & MyProc->holdLock)
break;
/* if conflicting locks requested */
- else if (lockctl->conflictTab[proc->token] & myMask)
+ else if (lockctl->conflictTab[proc->waitLockMode] & myMask)
{
/*
* Last attempt to don't move any more: if we don't conflict with
* rest waiters in queue.
*/
- else if (!(lockctl->conflictTab[token] & waitMask))
+ else if (!(lockctl->conflictTab[lockmode] & waitMask))
break;
- prevSame = (proc->token == token);
- (aheadHolders[proc->token])++;
- if (aheadHolders[proc->token] == lock->holders[proc->token])
- waitMask &= ~(1 << proc->token);
+ prevSame = (proc->waitLockMode == lockmode);
+ (aheadHolders[proc->waitLockMode])++;
+ if (aheadHolders[proc->waitLockMode] == lock->holders[proc->waitLockMode])
+ waitMask &= ~(1 << proc->waitLockMode);
proc = (PROC *) MAKE_PTR(proc->links.prev);
}
rt:;
-#ifdef LOCK_DEBUG
- /* Just to get meaningful debug messages from DumpLocks() */
- MyProc->waitLock = (LOCK *) NULL;
-#endif
+ MyProc->waitLock = NULL;
+ MyProc->waitHolder = NULL;
return MyProc->errType;
}
/*
* ProcWakeup -- wake up a process by releasing its private semaphore.
*
- * remove the process from the wait queue and set its links invalid.
+ * Also remove the process from the wait queue and set its links invalid.
* RETURN: the next process in the wait queue.
*/
PROC *
retProc = (PROC *) MAKE_PTR(proc->links.prev);
- /* you have to update waitLock->waitProcs.size yourself */
SHMQueueDelete(&(proc->links));
SHMQueueElemInit(&(proc->links));
+ (proc->waitLock->waitProcs.size)--;
proc->errType = errType;
* released.
*/
int
-ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
+ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
{
+ PROC_QUEUE *queue = &(lock->waitProcs);
PROC *proc;
- int count = 0;
- int last_locktype = 0;
+ int awoken = 0;
+ LOCKMODE last_lockmode = 0;
int queue_size = queue->size;
- Assert(queue->size >= 0);
+ Assert(queue_size >= 0);
- if (!queue->size)
+ if (!queue_size)
return STATUS_NOT_FOUND;
proc = (PROC *) MAKE_PTR(queue->links.prev);
- while ((queue_size--) && (proc))
- {
- /*
- * This proc will conflict as the previous one did, don't even
- * try.
- */
- if (proc->token == last_locktype)
- continue;
+ while (queue_size-- > 0)
+ {
+ if (proc->waitLockMode == last_lockmode)
+ {
+ /*
+ * This proc will conflict as the previous one did, don't even
+ * try.
+ */
+ goto nextProc;
+ }
/*
* Does this proc conflict with locks held by others ?
*/
if (LockResolveConflicts(lockmethod,
+ proc->waitLockMode,
lock,
- proc->token,
- proc->xid,
- (XIDLookupEnt *) NULL) != STATUS_OK)
+ proc->waitHolder,
+ proc,
+ NULL) != STATUS_OK)
{
- if (count != 0)
+ /* Yes. Quit if we already awoke at least one process. */
+ if (awoken != 0)
break;
- last_locktype = proc->token;
- continue;
+ /* Otherwise, see if any later waiters can be awoken. */
+ last_lockmode = proc->waitLockMode;
+ goto nextProc;
}
/*
- * there was a waiting process, grant it the lock before waking it
- * up. This will prevent another process from seizing the lock
- * between the time we release the lock master (spinlock) and the
- * time that the awoken process begins executing again.
+ * OK to wake up this sleeping process.
*/
- GrantLock(lock, proc->token);
+ GrantLock(lock, proc->waitHolder, proc->waitLockMode);
+ proc = ProcWakeup(proc, NO_ERROR);
+ awoken++;
/*
- * ProcWakeup removes proc from the lock waiting process queue and
- * returns the next proc in chain.
+ * ProcWakeup removes proc from the lock's waiting process queue
+ * and returns the next proc in chain; don't use prev link.
*/
+ continue;
- count++;
- queue->size--;
- proc = ProcWakeup(proc, NO_ERROR);
+nextProc:
+ proc = (PROC *) MAKE_PTR(proc->links.prev);
}
Assert(queue->size >= 0);
- if (count)
+ if (awoken)
return STATUS_OK;
else
{
#ifdef LOCK_DEBUG
if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
{
- elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process", MAKE_OFFSET(lock));
+ elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process",
+ MAKE_OFFSET(lock));
if (Debug_deadlocks)
- DumpAllLocks();
+ DumpAllLocks();
}
#endif
return STATUS_NOT_FOUND;
*/
mywaitlock = MyProc->waitLock;
Assert(mywaitlock->waitProcs.size > 0);
- lockWaiting = false;
--mywaitlock->waitProcs.size;
SHMQueueDelete(&(MyProc->links));
SHMQueueElemInit(&(MyProc->links));
+ MyProc->waitLock = NULL;
+ MyProc->waitHolder = NULL;
+ lockWaiting = false;
/* ------------------
* Unlock my semaphore so that the interrupted ProcSleep() call can finish.