OSDN Git Service

Clean up lockmanager data structures some more, in preparation for planned
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 22 Jan 2001 22:30:06 +0000 (22:30 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 22 Jan 2001 22:30:06 +0000 (22:30 +0000)
rewrite of deadlock checking.  Lock holder objects are now reachable from
the associated LOCK as well as from the owning PROC.  This makes it
practical to find all the processes holding a lock, as well as all those
waiting on the lock.  Also, clean up some of the grottier aspects of the
SHMQueue API, and cause the waitProcs list to be stored in the intuitive
direction instead of the nonintuitive one.  (Bet you didn't know that
the code followed the 'prev' link to get to the next waiting process,
instead of the 'next' link.  It doesn't do that anymore.)

src/backend/storage/ipc/shmqueue.c
src/backend/storage/lmgr/README
src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/proc.c
src/include/storage/lock.h
src/include/storage/proc.h
src/include/storage/shmem.h

index 2cdccd0..ae6950c 100644 (file)
@@ -8,32 +8,34 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/ipc/shmqueue.c,v 1.13 2000/01/26 05:56:58 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/ipc/shmqueue.c,v 1.14 2001/01/22 22:30:06 tgl Exp $
  *
  * NOTES
  *
  * Package for managing doubly-linked lists in shared memory.
  * The only tricky thing is that SHM_QUEUE will usually be a field
- * in a larger record. SHMQueueGetFirst has to return a pointer
+ * in a larger record. SHMQueueNext has to return a pointer
  * to the record itself instead of a pointer to the SHMQueue field
- * of the record.  It takes an extra pointer and does some extra
+ * of the record.  It takes an extra parameter and does some extra
  * pointer arithmetic to do this correctly.
  *
  * NOTE: These are set up so they can be turned into macros some day.
  *
  *-------------------------------------------------------------------------
  */
-
 #include "postgres.h"
+
 #include "storage/shmem.h"
 
 /*#define SHMQUEUE_DEBUG*/
 #ifdef SHMQUEUE_DEBUG
-#define SHMQUEUE_DEBUG_DEL             /* deletions */
-#define SHMQUEUE_DEBUG_HD              /* head inserts */
-#define SHMQUEUE_DEBUG_TL              /* tail inserts */
+
 #define SHMQUEUE_DEBUG_ELOG NOTICE
-#endif  /* SHMQUEUE_DEBUG */
+
+static void dumpQ(SHM_QUEUE *q, char *s);
+
+#endif
+
 
 /*
  * ShmemQueueInit -- make the head of a new queue point
@@ -84,76 +86,23 @@ SHMQueueDelete(SHM_QUEUE *queue)
        Assert(SHM_PTR_VALID(nextElem));
        Assert(SHM_PTR_VALID(prevElem));
 
-#ifdef SHMQUEUE_DEBUG_DEL
+#ifdef SHMQUEUE_DEBUG
        dumpQ(queue, "in SHMQueueDelete: begin");
-#endif  /* SHMQUEUE_DEBUG_DEL */
+#endif
 
        prevElem->next = (queue)->next;
        nextElem->prev = (queue)->prev;
 
-#ifdef SHMQUEUE_DEBUG_DEL
-       dumpQ((SHM_QUEUE *) MAKE_PTR(queue->prev), "in SHMQueueDelete: end");
-#endif  /* SHMQUEUE_DEBUG_DEL */
-}
-
-#ifdef SHMQUEUE_DEBUG
-void
-dumpQ(SHM_QUEUE *q, char *s)
-{
-       char            elem[NAMEDATALEN];
-       char            buf[1024];
-       SHM_QUEUE  *start = q;
-       int                     count = 0;
-
-       sprintf(buf, "q prevs: %x", MAKE_OFFSET(q));
-       q = (SHM_QUEUE *) MAKE_PTR(q->prev);
-       while (q != start)
-       {
-               sprintf(elem, "--->%x", MAKE_OFFSET(q));
-               strcat(buf, elem);
-               q = (SHM_QUEUE *) MAKE_PTR(q->prev);
-               if (q->prev == MAKE_OFFSET(q))
-                       break;
-               if (count++ > 40)
-               {
-                       strcat(buf, "BAD PREV QUEUE!!");
-                       break;
-               }
-       }
-       sprintf(elem, "--->%x", MAKE_OFFSET(q));
-       strcat(buf, elem);
-       elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
-
-       sprintf(buf, "q nexts: %x", MAKE_OFFSET(q));
-       count = 0;
-       q = (SHM_QUEUE *) MAKE_PTR(q->next);
-       while (q != start)
-       {
-               sprintf(elem, "--->%x", MAKE_OFFSET(q));
-               strcat(buf, elem);
-               q = (SHM_QUEUE *) MAKE_PTR(q->next);
-               if (q->next == MAKE_OFFSET(q))
-                       break;
-               if (count++ > 10)
-               {
-                       strcat(buf, "BAD NEXT QUEUE!!");
-                       break;
-               }
-       }
-       sprintf(elem, "--->%x", MAKE_OFFSET(q));
-       strcat(buf, elem);
-       elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
+       (queue)->prev = (queue)->next = INVALID_OFFSET;
 }
 
-#endif  /* SHMQUEUE_DEBUG */
-
 /*
- * SHMQueueInsertHD -- put elem in queue between the queue head
- *             and its "prev" element.
+ * SHMQueueInsertBefore -- put elem in queue before the given queue
+ *             element.  Inserting "before" the queue head puts the elem
+ *             at the tail of the queue.
  */
-#ifdef NOT_USED
 void
-SHMQueueInsertHD(SHM_QUEUE *queue, SHM_QUEUE *elem)
+SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem)
 {
        SHM_QUEUE  *prevPtr = (SHM_QUEUE *) MAKE_PTR((queue)->prev);
        SHMEM_OFFSET elemOffset = MAKE_OFFSET(elem);
@@ -161,24 +110,28 @@ SHMQueueInsertHD(SHM_QUEUE *queue, SHM_QUEUE *elem)
        Assert(SHM_PTR_VALID(queue));
        Assert(SHM_PTR_VALID(elem));
 
-#ifdef SHMQUEUE_DEBUG_HD
-       dumpQ(queue, "in SHMQueueInsertHD: begin");
-#endif  /* SHMQUEUE_DEBUG_HD */
+#ifdef SHMQUEUE_DEBUG
+       dumpQ(queue, "in SHMQueueInsertBefore: begin");
+#endif
 
        (elem)->next = prevPtr->next;
        (elem)->prev = queue->prev;
        (queue)->prev = elemOffset;
        prevPtr->next = elemOffset;
 
-#ifdef SHMQUEUE_DEBUG_HD
-       dumpQ(queue, "in SHMQueueInsertHD: end");
-#endif  /* SHMQUEUE_DEBUG_HD */
-}
-
+#ifdef SHMQUEUE_DEBUG
+       dumpQ(queue, "in SHMQueueInsertBefore: end");
 #endif
+}
 
+/*
+ * SHMQueueInsertAfter -- put elem in queue after the given queue
+ *             element.  Inserting "after" the queue head puts the elem
+ *             at the head of the queue.
+ */
+#ifdef NOT_USED
 void
