*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/ipc/shmqueue.c,v 1.13 2000/01/26 05:56:58 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/ipc/shmqueue.c,v 1.14 2001/01/22 22:30:06 tgl Exp $
*
* NOTES
*
* Package for managing doubly-linked lists in shared memory.
* The only tricky thing is that SHM_QUEUE will usually be a field
- * in a larger record. SHMQueueGetFirst has to return a pointer
+ * in a larger record. SHMQueueNext has to return a pointer
* to the record itself instead of a pointer to the SHMQueue field
- * of the record. It takes an extra pointer and does some extra
+ * of the record. It takes an extra parameter and does some extra
* pointer arithmetic to do this correctly.
*
* NOTE: These are set up so they can be turned into macros some day.
*
*-------------------------------------------------------------------------
*/
-
#include "postgres.h"
+
#include "storage/shmem.h"
/*#define SHMQUEUE_DEBUG*/
#ifdef SHMQUEUE_DEBUG
-#define SHMQUEUE_DEBUG_DEL /* deletions */
-#define SHMQUEUE_DEBUG_HD /* head inserts */
-#define SHMQUEUE_DEBUG_TL /* tail inserts */
+
#define SHMQUEUE_DEBUG_ELOG NOTICE
-#endif /* SHMQUEUE_DEBUG */
+
+static void dumpQ(SHM_QUEUE *q, char *s);
+
+#endif
+
/*
* ShmemQueueInit -- make the head of a new queue point
Assert(SHM_PTR_VALID(nextElem));
Assert(SHM_PTR_VALID(prevElem));
-#ifdef SHMQUEUE_DEBUG_DEL
+#ifdef SHMQUEUE_DEBUG
dumpQ(queue, "in SHMQueueDelete: begin");
-#endif /* SHMQUEUE_DEBUG_DEL */
+#endif
prevElem->next = (queue)->next;
nextElem->prev = (queue)->prev;
-#ifdef SHMQUEUE_DEBUG_DEL
- dumpQ((SHM_QUEUE *) MAKE_PTR(queue->prev), "in SHMQueueDelete: end");
-#endif /* SHMQUEUE_DEBUG_DEL */
-}
-
-#ifdef SHMQUEUE_DEBUG
-void
-dumpQ(SHM_QUEUE *q, char *s)
-{
- char elem[NAMEDATALEN];
- char buf[1024];
- SHM_QUEUE *start = q;
- int count = 0;
-
- sprintf(buf, "q prevs: %x", MAKE_OFFSET(q));
- q = (SHM_QUEUE *) MAKE_PTR(q->prev);
- while (q != start)
- {
- sprintf(elem, "--->%x", MAKE_OFFSET(q));
- strcat(buf, elem);
- q = (SHM_QUEUE *) MAKE_PTR(q->prev);
- if (q->prev == MAKE_OFFSET(q))
- break;
- if (count++ > 40)
- {
- strcat(buf, "BAD PREV QUEUE!!");
- break;
- }
- }
- sprintf(elem, "--->%x", MAKE_OFFSET(q));
- strcat(buf, elem);
- elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
-
- sprintf(buf, "q nexts: %x", MAKE_OFFSET(q));
- count = 0;
- q = (SHM_QUEUE *) MAKE_PTR(q->next);
- while (q != start)
- {
- sprintf(elem, "--->%x", MAKE_OFFSET(q));
- strcat(buf, elem);
- q = (SHM_QUEUE *) MAKE_PTR(q->next);
- if (q->next == MAKE_OFFSET(q))
- break;
- if (count++ > 10)
- {
- strcat(buf, "BAD NEXT QUEUE!!");
- break;
- }
- }
- sprintf(elem, "--->%x", MAKE_OFFSET(q));
- strcat(buf, elem);
- elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
+ (queue)->prev = (queue)->next = INVALID_OFFSET;
}
-#endif /* SHMQUEUE_DEBUG */
-
/*
- * SHMQueueInsertHD -- put elem in queue between the queue head
- * and its "prev" element.
+ * SHMQueueInsertBefore -- put elem in queue before the given queue
+ * element. Inserting "before" the queue head puts the elem
+ * at the tail of the queue.
*/
-#ifdef NOT_USED
void
-SHMQueueInsertHD(SHM_QUEUE *queue, SHM_QUEUE *elem)
+SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem)
{
SHM_QUEUE *prevPtr = (SHM_QUEUE *) MAKE_PTR((queue)->prev);
SHMEM_OFFSET elemOffset = MAKE_OFFSET(elem);
Assert(SHM_PTR_VALID(queue));
Assert(SHM_PTR_VALID(elem));
-#ifdef SHMQUEUE_DEBUG_HD
- dumpQ(queue, "in SHMQueueInsertHD: begin");
-#endif /* SHMQUEUE_DEBUG_HD */
+#ifdef SHMQUEUE_DEBUG
+ dumpQ(queue, "in SHMQueueInsertBefore: begin");
+#endif
(elem)->next = prevPtr->next;
(elem)->prev = queue->prev;
(queue)->prev = elemOffset;
prevPtr->next = elemOffset;
-#ifdef SHMQUEUE_DEBUG_HD
- dumpQ(queue, "in SHMQueueInsertHD: end");
-#endif /* SHMQUEUE_DEBUG_HD */
-}
-
+#ifdef SHMQUEUE_DEBUG
+ dumpQ(queue, "in SHMQueueInsertBefore: end");
#endif
+}
+/*
+ * SHMQueueInsertAfter -- put elem in queue after the given queue
+ * element. Inserting "after" the queue head puts the elem
+ * at the head of the queue.
+ */
+#ifdef NOT_USED
void
-SHMQueueInsertTL(SHM_QUEUE *queue, SHM_QUEUE *elem)
+SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
{
SHM_QUEUE *nextPtr = (SHM_QUEUE *) MAKE_PTR((queue)->next);
SHMEM_OFFSET elemOffset = MAKE_OFFSET(elem);
Assert(SHM_PTR_VALID(queue));
Assert(SHM_PTR_VALID(elem));
-#ifdef SHMQUEUE_DEBUG_TL
- dumpQ(queue, "in SHMQueueInsertTL: begin");
-#endif /* SHMQUEUE_DEBUG_TL */
+#ifdef SHMQUEUE_DEBUG
+ dumpQ(queue, "in SHMQueueInsertAfter: begin");
+#endif
(elem)->prev = nextPtr->prev;
(elem)->next = queue->next;
(queue)->next = elemOffset;
nextPtr->prev = elemOffset;
-#ifdef SHMQUEUE_DEBUG_TL
- dumpQ(queue, "in SHMQueueInsertTL: end");
-#endif /* SHMQUEUE_DEBUG_TL */
+#ifdef SHMQUEUE_DEBUG
+ dumpQ(queue, "in SHMQueueInsertAfter: end");
+#endif
}
+#endif /* NOT_USED */
-/*
- * SHMQueueFirst -- Get the first element from a queue
+/*--------------------
+ * SHMQueueNext -- Get the next element from a queue
*
- * First element is queue->next. If SHMQueue is part of
+ * To start the iteration, pass the queue head as both queue and curElem.
+ * Returns NULL if no more elements.
+ *
+ * Next element is at curElem->next. If SHMQueue is part of
* a larger structure, we want to return a pointer to the
* whole structure rather than a pointer to its SHMQueue field.
* I.E. struct {
* int stuff;
* SHMQueue elem;
* } ELEMType;
- * when this element is in a queue (queue->next) is struct.elem.
- * nextQueue allows us to calculate the offset of the SHMQueue
- * field in the structure.
- *
- * call to SHMQueueFirst should take these parameters:
+ * When this element is in a queue, (prevElem->next) is struct.elem.
+ * We subtract linkOffset to get the correct start address of the structure.
*
- * &(queueHead),&firstElem,&(firstElem->next)
+ * calls to SHMQueueNext should take these parameters:
*
- * Note that firstElem may well be uninitialized. if firstElem
- * is initially K, &(firstElem->next) will be K+ the offset to
- * next.
+ * &(queueHead), &(queueHead), offsetof(ELEMType, elem)
+ * or
+ * &(queueHead), &(curElem->elem), offsetof(ELEMType, elem)
+ *--------------------
*/
-void
-SHMQueueFirst(SHM_QUEUE *queue, Pointer *nextPtrPtr, SHM_QUEUE *nextQueue)
+Pointer
+SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem, Size linkOffset)
{
- SHM_QUEUE *elemPtr = (SHM_QUEUE *) MAKE_PTR((queue)->next);
+ SHM_QUEUE *elemPtr = (SHM_QUEUE *) MAKE_PTR((curElem)->next);
- Assert(SHM_PTR_VALID(queue));
- *nextPtrPtr = (Pointer) (((unsigned long) *nextPtrPtr) +
- ((unsigned long) elemPtr) - ((unsigned long) nextQueue));
-
- /*
- * nextPtrPtr a ptr to a structure linked in the queue nextQueue is
- * the SHMQueue field of the structure nextPtrPtr - nextQueue is 0
- * minus the offset of the queue field n the record elemPtr +
- * (*nextPtrPtr - nexQueue) is the start of the structure containing
- * elemPtr.
- */
+ Assert(SHM_PTR_VALID(curElem));
+
+ if (elemPtr == queue) /* back to the queue head? */
+ return NULL;
+
+ return (Pointer) (((char *) elemPtr) - linkOffset);
}
/*
}
return FALSE;
}
+
+#ifdef SHMQUEUE_DEBUG
+
+static void
+dumpQ(SHM_QUEUE *q, char *s)
+{
+ char elem[NAMEDATALEN];
+ char buf[1024];
+ SHM_QUEUE *start = q;
+ int count = 0;
+
+ sprintf(buf, "q prevs: %lx", MAKE_OFFSET(q));
+ q = (SHM_QUEUE *) MAKE_PTR(q->prev);
+ while (q != start)
+ {
+ sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+ strcat(buf, elem);
+ q = (SHM_QUEUE *) MAKE_PTR(q->prev);
+ if (q->prev == MAKE_OFFSET(q))
+ break;
+ if (count++ > 40)
+ {
+ strcat(buf, "BAD PREV QUEUE!!");
+ break;
+ }
+ }
+ sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+ strcat(buf, elem);
+ elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
+
+ sprintf(buf, "q nexts: %lx", MAKE_OFFSET(q));
+ count = 0;
+ q = (SHM_QUEUE *) MAKE_PTR(q->next);
+ while (q != start)
+ {
+ sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+ strcat(buf, elem);
+ q = (SHM_QUEUE *) MAKE_PTR(q->next);
+ if (q->next == MAKE_OFFSET(q))
+ break;
+ if (count++ > 10)
+ {
+ strcat(buf, "BAD NEXT QUEUE!!");
+ break;
+ }
+ }
+ sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+ strcat(buf, elem);
+ elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
+}
+
+#endif /* SHMQUEUE_DEBUG */
-$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.5 2001/01/16 06:11:34 tgl Exp $
+$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.6 2001/01/22 22:30:06 tgl Exp $
There are two fundamental lock structures: the per-lockable-object LOCK
struct, and the per-lock-holder HOLDER struct. A LOCK object exists
---------------------------------------------------------------------------
-The lock manager's LOCK:
+The lock manager's LOCK objects contain:
tag -
The key fields that are used for hashing locks in the shared memory
tag.dbId -
Uniquely identifies the database in which the relation lives. If
- this is a shared system relation (e.g. pg_user) the dbId should be
- set to 0.
+ this is a shared system relation (e.g. pg_database) the dbId must
+ be set to 0.
- tag.tupleId -
+ tag.objId -
Uniquely identifies the block/page within the relation and the
tuple within the block. If we are setting a table level lock
both the blockId and tupleId (in an item pointer this is called
This bitmask shows the types of locks being waited for. Bit i of waitMask
is 1 if and only if requested[i] > granted[i].
+lockHolders -
+ This is a shared memory queue of all the HOLDER structs associated with
+ the lock object. Note that both granted and waiting HOLDERs are in this
+ list (indeed, the same HOLDER might have some already-granted locks and
+ be waiting for more!).
+
waitProcs -
This is a shared memory queue of all process structures corresponding to
a backend that is waiting (sleeping) until another backend releases this
---------------------------------------------------------------------------
-The lock manager's HOLDER:
+The lock manager's HOLDER objects contain:
tag -
The key fields that are used for hashing entries in the shared memory
tag.lock
SHMEM offset of the LOCK object this holder is for.
- tag.pid
- PID of backend process that owns this holder.
+ tag.proc
+ SHMEM offset of PROC of backend process that owns this holder.
tag.xid
XID of transaction this holder is for, or InvalidTransactionId
nHolding -
Sum of the holding[] array.
-queue -
+lockLink -
+ List link for shared memory queue of all the HOLDER objects for the
+ same LOCK.
+
+procLink -
List link for shared memory queue of all the HOLDER objects for the
same backend.
+
+---------------------------------------------------------------------------
+
+The deadlock detection algorithm:
+
+Since we allow user transactions to request locks in any order, deadlock
+is possible. We use a deadlock detection/breaking algorithm that is
+fairly standard in essence, but there are many special considerations
+needed to deal with Postgres' generalized locking model.
+
+A key design consideration is that we want to make routine operations
+(lock grant and release) run quickly when there is no deadlock, and avoid
+the overhead of deadlock handling as much as possible. We do this using
+an "optimistic waiting" approach: if a process cannot acquire the lock
+it wants immediately, it goes to sleep without any deadlock check. But
+it also sets a delay timer, with a delay of DeadlockTimeout milliseconds
+(typically set to one second). If the delay expires before the process is
+granted the lock it wants, it runs the deadlock detection/breaking code.
+Normally this code will determine that there is no deadlock condition,
+and then the process will go back to sleep and wait quietly until it is
+granted the lock. But if a deadlock condition does exist, it will be
+resolved, usually by aborting the detecting process' transaction. In this
+way, we avoid deadlock handling overhead whenever the wait time for a lock
+is less than DeadlockTimeout, while not imposing an unreasonable delay of
+detection when there is an error.
+
+Lock acquisition (routines LockAcquire and ProcSleep) follows these rules:
+
+1. A lock request is granted immediately if it does not conflict with any
+existing or waiting lock request, or if the process already holds an
+instance of the same lock type (eg, there's no penalty to acquire a read
+lock twice). Note that a process never conflicts with itself, eg one can
+obtain read lock when one already holds exclusive lock.
+
+2. Otherwise the process joins the lock's wait queue. Normally it will be
+added to the end of the queue, but there is an exception: if the process
+already holds locks on this same lockable object that conflict with the
+request of any pending waiter, then the process will be inserted in the
+wait queue just ahead of the first such waiter. (If we did not make this
+check, the deadlock detection code would adjust the queue order to resolve
+the conflict, but it's relatively cheap to make the check in ProcSleep and
+avoid a deadlock timeout delay in this case.) Note special case: if the
+process holds locks that conflict with the first waiter, so that it would
+go at the front of the queue, and its request does not conflict with the
+already-granted locks, then the process will be granted the lock without
+going to sleep at all.
+
+When a lock is released, the lock release routine (ProcLockWakeup) scans
+the lock object's wait queue. Each waiter is awoken if (a) its request
+does not conflict with already-granted locks, and (b) its request does
+not conflict with the requests of prior un-wakable waiters. Rule (b)
+ensures that conflicting requests are granted in order of arrival.
+There are cases where a later waiter must be allowed to go in front of
+conflicting earlier waiters to avoid deadlock, but it is not
+ProcLockWakeup's responsibility to recognize these cases; instead, the
+deadlock detection code re-orders the wait queue when necessary.
+
+To perform deadlock checking, we use the standard method of viewing the
+various processes as nodes in a directed graph (the waits-for graph or
+WFG). There is a graph edge leading from process A to process B if A
+waits for B, ie, A is waiting for some lock and B holds a conflicting
+lock. There is a deadlock condition if and only if the WFG contains
+a cycle. We detect cycles by searching outward along waits-for edges
+to see if we return to our starting point. There are three possible
+outcomes:
+
+1. All outgoing paths terminate at a running process (which has no
+outgoing edge).
+
+2. A deadlock is detected by looping back to the start point. We resolve
+such a deadlock by canceling the start point's lock request and reporting
+an error in that transaction, which normally leads to transaction abort
+and release of that transaction's held locks. Note that it's sufficient
+to cancel one request to remove the cycle; we don't need to kill all the
+transactions involved.
+
+3. Some path(s) loop back to a node other than the start point. This
+indicates a deadlock, but one that does not involve our starting process.
+We ignore this condition on the grounds that resolving such a deadlock
+is the responsibility of the processes involved --- killing our start-
+point process would not resolve the deadlock. So, cases 1 and 3 both
+report "no deadlock".
+
+Postgres' situation is a little more complex than the standard discussion
+of deadlock detection, for two reasons:
+
+1. A process can be waiting for more than one other process, since there
+might be multiple holders of (nonconflicting) lock types that all conflict
+with the waiter's request. This creates no real difficulty however; we
+simply need to be prepared to trace more than one outgoing edge.
+
+2. If a process A is behind a process B in some lock's wait queue, and
+their requested locks conflict, then we must say that A waits for B, since
+ProcLockWakeup will never awaken A before B. This creates additional
+edges in the WFG. We call these "soft" edges, as opposed to the "hard"
+edges induced by locks already held. Note that if B already holds any
+locks conflicting with A's request, then their relationship is a hard edge
+not a soft edge.
+
+A "soft" block, or wait-priority block, has the same potential for
+inducing deadlock as a hard block. However, we may be able to resolve
+a soft block without aborting the transactions involved: we can instead
+rearrange the order of the wait queue. This rearrangement reverses the
+direction of the soft edge between two processes with conflicting requests
+whose queue order is reversed. If we can find a rearrangement that
+eliminates a cycle without creating new ones, then we can avoid an abort.
+Checking for such possible rearrangements is the trickiest part of the
+algorithm.
+
+The workhorse of the deadlock detector is a routine FindLockCycle() which
+is given a starting point process (which must be a waiting process).
+It recursively scans outwards across waits-for edges as discussed above.
+If it finds no cycle involving the start point, it returns "false".
+(As discussed above, we can ignore cycles not involving the start point.)
+When such a cycle is found, FindLockCycle() returns "true", and as it
+unwinds it also builds a list of any "soft" edges involved in the cycle.
+If the resulting list is empty then there is a hard deadlock and the
+configuration cannot succeed. However, if the list is not empty, then
+reversing any one of the listed edges through wait-queue rearrangement
+will eliminate that cycle. Since such a reversal might create cycles
+elsewhere, we may need to try every possibility. Therefore, we need to
+be able to invoke FindLockCycle() on hypothetical configurations (wait
+orders) as well as the current real order.
+
+The easiest way to handle this seems to be to have a lookaside table that
+shows the proposed new queue order for each wait queue that we are
+considering rearranging. This table is passed to FindLockCycle, and it
+believes the given queue order rather than the "real" order for each lock
+that has an entry in the lookaside table.
+
+We build a proposed new queue order by doing a "topological sort" of the
+existing entries. Each soft edge that we are currently considering
+reversing is a property of the partial order that the topological sort
+has to enforce. We must use a sort method that preserves the input
+ordering as much as possible, so as not to gratuituously break arrival
+order for processes not involved in a deadlock. (This is not true of the
+tsort method shown in Knuth, for example, but it's easily done by a simple
+doubly-nested-loop method that emits the first legal candidate at each
+step. Fortunately, we don't need a highly efficient sort algorithm, since
+the number of partial order constraints is not likely to be large.) Note
+that failure of the topological sort tells us we have conflicting ordering
+constraints, and therefore that the last-added soft edge reversal
+conflicts with a prior edge reversal. We need to detect this case to
+avoid an infinite loop in the case where no possible rearrangement will
+work: otherwise, we might try a reversal, find that it still leads to
+a cycle, then try to un-reverse the reversal while trying to get rid of
+that cycle, etc etc. Topological sort failure tells us the un-reversal
+is not a legitimate move in this context.
+
+So, the basic step in our rearrangement method is to take a list of
+soft edges in a cycle (as returned by FindLockCycle()) and successively
+try the reversal of each one as a topological-sort constraint added to
+whatever constraints we are already considering. We recursively search
+through all such sets of constraints to see if any one eliminates all
+the deadlock cycles at once. Although this might seem impossibly
+inefficient, it shouldn't be a big problem in practice, because there
+will normally be very few, and not very large, deadlock cycles --- if
+any at all. So the combinatorial inefficiency isn't going to hurt us.
+Besides, it's better to spend some time to guarantee that we've checked
+all possible escape routes than to abort a transaction when we didn't
+really have to.
+
+Each edge reversal constraint can be viewed as requesting that the waiting
+process A be moved to before the blocking process B in the wait queue they
+are both in. This action will reverse the desired soft edge, as well as
+any other soft edges between A and other processes it is advanced over.
+No other edges will be affected (note this is actually a constraint on our
+topological sort method to not re-order the queue more than necessary.)
+Therefore, we can be sure we have not created any new deadlock cycles if
+neither FindLockCycle(A) nor FindLockCycle(B) discovers any cycle. Given
+the above-defined behavior of FindLockCycle, each of these searches is
+necessary as well as sufficient, since FindLockCycle starting at the
+original start point will not complain about cycles that include A or B
+but not the original start point.
+
+In short then, a proposed rearrangement of the wait queue(s) is determined
+by one or more broken soft edges A->B, fully specified by the output of
+topological sorts of each wait queue involved, and then tested by invoking
+FindLockCycle() starting at the original start point as well as each of
+the mentioned processes (A's and B's). If none of the tests detect a
+cycle, then we have a valid configuration and can implement it by
+reordering the wait queues per the sort outputs (and then applying
+ProcLockWakeup on each reordered queue, in case a waiter has become wakable).
+If any test detects a soft cycle, we can try to resolve it by adding each
+soft link in that cycle, in turn, to the proposed rearrangement list.
+This is repeated recursively until we either find a workable rearrangement
+or determine that none exists. In the latter case, the outer level
+resolves the deadlock by aborting the original start-point transaction.
+
+The particular order in which rearrangements are tried depends on the
+order FindLockCycle() happens to scan in, so if there are multiple
+workable rearrangements of the wait queues, then it is unspecified which
+one will be chosen. What's more important is that we guarantee to try
+every queue rearrangement that could lead to success. (For example,
+if we have A before B before C and the needed order constraints are
+C before A and B before C, we would first discover that A before C
+doesn't work and try the rearrangement C before A before B. This would
+eventually lead to the discovery of the additional constraint B before C.)
+
+Got that?
+
+Miscellaneous notes:
+
+1. It is easily proven that no deadlock will be missed due to our
+asynchronous invocation of deadlock checking. A deadlock cycle in the WFG
+is formed when the last edge in the cycle is added; therefore the last
+process in the cycle to wait (the one from which that edge is outgoing) is
+certain to detect and resolve the cycle when it later runs HandleDeadLock.
+This holds even if that edge addition created multiple cycles; the process
+may indeed abort without ever noticing those additional cycles, but we
+don't particularly care. The only other possible creation of deadlocks is
+during deadlock resolution's rearrangement of wait queues, and we already
+saw that that algorithm will prove that it creates no new deadlocks before
+it attempts to actually execute any rearrangement.
+
+2. It is not certain that a deadlock will be resolved by aborting the
+last-to-wait process. If earlier waiters in the cycle have not yet run
+HandleDeadLock, then the first one to do so will be the victim.
+
+3. No live (wakable) process can be missed by ProcLockWakeup, since it
+examines every member of the wait queue (this was not true in the 7.0
+implementation, BTW). Therefore, if ProcLockWakeup is always invoked
+after a lock is released or a wait queue is rearranged, there can be no
+failure to wake a wakable process. One should also note that
+LockWaitCancel (abort a waiter due to outside factors) must run
+ProcLockWakeup, in case the cancelled waiter was soft-blocking other
+waiters.
+
+4. We can minimize excess rearrangement-trial work by being careful to scan
+the wait queue from the front when looking for soft edges. For example,
+if we have queue order A,B,C and C has deadlock conflicts with both A and B,
+we want to generate the "C before A" constraint first, rather than wasting
+time with "C before B", which won't move C far enough up. So we look for
+soft edges outgoing from C starting at the front of the wait queue.
+
+5. The working data structures needed by the deadlock detection code can
+be proven not to need more than MAXBACKENDS entries. Therefore the
+working storage can be statically allocated instead of depending on
+palloc(). This is a good thing, since if the deadlock detector could
+fail for extraneous reasons, all the above safety proofs fall down.
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.78 2001/01/16 06:11:34 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.79 2001/01/22 22:30:06 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
|| (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
)
elog(DEBUG,
- "%s: holder(%lx) lock(%lx) tbl(%d) pid(%d) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
+ "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
where, MAKE_OFFSET(holderP), holderP->tag.lock,
HOLDER_LOCKMETHOD(*(holderP)),
- holderP->tag.pid, holderP->tag.xid,
+ holderP->tag.proc, holderP->tag.xid,
holderP->holding[1], holderP->holding[2], holderP->holding[3],
holderP->holding[4], holderP->holding[5], holderP->holding[6],
holderP->holding[7], holderP->nHolding);
* tag.objId block id lock id2
* or xact id
* tag.offnum 0 lock id1
- * xid.pid backend pid backend pid
- * xid.xid xid or 0 0
+ * holder.xid xid or 0 0
* persistence transaction user or backend
* or backend
*
{
lock->grantMask = 0;
lock->waitMask = 0;
+ SHMQueueInit(&(lock->lockHolders));
+ ProcQueueInit(&(lock->waitProcs));
lock->nRequested = 0;
lock->nGranted = 0;
MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
- ProcQueueInit(&(lock->waitProcs));
LOCK_PRINT("LockAcquire: new", lock, lockmode);
}
else
*/
MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
holdertag.lock = MAKE_OFFSET(lock);
- holdertag.pid = MyProcPid;
+ holdertag.proc = MAKE_OFFSET(MyProc);
TransactionIdStore(xid, &holdertag.xid);
/*
{
holder->nHolding = 0;
MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
- ProcAddLock(&holder->queue);
+ /* Add holder to appropriate lists */
+ SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
+ SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
HOLDER_PRINT("LockAcquire: new", holder);
}
else
{
if (holder->nHolding == 0)
{
- SHMQueueDelete(&holder->queue);
+ SHMQueueDelete(&holder->lockLink);
+ SHMQueueDelete(&holder->procLink);
holder = (HOLDER *) hash_search(holderTable,
(Pointer) holder,
HASH_REMOVE, &found);
static void
LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
{
- HOLDER *holder = NULL;
- HOLDER *nextHolder = NULL;
- SHM_QUEUE *holderQueue = &(proc->holderQueue);
- SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
+ SHM_QUEUE *procHolders = &(proc->procHolders);
+ HOLDER *holder;
int i;
MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
- if (SHMQueueEmpty(holderQueue))
- return;
-
- SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
+ holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+ offsetof(HOLDER, procLink));
- do
+ while (holder)
{
- /* ---------------------------
- * XXX Here we assume the shared memory queue is circular and
- * that we know its internal structure. Should have some sort of
- * macros to allow one to walk it. mer 20 July 1991
- * ---------------------------
- */
- if (holder->queue.next == end)
- nextHolder = NULL;
- else
- SHMQueueFirst(&holder->queue,
- (Pointer *) &nextHolder, &nextHolder->queue);
-
if (lockOffset == holder->tag.lock)
{
for (i = 1; i < MAX_LOCKMODES; i++)
}
}
- holder = nextHolder;
- } while (holder);
+ holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+ offsetof(HOLDER, procLink));
+ }
}
/*
*/
MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
holdertag.lock = MAKE_OFFSET(lock);
- holdertag.pid = MyProcPid;
+ holdertag.proc = MAKE_OFFSET(MyProc);
TransactionIdStore(xid, &holdertag.xid);
holderTable = lockMethodTable->holderHash;
Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
Assert(lock->nGranted <= lock->nRequested);
- if (!lock->nRequested)
+ if (lock->nRequested == 0)
{
/* ------------------
* if there's no one waiting in the queue,
* If this was my last hold on this lock, delete my entry in the holder
* table.
*/
- if (!holder->nHolding)
+ if (holder->nHolding == 0)
{
- if (holder->queue.prev == INVALID_OFFSET)
- elog(NOTICE, "LockRelease: holder.prev == INVALID_OFFSET");
- if (holder->queue.next == INVALID_OFFSET)
- elog(NOTICE, "LockRelease: holder.next == INVALID_OFFSET");
- if (holder->queue.next != INVALID_OFFSET)
- SHMQueueDelete(&holder->queue);
HOLDER_PRINT("LockRelease: deleting", holder);
+ SHMQueueDelete(&holder->lockLink);
+ SHMQueueDelete(&holder->procLink);
holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
HASH_REMOVE_SAVED, &found);
if (!holder || !found)
}
/*
- * LockReleaseAll -- Release all locks in a process's lock queue.
+ * LockReleaseAll -- Release all locks in a process's lock list.
*
* Well, not really *all* locks.
*
LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
bool allxids, TransactionId xid)
{
- HOLDER *holder = NULL;
- HOLDER *nextHolder = NULL;
- SHM_QUEUE *holderQueue = &(proc->holderQueue);
- SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
+ SHM_QUEUE *procHolders = &(proc->procHolders);
+ HOLDER *holder;
+ HOLDER *nextHolder;
SPINLOCK masterLock;
LOCKMETHODTABLE *lockMethodTable;
int i,
numLockModes;
LOCK *lock;
bool found;
- int nleft;
#ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
- lockmethod, MyProcPid);
+ lockmethod, proc->pid);
#endif
Assert(lockmethod < NumLockMethods);
return FALSE;
}
- if (SHMQueueEmpty(holderQueue))
- return TRUE;
-
numLockModes = lockMethodTable->ctl->numLockModes;
masterLock = lockMethodTable->ctl->masterLock;
SpinAcquire(masterLock);
- SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
-
- nleft = 0;
+ holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+ offsetof(HOLDER, procLink));
- do
+ while (holder)
{
bool wakeupNeeded = false;
- /* ---------------------------
- * XXX Here we assume the shared memory queue is circular and
- * that we know its internal structure. Should have some sort of
- * macros to allow one to walk it. mer 20 July 1991
- * ---------------------------
- */
- if (holder->queue.next == end)
- nextHolder = NULL;
- else
- SHMQueueFirst(&holder->queue,
- (Pointer *) &nextHolder, &nextHolder->queue);
+ /* Get link first, since we may unlink/delete this holder */
+ nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+ offsetof(HOLDER, procLink));
- Assert(holder->tag.pid == proc->pid);
+ Assert(holder->tag.proc == MAKE_OFFSET(proc));
lock = (LOCK *) MAKE_PTR(holder->tag.lock);
/* Ignore items that are not of the lockmethod to be removed */
if (LOCK_LOCKMETHOD(*lock) != lockmethod)
- {
- nleft++;
goto next_item;
- }
/* If not allxids, ignore items that are of the wrong xid */
if (!allxids && xid != holder->tag.xid)
- {
- nleft++;
goto next_item;
- }
HOLDER_PRINT("LockReleaseAll", holder);
LOCK_PRINT("LockReleaseAll", lock, 0);
HOLDER_PRINT("LockReleaseAll: deleting", holder);
/*
- * Remove the holder entry from the process' lock queue
+ * Remove the holder entry from the linked lists
*/
- SHMQueueDelete(&holder->queue);
+ SHMQueueDelete(&holder->lockLink);
+ SHMQueueDelete(&holder->procLink);
/*
* remove the holder entry from the hashtable
next_item:
holder = nextHolder;
- } while (holder);
-
- /*
- * Reinitialize the queue only if nothing has been left in.
- */
- if (nleft == 0)
- {
-#ifdef LOCK_DEBUG
- if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
- elog(DEBUG, "LockReleaseAll: reinitializing holderQueue");
-#endif
- SHMQueueInit(holderQueue);
}
SpinRelease(masterLock);
bool
DeadLockCheck(PROC *thisProc, LOCK *findlock)
{
- HOLDER *holder = NULL;
- HOLDER *nextHolder = NULL;
PROC *waitProc;
PROC_QUEUE *waitQueue;
- SHM_QUEUE *holderQueue = &(thisProc->holderQueue);
- SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
+ SHM_QUEUE *procHolders = &(thisProc->procHolders);
+ HOLDER *holder;
+ HOLDER *nextHolder;
LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
LOCK *lock;
int i,
/*
* Scan over all the locks held/awaited by thisProc.
*/
- if (SHMQueueEmpty(holderQueue))
- return false;
-
- SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
+ holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+ offsetof(HOLDER, procLink));
- do
+ while (holder)
{
- /* ---------------------------
- * XXX Here we assume the shared memory queue is circular and
- * that we know its internal structure. Should have some sort of
- * macros to allow one to walk it. mer 20 July 1991
- * ---------------------------
- */
- if (holder->queue.next == end)
- nextHolder = NULL;
- else
- SHMQueueFirst(&holder->queue,
- (Pointer *) &nextHolder, &nextHolder->queue);
+ /* Get link first, since we may unlink/delete this holder */
+ nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+ offsetof(HOLDER, procLink));
- Assert(holder->tag.pid == thisProc->pid);
+ Assert(holder->tag.proc == MAKE_OFFSET(thisProc));
lock = (LOCK *) MAKE_PTR(holder->tag.lock);
LOCK_PRINT("DeadLockCheck", lock, 0);
/*
- * waitLock is always in holderQueue of waiting proc, if !first_run
+ * waitLock is always in procHolders of waiting proc, if !first_run
* then upper caller will handle waitProcs queue of waitLock.
*/
if (thisProc->waitLock == lock && !first_run)
}
/*
- * Else - get the next lock from thisProc's holderQueue
+ * Else - get the next lock from thisProc's procHolders
*/
goto nxtl;
}
waitQueue = &(lock->waitProcs);
- waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
+ waitProc = (PROC *) MAKE_PTR(waitQueue->links.next);
/*
* Inner loop scans over all processes waiting for this lock.
/* and he blocked by me -> deadlock */
if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks)
return true;
- /* we shouldn't look at holderQueue of our blockers */
+ /* we shouldn't look at procHolders of our blockers */
goto nextWaitProc;
}
* implicitly). Note that we don't do like test if
* !first_run (when thisProc is holder and non-waiter on
* lock) and so we call DeadLockCheck below for every
- * waitProc in thisProc->holderQueue, even for waitProc-s
+ * waitProc in thisProc->procHolders, even for waitProc-s
* un-blocked by thisProc. Should we? This could save us
* some time...
*/
goto nextWaitProc;
}
- /* Recursively check this process's holderQueue. */
+ /* Recursively check this process's procHolders. */
Assert(nprocs < MAXBACKENDS);
checked_procs[nprocs++] = waitProc;
}
nextWaitProc:
- waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+ waitProc = (PROC *) MAKE_PTR(waitProc->links.next);
}
nxtl:
holder = nextHolder;
- } while (holder);
+ }
/* if we got here, no deadlock */
return false;
#ifdef LOCK_DEBUG
/*
- * Dump all locks in the proc->holderQueue. Must have already acquired
- * the masterLock.
+ * Dump all locks in the proc->procHolders list.
+ *
+ * Must have already acquired the masterLock.
*/
void
DumpLocks(void)
{
SHMEM_OFFSET location;
PROC *proc;
- SHM_QUEUE *holderQueue;
- HOLDER *holder = NULL;
- HOLDER *nextHolder = NULL;
- SHMEM_OFFSET end;
+ SHM_QUEUE *procHolders;
+ HOLDER *holder;
LOCK *lock;
int lockmethod = DEFAULT_LOCKMETHOD;
LOCKMETHODTABLE *lockMethodTable;
proc = (PROC *) MAKE_PTR(location);
if (proc != MyProc)
return;
- holderQueue = &proc->holderQueue;
- end = MAKE_OFFSET(holderQueue);
+ procHolders = &proc->procHolders;
Assert(lockmethod < NumLockMethods);
lockMethodTable = LockMethodTable[lockmethod];
if (proc->waitLock)
LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
- if (SHMQueueEmpty(holderQueue))
- return;
-
- SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
+ holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+ offsetof(HOLDER, procLink));
- do
+ while (holder)
{
- /* ---------------------------
- * XXX Here we assume the shared memory queue is circular and
- * that we know its internal structure. Should have some sort of
- * macros to allow one to walk it. mer 20 July 1991
- * ---------------------------
- */
- if (holder->queue.next == end)
- nextHolder = NULL;
- else
- SHMQueueFirst(&holder->queue,
- (Pointer *) &nextHolder, &nextHolder->queue);
-
- Assert(holder->tag.pid == proc->pid);
+ Assert(holder->tag.proc == MAKE_OFFSET(proc));
lock = (LOCK *) MAKE_PTR(holder->tag.lock);
HOLDER_PRINT("DumpLocks", holder);
LOCK_PRINT("DumpLocks", lock, 0);
- holder = nextHolder;
- } while (holder);
+ holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+ offsetof(HOLDER, procLink));
+ }
}
/*
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.94 2001/01/16 20:59:34 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.95 2001/01/22 22:30:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
*
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.94 2001/01/16 20:59:34 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.95 2001/01/22 22:30:06 tgl Exp $
*/
#include "postgres.h"
SpinRelease(ProcStructLock);
elog(FATAL, "cannot create new proc: out of memory");
}
-
- /* this cannot be initialized until after the buffer pool */
- SHMQueueInit(&(MyProc->holderQueue));
}
/*
MyProc->sem.semNum = -1;
}
+ SHMQueueElemInit(&(MyProc->links));
+ MyProc->errType = NO_ERROR;
MyProc->pid = MyProcPid;
MyProc->databaseId = MyDatabaseId;
MyProc->xid = InvalidTransactionId;
MyProc->xmin = InvalidTransactionId;
+ MyProc->waitLock = NULL;
+ MyProc->waitHolder = NULL;
+ SHMQueueInit(&(MyProc->procHolders));
/* ----------------------
* Release the lock.
(location != MAKE_OFFSET(MyProc)))
elog(STOP, "InitProcess: ShmemPID table broken");
- MyProc->errType = NO_ERROR;
- SHMQueueElemInit(&(MyProc->links));
-
on_shmem_exit(ProcKill, 0);
}
waitLock->waitMask &= ~(1 << lockmode);
/* Clean up the proc's own state */
- SHMQueueElemInit(&(proc->links));
proc->waitLock = NULL;
proc->waitHolder = NULL;
ProcFreeSem(proc->sem.semId, proc->sem.semNum);
+ /* Add PROC struct to freelist so space can be recycled in future */
proc->links.next = ProcGlobal->freeProcs;
ProcGlobal->freeProcs = MAKE_OFFSET(proc);
bigtime_t time_interval;
#endif
- MyProc->waitLock = lock;
- MyProc->waitHolder = holder;
- MyProc->waitLockMode = lockmode;
- /* We assume the caller set up MyProc->heldLocks */
-
- proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
+ proc = (PROC *) MAKE_PTR(waitQueue->links.next);
/* if we don't conflict with any waiter - be first in queue */
if (!(lockctl->conflictTab[lockmode] & waitMask))
{
/* Yes, report deadlock failure */
MyProc->errType = STATUS_ERROR;
- goto rt;
+ return STATUS_ERROR;
}
/* I must go after him in queue - so continue loop */
}
(aheadGranted[procWaitMode])++;
if (aheadGranted[procWaitMode] == lock->requested[procWaitMode])
waitMask &= ~(1 << procWaitMode);
- proc = (PROC *) MAKE_PTR(proc->links.prev);
+ proc = (PROC *) MAKE_PTR(proc->links.next);
}
ins:;
/* -------------------
- * Insert self into queue, ahead of the given proc.
- * These operations are atomic (because of the spinlock).
+ * Insert self into queue, ahead of the given proc (or at tail of queue).
* -------------------
*/
- SHMQueueInsertTL(&(proc->links), &(MyProc->links));
+ SHMQueueInsertBefore(&(proc->links), &(MyProc->links));
waitQueue->size++;
lock->waitMask |= myMask;
+ /* Set up wait information in PROC object, too */
+ MyProc->waitLock = lock;
+ MyProc->waitHolder = holder;
+ MyProc->waitLockMode = lockmode;
+ /* We assume the caller set up MyProc->heldLocks */
+
MyProc->errType = NO_ERROR; /* initialize result for success */
/* mark that we are waiting for a lock */
*/
SpinAcquire(spinlock);
-rt:;
-
- MyProc->waitLock = NULL;
- MyProc->waitHolder = NULL;
-
+ /*
+ * We don't have to do anything else, because the awaker did all the
+ * necessary update of the lock table and MyProc.
+ */
return MyProc->errType;
}
/* assume that spinlock has been acquired */
+ /* Proc should be sleeping ... */
if (proc->links.prev == INVALID_OFFSET ||
proc->links.next == INVALID_OFFSET)
return (PROC *) NULL;
- retProc = (PROC *) MAKE_PTR(proc->links.prev);
+ /* Save next process before we zap the list link */
+ retProc = (PROC *) MAKE_PTR(proc->links.next);
+ /* Remove process from wait queue */
SHMQueueDelete(&(proc->links));
- SHMQueueElemInit(&(proc->links));
(proc->waitLock->waitProcs.size)--;
+ /* Clean up process' state and pass it the ok/fail signal */
+ proc->waitLock = NULL;
+ proc->waitHolder = NULL;
proc->errType = errType;
+ /* And awaken it */
IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum);
return retProc;
if (!queue_size)
return STATUS_NOT_FOUND;
- proc = (PROC *) MAKE_PTR(queue->links.prev);
+ proc = (PROC *) MAKE_PTR(queue->links.next);
while (queue_size-- > 0)
{
/*
* ProcWakeup removes proc from the lock's waiting process queue
- * and returns the next proc in chain; don't use prev link.
+ * and returns the next proc in chain; don't use proc's next-link,
+ * because it's been cleared.
*/
continue;
nextProc:
- proc = (PROC *) MAKE_PTR(proc->links.prev);
+ proc = (PROC *) MAKE_PTR(proc->links.next);
}
Assert(queue->size >= 0);
}
}
-void
-ProcAddLock(SHM_QUEUE *elem)
-{
- SHMQueueInsertTL(&MyProc->holderQueue, elem);
-}
-
/* --------------------
* We only get to this routine if we got SIGALRM after DeadlockTimeout
* while waiting for a lock to be released by some other process. Look
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: lock.h,v 1.41 2001/01/16 06:11:34 tgl Exp $
+ * $Id: lock.h,v 1.42 2001/01/22 22:30:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
/* originally in procq.h */
typedef struct PROC_QUEUE
{
- SHM_QUEUE links;
- int size;
+ SHM_QUEUE links; /* head of list of PROC objects */
+ int size; /* number of entries in list */
} PROC_QUEUE;
/* struct proc is declared in storage/proc.h, but must forward-reference it */
typedef int LOCKMODE;
typedef int LOCKMETHOD;
-/* MAX_LOCKMODES cannot be larger than the bits in LOCKMASK */
+/* MAX_LOCKMODES cannot be larger than the # of bits in LOCKMASK */
#define MAX_LOCKMODES 8
/*
* tag -- uniquely identifies the object being locked
* grantMask -- bitmask for all lock types currently granted on this object.
* waitMask -- bitmask for all lock types currently awaited on this object.
+ * lockHolders -- list of HOLDER objects for this lock.
* waitProcs -- queue of processes waiting for this lock.
* requested -- count of each lock type currently requested on the lock
* (includes requests already granted!!).
/* data */
int grantMask; /* bitmask for lock types already granted */
int waitMask; /* bitmask for lock types awaited */
+ SHM_QUEUE lockHolders; /* list of HOLDER objects assoc. with lock */
PROC_QUEUE waitProcs; /* list of PROC objects waiting on lock */
int requested[MAX_LOCKMODES]; /* counts of requested locks */
int nRequested; /* total of requested[] array */
* holder hashtable. A HOLDERTAG value uniquely identifies a lock holder.
*
* There are two possible kinds of holder tags: a transaction (identified
- * both by the PID of the backend running it, and the xact's own ID) and
- * a session (identified by backend PID, with xid = InvalidTransactionId).
+ * both by the PROC of the backend running it, and the xact's own ID) and
+ * a session (identified by backend PROC, with xid = InvalidTransactionId).
*
* Currently, session holders are used for user locks and for cross-xact
* locks obtained for VACUUM. We assume that a session lock never conflicts
* zero holding[], for any lock that the process is currently waiting on.
* Otherwise, holder objects whose counts have gone to zero are recycled
* as soon as convenient.
+ *
+ * Each HOLDER object is linked into lists for both the associated LOCK object
+ * and the owning PROC object. Note that the HOLDER is entered into these
+ * lists as soon as it is created, even if no lock has yet been granted.
+ * A PROC that is waiting for a lock to be granted will also be linked into
+ * the lock's waitProcs queue.
*/
typedef struct HOLDERTAG
{
SHMEM_OFFSET lock; /* link to per-lockable-object information */
- int pid; /* PID of backend */
+ SHMEM_OFFSET proc; /* link to PROC of owning backend */
TransactionId xid; /* xact ID, or InvalidTransactionId */
} HOLDERTAG;
/* data */
int holding[MAX_LOCKMODES]; /* count of locks currently held */
int nHolding; /* total of holding[] array */
- SHM_QUEUE queue; /* list link for process' list of holders */
+ SHM_QUEUE lockLink; /* list link for lock's list of holders */
+ SHM_QUEUE procLink; /* list link for process's list of holders */
} HOLDER;
#define SHMEM_HOLDERTAB_KEYSIZE sizeof(HOLDERTAG)
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: proc.h,v 1.36 2001/01/16 20:59:34 tgl Exp $
+ * $Id: proc.h,v 1.37 2001/01/22 22:30:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
} SEMA;
/*
- * Each backend has a PROC struct in shared memory. There is also a list
- * of currently-unused PROC structs that will be reallocated to new backends
- * (a fairly pointless optimization, but it's there anyway).
+ * Each backend has a PROC struct in shared memory. There is also a list of
+ * currently-unused PROC structs that will be reallocated to new backends.
*
* links: list link for any list the PROC is in. When waiting for a lock,
* the PROC is linked into that lock's waitProcs queue. A recycled PROC
*/
struct proc
{
- /* proc->links MUST BE THE FIRST ELEMENT OF STRUCT (see ProcWakeup()) */
+ /* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */
SHM_QUEUE links; /* list link if process is in a list */
XLogRecPtr logRec;
- /* Info about lock the process is currently waiting for, if any */
+ /* Info about lock the process is currently waiting for, if any. */
+ /* waitLock and waitHolder are NULL if not currently waiting. */
LOCK *waitLock; /* Lock object we're sleeping on ... */
HOLDER *waitHolder; /* Per-holder info for awaited lock */
LOCKMODE waitLockMode; /* type of lock we're waiting for */
Oid databaseId; /* OID of database this backend is using */
short sLocks[MAX_SPINS]; /* Spin lock stats */
- SHM_QUEUE holderQueue; /* list of HOLDER objects for locks held or
+ SHM_QUEUE procHolders; /* list of HOLDER objects for locks held or
* awaited by this backend */
};
LOCK *lock, HOLDER *holder);
extern PROC *ProcWakeup(PROC *proc, int errType);
extern int ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock);
-extern void ProcAddLock(SHM_QUEUE *elem);
extern void ProcReleaseSpins(PROC *proc);
extern bool LockWaitCancel(void);
extern void HandleDeadLock(SIGNAL_ARGS);
* Portions Copyright (c) 1996-2000, PostgreSQL, Inc
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: shmem.h,v 1.24 2000/11/28 23:27:57 tgl Exp $
+ * $Id: shmem.h,v 1.25 2001/01/22 22:30:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
extern void SHMQueueInit(SHM_QUEUE *queue);
extern void SHMQueueElemInit(SHM_QUEUE *queue);
extern void SHMQueueDelete(SHM_QUEUE *queue);
-extern void SHMQueueInsertTL(SHM_QUEUE *queue, SHM_QUEUE *elem);
-extern void SHMQueueFirst(SHM_QUEUE *queue, Pointer *nextPtrPtr,
- SHM_QUEUE *nextQueue);
+extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem);
+extern Pointer SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem,
+ Size linkOffset);
extern bool SHMQueueEmpty(SHM_QUEUE *queue);
#endif /* SHMEM_H */