OSDN Git Service

I have written some patches to the postgres lock manager which allow the
authorMarc G. Fournier <scrappy@hub.org>
Fri, 11 Oct 1996 03:22:59 +0000 (03:22 +0000)
committerMarc G. Fournier <scrappy@hub.org>
Fri, 11 Oct 1996 03:22:59 +0000 (03:22 +0000)
use of long term cooperative locks managed by the user applications.

Submitted by: Massimo Dal Zotto <dz@cs.unitn.it>

src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/proc.c

index 9e618f0..353f010 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.2 1996/07/30 07:47:33 scrappy Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.3 1996/10/11 03:22:56 scrappy Exp $
  *
  * NOTES
  *    Outside modules can create a lock table and acquire/release
@@ -366,6 +366,48 @@ LockTabRename(LockTableId tableId)
  * Side Effects: The lock is always acquired.  No way to abort
  *     a lock acquisition other than aborting the transaction.
  *     Lock is recorded in the lkchain.
+#ifdef USER_LOCKS
+ * Note on User Locks: 
+ *     User locks are handled totally on the application side as
+ *     long term cooperative locks which extend beyond the normal
+ *     transaction boundaries.  Their purpose is to indicate to an
+ *     application that someone is `working' on an item.  So it is
+ *     possible to put an user lock on a tuple's oid, retrieve the
+ *     tuple, work on it for an hour and then update it and remove
+ *     the lock.  While the lock is active other clients can still
+ *     read and write the tuple but they can be aware that it has
+ *     been locked at the application level by someone.
+ *     User locks use lock tags made of an uint16 and an uint32, for
+ *     example 0 and a tuple oid, or any other arbitrary pair of
+ *     numbers following a convention established by the application.
+ *     In this sense tags don't refer to tuples or database entities.
+ *     User locks and normal locks are completely orthogonal and
+ *     they don't interfere with each other, so it is possible
+ *     to acquire a normal lock on an user-locked tuple or user-lock
+ *     a tuple for which a normal write lock already exists.
+ *     User locks are always non blocking, therefore they are never
+ *     acquired if already held by another process.  They must be
+ *     released explicitly by the application but they are released
+ *     automatically when a backend terminates.
+ *     They are indicated by a dummy tableId 0 which doesn't have
+ *     any table allocated but uses the normal lock table, and are
+ *     distinguished from normal locks for the following differences:
+ *
+ *                                     normal lock     user lock
+ *
+ *     tableId                         1               0
+ *     tag.relId                       rel oid         0
+ *     tag.ItemPointerData.ip_blkid    block id        lock id2
+ *     tag.ItemPointerData.ip_posid    tuple offset    lock id1
+ *     xid.pid                         0               backend pid
+ *     xid.xid                         current xid     0
+ *     persistence                     transaction     user or backend
+ *
+ *     The lockt parameter can have the same values for normal locks
+ *     although probably only WRITE_LOCK can have some practical use.
+ *
+ *                                                     DZ - 4 Oct 1996
+#endif
  */
 bool
 LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
@@ -379,6 +421,22 @@ LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
     int                status;
     TransactionId      myXid;
     
+#ifdef USER_LOCKS
+    int is_user_lock;
+
+    is_user_lock = (tableId == 0);
+    if (is_user_lock) {
+       tableId = 1;
+#ifdef USER_LOCKS_DEBUG
+       elog(NOTICE,"LockAcquire: user lock tag [%u,%u] %d",
+            lockName->tupleId.ip_posid,
+            ((lockName->tupleId.ip_blkid.bi_hi<<16)+
+             lockName->tupleId.ip_blkid.bi_lo),
+            lockt);
+#endif
+    }
+#endif
+
     Assert (tableId < NumTables);
     ltable = AllTables[tableId];
     if (!ltable)
@@ -445,6 +503,17 @@ LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
     item.tag.pid = MyPid;
 #endif
     
+#ifdef USER_LOCKS
+    if (is_user_lock) {
+       item.tag.pid = getpid();
+       item.tag.xid = myXid = 0;
+#ifdef USER_LOCKS_DEBUG
+       elog(NOTICE,"LockAcquire: user lock xid [%d,%d,%d]",
+            item.tag.lock, item.tag.pid, item.tag.xid);
+#endif
+    }
+#endif
+
     result = (XIDLookupEnt *)hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found);
     if (!result)
        {
@@ -494,6 +563,25 @@ LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
        }
     else if (status == STATUS_FOUND)
        {
+#ifdef USER_LOCKS
+           /*
+            * User locks are non blocking. If we can't acquire a lock
+            * remove the xid entry and return FALSE without waiting.
+            */
+           if (is_user_lock) {
+               if (!result->nHolding) {
+                   SHMQueueDelete(&result->queue);
+                   hash_search(xidTable, (Pointer)&item, HASH_REMOVE, &found);
+               }
+               lock->nHolding--;
+               lock->holders[lockt]--;
+               SpinRelease(masterLock);
+#ifdef USER_LOCKS_DEBUG
+               elog(NOTICE,"LockAcquire: user lock failed");
+#endif
+               return(FALSE);
+           }
+#endif
            status = WaitOnLock(ltable, tableId, lock, lockt);
            XID_PRINT("Someone granted me the lock", result);
        }
@@ -690,6 +778,22 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
     HTAB               *xidTable;
     bool               wakeupNeeded = true;
     
+#ifdef USER_LOCKS
+    int is_user_lock;
+
+    is_user_lock = (tableId == 0);
+    if (is_user_lock) {
+       tableId = 1;
+#ifdef USER_LOCKS_DEBUG
+       elog(NOTICE,"LockRelease: user lock tag [%u,%u] %d",
+            lockName->tupleId.ip_posid,
+            ((lockName->tupleId.ip_blkid.bi_hi<<16)+
+             lockName->tupleId.ip_blkid.bi_lo),
+            lockt);
+#endif
+    }
+#endif
+
     Assert (tableId < NumTables);
     ltable = AllTables[tableId];
     if (!ltable) {
@@ -713,6 +817,18 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
     lock = (LOCK *)
        hash_search(ltable->lockHash,(Pointer)lockName,HASH_FIND_SAVE,&found);
     
+#ifdef USER_LOCKS
+    /*
+     * If the entry is not found hash_search returns TRUE
+     * instead of NULL, so we must check it explicitly.
+     */
+    if ((is_user_lock) && (lock == (LOCK *)TRUE)) {
+       SpinRelease(masterLock);
+       elog(NOTICE,"LockRelease: there are no locks with this tag");
+       return(FALSE);
+    }
+#endif
+
     /* let the caller print its own error message, too.
      * Do not elog(WARN).
      */
@@ -732,6 +848,14 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
     
     Assert(lock->nHolding > 0);
     
+#ifdef USER_LOCKS
+    /*
+     * If this is an user lock it can be removed only after
+     * checking that it was acquired by the current process,
+     * so this code is skipped and executed later.
+     */
+  if (!is_user_lock) {
+#endif
     /*
      * fix the general lock stats
      */
@@ -758,6 +882,9 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
            Assert(lock && found);
            wakeupNeeded = false;
        }
+#ifdef USER_LOCKS
+  }
+#endif
     
     /* ------------------
      * Zero out all of the tag bytes (this clears the padding bytes for long
@@ -772,6 +899,17 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
     item.tag.pid = MyPid;
 #endif
     
+#ifdef USER_LOCKS
+    if (is_user_lock) {
+       item.tag.pid = getpid();
+       item.tag.xid = 0;
+#ifdef USER_LOCKS_DEBUG
+       elog(NOTICE,"LockRelease: user lock xid [%d,%d,%d]",
+            item.tag.lock, item.tag.pid, item.tag.xid);
+#endif
+    }
+#endif
+
     if (! ( result = (XIDLookupEnt *) hash_search(xidTable,
                                                  (Pointer)&item,
                                                  HASH_FIND_SAVE,
@@ -779,7 +917,15 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
        || !found)
        {
            SpinRelease(masterLock);
+#ifdef USER_LOCKS
+           if ((is_user_lock) && (result)) {
+               elog(NOTICE,"LockRelease: you don't have a lock on this tag");
+           } else {
+               elog(NOTICE,"LockRelease: find xid, table corrupted");
+           }
+#else
            elog(NOTICE,"LockReplace: xid table corrupted");
+#endif
            return(FALSE);
        }
     /*
@@ -797,6 +943,14 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
      */
     if (! result->nHolding)
        {
+#ifdef USER_LOCKS
+           if (result->queue.prev == INVALID_OFFSET) {
+               elog(NOTICE,"LockRelease: xid.prev == INVALID_OFFSET");
+           }
+           if (result->queue.next == INVALID_OFFSET) {
+               elog(NOTICE,"LockRelease: xid.next == INVALID_OFFSET");
+           }
+#endif
            if (result->queue.next != INVALID_OFFSET)
                SHMQueueDelete(&result->queue);
            if (! (result = (XIDLookupEnt *)
@@ -804,11 +958,50 @@ LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
                ! found)
                {
                    SpinRelease(masterLock);
+#ifdef USER_LOCKS
+                   elog(NOTICE,"LockRelease: remove xid, table corrupted");
+#else
                    elog(NOTICE,"LockReplace: xid table corrupted");
+#endif
                    return(FALSE);
                }
        }
     
+#ifdef USER_LOCKS
+    /*
+     * If this is an user lock remove it now, after the
+     * corresponding xid entry has been found and deleted.
+     */
+  if (is_user_lock) {
+    /*
+     * fix the general lock stats
+     */
+    lock->nHolding--;
+    lock->holders[lockt]--;
+    lock->nActive--;
+    lock->activeHolders[lockt]--;
+    
+    Assert(lock->nActive >= 0);
+    
+    if (! lock->nHolding)
+       {
+           /* ------------------
+            * if there's no one waiting in the queue,
+            * we just released the last lock.
+            * Delete it from the lock table.
+            * ------------------
+            */
+           Assert( ltable->lockHash->hash == tag_hash);
+           lock = (LOCK *) hash_search(ltable->lockHash,
+                                       (Pointer) &(lock->tag),
+                                       HASH_REMOVE,
+                                       &found);
+           Assert(lock && found);
+           wakeupNeeded = false;
+       }
+  }
+#endif
+
     /* --------------------------
      * If there are still active locks of the type I just released, no one
      * should be woken up.  Whoever is asleep will still conflict
@@ -848,6 +1041,18 @@ GrantLock(LOCK *lock, LOCKT lockt)
     lock->mask |= BITS_ON[lockt];
 }
 
+#ifdef USER_LOCKS
+/*
+ * LockReleaseAll -- Release all locks in a process lock queue.
+ *
+ * Note: This code is a little complicated by the presence in the
+ *       same queue of user locks which can't be removed from the
+ *       normal lock queue at the end of a transaction. They must
+ *       however be removed when the backend exits.
+ *       A dummy tableId 0 is used to indicate that we are releasing
+ *       the user locks, from the code added to ProcKill().
+ */
+#endif
 bool
 LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
 {
@@ -862,6 +1067,19 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
     LOCK               *lock;
     bool       found;
     
+#ifdef USER_LOCKS
+    int is_user_lock_table, my_pid, count, nskip;
+
+    is_user_lock_table = (tableId == 0);
+    my_pid = getpid();
+#ifdef USER_LOCKS_DEBUG
+    elog(NOTICE,"LockReleaseAll: tableId=%d, pid=%d", tableId, my_pid);
+#endif
+    if (is_user_lock_table) {
+       tableId = 1;
+    }
+#endif
+
     Assert (tableId < NumTables);
     ltable = AllTables[tableId];
     if (!ltable)
@@ -873,11 +1091,18 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
     if (SHMQueueEmpty(lockQueue))
        return TRUE;
     
+#ifdef USER_LOCKS
+    SpinAcquire(masterLock);
+#endif
     SHMQueueFirst(lockQueue,(Pointer*)&xidLook,&xidLook->queue);
     
     XID_PRINT("LockReleaseAll:", xidLook);
     
+#ifndef USER_LOCKS
     SpinAcquire(masterLock);
+#else
+    count = nskip = 0;
+#endif
     for (;;)
        {
            /* ---------------------------
@@ -891,6 +1116,65 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
            
            LOCK_PRINT("ReleaseAll",(&lock->tag),0);
            
+#ifdef USER_LOCKS
+           /*
+            * Sometimes the queue appears to be messed up.
+            */
+           if (count++ > 2000) {
+               elog(NOTICE,"LockReleaseAll: xid loop detected, giving up");
+               nskip = 0;
+               break;
+           }
+           if (is_user_lock_table) {
+               if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0)) {
+#ifdef USER_LOCKS_DEBUG
+                   elog(NOTICE,"LockReleaseAll: skip normal lock [%d,%d,%d]",
+                        xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+                   nskip++;
+                   goto next_item;
+               }
+               if (xidLook->tag.pid != my_pid) {
+                   /* This should never happen */
+#ifdef USER_LOCKS_DEBUG
+                   elog(NOTICE,
+                        "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
+                        lock->tag.tupleId.ip_posid,
+                        ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
+                         lock->tag.tupleId.ip_blkid.bi_lo),
+                        xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+                   nskip++;
+                   goto next_item;
+               }
+#ifdef USER_LOCKS_DEBUG
+               elog(NOTICE,
+                    "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
+                    lock->tag.tupleId.ip_posid,
+                    ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
+                     lock->tag.tupleId.ip_blkid.bi_lo),
+                    xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+           } else {
+               if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0)) {
+#ifdef USER_LOCKS_DEBUG
+                   elog(NOTICE,
+                        "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
+                        lock->tag.tupleId.ip_posid,
+                        ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
+                         lock->tag.tupleId.ip_blkid.bi_lo),
+                        xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+                   nskip++;
+                   goto next_item;
+               }
+#ifdef USER_LOCKS_DEBUG
+               elog(NOTICE,"LockReleaseAll: release normal lock [%d,%d,%d]",
+                    xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+           }
+#endif
+
            /* ------------------
             * fix the general lock stats
             * ------------------
@@ -921,11 +1205,18 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
             * always remove the xidLookup entry, we're done with it now
             * ----------------
             */
+#ifdef USER_LOCKS
+           SHMQueueDelete(&xidLook->queue);
+#endif
            if ((! hash_search(ltable->xidHash, (Pointer)xidLook, HASH_REMOVE, &found))
                || !found)
                {
                    SpinRelease(masterLock);
+#ifdef USER_LOCKS
+                   elog(NOTICE,"LockReleaseAll: xid table corrupted");
+#else
                    elog(NOTICE,"LockReplace: xid table corrupted");
+#endif
                    return(FALSE);
                }
            
@@ -943,7 +1234,11 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
                    if ((! lock) || (!found))
                        {
                            SpinRelease(masterLock);
+#ifdef USER_LOCKS
+                           elog(NOTICE,"LockReleaseAll: cannot remove lock from HTAB");
+#else
                            elog(NOTICE,"LockReplace: cannot remove lock from HTAB");
+#endif
                            return(FALSE);
                        }
                }
@@ -959,12 +1254,21 @@ LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
                    (void) ProcLockWakeup(waitQueue, (char *) ltable, (char *) lock);
                }
            
+#ifdef USER_LOCKS
+         next_item:
+#endif
            if (done)
                break;
            SHMQueueFirst(&xidLook->queue,(Pointer*)&tmp,&tmp->queue);
            xidLook = tmp;
        }
     SpinRelease(masterLock);
+#ifdef USER_LOCKS
+    /*
+     * Reinitialize the queue only if nothing has been left in.
+     */
+  if (nskip == 0)
+#endif
     SHMQueueInit(lockQueue);
     return TRUE;
 }
index edba19c..6588f5c 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.5 1996/08/01 05:11:33 scrappy Exp $
+ *    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.6 1996/10/11 03:22:59 scrappy Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -46,7 +46,7 @@
  *      This is so that we can support more backends. (system-wide semaphore
  *      sets run out pretty fast.)                -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.5 1996/08/01 05:11:33 scrappy Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.6 1996/10/11 03:22:59 scrappy Exp $
  */
 #include <sys/time.h>
 #ifndef WIN32
@@ -361,6 +361,10 @@ ProcKill(int exitStatus, int pid)
     ProcReleaseSpins(proc);
     LockReleaseAll(1,&proc->lockQueue);
     
+#ifdef USER_LOCKS
+    LockReleaseAll(0,&proc->lockQueue);
+#endif
+
     /* ----------------
      * get off the wait queue
      * ----------------