-SHMQueueInsertTL(SHM_QUEUE *queue, SHM_QUEUE *elem)
+SHMQueueInsertAfter(SHM_QUEUE *queue, SHM_QUEUE *elem)
 {
        SHM_QUEUE  *nextPtr = (SHM_QUEUE *) MAKE_PTR((queue)->next);
        SHMEM_OFFSET elemOffset = MAKE_OFFSET(elem);
@@ -186,58 +139,55 @@ SHMQueueInsertTL(SHM_QUEUE *queue, SHM_QUEUE *elem)
        Assert(SHM_PTR_VALID(queue));
        Assert(SHM_PTR_VALID(elem));
 
-#ifdef SHMQUEUE_DEBUG_TL
-       dumpQ(queue, "in SHMQueueInsertTL: begin");
-#endif  /* SHMQUEUE_DEBUG_TL */
+#ifdef SHMQUEUE_DEBUG
+       dumpQ(queue, "in SHMQueueInsertAfter: begin");
+#endif
 
        (elem)->prev = nextPtr->prev;
        (elem)->next = queue->next;
        (queue)->next = elemOffset;
        nextPtr->prev = elemOffset;
 
-#ifdef SHMQUEUE_DEBUG_TL
-       dumpQ(queue, "in SHMQueueInsertTL: end");
-#endif  /* SHMQUEUE_DEBUG_TL */
+#ifdef SHMQUEUE_DEBUG
+       dumpQ(queue, "in SHMQueueInsertAfter: end");
+#endif
 }
+#endif /* NOT_USED */
 
-/*
- * SHMQueueFirst -- Get the first element from a queue
+/*--------------------
+ * SHMQueueNext -- Get the next element from a queue
  *
- * First element is queue->next.  If SHMQueue is part of
+ * To start the iteration, pass the queue head as both queue and curElem.
+ * Returns NULL if no more elements.
+ *
+ * Next element is at curElem->next.  If SHMQueue is part of
  * a larger structure, we want to return a pointer to the
  * whole structure rather than a pointer to its SHMQueue field.
  * I.E. struct {
  *             int                             stuff;
  *             SHMQueue                elem;
  * } ELEMType;
- * when this element is in a queue (queue->next) is struct.elem.
- * nextQueue allows us to calculate the offset of the SHMQueue
- * field in the structure.
- *
- * call to SHMQueueFirst should take these parameters:
+ * When this element is in a queue, (prevElem->next) is struct.elem.
+ * We subtract linkOffset to get the correct start address of the structure.
  *
- *      &(queueHead),&firstElem,&(firstElem->next)
+ * calls to SHMQueueNext should take these parameters:
  *
- * Note that firstElem may well be uninitialized.  if firstElem
- * is initially K, &(firstElem->next) will be K+ the offset to
- * next.
+ *      &(queueHead), &(queueHead), offsetof(ELEMType, elem)
+ * or
+ *      &(queueHead), &(curElem->elem), offsetof(ELEMType, elem)
+ *--------------------
  */
-void
-SHMQueueFirst(SHM_QUEUE *queue, Pointer *nextPtrPtr, SHM_QUEUE *nextQueue)
+Pointer
+SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem, Size linkOffset)
 {
-       SHM_QUEUE  *elemPtr = (SHM_QUEUE *) MAKE_PTR((queue)->next);
+       SHM_QUEUE  *elemPtr = (SHM_QUEUE *) MAKE_PTR((curElem)->next);
 
-       Assert(SHM_PTR_VALID(queue));
-       *nextPtrPtr = (Pointer) (((unsigned long) *nextPtrPtr) +
-                               ((unsigned long) elemPtr) - ((unsigned long) nextQueue));
-
-       /*
-        * nextPtrPtr a ptr to a structure linked in the queue nextQueue is
-        * the SHMQueue field of the structure nextPtrPtr - nextQueue is 0
-        * minus the offset of the queue field n the record elemPtr +
-        * (*nextPtrPtr - nexQueue) is the start of the structure containing
-        * elemPtr.
-        */
+       Assert(SHM_PTR_VALID(curElem));
+
+       if (elemPtr == queue)           /* back to the queue head? */
+               return NULL;
+
+       return (Pointer) (((char *) elemPtr) - linkOffset);
 }
 
 /*
@@ -255,3 +205,55 @@ SHMQueueEmpty(SHM_QUEUE *queue)
        }
        return FALSE;
 }
+
+#ifdef SHMQUEUE_DEBUG
+
+static void
+dumpQ(SHM_QUEUE *q, char *s)
+{
+       char            elem[NAMEDATALEN];
+       char            buf[1024];
+       SHM_QUEUE  *start = q;
+       int                     count = 0;
+
+       sprintf(buf, "q prevs: %lx", MAKE_OFFSET(q));
+       q = (SHM_QUEUE *) MAKE_PTR(q->prev);
+       while (q != start)
+       {
+               sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+               strcat(buf, elem);
+               q = (SHM_QUEUE *) MAKE_PTR(q->prev);
+               if (q->prev == MAKE_OFFSET(q))
+                       break;
+               if (count++ > 40)
+               {
+                       strcat(buf, "BAD PREV QUEUE!!");
+                       break;
+               }
+       }
+       sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+       strcat(buf, elem);
+       elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
+
+       sprintf(buf, "q nexts: %lx", MAKE_OFFSET(q));
+       count = 0;
+       q = (SHM_QUEUE *) MAKE_PTR(q->next);
+       while (q != start)
+       {
+               sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+               strcat(buf, elem);
+               q = (SHM_QUEUE *) MAKE_PTR(q->next);
+               if (q->next == MAKE_OFFSET(q))
+                       break;
+               if (count++ > 10)
+               {
+                       strcat(buf, "BAD NEXT QUEUE!!");
+                       break;
+               }
+       }
+       sprintf(elem, "--->%lx", MAKE_OFFSET(q));
+       strcat(buf, elem);
+       elog(SHMQUEUE_DEBUG_ELOG, "%s: %s", s, buf);
+}
+
+#endif  /* SHMQUEUE_DEBUG */
index 7d881ff..af9fbc8 100644 (file)
@@ -1,4 +1,4 @@
-$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.5 2001/01/16 06:11:34 tgl Exp $
+$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.6 2001/01/22 22:30:06 tgl Exp $
 
 There are two fundamental lock structures: the per-lockable-object LOCK
 struct, and the per-lock-holder HOLDER struct.  A LOCK object exists
@@ -15,7 +15,7 @@ details.
 
 ---------------------------------------------------------------------------
 
-The lock manager's LOCK:
+The lock manager's LOCK objects contain:
 
 tag -
     The key fields that are used for hashing locks in the shared memory
@@ -29,10 +29,10 @@ tag -
     
     tag.dbId -
        Uniquely identifies the database in which the relation lives.  If
-       this is a shared system relation (e.g. pg_user) the dbId should be
-       set to 0.
+       this is a shared system relation (e.g. pg_database) the dbId must
+       be set to 0.
 
