1 /*-------------------------------------------------------------------------
4 * simple lock acquisition
6 * Copyright (c) 1994, Regents of the University of California
10 * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.50 1999/05/07 01:23:03 vadim Exp $
13 * Outside modules can create a lock table and acquire/release
14 * locks. A lock table is a shared memory hash table. When
15 * a process tries to acquire a lock of a type that conflictRs
16 * with existing locks, it is put to sleep using the routines
17 * in storage/lmgr/proc.c.
21 * LockAcquire(), LockRelease(), LockMethodTableInit(),
22 * LockMethodTableRename(), LockReleaseAll, LockOwners()
23 * LockResolveConflicts(), GrantLock()
25 * NOTE: This module is used to define new lock tables. The
26 * multi-level lock table (multi.c) used by the heap
27 * access methods calls these routines. See multi.c for
28 * examples showing how to use this interface.
30 *-------------------------------------------------------------------------
32 #include <stdio.h> /* for sprintf() */
34 #include <sys/types.h>
39 #include "miscadmin.h"
40 #include "storage/shmem.h"
41 #include "storage/sinvaladt.h"
42 #include "storage/spin.h"
43 #include "storage/proc.h"
44 #include "storage/lock.h"
45 #include "utils/hsearch.h"
46 #include "utils/memutils.h"
47 #include "utils/palloc.h"
48 #include "access/xact.h"
49 #include "access/transam.h"
50 #include "utils/trace.h"
51 #include "utils/ps_status.h"
53 static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
56 * lockDebugRelation can be used to trace unconditionally a single relation,
57 * for example pg_listener, if you suspect there are locking problems.
59 * lockDebugOidMin is is used to avoid tracing postgres relations, which
60 * would produce a lot of output. Unfortunately most system relations are
61 * created after bootstrap and have oid greater than BootstrapObjectIdData.
62 * If you are using tprintf you should specify a value greater than the max
63 * oid of system relations, which can be found with the following query:
65 * select max(int4in(int4out(oid))) from pg_class where relname ~ '^pg_';
67 * To get a useful lock trace you can use the following pg_options:
69 * -T "verbose,query,locks,userlocks,lock_debug_oidmin=17500"
71 #define LOCKDEBUG(lockmethod) (pg_options[TRACE_SHORTLOCKS+lockmethod])
72 #define lockDebugRelation (pg_options[TRACE_LOCKRELATION])
73 #define lockDebugOidMin (pg_options[TRACE_LOCKOIDMIN])
74 #define lockReadPriority (pg_options[OPT_LOCKREADPRIORITY])
77 #define LOCK_PRINT(where,lock,type) \
78 if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
79 && (lock->tag.relId >= lockDebugOidMin)) \
81 (lockDebugRelation && (lock->tag.relId == lockDebugRelation))) \
82 LOCK_PRINT_AUX(where,lock,type)
84 #define LOCK_PRINT_AUX(where,lock,type) \
86 "%s: lock(%x) tbl(%d) rel(%d) db(%d) obj(%u) mask(%x) " \
87 "hold(%d,%d,%d,%d,%d)=%d " \
88 "act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
91 lock->tag.lockmethod, \
94 lock->tag.objId.blkno, \
102 lock->activeHolders[1], \
103 lock->activeHolders[2], \
104 lock->activeHolders[3], \
105 lock->activeHolders[4], \
106 lock->activeHolders[5], \
108 lock->waitProcs.size, \
111 #define XID_PRINT(where,xidentP) \
112 if (((LOCKDEBUG(XIDENT_LOCKMETHOD(*(xidentP))) >= 1) \
113 && (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
114 >= lockDebugOidMin)) \
115 || (lockDebugRelation && \
116 (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
117 == lockDebugRelation))) \
118 XID_PRINT_AUX(where,xidentP)
120 #define XID_PRINT_AUX(where,xidentP) \
122 "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \
123 "hold(%d,%d,%d,%d,%d)=%d", \
125 MAKE_OFFSET(xidentP), \
127 XIDENT_LOCKMETHOD(*(xidentP)), \
130 xidentP->holders[1], \
131 xidentP->holders[2], \
132 xidentP->holders[3], \
133 xidentP->holders[4], \
134 xidentP->holders[5], \
137 #else /* !LOCK_MGR_DEBUG */
138 #define LOCK_PRINT(where,lock,type)
139 #define LOCK_PRINT_AUX(where,lock,type)
140 #define XID_PRINT(where,xidentP)
141 #define XID_PRINT_AUX(where,xidentP)
142 #endif /* !LOCK_MGR_DEBUG */
144 static char *lock_types[] = {
150 "ShareRowExclusiveLock",
152 "AccessExclusiveLock"
155 SPINLOCK LockMgrLock; /* in Shmem or created in
156 * CreateSpinlocks() */
158 /* This is to simplify/speed up some bit arithmetic */
160 static MASK BITS_OFF[MAX_LOCKMODES];
161 static MASK BITS_ON[MAX_LOCKMODES];
164 * XXX Want to move this to this file
167 static bool LockingIsDisabled;
169 /* -------------------
170 * map from lockmethod to the lock table structure
171 * -------------------
173 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
175 static int NumLockMethods;
177 /* -------------------
178 * InitLocks -- Init the lock module. Create a private data
179 * structure for constructing conflict masks.
180 * -------------------
189 /* -------------------
190 * remember 0th lockmode is invalid
191 * -------------------
193 for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
199 #ifdef LOCK_MGR_DEBUG
202 * If lockDebugOidMin value has not been specified in pg_options set a
205 if (!lockDebugOidMin)
206 lockDebugOidMin = BootstrapObjectIdData;
210 /* -------------------
211 * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
215 LockDisable(int status)
217 LockingIsDisabled = status;
222 * LockMethodInit -- initialize the lock table's lock type
225 * Notes: just copying. Should only be called once.
228 LockMethodInit(LOCKMETHODTABLE * lockMethodTable,
235 lockMethodTable->ctl->numLockModes = numModes;
237 for (i = 0; i < numModes; i++, prioP++, conflictsP++)
239 lockMethodTable->ctl->conflictTab[i] = *conflictsP;
240 lockMethodTable->ctl->prio[i] = *prioP;
245 * LockMethodTableInit -- initialize a lock table structure
248 * (a) a lock table has four separate entries in the shmem index
249 * table. This is because every shared hash table and spinlock
250 * has its name stored in the shmem index at its creation. It
251 * is wasteful, in this case, but not much space is involved.
255 LockMethodTableInit(char *tabName,
260 LOCKMETHODTABLE *lockMethodTable;
267 if (numModes > MAX_LOCKMODES)
269 elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
270 numModes, MAX_LOCKMODES);
271 return INVALID_LOCKMETHOD;
274 /* allocate a string for the shmem index table lookup */
275 shmemName = (char *) palloc((unsigned) (strlen(tabName) + 32));
278 elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s \n", tabName);
279 return INVALID_LOCKMETHOD;
282 /* each lock table has a non-shared header */
283 lockMethodTable = (LOCKMETHODTABLE *) palloc((unsigned) sizeof(LOCKMETHODTABLE));
284 if (!lockMethodTable)
286 elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %s\n", tabName);
288 return INVALID_LOCKMETHOD;
291 /* ------------------------
292 * find/acquire the spinlock for the table
293 * ------------------------
295 SpinAcquire(LockMgrLock);
298 /* -----------------------
299 * allocate a control structure from shared memory or attach to it
300 * if it already exists.
301 * -----------------------
303 sprintf(shmemName, "%s (ctl)", tabName);
304 lockMethodTable->ctl = (LOCKMETHODCTL *)
305 ShmemInitStruct(shmemName, (unsigned) sizeof(LOCKMETHODCTL), &found);
307 if (!lockMethodTable->ctl)
309 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
313 /* -------------------
315 * -------------------
320 * we're first - initialize
325 MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
326 lockMethodTable->ctl->masterLock = LockMgrLock;
327 lockMethodTable->ctl->lockmethod = NumLockMethods;
330 /* --------------------
331 * other modules refer to the lock table by a lockmethod
332 * --------------------
334 LockMethodTable[NumLockMethods] = lockMethodTable;
336 Assert(NumLockMethods <= MAX_LOCK_METHODS);
338 /* ----------------------
339 * allocate a hash table for the lock tags. This is used
340 * to find the different locks.
341 * ----------------------
343 info.keysize = SHMEM_LOCKTAB_KEYSIZE;
344 info.datasize = SHMEM_LOCKTAB_DATASIZE;
345 info.hash = tag_hash;
346 hash_flags = (HASH_ELEM | HASH_FUNCTION);
348 sprintf(shmemName, "%s (lock hash)", tabName);
349 lockMethodTable->lockHash = (HTAB *) ShmemInitHash(shmemName,
350 INIT_TABLE_SIZE, MAX_TABLE_SIZE,
353 Assert(lockMethodTable->lockHash->hash == tag_hash);
354 if (!lockMethodTable->lockHash)
356 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
360 /* -------------------------
361 * allocate an xid table. When different transactions hold
362 * the same lock, additional information must be saved (locks per tx).
363 * -------------------------
365 info.keysize = SHMEM_XIDTAB_KEYSIZE;
366 info.datasize = SHMEM_XIDTAB_DATASIZE;
367 info.hash = tag_hash;
368 hash_flags = (HASH_ELEM | HASH_FUNCTION);
370 sprintf(shmemName, "%s (xid hash)", tabName);
371 lockMethodTable->xidHash = (HTAB *) ShmemInitHash(shmemName,
372 INIT_TABLE_SIZE, MAX_TABLE_SIZE,
375 if (!lockMethodTable->xidHash)
377 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
381 /* init ctl data structures */
382 LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
384 SpinRelease(LockMgrLock);
389 return lockMethodTable->ctl->lockmethod;
391 return INVALID_LOCKMETHOD;
395 * LockMethodTableRename -- allocate another lockmethod to the same
398 * NOTES: Both the lock module and the lock chain (lchain.c)
399 * module use table id's to distinguish between different
400 * kinds of locks. Short term and long term locks look
401 * the same to the lock table, but are handled differently
402 * by the lock chain manager. This function allows the
403 * client to use different lockmethods when acquiring/releasing
404 * short term and long term locks.
408 LockMethodTableRename(LOCKMETHOD lockmethod)
410 LOCKMETHOD newLockMethod;
412 if (NumLockMethods >= MAX_LOCK_METHODS)
413 return INVALID_LOCKMETHOD;
414 if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
415 return INVALID_LOCKMETHOD;
417 /* other modules refer to the lock table by a lockmethod */
418 newLockMethod = NumLockMethods;
421 LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
422 return newLockMethod;
426 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
427 * set lock if/when no conflicts.
429 * Returns: TRUE if parameters are correct, FALSE otherwise.
431 * Side Effects: The lock is always acquired. No way to abort
432 * a lock acquisition other than aborting the transaction.
433 * Lock is recorded in the lkchain.
437 * Note on User Locks:
439 * User locks are handled totally on the application side as
440 * long term cooperative locks which extend beyond the normal
441 * transaction boundaries. Their purpose is to indicate to an
442 * application that someone is `working' on an item. So it is
443 * possible to put an user lock on a tuple's oid, retrieve the
444 * tuple, work on it for an hour and then update it and remove
445 * the lock. While the lock is active other clients can still
446 * read and write the tuple but they can be aware that it has
447 * been locked at the application level by someone.
448 * User locks use lock tags made of an uint16 and an uint32, for
449 * example 0 and a tuple oid, or any other arbitrary pair of
450 * numbers following a convention established by the application.
451 * In this sense tags don't refer to tuples or database entities.
452 * User locks and normal locks are completely orthogonal and
453 * they don't interfere with each other, so it is possible
454 * to acquire a normal lock on an user-locked tuple or user-lock
455 * a tuple for which a normal write lock already exists.
456 * User locks are always non blocking, therefore they are never
457 * acquired if already held by another process. They must be
458 * released explicitly by the application but they are released
459 * automatically when a backend terminates.
460 * They are indicated by a lockmethod 2 which is an alias for the
461 * normal lock table, and are distinguished from normal locks
462 * for the following differences:
464 * normal lock user lock
467 * tag.relId rel oid 0
468 * tag.ItemPointerData.ip_blkid block id lock id2
469 * tag.ItemPointerData.ip_posid tuple offset lock id1
470 * xid.pid 0 backend pid
472 * persistence transaction user or backend
474 * The lockmode parameter can have the same values for normal locks
475 * although probably only WRITE_LOCK can have some practical use.
482 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
484 XIDLookupEnt *result,
490 LOCKMETHODTABLE *lockMethodTable;
497 is_user_lock = (lockmethod == USER_LOCKMETHOD);
500 #ifdef USER_LOCKS_DEBUG
501 TPRINTF(TRACE_USERLOCKS, "LockAcquire: user lock [%u] %s",
502 locktag->objId.blkno,
503 lock_types[lockmode]);
508 /* ???????? This must be changed when short term locks will be used */
509 locktag->lockmethod = lockmethod;
511 Assert(lockmethod < NumLockMethods);
512 lockMethodTable = LockMethodTable[lockmethod];
513 if (!lockMethodTable)
515 elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
519 if (LockingIsDisabled)
522 masterLock = lockMethodTable->ctl->masterLock;
524 SpinAcquire(masterLock);
527 * Find or create a lock with this tag
529 Assert(lockMethodTable->lockHash->hash == tag_hash);
530 lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
534 SpinRelease(masterLock);
535 elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
539 /* --------------------
540 * if there was nothing else there, complete initialization
541 * --------------------
548 MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
549 MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
550 ProcQueueInit(&(lock->waitProcs));
551 Assert(lock->tag.objId.blkno == locktag->objId.blkno);
552 LOCK_PRINT("LockAcquire: new", lock, lockmode);
556 LOCK_PRINT("LockAcquire: found", lock, lockmode);
557 Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
558 Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
559 Assert(lock->nActive <= lock->nHolding);
562 /* ------------------
563 * add an element to the lock queue so that we can clear the
564 * locks at end of transaction.
567 xidTable = lockMethodTable->xidHash;
569 /* ------------------
570 * Zero out all of the tag bytes (this clears the padding bytes for long
571 * word alignment and ensures hashing consistency).
574 MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */
575 item.tag.lock = MAKE_OFFSET(lock);
576 #ifdef USE_XIDTAG_LOCKMETHOD
577 item.tag.lockmethod = lockmethod;
582 item.tag.pid = MyProcPid;
583 item.tag.xid = xid = 0;
587 xid = GetCurrentTransactionId();
588 TransactionIdStore(xid, &item.tag.xid);
591 xid = GetCurrentTransactionId();
592 TransactionIdStore(xid, &item.tag.xid);
596 * Find or create an xid entry with this tag
598 result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
602 elog(NOTICE, "LockAcquire: xid table corrupted");
607 * If not found initialize the new entry
611 result->nHolding = 0;
612 MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
613 ProcAddLock(&result->queue);
614 XID_PRINT("LockAcquire: new", result);
618 XID_PRINT("LockAcquire: found", result);
619 Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0));
620 Assert(result->nHolding <= lock->nActive);
624 * lock->nholding tells us how many processes have _tried_ to
625 * acquire this lock, Regardless of whether they succeeded or
626 * failed in doing so.
630 lock->holders[lockmode]++;
631 Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
633 /* --------------------
634 * If I'm the only one holding a lock, then there
635 * cannot be a conflict. The same is true if we already
637 * --------------------
639 if (result->nHolding == lock->nActive || result->holders[lockmode] != 0)
641 result->holders[lockmode]++;
643 XID_PRINT("LockAcquire: owning", result);
644 Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
645 GrantLock(lock, lockmode);
646 SpinRelease(masterLock);
651 * If lock requested conflicts with locks requested by waiters...
653 if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
658 * If I don't hold locks or my locks don't conflict
659 * with waiters then force to sleep.
661 if (result->nHolding > 0)
663 for ( ; i <= lockMethodTable->ctl->numLockModes; i++)
665 if (result->holders[i] > 0 &&
666 lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
667 break; /* conflict */
671 if (result->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
673 XID_PRINT("LockAcquire: higher priority proc waiting",
675 status = STATUS_FOUND;
678 status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
681 status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
683 if (status == STATUS_OK)
684 GrantLock(lock, lockmode);
685 else if (status == STATUS_FOUND)
690 * User locks are non blocking. If we can't acquire a lock we must
691 * remove the xid entry and return FALSE without waiting.
695 if (!result->nHolding)
697 SHMQueueDelete(&result->queue);
698 result = (XIDLookupEnt *) hash_search(xidTable,
700 HASH_REMOVE, &found);
701 if (!result || !found)
702 elog(NOTICE, "LockAcquire: remove xid, table corrupted");
705 XID_PRINT_AUX("LockAcquire: NHOLDING", result);
707 lock->holders[lockmode]--;
708 LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
709 Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
710 Assert(lock->nActive <= lock->nHolding);
711 SpinRelease(masterLock);
716 * Construct bitmask of locks we hold before going to sleep.
718 MyProc->holdLock = 0;
719 if (result->nHolding > 0)
724 for (i = 1; i <= lockMethodTable->ctl->numLockModes;
727 if (result->holders[i] > 0)
728 MyProc->holdLock |= tmpMask;
730 Assert(MyProc->holdLock != 0);
733 status = WaitOnLock(lockmethod, lock, lockmode);
736 * Check the xid entry status, in case something in the ipc
737 * communication doesn't work correctly.
739 if (!((result->nHolding > 0) && (result->holders[lockmode] > 0)))
741 XID_PRINT_AUX("LockAcquire: INCONSISTENT ", result);
742 LOCK_PRINT_AUX("LockAcquire: INCONSISTENT ", lock, lockmode);
743 /* Should we retry ? */
746 XID_PRINT("LockAcquire: granted", result);
747 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
750 SpinRelease(masterLock);
752 return status == STATUS_OK;
755 /* ----------------------------
756 * LockResolveConflicts -- test for lock conflicts
759 * Here's what makes this complicated: one transaction's
760 * locks don't conflict with one another. When many processes
761 * hold locks, each has to subtract off the other's locks when
762 * determining whether or not any new lock acquired conflicts with
765 * ----------------------------
768 LockResolveConflicts(LOCKMETHOD lockmethod,
772 XIDLookupEnt *xidentP) /* xident ptr or NULL */
774 XIDLookupEnt *result,
789 numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
790 xidTable = LockMethodTable[lockmethod]->xidHash;
796 * A pointer to the xid entry was supplied from the caller.
797 * Actually only LockAcquire can do it.
803 /* ---------------------
804 * read my own statistics from the xid table. If there
805 * isn't an entry, then we'll just add one.
807 * Zero out the tag, this clears the padding bytes for long
808 * word alignment and ensures hashing consistency.
811 MemSet(&item, 0, XID_TAGSIZE);
812 item.tag.lock = MAKE_OFFSET(lock);
813 #ifdef USE_XIDTAG_LOCKMETHOD
814 item.tag.lockmethod = lockmethod;
817 is_user_lock = (lockmethod == 2);
820 item.tag.pid = MyProcPid;
824 TransactionIdStore(xid, &item.tag.xid);
826 TransactionIdStore(xid, &item.tag.xid);
830 * Find or create an xid entry with this tag
832 result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
836 elog(NOTICE, "LockResolveConflicts: xid table corrupted");
841 * If not found initialize the new entry. THIS SHOULD NEVER
842 * HAPPEN, if we are trying to resolve a conflict we must already
843 * have allocated an xid entry for this lock. dz 21-11-1997
848 * we're not holding any type of lock yet. Clear
852 MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders)));
853 result->nHolding = 0;
854 XID_PRINT_AUX("LockResolveConflicts: NOT FOUND", result);
857 XID_PRINT("LockResolveConflicts: found", result);
859 Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
861 /* ----------------------------
862 * first check for global conflicts: If no locks conflict
863 * with mine, then I get the lock.
865 * Checking for conflict: lock->mask represents the types of
866 * currently held locks. conflictTable[lockmode] has a bit
867 * set for each type of lock that conflicts with mine. Bitwise
868 * compare tells if there is a conflict.
869 * ----------------------------
871 if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
873 result->holders[lockmode]++;
875 XID_PRINT("LockResolveConflicts: no conflict", result);
876 Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
880 /* ------------------------
881 * Rats. Something conflicts. But it could still be my own
882 * lock. We have to construct a conflict mask
883 * that does not reflect our own locks.
884 * ------------------------
886 myHolders = result->holders;
889 for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
891 if (lock->activeHolders[i] != myHolders[i])
895 /* ------------------------
896 * now check again for conflicts. 'bitmask' describes the types
897 * of locks held by other processes. If one of these
898 * conflicts with the kind of lock that I want, there is a
899 * conflict and I have to sleep.
900 * ------------------------
902 if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
904 /* no conflict. Get the lock and go on */
905 result->holders[lockmode]++;
907 XID_PRINT("LockResolveConflicts: resolved", result);
908 Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
912 XID_PRINT("LockResolveConflicts: conflicting", result);
917 * GrantLock -- update the lock data structure to show
918 * the new lock holder.
921 GrantLock(LOCK *lock, LOCKMODE lockmode)
924 lock->activeHolders[lockmode]++;
925 lock->mask |= BITS_ON[lockmode];
926 LOCK_PRINT("GrantLock", lock, lockmode);
927 Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
928 Assert(lock->nActive <= lock->nHolding);
932 WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
934 PROC_QUEUE *waitQueue = &(lock->waitProcs);
935 LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
939 Assert(lockmethod < NumLockMethods);
942 * the waitqueue is ordered by priority. I insert myself according to
943 * the priority of the lock I am acquiring.
945 * SYNC NOTE: I am assuming that the lock table spinlock is sufficient
946 * synchronization for this queue. That will not be true if/when
947 * people can be deleted from the queue by a SIGINT or something.
949 LOCK_PRINT_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
950 strcpy(old_status, PS_STATUS);
951 strcpy(new_status, PS_STATUS);
952 strcat(new_status, " waiting");
953 PS_SET_STATUS(new_status);
954 if (ProcSleep(waitQueue,
955 lockMethodTable->ctl,
959 /* -------------------
960 * This could have happend as a result of a deadlock,
961 * see HandleDeadLock().
962 * Decrement the lock nHolding and holders fields as
963 * we are no longer waiting on this lock.
964 * -------------------
967 lock->holders[lockmode]--;
968 LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode);
969 Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
970 Assert(lock->nActive <= lock->nHolding);
971 if (lock->activeHolders[lockmode] == lock->holders[lockmode])
972 lock->waitMask &= BITS_OFF[lockmode];
973 SpinRelease(lockMethodTable->ctl->masterLock);
974 elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
979 if (lock->activeHolders[lockmode] == lock->holders[lockmode])
980 lock->waitMask &= BITS_OFF[lockmode];
981 PS_SET_STATUS(old_status);
982 LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
987 * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
990 * Side Effects: if the lock no longer conflicts with the highest
991 * priority waiting process, that process is granted the lock
992 * and awoken. (We have to grant the lock here to avoid a
993 * race between the waking process and any new process to
994 * come along and request the lock).
997 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
1000 SPINLOCK masterLock;
1002 LOCKMETHODTABLE *lockMethodTable;
1003 XIDLookupEnt *result,
1007 bool wakeupNeeded = true;
1013 is_user_lock = (lockmethod == USER_LOCKMETHOD);
1016 TPRINTF(TRACE_USERLOCKS, "LockRelease: user lock tag [%u] %d",
1017 locktag->objId.blkno,
1022 /* ???????? This must be changed when short term locks will be used */
1023 locktag->lockmethod = lockmethod;
1027 (lockmethod == USER_LOCKMETHOD) ? TRACE_USERLOCKS : TRACE_LOCKS;
1029 trace_flag = TRACE_LOCKS;
1032 Assert(lockmethod < NumLockMethods);
1033 lockMethodTable = LockMethodTable[lockmethod];
1034 if (!lockMethodTable)
1036 elog(NOTICE, "lockMethodTable is null in LockRelease");
1040 if (LockingIsDisabled)
1043 masterLock = lockMethodTable->ctl->masterLock;
1044 SpinAcquire(masterLock);
1047 * Find a lock with this tag
1049 Assert(lockMethodTable->lockHash->hash == tag_hash);
1050 lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
1054 * let the caller print its own error message, too. Do not
1059 SpinRelease(masterLock);
1060 elog(NOTICE, "LockRelease: locktable corrupted");
1066 SpinRelease(masterLock);
1070 TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
1074 elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
1077 LOCK_PRINT("LockRelease: found", lock, lockmode);
1078 Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
1079 Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
1080 Assert(lock->nActive <= lock->nHolding);
1083 /* ------------------
1084 * Zero out all of the tag bytes (this clears the padding bytes for long
1085 * word alignment and ensures hashing consistency).
1086 * ------------------
1088 MemSet(&item, 0, XID_TAGSIZE);
1089 item.tag.lock = MAKE_OFFSET(lock);
1090 #ifdef USE_XIDTAG_LOCKMETHOD
1091 item.tag.lockmethod = lockmethod;
1096 item.tag.pid = MyProcPid;
1097 item.tag.xid = xid = 0;
1101 xid = GetCurrentTransactionId();
1102 TransactionIdStore(xid, &item.tag.xid);
1105 xid = GetCurrentTransactionId();
1106 TransactionIdStore(xid, &item.tag.xid);
1110 * Find an xid entry with this tag
1112 xidTable = lockMethodTable->xidHash;
1113 result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
1114 HASH_FIND_SAVE, &found);
1115 if (!result || !found)
1117 SpinRelease(masterLock);
1119 if (!found && is_user_lock)
1120 TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
1123 elog(NOTICE, "LockReplace: xid table corrupted");
1126 XID_PRINT("LockRelease: found", result);
1127 Assert(result->tag.lock == MAKE_OFFSET(lock));
1130 * Check that we are actually holding a lock of the type we want to
1133 if (!(result->holders[lockmode] > 0))
1135 SpinRelease(masterLock);
1136 XID_PRINT_AUX("LockAcquire: WRONGTYPE", result);
1137 elog(NOTICE, "LockRelease: you don't own a lock of type %s",
1138 lock_types[lockmode]);
1139 Assert(result->holders[lockmode] >= 0);
1142 Assert(result->nHolding > 0);
1145 * fix the general lock stats
1148 lock->holders[lockmode]--;
1150 lock->activeHolders[lockmode]--;
1153 /* --------------------------
1154 * If there are still active locks of the type I just released, no one
1155 * should be woken up. Whoever is asleep will still conflict
1156 * with the remaining locks.
1157 * --------------------------
1159 if (lock->activeHolders[lockmode])
1160 wakeupNeeded = false;
1164 * Above is not valid any more (due to MVCC lock modes).
1165 * Actually we should compare activeHolders[lockmode] with
1166 * number of waiters holding lock of this type and try to
1167 * wakeup only if these numbers are equal (and lock released
1168 * conflicts with locks requested by waiters). For the moment
1169 * we only check the last condition.
1171 if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
1172 wakeupNeeded = true;
1174 if (!(lock->activeHolders[lockmode]))
1176 /* change the conflict mask. No more of this lock type. */
1177 lock->mask &= BITS_OFF[lockmode];
1180 LOCK_PRINT("LockRelease: updated", lock, lockmode);
1181 Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
1182 Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
1183 Assert(lock->nActive <= lock->nHolding);
1185 if (!lock->nHolding)
1187 /* ------------------
1188 * if there's no one waiting in the queue,
1189 * we just released the last lock.
1190 * Delete it from the lock table.
1191 * ------------------
1193 Assert(lockMethodTable->lockHash->hash == tag_hash);
1194 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1195 (Pointer) &(lock->tag),
1198 Assert(lock && found);
1199 wakeupNeeded = false;
1203 * now check to see if I have any private locks. If I do, decrement
1204 * the counts associated with them.
1206 result->holders[lockmode]--;
1208 XID_PRINT("LockRelease: updated", result);
1209 Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
1212 * If this was my last hold on this lock, delete my entry in the XID
1215 if (!result->nHolding)
1217 if (result->queue.prev == INVALID_OFFSET)
1218 elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
1219 if (result->queue.next == INVALID_OFFSET)
1220 elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
1221 if (result->queue.next != INVALID_OFFSET)
1222 SHMQueueDelete(&result->queue);
1223 XID_PRINT("LockRelease: deleting", result);
1224 result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result,
1225 HASH_REMOVE_SAVED, &found);
1226 if (!result || !found)
1228 SpinRelease(masterLock);
1229 elog(NOTICE, "LockRelease: remove xid, table corrupted");
1236 ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
1240 if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
1241 && (lock->tag.relId >= lockDebugOidMin)) \
1243 (lockDebugRelation && (lock->tag.relId == lockDebugRelation)))
1244 TPRINTF(TRACE_ALL, "LockRelease: no wakeup needed");
1247 SpinRelease(masterLock);
1252 * LockReleaseAll -- Release all locks in a process lock queue.
1255 LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
1257 PROC_QUEUE *waitQueue;
1259 XIDLookupEnt *xidLook = NULL;
1260 XIDLookupEnt *tmp = NULL;
1261 XIDLookupEnt *result;
1262 SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1263 SPINLOCK masterLock;
1264 LOCKMETHODTABLE *lockMethodTable;
1270 int xidtag_lockmethod;
1273 int is_user_lock_table,
1279 is_user_lock_table = (lockmethod == USER_LOCKMETHOD);
1280 trace_flag = (lockmethod == 2) ? TRACE_USERLOCKS : TRACE_LOCKS;
1282 trace_flag = TRACE_LOCKS;
1284 TPRINTF(trace_flag, "LockReleaseAll: lockmethod=%d, pid=%d",
1285 lockmethod, MyProcPid);
1287 Assert(lockmethod < NumLockMethods);
1288 lockMethodTable = LockMethodTable[lockmethod];
1289 if (!lockMethodTable)
1291 elog(NOTICE, "LockAcquire: bad lockmethod %d", lockmethod);
1295 if (SHMQueueEmpty(lockQueue))
1298 numLockModes = lockMethodTable->ctl->numLockModes;
1299 masterLock = lockMethodTable->ctl->masterLock;
1301 SpinAcquire(masterLock);
1302 SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1306 bool wakeupNeeded = false;
1309 * Sometimes the queue appears to be messed up.
1313 elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
1318 /* ---------------------------
1319 * XXX Here we assume the shared memory queue is circular and
1320 * that we know its internal structure. Should have some sort of
1321 * macros to allow one to walk it. mer 20 July 1991
1322 * ---------------------------
1324 done = (xidLook->queue.next == end);
1325 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1327 xidtag_lockmethod = XIDENT_LOCKMETHOD(*xidLook);
1328 if ((xidtag_lockmethod == lockmethod) && pg_options[trace_flag])
1330 XID_PRINT("LockReleaseAll", xidLook);
1331 LOCK_PRINT("LockReleaseAll", lock, 0);
1334 #ifdef USE_XIDTAG_LOCKMETHOD
1335 if (xidtag_lockmethod != LOCK_LOCKMETHOD(*lock))
1336 elog(NOTICE, "LockReleaseAll: xid/lock method mismatch: %d != %d",
1337 xidtag_lockmethod, lock->tag.lockmethod);
1339 if ((xidtag_lockmethod != lockmethod) && (trace_flag >= 2))
1341 TPRINTF(trace_flag, "LockReleaseAll: skipping other table");
1346 Assert(lock->nHolding > 0);
1347 Assert(lock->nActive > 0);
1348 Assert(lock->nActive <= lock->nHolding);
1349 Assert(xidLook->nHolding >= 0);
1350 Assert(xidLook->nHolding <= lock->nHolding);
1353 if (is_user_lock_table)
1355 if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
1357 TPRINTF(TRACE_USERLOCKS,
1358 "LockReleaseAll: skiping normal lock [%d,%d,%d]",
1359 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1363 if (xidLook->tag.pid != MyProcPid)
1365 /* Should never happen */
1367 "LockReleaseAll: INVALID PID: [%u] [%d,%d,%d]",
1368 lock->tag.objId.blkno,
1369 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1373 TPRINTF(TRACE_USERLOCKS,
1374 "LockReleaseAll: releasing user lock [%u] [%d,%d,%d]",
1375 lock->tag.objId.blkno,
1376 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1382 * Can't check xidLook->tag.xid, can be 0 also for normal
1385 if (xidLook->tag.pid != 0)
1387 TPRINTF(TRACE_LOCKS,
1388 "LockReleaseAll: skiping user lock [%u] [%d,%d,%d]",
1389 lock->tag.objId.blkno,
1390 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1397 /* ------------------
1398 * fix the general lock stats
1399 * ------------------
1401 if (lock->nHolding != xidLook->nHolding)
1403 for (i = 1; i <= numLockModes; i++)
1405 Assert(xidLook->holders[i] >= 0);
1406 lock->holders[i] -= xidLook->holders[i];
1407 lock->activeHolders[i] -= xidLook->holders[i];
1408 Assert((lock->holders[i] >= 0) \
1409 &&(lock->activeHolders[i] >= 0));
1410 if (!lock->activeHolders[i])
1411 lock->mask &= BITS_OFF[i];
1413 * Read comments in LockRelease
1415 if (!wakeupNeeded && xidLook->holders[i] > 0 &&
1416 lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
1417 wakeupNeeded = true;
1419 lock->nHolding -= xidLook->nHolding;
1420 lock->nActive -= xidLook->nHolding;
1421 Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
1422 Assert(lock->nActive <= lock->nHolding);
1427 * set nHolding to zero so that we can garbage collect the lock
1432 /* Fix the lock status, just for next LOCK_PRINT message. */
1433 for (i = 1; i <= numLockModes; i++)
1435 Assert(lock->holders[i] == lock->activeHolders[i]);
1436 lock->holders[i] = lock->activeHolders[i] = 0;
1439 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1442 * Remove the xid from the process lock queue
1444 SHMQueueDelete(&xidLook->queue);
1447 * always remove the xidLookup entry, we're done with it now
1451 XID_PRINT("LockReleaseAll: deleting", xidLook);
1452 result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
1456 if (!result || !found)
1458 SpinRelease(masterLock);
1459 elog(NOTICE, "LockReleaseAll: xid table corrupted");
1463 if (!lock->nHolding)
1465 /* --------------------
1466 * if there's no one waiting in the queue, we've just released
1468 * --------------------
1470 LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1471 Assert(lockMethodTable->lockHash->hash == tag_hash);
1472 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1473 (Pointer) &(lock->tag),
1474 HASH_REMOVE, &found);
1475 if ((!lock) || (!found))
1477 SpinRelease(masterLock);
1478 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1482 else if (wakeupNeeded)
1484 waitQueue = &(lock->waitProcs);
1485 ProcLockWakeup(waitQueue, lockmethod, lock);
1493 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1498 * Reinitialize the queue only if nothing has been left in.
1502 TPRINTF(trace_flag, "LockReleaseAll: reinitializing lockQueue");
1503 SHMQueueInit(lockQueue);
1506 SpinRelease(masterLock);
1507 TPRINTF(trace_flag, "LockReleaseAll: done");
1513 LockShmemSize(int maxBackends)
1517 size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1518 size += MAXALIGN(maxBackends * sizeof(PROC)); /* each MyProc */
1519 size += MAXALIGN(maxBackends * sizeof(LOCKMETHODCTL)); /* each
1520 * lockMethodTable->ctl */
1522 /* lockHash table */
1523 size += hash_estimate_size(NLOCKENTS(maxBackends),
1524 SHMEM_LOCKTAB_KEYSIZE,
1525 SHMEM_LOCKTAB_DATASIZE);
1528 size += hash_estimate_size(maxBackends,
1529 SHMEM_XIDTAB_KEYSIZE,
1530 SHMEM_XIDTAB_DATASIZE);
1532 /* Since the lockHash entry count above is only an estimate,
1533 * add 10% safety margin.
1540 /* -----------------
1541 * Boolean function to determine current locking status
1547 return LockingIsDisabled;
1551 * DeadlockCheck -- Checks for deadlocks for a given process
1553 * We can't block on user locks, so no sense testing for deadlock
1554 * because there is no blocking, and no timer for the block.
1556 * This code takes a list of locks a process holds, and the lock that
1557 * the process is sleeping on, and tries to find if any of the processes
1558 * waiting on its locks hold the lock it is waiting for. If no deadlock
1559 * is found, it goes on to look at all the processes waiting on their locks.
1561 * We have already locked the master lock before being called.
1564 DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
1567 XIDLookupEnt *xidLook = NULL;
1568 XIDLookupEnt *tmp = NULL;
1569 SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1572 static PROC *checked_procs[MAXBACKENDS];
1575 /* initialize at start of recursion */
1578 checked_procs[0] = MyProc;
1582 if (SHMQueueEmpty(lockQueue))
1585 SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1587 XID_PRINT("DeadLockCheck", xidLook);
1591 done = (xidLook->queue.next == end);
1592 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1594 LOCK_PRINT("DeadLockCheck", lock, 0);
1597 * This is our only check to see if we found the lock we want.
1599 * The lock we are waiting for is already in MyProc->lockQueue so we
1600 * need to skip it here. We are trying to find it in someone
1601 * else's lockQueue. bjm
1603 if (lock == findlock && !skip_check)
1607 PROC_QUEUE *waitQueue = &(lock->waitProcs);
1612 proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
1613 for (i = 0; i < waitQueue->size; i++)
1616 * If I hold some locks on findlock and another proc
1617 * waits on it holding locks too - check if we are
1618 * waiting one another.
1620 if (proc != MyProc &&
1621 lock == findlock && /* skip_check also true */
1624 LOCKMETHODCTL *lockctl =
1625 LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
1628 if (lockctl->conflictTab[MyProc->token] & proc->holdLock &&
1629 lockctl->conflictTab[proc->token] & MyProc->holdLock)
1634 * No sense in looking at the wait queue of the lock we
1635 * are looking for. If lock == findlock, and I got here,
1636 * skip_check must be true too.
1638 if (lock != findlock)
1640 for (j = 0; j < nprocs; j++)
1641 if (checked_procs[j] == proc)
1643 if (j >= nprocs && lock != findlock)
1645 Assert(nprocs < MAXBACKENDS);
1646 checked_procs[nprocs++] = proc;
1649 * For non-MyProc entries, we are looking only
1650 * waiters, not necessarily people who already
1651 * hold locks and are waiting. Now we check for
1652 * cases where we have two or more tables in a
1653 * deadlock. We do this by continuing to search
1654 * for someone holding a lock bjm
1656 if (DeadLockCheck(&(proc->lockQueue), findlock, false))
1660 proc = (PROC *) MAKE_PTR(proc->links.prev);
1666 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1670 /* if we got here, no deadlock */
1676 * Return an array with the pids of all processes owning a lock.
1677 * This works only for user locks because normal locks have no
1678 * pid information in the corresponding XIDLookupEnt.
1681 LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag)
1683 XIDLookupEnt *xidLook = NULL;
1684 SPINLOCK masterLock;
1686 SHMEM_OFFSET lock_offset;
1688 LOCKMETHODTABLE *lockMethodTable;
1700 /* Assume that no one will modify the result */
1701 static int empty_array[] = {20, 1, 0, 0, 0};
1706 is_user_lock = (lockmethod == USER_LOCKMETHOD);
1709 TPRINTF(TRACE_USERLOCKS, "LockOwners: user lock tag [%u]",
1710 locktag->objId.blkno;,
1714 /* This must be changed when short term locks will be used */
1715 locktag->lockmethod = lockmethod;
1717 Assert((lockmethod >= MIN_LOCKMETHOD) && (lockmethod < NumLockMethods));
1718 lockMethodTable = LockMethodTable[lockmethod];
1719 if (!lockMethodTable)
1721 elog(NOTICE, "lockMethodTable is null in LockOwners");
1722 return (ArrayType *) &empty_array;
1725 if (LockingIsDisabled)
1726 return (ArrayType *) &empty_array;
1728 masterLock = lockMethodTable->ctl->masterLock;
1729 SpinAcquire(masterLock);
1732 * Find a lock with this tag
1734 Assert(lockMethodTable->lockHash->hash == tag_hash);
1735 lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
1739 * let the caller print its own error message, too. Do not elog(WARN).
1743 SpinRelease(masterLock);
1744 elog(NOTICE, "LockOwners: locktable corrupted");
1745 return (ArrayType *) &empty_array;
1750 SpinRelease(masterLock);
1754 TPRINTF(TRACE_USERLOCKS, "LockOwners: no lock with this tag");
1755 return (ArrayType *) &empty_array;
1758 elog(NOTICE, "LockOwners: locktable lookup failed, no lock");
1759 return (ArrayType *) &empty_array;
1761 LOCK_PRINT("LockOwners: found", lock, 0);
1762 Assert((lock->nHolding > 0) && (lock->nActive > 0));
1763 Assert(lock->nActive <= lock->nHolding);
1764 lock_offset = MAKE_OFFSET(lock);
1766 /* Construct a 1-dimensional array */
1768 hdrlen = ARR_OVERHEAD(ndims);
1770 hbounds[0] = lock->nActive;
1771 size = hdrlen + sizeof(int) * hbounds[0];
1772 array = (ArrayType *) palloc(size);
1773 MemSet(array, 0, size);
1774 memmove((char *) array, (char *) &size, sizeof(int));
1775 memmove((char *) ARR_NDIM_PTR(array), (char *) &ndims, sizeof(int));
1776 memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
1777 memmove((char *) ARR_LBOUND(array), (char *) lbounds, ndims * sizeof(int));
1778 SET_LO_FLAG(false, array);
1779 data_ptr = (int *) ARR_DATA_PTR(array);
1781 xidTable = lockMethodTable->xidHash;
1784 while ((xidLook = (XIDLookupEnt *) hash_seq(xidTable)) &&
1785 (xidLook != (XIDLookupEnt *) TRUE))
1789 elog(NOTICE, "LockOwners: possible loop, giving up");
1793 if (xidLook->tag.pid == 0)
1795 XID_PRINT("LockOwners: no pid", xidLook);
1799 if (!xidLook->tag.lock)
1801 XID_PRINT("LockOwners: NULL LOCK", xidLook);
1805 if (xidLook->tag.lock != lock_offset)
1807 XID_PRINT("LockOwners: different lock", xidLook);
1811 if (LOCK_LOCKMETHOD(*lock) != lockmethod)
1813 XID_PRINT("LockOwners: other table", xidLook);
1817 if (xidLook->nHolding <= 0)
1819 XID_PRINT("LockOwners: not holding", xidLook);
1823 if (nitems >= hbounds[0])
1825 elog(NOTICE, "LockOwners: array size exceeded");
1830 * Check that the holding process is still alive by sending him an
1831 * unused (ignored) signal. If the kill fails the process is not
1834 if ((xidLook->tag.pid != MyProcPid) \
1835 &&(kill(xidLook->tag.pid, SIGCHLD)) != 0)
1837 /* Return a negative pid to signal that process is dead */
1838 data_ptr[nitems++] = -(xidLook->tag.pid);
1839 XID_PRINT("LockOwners: not alive", xidLook);
1840 /* XXX - TODO: remove this entry and update lock stats */
1844 /* Found a process holding the lock */
1845 XID_PRINT("LockOwners: holding", xidLook);
1846 data_ptr[nitems++] = xidLook->tag.pid;
1849 SpinRelease(masterLock);
1851 /* Adjust the actual size of the array */
1852 hbounds[0] = nitems;
1853 size = hdrlen + sizeof(int) * hbounds[0];
1854 memmove((char *) array, (char *) &size, sizeof(int));
1855 memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
1861 #ifdef DEADLOCK_DEBUG
1863 * Dump all locks in the proc->lockQueue. Must have already acquired
1869 SHMEM_OFFSET location;
1871 SHM_QUEUE *lockQueue;
1873 XIDLookupEnt *xidLook = NULL;
1874 XIDLookupEnt *tmp = NULL;
1876 SPINLOCK masterLock;
1880 int lockmethod = DEFAULT_LOCKMETHOD;
1881 LOCKMETHODTABLE *lockMethodTable;
1883 ShmemPIDLookup(MyProcPid, &location);
1884 if (location == INVALID_OFFSET)
1886 proc = (PROC *) MAKE_PTR(location);
1889 lockQueue = &proc->lockQueue;
1891 Assert(lockmethod < NumLockMethods);
1892 lockMethodTable = LockMethodTable[lockmethod];
1893 if (!lockMethodTable)
1896 numLockModes = lockMethodTable->ctl->numLockModes;
1897 masterLock = lockMethodTable->ctl->masterLock;
1899 if (SHMQueueEmpty(lockQueue))
1902 SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1903 end = MAKE_OFFSET(lockQueue);
1905 if (MyProc->waitLock)
1906 LOCK_PRINT_AUX("DumpLocks: waiting on", MyProc->waitLock, 0);
1912 elog(NOTICE, "DumpLocks: xid loop detected, giving up");
1916 /* ---------------------------
1917 * XXX Here we assume the shared memory queue is circular and
1918 * that we know its internal structure. Should have some sort of
1919 * macros to allow one to walk it. mer 20 July 1991
1920 * ---------------------------
1922 done = (xidLook->queue.next == end);
1923 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1925 XID_PRINT_AUX("DumpLocks", xidLook);
1926 LOCK_PRINT_AUX("DumpLocks", lock, 0);
1931 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1937 * Dump all postgres locks. Must have already acquired the masterLock.
1942 SHMEM_OFFSET location;
1944 XIDLookupEnt *xidLook = NULL;
1948 int lockmethod = DEFAULT_LOCKMETHOD;
1949 LOCKMETHODTABLE *lockMethodTable;
1953 ShmemPIDLookup(pid, &location);
1954 if (location == INVALID_OFFSET)
1956 proc = (PROC *) MAKE_PTR(location);
1960 Assert(lockmethod < NumLockMethods);
1961 lockMethodTable = LockMethodTable[lockmethod];
1962 if (!lockMethodTable)
1965 xidTable = lockMethodTable->xidHash;
1967 if (MyProc->waitLock)
1968 LOCK_PRINT_AUX("DumpAllLocks: waiting on", MyProc->waitLock, 0);
1971 while ((xidLook = (XIDLookupEnt *) hash_seq(xidTable)) &&
1972 (xidLook != (XIDLookupEnt *) TRUE))
1974 XID_PRINT_AUX("DumpAllLocks", xidLook);
1976 if (xidLook->tag.lock)
1978 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1979 LOCK_PRINT_AUX("DumpAllLocks", lock, 0);
1982 elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL");
1986 elog(NOTICE, "DumpAllLocks: possible loop, giving up");