-    tag.tupleId -
+    tag.objId -
        Uniquely identifies the block/page within the relation and the
        tuple within the block.  If we are setting a table level lock
        both the blockId and tupleId (in an item pointer this is called
@@ -56,6 +56,12 @@ waitMask -
     This bitmask shows the types of locks being waited for.  Bit i of waitMask
     is 1 if and only if requested[i] > granted[i].
 
+lockHolders -
+    This is a shared memory queue of all the HOLDER structs associated with
+    the lock object.  Note that both granted and waiting HOLDERs are in this
+    list (indeed, the same HOLDER might have some already-granted locks and
+    be waiting for more!).
+
 waitProcs -
     This is a shared memory queue of all process structures corresponding to
     a backend that is waiting (sleeping) until another backend releases this
@@ -93,7 +99,7 @@ zero, the lock object is no longer needed and can be freed.
 
 ---------------------------------------------------------------------------
 
-The lock manager's HOLDER:
+The lock manager's HOLDER objects contain:
 
 tag -
     The key fields that are used for hashing entries in the shared memory
@@ -103,8 +109,8 @@ tag -
     tag.lock
         SHMEM offset of the LOCK object this holder is for.
 
-    tag.pid
-        PID of backend process that owns this holder.
+    tag.proc
+        SHMEM offset of PROC of backend process that owns this holder.
 
     tag.xid
         XID of transaction this holder is for, or InvalidTransactionId
@@ -124,6 +130,250 @@ holding -
 nHolding -
     Sum of the holding[] array.
 
-queue -
+lockLink -
+    List link for shared memory queue of all the HOLDER objects for the
+    same LOCK.
+
+procLink -
     List link for shared memory queue of all the HOLDER objects for the
     same backend.
+
+---------------------------------------------------------------------------
+
+The deadlock detection algorithm:
+
+Since we allow user transactions to request locks in any order, deadlock
+is possible.  We use a deadlock detection/breaking algorithm that is
+fairly standard in essence, but there are many special considerations
+needed to deal with Postgres' generalized locking model.
+
+A key design consideration is that we want to make routine operations
+(lock grant and release) run quickly when there is no deadlock, and avoid
+the overhead of deadlock handling as much as possible.  We do this using
+an "optimistic waiting" approach: if a process cannot acquire the lock
+it wants immediately, it goes to sleep without any deadlock check.  But
+it also sets a delay timer, with a delay of DeadlockTimeout milliseconds
+(typically set to one second).  If the delay expires before the process is
+granted the lock it wants, it runs the deadlock detection/breaking code.
+Normally this code will determine that there is no deadlock condition,
+and then the process will go back to sleep and wait quietly until it is
+granted the lock.  But if a deadlock condition does exist, it will be
+resolved, usually by aborting the detecting process' transaction.  In this
+way, we avoid deadlock handling overhead whenever the wait time for a lock
+is less than DeadlockTimeout, while not imposing an unreasonable delay of
+detection when there is an error.
+
+Lock acquisition (routines LockAcquire and ProcSleep) follows these rules:
+
+1. A lock request is granted immediately if it does not conflict with any
+existing or waiting lock request, or if the process already holds an
+instance of the same lock type (eg, there's no penalty to acquire a read
+lock twice).  Note that a process never conflicts with itself, eg one can
+obtain read lock when one already holds exclusive lock.
+
+2. Otherwise the process joins the lock's wait queue.  Normally it will be
+added to the end of the queue, but there is an exception: if the process
+already holds locks on this same lockable object that conflict with the
+request of any pending waiter, then the process will be inserted in the
+wait queue just ahead of the first such waiter.  (If we did not make this
+check, the deadlock detection code would adjust the queue order to resolve
+the conflict, but it's relatively cheap to make the check in ProcSleep and
+avoid a deadlock timeout delay in this case.)  Note special case: if the
+process holds locks that conflict with the first waiter, so that it would
+go at the front of the queue, and its request does not conflict with the
+already-granted locks, then the process will be granted the lock without
+going to sleep at all.
+
+When a lock is released, the lock release routine (ProcLockWakeup) scans
+the lock object's wait queue.  Each waiter is awoken if (a) its request
+does not conflict with already-granted locks, and (b) its request does
+not conflict with the requests of prior un-wakable waiters.  Rule (b)
+ensures that conflicting requests are granted in order of arrival.
+There are cases where a later waiter must be allowed to go in front of
+conflicting earlier waiters to avoid deadlock, but it is not
+ProcLockWakeup's responsibility to recognize these cases; instead, the
+deadlock detection code re-orders the wait queue when necessary.
+
+To perform deadlock checking, we use the standard method of viewing the
+various processes as nodes in a directed graph (the waits-for graph or
+WFG).  There is a graph edge leading from process A to process B if A
+waits for B, ie, A is waiting for some lock and B holds a conflicting
+lock.  There is a deadlock condition if and only if the WFG contains
+a cycle.  We detect cycles by searching outward along waits-for edges
+to see if we return to our starting point.  There are three possible
+outcomes:
+
+1. All outgoing paths terminate at a running process (which has no
+outgoing edge).
+
+2. A deadlock is detected by looping back to the start point.  We resolve
+such a deadlock by canceling the start point's lock request and reporting
+an error in that transaction, which normally leads to transaction abort
+and release of that transaction's held locks.  Note that it's sufficient
+to cancel one request to remove the cycle; we don't need to kill all the
+transactions involved.
+
+3. Some path(s) loop back to a node other than the start point.  This
+indicates a deadlock, but one that does not involve our starting process.
+We ignore this condition on the grounds that resolving such a deadlock
+is the responsibility of the processes involved --- killing our start-
+point process would not resolve the deadlock.  So, cases 1 and 3 both
+report "no deadlock".
+
+Postgres' situation is a little more complex than the standard discussion
+of deadlock detection, for two reasons:
+
+1. A process can be waiting for more than one other process, since there
+might be multiple holders of (nonconflicting) lock types that all conflict
+with the waiter's request.  This creates no real difficulty however; we
+simply need to be prepared to trace more than one outgoing edge.
+
+2. If a process A is behind a process B in some lock's wait queue, and
+their requested locks conflict, then we must say that A waits for B, since
+ProcLockWakeup will never awaken A before B.  This creates additional
+edges in the WFG.  We call these "soft" edges, as opposed to the "hard"
+edges induced by locks already held.  Note that if B already holds any
+locks conflicting with A's request, then their relationship is a hard edge
+not a soft edge.
+
+A "soft" block, or wait-priority block, has the same potential for
+inducing deadlock as a hard block.  However, we may be able to resolve
+a soft block without aborting the transactions involved: we can instead
+rearrange the order of the wait queue.  This rearrangement reverses the
+direction of the soft edge between two processes with conflicting requests
+whose queue order is reversed.  If we can find a rearrangement that
+eliminates a cycle without creating new ones, then we can avoid an abort.
+Checking for such possible rearrangements is the trickiest part of the
+algorithm.
+
+The workhorse of the deadlock detector is a routine FindLockCycle() which
+is given a starting point process (which must be a waiting process).
+It recursively scans outwards across waits-for edges as discussed above.
+If it finds no cycle involving the start point, it returns "false".
+(As discussed above, we can ignore cycles not involving the start point.)
+When such a cycle is found, FindLockCycle() returns "true", and as it
+unwinds it also builds a list of any "soft" edges involved in the cycle.
+If the resulting list is empty then there is a hard deadlock and the
+configuration cannot succeed.  However, if the list is not empty, then
+reversing any one of the listed edges through wait-queue rearrangement
+will eliminate that cycle.  Since such a reversal might create cycles
+elsewhere, we may need to try every possibility.  Therefore, we need to
+be able to invoke FindLockCycle() on hypothetical configurations (wait
+orders) as well as the current real order.
+
+The easiest way to handle this seems to be to have a lookaside table that
+shows the proposed new queue order for each wait queue that we are
+considering rearranging.  This table is passed to FindLockCycle, and it
+believes the given queue order rather than the "real" order for each lock
+that has an entry in the lookaside table.
+
+We build a proposed new queue order by doing a "topological sort" of the
+existing entries.  Each soft edge that we are currently considering
+reversing is a property of the partial order that the topological sort
+has to enforce.  We must use a sort method that preserves the input
+ordering as much as possible, so as not to gratuituously break arrival
+order for processes not involved in a deadlock.  (This is not true of the
+tsort method shown in Knuth, for example, but it's easily done by a simple
+doubly-nested-loop method that emits the first legal candidate at each
+step.  Fortunately, we don't need a highly efficient sort algorithm, since
+the number of partial order constraints is not likely to be large.)  Note
+that failure of the topological sort tells us we have conflicting ordering
+constraints, and therefore that the last-added soft edge reversal
+conflicts with a prior edge reversal.  We need to detect this case to
+avoid an infinite loop in the case where no possible rearrangement will
+work: otherwise, we might try a reversal, find that it still leads to
+a cycle, then try to un-reverse the reversal while trying to get rid of
+that cycle, etc etc.  Topological sort failure tells us the un-reversal
+is not a legitimate move in this context.
+
+So, the basic step in our rearrangement method is to take a list of
+soft edges in a cycle (as returned by FindLockCycle()) and successively
+try the reversal of each one as a topological-sort constraint added to
+whatever constraints we are already considering.  We recursively search
+through all such sets of constraints to see if any one eliminates all
+the deadlock cycles at once.  Although this might seem impossibly
+inefficient, it shouldn't be a big problem in practice, because there
+will normally be very few, and not very large, deadlock cycles --- if
+any at all.  So the combinatorial inefficiency isn't going to hurt us.
+Besides, it's better to spend some time to guarantee that we've checked
+all possible escape routes than to abort a transaction when we didn't
+really have to.
+
+Each edge reversal constraint can be viewed as requesting that the waiting
+process A be moved to before the blocking process B in the wait queue they
+are both in.  This action will reverse the desired soft edge, as well as
+any other soft edges between A and other processes it is advanced over.
+No other edges will be affected (note this is actually a constraint on our
+topological sort method to not re-order the queue more than necessary.)
+Therefore, we can be sure we have not created any new deadlock cycles if
+neither FindLockCycle(A) nor FindLockCycle(B) discovers any cycle.  Given
+the above-defined behavior of FindLockCycle, each of these searches is
+necessary as well as sufficient, since FindLockCycle starting at the
+original start point will not complain about cycles that include A or B
+but not the original start point.
+
+In short then, a proposed rearrangement of the wait queue(s) is determined
+by one or more broken soft edges A->B, fully specified by the output of
+topological sorts of each wait queue involved, and then tested by invoking
+FindLockCycle() starting at the original start point as well as each of
+the mentioned processes (A's and B's).  If none of the tests detect a
+cycle, then we have a valid configuration and can implement it by
+reordering the wait queues per the sort outputs (and then applying
+ProcLockWakeup on each reordered queue, in case a waiter has become wakable).
+If any test detects a soft cycle, we can try to resolve it by adding each
+soft link in that cycle, in turn, to the proposed rearrangement list.
+This is repeated recursively until we either find a workable rearrangement
+or determine that none exists.  In the latter case, the outer level
+resolves the deadlock by aborting the original start-point transaction.
+
+The particular order in which rearrangements are tried depends on the
+order FindLockCycle() happens to scan in, so if there are multiple
+workable rearrangements of the wait queues, then it is unspecified which
+one will be chosen.  What's more important is that we guarantee to try
+every queue rearrangement that could lead to success.  (For example,
+if we have A before B before C and the needed order constraints are
+C before A and B before C, we would first discover that A before C
+doesn't work and try the rearrangement C before A before B.  This would
+eventually lead to the discovery of the additional constraint B before C.)
+
+Got that?
+
+Miscellaneous notes:
+
+1. It is easily proven that no deadlock will be missed due to our
+asynchronous invocation of deadlock checking.  A deadlock cycle in the WFG
+is formed when the last edge in the cycle is added; therefore the last
+process in the cycle to wait (the one from which that edge is outgoing) is
+certain to detect and resolve the cycle when it later runs HandleDeadLock.
+This holds even if that edge addition created multiple cycles; the process
+may indeed abort without ever noticing those additional cycles, but we
+don't particularly care.  The only other possible creation of deadlocks is
+during deadlock resolution's rearrangement of wait queues, and we already
+saw that that algorithm will prove that it creates no new deadlocks before
+it attempts to actually execute any rearrangement.
+
+2. It is not certain that a deadlock will be resolved by aborting the
+last-to-wait process.  If earlier waiters in the cycle have not yet run
+HandleDeadLock, then the first one to do so will be the victim.
+
+3. No live (wakable) process can be missed by ProcLockWakeup, since it
+examines every member of the wait queue (this was not true in the 7.0
+implementation, BTW).  Therefore, if ProcLockWakeup is always invoked
+after a lock is released or a wait queue is rearranged, there can be no
+failure to wake a wakable process.  One should also note that
+LockWaitCancel (abort a waiter due to outside factors) must run
+ProcLockWakeup, in case the cancelled waiter was soft-blocking other
+waiters.
+
+4. We can minimize excess rearrangement-trial work by being careful to scan
+the wait queue from the front when looking for soft edges.  For example,
+if we have queue order A,B,C and C has deadlock conflicts with both A and B,
+we want to generate the "C before A" constraint first, rather than wasting
+time with "C before B", which won't move C far enough up.  So we look for
+soft edges outgoing from C starting at the front of the wait queue.
+
+5. The working data structures needed by the deadlock detection code can
+be proven not to need more than MAXBACKENDS entries.  Therefore the
+working storage can be statically allocated instead of depending on
+palloc().  This is a good thing, since if the deadlock detector could
+fail for extraneous reasons, all the above safety proofs fall down.
index fa2b98c..35e960e 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.78 2001/01/16 06:11:34 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.79 2001/01/22 22:30:06 tgl Exp $
  *
  * NOTES
  *       Outside modules can create a lock table and acquire/release
@@ -127,10 +127,10 @@ HOLDER_PRINT(const char * where, const HOLDER * holderP)
                || (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
         )
         elog(DEBUG,
-             "%s: holder(%lx) lock(%lx) tbl(%d) pid(%d) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
+             "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
              where, MAKE_OFFSET(holderP), holderP->tag.lock,
                         HOLDER_LOCKMETHOD(*(holderP)),
-             holderP->tag.pid, holderP->tag.xid,
+             holderP->tag.proc, holderP->tag.xid,
              holderP->holding[1], holderP->holding[2], holderP->holding[3],
                         holderP->holding[4], holderP->holding[5], holderP->holding[6],
                         holderP->holding[7], holderP->nHolding);
@@ -455,8 +455,7 @@ LockMethodTableRename(LOCKMETHOD lockmethod)
  *             tag.objId                                               block id                lock id2
  *                                                                             or xact id
  *             tag.offnum                                              0                               lock id1
- *             xid.pid                                                 backend pid             backend pid
- *             xid.xid                                                 xid or 0                0
+ *             holder.xid                                              xid or 0                0
  *             persistence                                             transaction             user or backend
  *                                                                             or backend
  *
@@ -526,11 +525,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        {
                lock->grantMask = 0;
                lock->waitMask = 0;
+               SHMQueueInit(&(lock->lockHolders));
+               ProcQueueInit(&(lock->waitProcs));
                lock->nRequested = 0;
                lock->nGranted = 0;
                MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
                MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
-               ProcQueueInit(&(lock->waitProcs));
                LOCK_PRINT("LockAcquire: new", lock, lockmode);
        }
        else
@@ -547,7 +547,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
         */
        MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
        holdertag.lock = MAKE_OFFSET(lock);
-       holdertag.pid = MyProcPid;
+       holdertag.proc = MAKE_OFFSET(MyProc);
        TransactionIdStore(xid, &holdertag.xid);
 
        /*
@@ -570,7 +570,9 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        {
                holder->nHolding = 0;
                MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
-               ProcAddLock(&holder->queue);
+               /* Add holder to appropriate lists */
+               SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
+               SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
                HOLDER_PRINT("LockAcquire: new", holder);
        }
        else
@@ -693,7 +695,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                {
                        if (holder->nHolding == 0)
                        {
-                               SHMQueueDelete(&holder->queue);
+                               SHMQueueDelete(&holder->lockLink);
+                               SHMQueueDelete(&holder->procLink);
                                holder = (HOLDER *) hash_search(holderTable,
                                                                                                (Pointer) holder,
                                                                                                HASH_REMOVE, &found);
@@ -862,33 +865,17 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
 static void
 LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
 {
-       HOLDER     *holder = NULL;
-       HOLDER     *nextHolder = NULL;
-       SHM_QUEUE  *holderQueue = &(proc->holderQueue);
-       SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
+       SHM_QUEUE  *procHolders = &(proc->procHolders);
+       HOLDER     *holder;
        int                     i;
 
        MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
 
-       if (SHMQueueEmpty(holderQueue))
-               return;
-
-       SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
+       holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+                                                                        offsetof(HOLDER, procLink));
 
-       do
+       while (holder)
        {
-               /* ---------------------------
-                * XXX Here we assume the shared memory queue is circular and
-                * that we know its internal structure.  Should have some sort of
-                * macros to allow one to walk it.      mer 20 July 1991
-                * ---------------------------
-                */
-               if (holder->queue.next == end)
-                       nextHolder = NULL;
-               else
-                       SHMQueueFirst(&holder->queue,
-                                                 (Pointer *) &nextHolder, &nextHolder->queue);
-
                if (lockOffset == holder->tag.lock)
                {
                        for (i = 1; i < MAX_LOCKMODES; i++)
@@ -897,8 +884,9 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
                        }
                }
 
-               holder = nextHolder;
-       } while (holder);
+               holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+                                                                                offsetof(HOLDER, procLink));
+       }
 }
 
 /*
@@ -1080,7 +1068,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
         */
        MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
        holdertag.lock = MAKE_OFFSET(lock);
-       holdertag.pid = MyProcPid;
+       holdertag.proc = MAKE_OFFSET(MyProc);
        TransactionIdStore(xid, &holdertag.xid);
 
        holderTable = lockMethodTable->holderHash;
@@ -1160,7 +1148,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
        Assert(lock->nGranted <= lock->nRequested);
 
-       if (!lock->nRequested)
+       if (lock->nRequested == 0)
        {
                /* ------------------
                 * if there's no one waiting in the queue,
@@ -1189,15 +1177,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
         * If this was my last hold on this lock, delete my entry in the holder
         * table.
         */
-       if (!holder->nHolding)
+       if (holder->nHolding == 0)
        {
-               if (holder->queue.prev == INVALID_OFFSET)
-                       elog(NOTICE, "LockRelease: holder.prev == INVALID_OFFSET");
-               if (holder->queue.next == INVALID_OFFSET)
-                       elog(NOTICE, "LockRelease: holder.next == INVALID_OFFSET");
-               if (holder->queue.next != INVALID_OFFSET)
-                       SHMQueueDelete(&holder->queue);
                HOLDER_PRINT("LockRelease: deleting", holder);
+               SHMQueueDelete(&holder->lockLink);
+               SHMQueueDelete(&holder->procLink);
                holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
                                                                                HASH_REMOVE_SAVED, &found);
                if (!holder || !found)
@@ -1220,7 +1204,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 }
 
 /*
- * LockReleaseAll -- Release all locks in a process's lock queue.
+ * LockReleaseAll -- Release all locks in a process's lock list.
  *
  * Well, not really *all* locks.
  *
@@ -1234,22 +1218,20 @@ bool
 LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
                           bool allxids, TransactionId xid)
 {
-       HOLDER     *holder = NULL;
-       HOLDER     *nextHolder = NULL;
-       SHM_QUEUE  *holderQueue = &(proc->holderQueue);
-       SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
+       SHM_QUEUE  *procHolders = &(proc->procHolders);
+       HOLDER     *holder;
+       HOLDER     *nextHolder;
        SPINLOCK        masterLock;
        LOCKMETHODTABLE *lockMethodTable;
        int                     i,
                                numLockModes;
        LOCK       *lock;
        bool            found;
-       int                     nleft;
 
 #ifdef LOCK_DEBUG
        if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
                elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
-                        lockmethod, MyProcPid);
+                        lockmethod, proc->pid);
 #endif
 
        Assert(lockmethod < NumLockMethods);
@@ -1260,51 +1242,33 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
                return FALSE;
        }
 
-       if (SHMQueueEmpty(holderQueue))
-               return TRUE;
-
        numLockModes = lockMethodTable->ctl->numLockModes;
        masterLock = lockMethodTable->ctl->masterLock;
 
        SpinAcquire(masterLock);
 
-       SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
-
-       nleft = 0;
+       holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+                                                                        offsetof(HOLDER, procLink));
 
-       do
+       while (holder)
        {
                bool            wakeupNeeded = false;
 
-               /* ---------------------------
-                * XXX Here we assume the shared memory queue is circular and
-                * that we know its internal structure.  Should have some sort of
-                * macros to allow one to walk it.      mer 20 July 1991
-                * ---------------------------
-                */
-               if (holder->queue.next == end)
-                       nextHolder = NULL;
-               else
-                       SHMQueueFirst(&holder->queue,
-                                                 (Pointer *) &nextHolder, &nextHolder->queue);
+               /* Get link first, since we may unlink/delete this holder */
+               nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+                                                                                        offsetof(HOLDER, procLink));
 
-               Assert(holder->tag.pid == proc->pid);
+               Assert(holder->tag.proc == MAKE_OFFSET(proc));
 
                lock = (LOCK *) MAKE_PTR(holder->tag.lock);
 
                /* Ignore items that are not of the lockmethod to be removed */
                if (LOCK_LOCKMETHOD(*lock) != lockmethod)
-               {
-                       nleft++;
                        goto next_item;
-               }
 
                /* If not allxids, ignore items that are of the wrong xid */
                if (!allxids && xid != holder->tag.xid)
-               {
-                       nleft++;
                        goto next_item;
-               }
 
                HOLDER_PRINT("LockReleaseAll", holder);
                LOCK_PRINT("LockReleaseAll", lock, 0);
@@ -1364,9 +1328,10 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
                HOLDER_PRINT("LockReleaseAll: deleting", holder);
 
                /*
-                * Remove the holder entry from the process' lock queue
+                * Remove the holder entry from the linked lists
                 */
-               SHMQueueDelete(&holder->queue);
+               SHMQueueDelete(&holder->lockLink);
+               SHMQueueDelete(&holder->procLink);
 
                /*
                 * remove the holder entry from the hashtable
@@ -1406,18 +1371,6 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
 
 next_item:
                holder = nextHolder;
-       } while (holder);
-
-       /*
-        * Reinitialize the queue only if nothing has been left in.
-        */
-       if (nleft == 0)
-       {
-#ifdef LOCK_DEBUG
-        if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
-            elog(DEBUG, "LockReleaseAll: reinitializing holderQueue");
-#endif
-               SHMQueueInit(holderQueue);
        }
 
        SpinRelease(masterLock);
@@ -1476,12 +1429,11 @@ LockShmemSize(int maxBackends)
 bool
 DeadLockCheck(PROC *thisProc, LOCK *findlock)
 {
-       HOLDER     *holder = NULL;
-       HOLDER     *nextHolder = NULL;
        PROC       *waitProc;
        PROC_QUEUE *waitQueue;
-       SHM_QUEUE  *holderQueue = &(thisProc->holderQueue);
-       SHMEM_OFFSET end = MAKE_OFFSET(holderQueue);
+       SHM_QUEUE  *procHolders = &(thisProc->procHolders);
+       HOLDER     *holder;
+       HOLDER     *nextHolder;
        LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
        LOCK       *lock;
        int                     i,
@@ -1501,26 +1453,16 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
        /*
         * Scan over all the locks held/awaited by thisProc.
         */
-       if (SHMQueueEmpty(holderQueue))
-               return false;
-
-       SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
+       holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+                                                                        offsetof(HOLDER, procLink));
 
-       do
+       while (holder)
        {
-               /* ---------------------------
-                * XXX Here we assume the shared memory queue is circular and
-                * that we know its internal structure.  Should have some sort of
-                * macros to allow one to walk it.      mer 20 July 1991
-                * ---------------------------
-                */
-               if (holder->queue.next == end)
-                       nextHolder = NULL;
-               else
-                       SHMQueueFirst(&holder->queue,
-                                                 (Pointer *) &nextHolder, &nextHolder->queue);
+               /* Get link first, since we may unlink/delete this holder */
+               nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+                                                                                        offsetof(HOLDER, procLink));
 
-               Assert(holder->tag.pid == thisProc->pid);
+               Assert(holder->tag.proc == MAKE_OFFSET(thisProc));
 
                lock = (LOCK *) MAKE_PTR(holder->tag.lock);
 
@@ -1532,7 +1474,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
                LOCK_PRINT("DeadLockCheck", lock, 0);
 
                /*
-                * waitLock is always in holderQueue of waiting proc, if !first_run
+                * waitLock is always in procHolders of waiting proc, if !first_run
                 * then upper caller will handle waitProcs queue of waitLock.
                 */
                if (thisProc->waitLock == lock && !first_run)
@@ -1555,13 +1497,13 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
                        }
 
                        /*
-                        * Else - get the next lock from thisProc's holderQueue
+                        * Else - get the next lock from thisProc's procHolders
                         */
                        goto nxtl;
                }
 
                waitQueue = &(lock->waitProcs);
-               waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
+               waitProc = (PROC *) MAKE_PTR(waitQueue->links.next);
 
                /*
                 * Inner loop scans over all processes waiting for this lock.
@@ -1589,7 +1531,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
                                        /* and he blocked by me -> deadlock */
                                        if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks)
                                                return true;
-                                       /* we shouldn't look at holderQueue of our blockers */
+                                       /* we shouldn't look at procHolders of our blockers */
                                        goto nextWaitProc;
                                }
 
@@ -1600,7 +1542,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
                                 * implicitly). Note that we don't do like test if
                                 * !first_run (when thisProc is holder and non-waiter on
                                 * lock) and so we call DeadLockCheck below for every
-                                * waitProc in thisProc->holderQueue, even for waitProc-s
+                                * waitProc in thisProc->procHolders, even for waitProc-s
                                 * un-blocked by thisProc. Should we? This could save us
                                 * some time...
                                 */
@@ -1618,7 +1560,7 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
                                        goto nextWaitProc;
                        }
 
-                       /* Recursively check this process's holderQueue. */
+                       /* Recursively check this process's procHolders. */
                        Assert(nprocs < MAXBACKENDS);
                        checked_procs[nprocs++] = waitProc;
 
@@ -1699,12 +1641,12 @@ DeadLockCheck(PROC *thisProc, LOCK *findlock)
                        }
 
 nextWaitProc:
-                       waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
+                       waitProc = (PROC *) MAKE_PTR(waitProc->links.next);
                }
 
 nxtl:
                holder = nextHolder;
-       } while (holder);
+       }
 
        /* if we got here, no deadlock */
        return false;
@@ -1712,18 +1654,17 @@ nxtl:
 
 #ifdef LOCK_DEBUG
 /*
- * Dump all locks in the proc->holderQueue. Must have already acquired
- * the masterLock.
+ * Dump all locks in the proc->procHolders list.
+ *
+ * Must have already acquired the masterLock.
  */
 void
 DumpLocks(void)
 {
        SHMEM_OFFSET location;
        PROC       *proc;
-       SHM_QUEUE  *holderQueue;
-       HOLDER     *holder = NULL;
-       HOLDER     *nextHolder = NULL;
-       SHMEM_OFFSET end;
+       SHM_QUEUE  *procHolders;
+       HOLDER     *holder;
        LOCK       *lock;
        int                     lockmethod = DEFAULT_LOCKMETHOD;
        LOCKMETHODTABLE *lockMethodTable;
@@ -1734,8 +1675,7 @@ DumpLocks(void)
        proc = (PROC *) MAKE_PTR(location);
        if (proc != MyProc)
                return;
-       holderQueue = &proc->holderQueue;
-       end = MAKE_OFFSET(holderQueue);
+       procHolders = &proc->procHolders;
 
        Assert(lockmethod < NumLockMethods);
        lockMethodTable = LockMethodTable[lockmethod];
@@ -1745,34 +1685,21 @@ DumpLocks(void)
        if (proc->waitLock)
                LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
 
-       if (SHMQueueEmpty(holderQueue))
-               return;
-
-       SHMQueueFirst(holderQueue, (Pointer *) &holder, &holder->queue);
+       holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
+                                                                        offsetof(HOLDER, procLink));
 
-       do
+       while (holder)
        {
-               /* ---------------------------
-                * XXX Here we assume the shared memory queue is circular and
-                * that we know its internal structure.  Should have some sort of
-                * macros to allow one to walk it.      mer 20 July 1991
-                * ---------------------------
-                */
-               if (holder->queue.next == end)
-                       nextHolder = NULL;
-               else
-                       SHMQueueFirst(&holder->queue,
-                                                 (Pointer *) &nextHolder, &nextHolder->queue);
-
-               Assert(holder->tag.pid == proc->pid);
+               Assert(holder->tag.proc == MAKE_OFFSET(proc));
 
                lock = (LOCK *) MAKE_PTR(holder->tag.lock);
 
                HOLDER_PRINT("DumpLocks", holder);
                LOCK_PRINT("DumpLocks", lock, 0);
 
-               holder = nextHolder;
-       } while (holder);
+               holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
+                                                                                offsetof(HOLDER, procLink));
+       }
 }
 
 /*
index bc461f0..af345e6 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.94 2001/01/16 20:59:34 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.95 2001/01/22 22:30:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -48,7 +48,7 @@
  *             This is so that we can support more backends. (system-wide semaphore
  *             sets run out pretty fast.)                                -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.94 2001/01/16 20:59:34 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.95 2001/01/22 22:30:06 tgl Exp $
  */
 #include "postgres.h"
 
@@ -228,9 +228,6 @@ InitProcess(void)
                        SpinRelease(ProcStructLock);
                        elog(FATAL, "cannot create new proc: out of memory");
                }
-
-               /* this cannot be initialized until after the buffer pool */
-               SHMQueueInit(&(MyProc->holderQueue));
        }
 
        /*
@@ -259,10 +256,15 @@ InitProcess(void)
                MyProc->sem.semNum = -1;
        }
 
+       SHMQueueElemInit(&(MyProc->links));
+       MyProc->errType = NO_ERROR;
        MyProc->pid = MyProcPid;
        MyProc->databaseId = MyDatabaseId;
        MyProc->xid = InvalidTransactionId;
        MyProc->xmin = InvalidTransactionId;
+       MyProc->waitLock = NULL;
+       MyProc->waitHolder = NULL;
+       SHMQueueInit(&(MyProc->procHolders));
 
        /* ----------------------
         * Release the lock.
@@ -282,9 +284,6 @@ InitProcess(void)
                (location != MAKE_OFFSET(MyProc)))
                elog(STOP, "InitProcess: ShmemPID table broken");
 
-       MyProc->errType = NO_ERROR;
-       SHMQueueElemInit(&(MyProc->links));
-
        on_shmem_exit(ProcKill, 0);
 }
 
@@ -342,7 +341,6 @@ RemoveFromWaitQueue(PROC *proc)
                waitLock->waitMask &= ~(1 << lockmode);
 
        /* Clean up the proc's own state */
-       SHMQueueElemInit(&(proc->links));
        proc->waitLock = NULL;
        proc->waitHolder = NULL;
 
@@ -451,6 +449,7 @@ ProcRemove(int pid)
 
        ProcFreeSem(proc->sem.semId, proc->sem.semNum);
 
+       /* Add PROC struct to freelist so space can be recycled in future */
        proc->links.next = ProcGlobal->freeProcs;
        ProcGlobal->freeProcs = MAKE_OFFSET(proc);
 
@@ -565,12 +564,7 @@ ProcSleep(LOCKMETHODCTL *lockctl,
     bigtime_t time_interval;
 #endif
 
-       MyProc->waitLock = lock;
-       MyProc->waitHolder = holder;
-       MyProc->waitLockMode = lockmode;
-       /* We assume the caller set up MyProc->heldLocks */
-
-       proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
+       proc = (PROC *) MAKE_PTR(waitQueue->links.next);
 
        /* if we don't conflict with any waiter - be first in queue */
        if (!(lockctl->conflictTab[lockmode] & waitMask))
@@ -593,7 +587,7 @@ ProcSleep(LOCKMETHODCTL *lockctl,
                        {
                                /* Yes, report deadlock failure */
                                MyProc->errType = STATUS_ERROR;
-                               goto rt;
+                               return STATUS_ERROR;
                        }
                        /* I must go after him in queue - so continue loop */
                }
@@ -624,20 +618,25 @@ ProcSleep(LOCKMETHODCTL *lockctl,
                (aheadGranted[procWaitMode])++;
                if (aheadGranted[procWaitMode] == lock->requested[procWaitMode])
                        waitMask &= ~(1 << procWaitMode);
-               proc = (PROC *) MAKE_PTR(proc->links.prev);
+               proc = (PROC *) MAKE_PTR(proc->links.next);
        }
 
 ins:;
        /* -------------------
-        * Insert self into queue, ahead of the given proc.
-        * These operations are atomic (because of the spinlock).
+        * Insert self into queue, ahead of the given proc (or at tail of queue).
         * -------------------
         */
-       SHMQueueInsertTL(&(proc->links), &(MyProc->links));
+       SHMQueueInsertBefore(&(proc->links), &(MyProc->links));
        waitQueue->size++;
 
        lock->waitMask |= myMask;
 
+       /* Set up wait information in PROC object, too */
+       MyProc->waitLock = lock;
+       MyProc->waitHolder = holder;
+       MyProc->waitLockMode = lockmode;
+       /* We assume the caller set up MyProc->heldLocks */
+
        MyProc->errType = NO_ERROR;             /* initialize result for success */
 
        /* mark that we are waiting for a lock */
@@ -723,11 +722,10 @@ ins:;
         */
        SpinAcquire(spinlock);
 
-rt:;
-
-       MyProc->waitLock = NULL;
-       MyProc->waitHolder = NULL;
-
+       /*
+        * We don't have to do anything else, because the awaker did all the
+        * necessary update of the lock table and MyProc.
+        */
        return MyProc->errType;
 }
 
@@ -745,18 +743,24 @@ ProcWakeup(PROC *proc, int errType)
 
        /* assume that spinlock has been acquired */
 
+       /* Proc should be sleeping ... */
        if (proc->links.prev == INVALID_OFFSET ||
                proc->links.next == INVALID_OFFSET)
                return (PROC *) NULL;
 
-       retProc = (PROC *) MAKE_PTR(proc->links.prev);
+       /* Save next process before we zap the list link */
+       retProc = (PROC *) MAKE_PTR(proc->links.next);
 
+       /* Remove process from wait queue */
        SHMQueueDelete(&(proc->links));
-       SHMQueueElemInit(&(proc->links));
        (proc->waitLock->waitProcs.size)--;
 
+       /* Clean up process' state and pass it the ok/fail signal */
+       proc->waitLock = NULL;
+       proc->waitHolder = NULL;
        proc->errType = errType;
 
+       /* And awaken it */
        IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum);
 
        return retProc;
@@ -780,7 +784,7 @@ ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
        if (!queue_size)
                return STATUS_NOT_FOUND;
 
-       proc = (PROC *) MAKE_PTR(queue->links.prev);
+       proc = (PROC *) MAKE_PTR(queue->links.next);
 
        while (queue_size-- > 0)
        {
@@ -820,12 +824,13 @@ ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
 
                /*
                 * ProcWakeup removes proc from the lock's waiting process queue
-                * and returns the next proc in chain; don't use prev link.
+                * and returns the next proc in chain; don't use proc's next-link,
+                * because it's been cleared.
                 */
                continue;
 
 nextProc:
-               proc = (PROC *) MAKE_PTR(proc->links.prev);
+               proc = (PROC *) MAKE_PTR(proc->links.next);
        }
 
        Assert(queue->size >= 0);
@@ -848,12 +853,6 @@ nextProc:
        }
 }
 
-void
-ProcAddLock(SHM_QUEUE *elem)
-{
-       SHMQueueInsertTL(&MyProc->holderQueue, elem);
-}
-
 /* --------------------
  * We only get to this routine if we got SIGALRM after DeadlockTimeout
  * while waiting for a lock to be released by some other process.  Look
index 85e2f37..6d84fea 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.41 2001/01/16 06:11:34 tgl Exp $
+ * $Id: lock.h,v 1.42 2001/01/22 22:30:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -22,8 +22,8 @@
 /* originally in procq.h */
 typedef struct PROC_QUEUE
 {
-       SHM_QUEUE       links;
-       int                     size;
+       SHM_QUEUE       links;                  /* head of list of PROC objects */
+       int                     size;                   /* number of entries in list */
 } PROC_QUEUE;
 
 /* struct proc is declared in storage/proc.h, but must forward-reference it */
@@ -59,7 +59,7 @@ typedef int LOCKMASK;
 typedef int LOCKMODE;
 typedef int LOCKMETHOD;
 
-/* MAX_LOCKMODES cannot be larger than the bits in LOCKMASK */
+/* MAX_LOCKMODES cannot be larger than the # of bits in LOCKMASK */
 #define MAX_LOCKMODES  8
 
 /*
@@ -152,6 +152,7 @@ typedef struct LOCKTAG
  * tag -- uniquely identifies the object being locked
  * grantMask -- bitmask for all lock types currently granted on this object.
  * waitMask -- bitmask for all lock types currently awaited on this object.
+ * lockHolders -- list of HOLDER objects for this lock.
  * waitProcs -- queue of processes waiting for this lock.
  * requested -- count of each lock type currently requested on the lock
  *             (includes requests already granted!!).
@@ -167,6 +168,7 @@ typedef struct LOCK
        /* data */
        int                     grantMask;              /* bitmask for lock types already granted */
        int                     waitMask;               /* bitmask for lock types awaited */
+       SHM_QUEUE       lockHolders;    /* list of HOLDER objects assoc. with lock */
        PROC_QUEUE      waitProcs;              /* list of PROC objects waiting on lock */
        int                     requested[MAX_LOCKMODES]; /* counts of requested locks */
        int                     nRequested;             /* total of requested[] array */
@@ -189,8 +191,8 @@ typedef struct LOCK
  * holder hashtable.  A HOLDERTAG value uniquely identifies a lock holder.
  *
  * There are two possible kinds of holder tags: a transaction (identified
- * both by the PID of the backend running it, and the xact's own ID) and
- * a session (identified by backend PID, with xid = InvalidTransactionId).
+ * both by the PROC of the backend running it, and the xact's own ID) and
+ * a session (identified by backend PROC, with xid = InvalidTransactionId).
  *
  * Currently, session holders are used for user locks and for cross-xact
  * locks obtained for VACUUM.  We assume that a session lock never conflicts
@@ -201,11 +203,17 @@ typedef struct LOCK
  * zero holding[], for any lock that the process is currently waiting on.
  * Otherwise, holder objects whose counts have gone to zero are recycled
  * as soon as convenient.
+ *
+ * Each HOLDER object is linked into lists for both the associated LOCK object
+ * and the owning PROC object.  Note that the HOLDER is entered into these
+ * lists as soon as it is created, even if no lock has yet been granted.
+ * A PROC that is waiting for a lock to be granted will also be linked into
+ * the lock's waitProcs queue.
  */
 typedef struct HOLDERTAG
 {
        SHMEM_OFFSET lock;                      /* link to per-lockable-object information */
-       int                     pid;                    /* PID of backend */
+       SHMEM_OFFSET proc;                      /* link to PROC of owning backend */
        TransactionId xid;                      /* xact ID, or InvalidTransactionId */
 } HOLDERTAG;
 
@@ -217,7 +225,8 @@ typedef struct HOLDER
        /* data */
        int                     holding[MAX_LOCKMODES]; /* count of locks currently held */
        int                     nHolding;               /* total of holding[] array */
-       SHM_QUEUE       queue;                  /* list link for process' list of holders */
+       SHM_QUEUE       lockLink;               /* list link for lock's list of holders */
+       SHM_QUEUE       procLink;               /* list link for process's list of holders */
 } HOLDER;
 
 #define SHMEM_HOLDERTAB_KEYSIZE  sizeof(HOLDERTAG)
index 131c339..5fcd7c6 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.36 2001/01/16 20:59:34 tgl Exp $
+ * $Id: proc.h,v 1.37 2001/01/22 22:30:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -27,9 +27,8 @@ typedef struct
 } SEMA;
 
 /*
- * Each backend has a PROC struct in shared memory.  There is also a list
- * of currently-unused PROC structs that will be reallocated to new backends
- * (a fairly pointless optimization, but it's there anyway).
+ * Each backend has a PROC struct in shared memory.  There is also a list of
+ * currently-unused PROC structs that will be reallocated to new backends.
  *
  * links: list link for any list the PROC is in.  When waiting for a lock,
  * the PROC is linked into that lock's waitProcs queue.  A recycled PROC
@@ -37,7 +36,7 @@ typedef struct
  */
 struct proc
 {
-       /* proc->links MUST BE THE FIRST ELEMENT OF STRUCT (see ProcWakeup()) */
+       /* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */
 
        SHM_QUEUE       links;                  /* list link if process is in a list */
 
@@ -53,7 +52,8 @@ struct proc
 
        XLogRecPtr      logRec;
 
-       /* Info about lock the process is currently waiting for, if any */
+       /* Info about lock the process is currently waiting for, if any. */
+       /* waitLock and waitHolder are NULL if not currently waiting. */
        LOCK       *waitLock;           /* Lock object we're sleeping on ... */
        HOLDER     *waitHolder;         /* Per-holder info for awaited lock */
        LOCKMODE        waitLockMode;   /* type of lock we're waiting for */
@@ -64,7 +64,7 @@ struct proc
        Oid                     databaseId;             /* OID of database this backend is using */
 
        short           sLocks[MAX_SPINS];              /* Spin lock stats */
-       SHM_QUEUE       holderQueue;    /* list of HOLDER objects for locks held or
+       SHM_QUEUE       procHolders;    /* list of HOLDER objects for locks held or
                                                                 * awaited by this backend */
 };
 
@@ -138,7 +138,6 @@ extern int ProcSleep(LOCKMETHODCTL *lockctl, LOCKMODE lockmode,
                                         LOCK *lock, HOLDER *holder);
 extern PROC *ProcWakeup(PROC *proc, int errType);
 extern int ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock);
-extern void ProcAddLock(SHM_QUEUE *elem);
 extern void ProcReleaseSpins(PROC *proc);
 extern bool LockWaitCancel(void);
 extern void HandleDeadLock(SIGNAL_ARGS);
index 8b2cc44..fb76297 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: shmem.h,v 1.24 2000/11/28 23:27:57 tgl Exp $
+ * $Id: shmem.h,v 1.25 2001/01/22 22:30:06 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -100,9 +100,9 @@ typedef struct
 extern void SHMQueueInit(SHM_QUEUE *queue);
 extern void SHMQueueElemInit(SHM_QUEUE *queue);
 extern void SHMQueueDelete(SHM_QUEUE *queue);
-extern void SHMQueueInsertTL(SHM_QUEUE *queue, SHM_QUEUE *elem);
-extern void SHMQueueFirst(SHM_QUEUE *queue, Pointer *nextPtrPtr,
-                         SHM_QUEUE *nextQueue);
+extern void SHMQueueInsertBefore(SHM_QUEUE *queue, SHM_QUEUE *elem);
+extern Pointer SHMQueueNext(SHM_QUEUE *queue, SHM_QUEUE *curElem,
+                                                       Size linkOffset);
 extern bool SHMQueueEmpty(SHM_QUEUE *queue);
 
 #endif  /* SHMEM_H */