OSDN Git Service

c1996fe0713a9f0cfec276ee269f5ed95792fcd4
[pg-rex/syncrep.git] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        simple lock acquisition
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.50 1999/05/07 01:23:03 vadim Exp $
11  *
12  * NOTES
13  *        Outside modules can create a lock table and acquire/release
14  *        locks.  A lock table is a shared memory hash table.  When
15  *        a process tries to acquire a lock of a type that conflictRs
16  *        with existing locks, it is put to sleep using the routines
17  *        in storage/lmgr/proc.c.
18  *
19  *      Interface:
20  *
21  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
22  *      LockMethodTableRename(), LockReleaseAll, LockOwners()
23  *      LockResolveConflicts(), GrantLock()
24  *
25  *      NOTE: This module is used to define new lock tables.  The
26  *              multi-level lock table (multi.c) used by the heap
27  *              access methods calls these routines.  See multi.c for
28  *              examples showing how to use this interface.
29  *
30  *-------------------------------------------------------------------------
31  */
32 #include <stdio.h>                              /* for sprintf() */
33 #include <string.h>
34 #include <sys/types.h>
35 #include <unistd.h>
36 #include <signal.h>
37
38 #include "postgres.h"
39 #include "miscadmin.h"
40 #include "storage/shmem.h"
41 #include "storage/sinvaladt.h"
42 #include "storage/spin.h"
43 #include "storage/proc.h"
44 #include "storage/lock.h"
45 #include "utils/hsearch.h"
46 #include "utils/memutils.h"
47 #include "utils/palloc.h"
48 #include "access/xact.h"
49 #include "access/transam.h"
50 #include "utils/trace.h"
51 #include "utils/ps_status.h"
52
53 static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
54
55 /*
56  * lockDebugRelation can be used to trace unconditionally a single relation,
57  * for example pg_listener, if you suspect there are locking problems.
58  *
59  * lockDebugOidMin is is used to avoid tracing postgres relations, which
60  * would produce a lot of output. Unfortunately most system relations are
61  * created after bootstrap and have oid greater than BootstrapObjectIdData.
62  * If you are using tprintf you should specify a value greater than the max
63  * oid of system relations, which can be found with the following query:
64  *
65  *       select max(int4in(int4out(oid))) from pg_class where relname ~ '^pg_';
66  *
67  * To get a useful lock trace you can use the following pg_options:
68  *
69  *       -T "verbose,query,locks,userlocks,lock_debug_oidmin=17500"
70  */
71 #define LOCKDEBUG(lockmethod)   (pg_options[TRACE_SHORTLOCKS+lockmethod])
72 #define lockDebugRelation               (pg_options[TRACE_LOCKRELATION])
73 #define lockDebugOidMin                 (pg_options[TRACE_LOCKOIDMIN])
74 #define lockReadPriority                (pg_options[OPT_LOCKREADPRIORITY])
75
76 #ifdef LOCK_MGR_DEBUG
77 #define LOCK_PRINT(where,lock,type) \
78         if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
79                  && (lock->tag.relId >= lockDebugOidMin)) \
80                 || \
81                 (lockDebugRelation && (lock->tag.relId == lockDebugRelation))) \
82                 LOCK_PRINT_AUX(where,lock,type)
83
84 #define LOCK_PRINT_AUX(where,lock,type) \
85         TPRINTF(TRACE_ALL, \
86                  "%s: lock(%x) tbl(%d) rel(%d) db(%d) obj(%u) mask(%x) " \
87                  "hold(%d,%d,%d,%d,%d)=%d " \
88                  "act(%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", \
89                  where, \
90                  MAKE_OFFSET(lock), \
91                  lock->tag.lockmethod, \
92                  lock->tag.relId, \
93                  lock->tag.dbId, \
94                  lock->tag.objId.blkno, \
95                  lock->mask, \
96                  lock->holders[1], \
97                  lock->holders[2], \
98                  lock->holders[3], \
99                  lock->holders[4], \
100                  lock->holders[5], \
101                  lock->nHolding, \
102                  lock->activeHolders[1], \
103                  lock->activeHolders[2], \
104                  lock->activeHolders[3], \
105                  lock->activeHolders[4], \
106                  lock->activeHolders[5], \
107                  lock->nActive, \
108                  lock->waitProcs.size, \
109                  lock_types[type])
110
111 #define XID_PRINT(where,xidentP) \
112         if (((LOCKDEBUG(XIDENT_LOCKMETHOD(*(xidentP))) >= 1) \
113                  && (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
114                          >= lockDebugOidMin)) \
115                 || (lockDebugRelation && \
116                         (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
117                          == lockDebugRelation))) \
118                 XID_PRINT_AUX(where,xidentP)
119
120 #define XID_PRINT_AUX(where,xidentP) \
121         TPRINTF(TRACE_ALL, \
122                  "%s: xid(%x) lock(%x) tbl(%d) pid(%d) xid(%d) " \
123                  "hold(%d,%d,%d,%d,%d)=%d", \
124                  where, \
125                  MAKE_OFFSET(xidentP), \
126                  xidentP->tag.lock, \
127                  XIDENT_LOCKMETHOD(*(xidentP)), \
128                  xidentP->tag.pid, \
129                  xidentP->tag.xid, \
130                  xidentP->holders[1], \
131                  xidentP->holders[2], \
132                  xidentP->holders[3], \
133                  xidentP->holders[4], \
134                  xidentP->holders[5], \
135                  xidentP->nHolding)
136
137 #else                                                   /* !LOCK_MGR_DEBUG */
138 #define LOCK_PRINT(where,lock,type)
139 #define LOCK_PRINT_AUX(where,lock,type)
140 #define XID_PRINT(where,xidentP)
141 #define XID_PRINT_AUX(where,xidentP)
142 #endif   /* !LOCK_MGR_DEBUG */
143
144 static char *lock_types[] = {
145         "INVALID",
146         "AccessShareLock",
147         "RowShareLock",
148         "RowExclusiveLock",
149         "ShareLock",
150         "ShareRowExclusiveLock",
151         "ExclusiveLock",
152         "AccessExclusiveLock"
153 };
154
155 SPINLOCK        LockMgrLock;            /* in Shmem or created in
156                                                                  * CreateSpinlocks() */
157
158 /* This is to simplify/speed up some bit arithmetic */
159
160 static MASK BITS_OFF[MAX_LOCKMODES];
161 static MASK BITS_ON[MAX_LOCKMODES];
162
163 /* -----------------
164  * XXX Want to move this to this file
165  * -----------------
166  */
167 static bool LockingIsDisabled;
168
169 /* -------------------
170  * map from lockmethod to the lock table structure
171  * -------------------
172  */
173 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
174
175 static int      NumLockMethods;
176
177 /* -------------------
178  * InitLocks -- Init the lock module.  Create a private data
179  *              structure for constructing conflict masks.
180  * -------------------
181  */
182 void
183 InitLocks()
184 {
185         int                     i;
186         int                     bit;
187
188         bit = 1;
189         /* -------------------
190          * remember 0th lockmode is invalid
191          * -------------------
192          */
193         for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
194         {
195                 BITS_ON[i] = bit;
196                 BITS_OFF[i] = ~bit;
197         }
198
199 #ifdef LOCK_MGR_DEBUG
200
201         /*
202          * If lockDebugOidMin value has not been specified in pg_options set a
203          * default value.
204          */
205         if (!lockDebugOidMin)
206                 lockDebugOidMin = BootstrapObjectIdData;
207 #endif
208 }
209
210 /* -------------------
211  * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
212  * ------------------
213  */
214 void
215 LockDisable(int status)
216 {
217         LockingIsDisabled = status;
218 }
219
220
221 /*
222  * LockMethodInit -- initialize the lock table's lock type
223  *              structures
224  *
225  * Notes: just copying.  Should only be called once.
226  */
227 static void
228 LockMethodInit(LOCKMETHODTABLE * lockMethodTable,
229                            MASK *conflictsP,
230                            int *prioP,
231                            int numModes)
232 {
233         int                     i;
234
235         lockMethodTable->ctl->numLockModes = numModes;
236         numModes++;
237         for (i = 0; i < numModes; i++, prioP++, conflictsP++)
238         {
239                 lockMethodTable->ctl->conflictTab[i] = *conflictsP;
240                 lockMethodTable->ctl->prio[i] = *prioP;
241         }
242 }
243
244 /*
245  * LockMethodTableInit -- initialize a lock table structure
246  *
247  * Notes:
248  *              (a) a lock table has four separate entries in the shmem index
249  *              table.  This is because every shared hash table and spinlock
250  *              has its name stored in the shmem index at its creation.  It
251  *              is wasteful, in this case, but not much space is involved.
252  *
253  */
254 LOCKMETHOD
255 LockMethodTableInit(char *tabName,
256                                         MASK *conflictsP,
257                                         int *prioP,
258                                         int numModes)
259 {
260         LOCKMETHODTABLE *lockMethodTable;
261         char       *shmemName;
262         HASHCTL         info;
263         int                     hash_flags;
264         bool            found;
265         int                     status = TRUE;
266
267         if (numModes > MAX_LOCKMODES)
268         {
269                 elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
270                          numModes, MAX_LOCKMODES);
271                 return INVALID_LOCKMETHOD;
272         }
273
274         /* allocate a string for the shmem index table lookup */
275         shmemName = (char *) palloc((unsigned) (strlen(tabName) + 32));
276         if (!shmemName)
277         {
278                 elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s \n", tabName);
279                 return INVALID_LOCKMETHOD;
280         }
281
282         /* each lock table has a non-shared header */
283         lockMethodTable = (LOCKMETHODTABLE *) palloc((unsigned) sizeof(LOCKMETHODTABLE));
284         if (!lockMethodTable)
285         {
286                 elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %s\n", tabName);
287                 pfree(shmemName);
288                 return INVALID_LOCKMETHOD;
289         }
290
291         /* ------------------------
292          * find/acquire the spinlock for the table
293          * ------------------------
294          */
295         SpinAcquire(LockMgrLock);
296
297
298         /* -----------------------
299          * allocate a control structure from shared memory or attach to it
300          * if it already exists.
301          * -----------------------
302          */
303         sprintf(shmemName, "%s (ctl)", tabName);
304         lockMethodTable->ctl = (LOCKMETHODCTL *)
305                 ShmemInitStruct(shmemName, (unsigned) sizeof(LOCKMETHODCTL), &found);
306
307         if (!lockMethodTable->ctl)
308         {
309                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
310                 status = FALSE;
311         }
312
313         /* -------------------
314          * no zero-th table
315          * -------------------
316          */
317         NumLockMethods = 1;
318
319         /* ----------------
320          * we're first - initialize
321          * ----------------
322          */
323         if (!found)
324         {
325                 MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
326                 lockMethodTable->ctl->masterLock = LockMgrLock;
327                 lockMethodTable->ctl->lockmethod = NumLockMethods;
328         }
329
330         /* --------------------
331          * other modules refer to the lock table by a lockmethod
332          * --------------------
333          */
334         LockMethodTable[NumLockMethods] = lockMethodTable;
335         NumLockMethods++;
336         Assert(NumLockMethods <= MAX_LOCK_METHODS);
337
338         /* ----------------------
339          * allocate a hash table for the lock tags.  This is used
340          * to find the different locks.
341          * ----------------------
342          */
343         info.keysize = SHMEM_LOCKTAB_KEYSIZE;
344         info.datasize = SHMEM_LOCKTAB_DATASIZE;
345         info.hash = tag_hash;
346         hash_flags = (HASH_ELEM | HASH_FUNCTION);
347
348         sprintf(shmemName, "%s (lock hash)", tabName);
349         lockMethodTable->lockHash = (HTAB *) ShmemInitHash(shmemName,
350                                                                                  INIT_TABLE_SIZE, MAX_TABLE_SIZE,
351                                                                                                            &info, hash_flags);
352
353         Assert(lockMethodTable->lockHash->hash == tag_hash);
354         if (!lockMethodTable->lockHash)
355         {
356                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
357                 status = FALSE;
358         }
359
360         /* -------------------------
361          * allocate an xid table.  When different transactions hold
362          * the same lock, additional information must be saved (locks per tx).
363          * -------------------------
364          */
365         info.keysize = SHMEM_XIDTAB_KEYSIZE;
366         info.datasize = SHMEM_XIDTAB_DATASIZE;
367         info.hash = tag_hash;
368         hash_flags = (HASH_ELEM | HASH_FUNCTION);
369
370         sprintf(shmemName, "%s (xid hash)", tabName);
371         lockMethodTable->xidHash = (HTAB *) ShmemInitHash(shmemName,
372                                                                                  INIT_TABLE_SIZE, MAX_TABLE_SIZE,
373                                                                                                           &info, hash_flags);
374
375         if (!lockMethodTable->xidHash)
376         {
377                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
378                 status = FALSE;
379         }
380
381         /* init ctl data structures */
382         LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
383
384         SpinRelease(LockMgrLock);
385
386         pfree(shmemName);
387
388         if (status)
389                 return lockMethodTable->ctl->lockmethod;
390         else
391                 return INVALID_LOCKMETHOD;
392 }
393
394 /*
395  * LockMethodTableRename -- allocate another lockmethod to the same
396  *              lock table.
397  *
398  * NOTES: Both the lock module and the lock chain (lchain.c)
399  *              module use table id's to distinguish between different
400  *              kinds of locks.  Short term and long term locks look
401  *              the same to the lock table, but are handled differently
402  *              by the lock chain manager.      This function allows the
403  *              client to use different lockmethods when acquiring/releasing
404  *              short term and long term locks.
405  */
406
407 LOCKMETHOD
408 LockMethodTableRename(LOCKMETHOD lockmethod)
409 {
410         LOCKMETHOD      newLockMethod;
411
412         if (NumLockMethods >= MAX_LOCK_METHODS)
413                 return INVALID_LOCKMETHOD;
414         if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
415                 return INVALID_LOCKMETHOD;
416
417         /* other modules refer to the lock table by a lockmethod */
418         newLockMethod = NumLockMethods;
419         NumLockMethods++;
420
421         LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
422         return newLockMethod;
423 }
424
425 /*
426  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
427  *              set lock if/when no conflicts.
428  *
429  * Returns: TRUE if parameters are correct, FALSE otherwise.
430  *
431  * Side Effects: The lock is always acquired.  No way to abort
432  *              a lock acquisition other than aborting the transaction.
433  *              Lock is recorded in the lkchain.
434  *
435 #ifdef USER_LOCKS
436  *
437  * Note on User Locks:
438  *
439  *              User locks are handled totally on the application side as
440  *              long term cooperative locks which extend beyond the normal
441  *              transaction boundaries.  Their purpose is to indicate to an
442  *              application that someone is `working' on an item.  So it is
443  *              possible to put an user lock on a tuple's oid, retrieve the
444  *              tuple, work on it for an hour and then update it and remove
445  *              the lock.  While the lock is active other clients can still
446  *              read and write the tuple but they can be aware that it has
447  *              been locked at the application level by someone.
448  *              User locks use lock tags made of an uint16 and an uint32, for
449  *              example 0 and a tuple oid, or any other arbitrary pair of
450  *              numbers following a convention established by the application.
451  *              In this sense tags don't refer to tuples or database entities.
452  *              User locks and normal locks are completely orthogonal and
453  *              they don't interfere with each other, so it is possible
454  *              to acquire a normal lock on an user-locked tuple or user-lock
455  *              a tuple for which a normal write lock already exists.
456  *              User locks are always non blocking, therefore they are never
457  *              acquired if already held by another process.  They must be
458  *              released explicitly by the application but they are released
459  *              automatically when a backend terminates.
460  *              They are indicated by a lockmethod 2 which is an alias for the
461  *              normal lock table, and are distinguished from normal locks
462  *              for the following differences:
463  *
464  *                                                                              normal lock             user lock
465  *
466  *              lockmethod                                              1                               2
467  *              tag.relId                                               rel oid                 0
468  *              tag.ItemPointerData.ip_blkid    block id                lock id2
469  *              tag.ItemPointerData.ip_posid    tuple offset    lock id1
470  *              xid.pid                                                 0                               backend pid
471  *              xid.xid                                                 xid or 0                0
472  *              persistence                                             transaction             user or backend
473  *
474  *              The lockmode parameter can have the same values for normal locks
475  *              although probably only WRITE_LOCK can have some practical use.
476  *
477  *                                                                                                              DZ - 22 Nov 1997
478 #endif
479  */
480
481 bool
482 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
483 {
484         XIDLookupEnt *result,
485                                 item;
486         HTAB       *xidTable;
487         bool            found;
488         LOCK       *lock = NULL;
489         SPINLOCK        masterLock;
490         LOCKMETHODTABLE *lockMethodTable;
491         int                     status;
492         TransactionId xid;
493
494 #ifdef USER_LOCKS
495         int                     is_user_lock;
496
497         is_user_lock = (lockmethod == USER_LOCKMETHOD);
498         if (is_user_lock)
499         {
500 #ifdef USER_LOCKS_DEBUG
501                 TPRINTF(TRACE_USERLOCKS, "LockAcquire: user lock [%u] %s",
502                                 locktag->objId.blkno,
503                                 lock_types[lockmode]);
504 #endif
505         }
506 #endif
507
508         /* ???????? This must be changed when short term locks will be used */
509         locktag->lockmethod = lockmethod;
510
511         Assert(lockmethod < NumLockMethods);
512         lockMethodTable = LockMethodTable[lockmethod];
513         if (!lockMethodTable)
514         {
515                 elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
516                 return FALSE;
517         }
518
519         if (LockingIsDisabled)
520                 return TRUE;
521
522         masterLock = lockMethodTable->ctl->masterLock;
523
524         SpinAcquire(masterLock);
525
526         /*
527          * Find or create a lock with this tag
528          */
529         Assert(lockMethodTable->lockHash->hash == tag_hash);
530         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
531                                                                 HASH_ENTER, &found);
532         if (!lock)
533         {
534                 SpinRelease(masterLock);
535                 elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
536                 return FALSE;
537         }
538
539         /* --------------------
540          * if there was nothing else there, complete initialization
541          * --------------------
542          */
543         if (!found)
544         {
545                 lock->mask = 0;
546                 lock->nHolding = 0;
547                 lock->nActive = 0;
548                 MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
549                 MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
550                 ProcQueueInit(&(lock->waitProcs));
551                 Assert(lock->tag.objId.blkno == locktag->objId.blkno);
552                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
553         }
554         else
555         {
556                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
557                 Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
558                 Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
559                 Assert(lock->nActive <= lock->nHolding);
560         }
561
562         /* ------------------
563          * add an element to the lock queue so that we can clear the
564          * locks at end of transaction.
565          * ------------------
566          */
567         xidTable = lockMethodTable->xidHash;
568
569         /* ------------------
570          * Zero out all of the tag bytes (this clears the padding bytes for long
571          * word alignment and ensures hashing consistency).
572          * ------------------
573          */
574         MemSet(&item, 0, XID_TAGSIZE);          /* must clear padding, needed */
575         item.tag.lock = MAKE_OFFSET(lock);
576 #ifdef USE_XIDTAG_LOCKMETHOD
577         item.tag.lockmethod = lockmethod;
578 #endif
579 #ifdef USER_LOCKS
580         if (is_user_lock)
581         {
582                 item.tag.pid = MyProcPid;
583                 item.tag.xid = xid = 0;
584         }
585         else
586         {
587                 xid = GetCurrentTransactionId();
588                 TransactionIdStore(xid, &item.tag.xid);
589         }
590 #else
591         xid = GetCurrentTransactionId();
592         TransactionIdStore(xid, &item.tag.xid);
593 #endif
594
595         /*
596          * Find or create an xid entry with this tag
597          */
598         result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
599                                                                                   HASH_ENTER, &found);
600         if (!result)
601         {
602                 elog(NOTICE, "LockAcquire: xid table corrupted");
603                 return STATUS_ERROR;
604         }
605
606         /*
607          * If not found initialize the new entry
608          */
609         if (!found)
610         {
611                 result->nHolding = 0;
612                 MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
613                 ProcAddLock(&result->queue);
614                 XID_PRINT("LockAcquire: new", result);
615         }
616         else
617         {
618                 XID_PRINT("LockAcquire: found", result);
619                 Assert((result->nHolding > 0) && (result->holders[lockmode] >= 0));
620                 Assert(result->nHolding <= lock->nActive);
621         }
622
623         /* ----------------
624          * lock->nholding tells us how many processes have _tried_ to
625          * acquire this lock,  Regardless of whether they succeeded or
626          * failed in doing so.
627          * ----------------
628          */
629         lock->nHolding++;
630         lock->holders[lockmode]++;
631         Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
632
633         /* --------------------
634          * If I'm the only one holding a lock, then there
635          * cannot be a conflict. The same is true if we already
636          * hold this lock.
637          * --------------------
638          */
639         if (result->nHolding == lock->nActive || result->holders[lockmode] != 0)
640         {
641                 result->holders[lockmode]++;
642                 result->nHolding++;
643                 XID_PRINT("LockAcquire: owning", result);
644                 Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
645                 GrantLock(lock, lockmode);
646                 SpinRelease(masterLock);
647                 return TRUE;
648         }
649
650         /*
651          * If lock requested conflicts with locks requested by waiters...
652          */
653         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
654         {
655                 int             i = 1;
656
657                 /*
658                  * If I don't hold locks or my locks don't conflict 
659                  * with waiters then force to sleep.
660                  */
661                 if (result->nHolding > 0)
662                 {
663                         for ( ; i <= lockMethodTable->ctl->numLockModes; i++)
664                         {
665                                 if (result->holders[i] > 0 && 
666                                         lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
667                                         break;  /* conflict */
668                         }
669                 }
670         
671                 if (result->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
672                 {
673                         XID_PRINT("LockAcquire: higher priority proc waiting",
674                                           result);
675                         status = STATUS_FOUND;
676                 }
677                 else
678                         status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
679         }
680         else
681                 status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
682
683         if (status == STATUS_OK)
684                 GrantLock(lock, lockmode);
685         else if (status == STATUS_FOUND)
686         {
687 #ifdef USER_LOCKS
688
689                 /*
690                  * User locks are non blocking. If we can't acquire a lock we must
691                  * remove the xid entry and return FALSE without waiting.
692                  */
693                 if (is_user_lock)
694                 {
695                         if (!result->nHolding)
696                         {
697                                 SHMQueueDelete(&result->queue);
698                                 result = (XIDLookupEnt *) hash_search(xidTable,
699                                                                                                           (Pointer) result,
700                                                                                                         HASH_REMOVE, &found);
701                                 if (!result || !found)
702                                         elog(NOTICE, "LockAcquire: remove xid, table corrupted");
703                         }
704                         else
705                                 XID_PRINT_AUX("LockAcquire: NHOLDING", result);
706                         lock->nHolding--;
707                         lock->holders[lockmode]--;
708                         LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
709                         Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
710                         Assert(lock->nActive <= lock->nHolding);
711                         SpinRelease(masterLock);
712                         return FALSE;
713                 }
714 #endif
715                 /*
716                  * Construct bitmask of locks we hold before going to sleep.
717                  */
718                 MyProc->holdLock = 0;
719                 if (result->nHolding > 0)
720                 {
721                         int             i,
722                                         tmpMask = 2;
723
724                         for (i = 1; i <= lockMethodTable->ctl->numLockModes; 
725                                         i++, tmpMask <<= 1)
726                         {
727                                 if (result->holders[i] > 0)
728                                         MyProc->holdLock |= tmpMask;
729                         }
730                         Assert(MyProc->holdLock != 0);
731                 }
732
733                 status = WaitOnLock(lockmethod, lock, lockmode);
734
735                 /*
736                  * Check the xid entry status, in case something in the ipc
737                  * communication doesn't work correctly.
738                  */
739                 if (!((result->nHolding > 0) && (result->holders[lockmode] > 0)))
740                 {
741                         XID_PRINT_AUX("LockAcquire: INCONSISTENT ", result);
742                         LOCK_PRINT_AUX("LockAcquire: INCONSISTENT ", lock, lockmode);
743                         /* Should we retry ? */
744                         return FALSE;
745                 }
746                 XID_PRINT("LockAcquire: granted", result);
747                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
748         }
749
750         SpinRelease(masterLock);
751
752         return status == STATUS_OK;
753 }
754
755 /* ----------------------------
756  * LockResolveConflicts -- test for lock conflicts
757  *
758  * NOTES:
759  *              Here's what makes this complicated: one transaction's
760  * locks don't conflict with one another.  When many processes
761  * hold locks, each has to subtract off the other's locks when
762  * determining whether or not any new lock acquired conflicts with
763  * the old ones.
764  *
765  * ----------------------------
766  */
767 int
768 LockResolveConflicts(LOCKMETHOD lockmethod,
769                                          LOCK *lock,
770                                          LOCKMODE lockmode,
771                                          TransactionId xid,
772                                          XIDLookupEnt *xidentP)         /* xident ptr or NULL */
773 {
774         XIDLookupEnt *result,
775                                 item;
776         int                *myHolders;
777         int                     numLockModes;
778         HTAB       *xidTable;
779         bool            found;
780         int                     bitmask;
781         int                     i,
782                                 tmpMask;
783
784 #ifdef USER_LOCKS
785         int                     is_user_lock;
786
787 #endif
788
789         numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
790         xidTable = LockMethodTable[lockmethod]->xidHash;
791
792         if (xidentP)
793         {
794
795                 /*
796                  * A pointer to the xid entry was supplied from the caller.
797                  * Actually only LockAcquire can do it.
798                  */
799                 result = xidentP;
800         }
801         else
802         {
803                 /* ---------------------
804                  * read my own statistics from the xid table.  If there
805                  * isn't an entry, then we'll just add one.
806                  *
807                  * Zero out the tag, this clears the padding bytes for long
808                  * word alignment and ensures hashing consistency.
809                  * ------------------
810                  */
811                 MemSet(&item, 0, XID_TAGSIZE);
812                 item.tag.lock = MAKE_OFFSET(lock);
813 #ifdef USE_XIDTAG_LOCKMETHOD
814                 item.tag.lockmethod = lockmethod;
815 #endif
816 #ifdef USER_LOCKS
817                 is_user_lock = (lockmethod == 2);
818                 if (is_user_lock)
819                 {
820                         item.tag.pid = MyProcPid;
821                         item.tag.xid = 0;
822                 }
823                 else
824                         TransactionIdStore(xid, &item.tag.xid);
825 #else
826                 TransactionIdStore(xid, &item.tag.xid);
827 #endif
828
829                 /*
830                  * Find or create an xid entry with this tag
831                  */
832                 result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
833                                                                                           HASH_ENTER, &found);
834                 if (!result)
835                 {
836                         elog(NOTICE, "LockResolveConflicts: xid table corrupted");
837                         return STATUS_ERROR;
838                 }
839
840                 /*
841                  * If not found initialize the new entry. THIS SHOULD NEVER
842                  * HAPPEN, if we are trying to resolve a conflict we must already
843                  * have allocated an xid entry for this lock.    dz 21-11-1997
844                  */
845                 if (!found)
846                 {
847                         /* ---------------
848                          * we're not holding any type of lock yet.  Clear
849                          * the lock stats.
850                          * ---------------
851                          */
852                         MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders)));
853                         result->nHolding = 0;
854                         XID_PRINT_AUX("LockResolveConflicts: NOT FOUND", result);
855                 }
856                 else
857                         XID_PRINT("LockResolveConflicts: found", result);
858         }
859         Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
860
861         /* ----------------------------
862          * first check for global conflicts: If no locks conflict
863          * with mine, then I get the lock.
864          *
865          * Checking for conflict: lock->mask represents the types of
866          * currently held locks.  conflictTable[lockmode] has a bit
867          * set for each type of lock that conflicts with mine.  Bitwise
868          * compare tells if there is a conflict.
869          * ----------------------------
870          */
871         if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
872         {
873                 result->holders[lockmode]++;
874                 result->nHolding++;
875                 XID_PRINT("LockResolveConflicts: no conflict", result);
876                 Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
877                 return STATUS_OK;
878         }
879
880         /* ------------------------
881          * Rats.  Something conflicts. But it could still be my own
882          * lock.  We have to construct a conflict mask
883          * that does not reflect our own locks.
884          * ------------------------
885          */
886         myHolders = result->holders;
887         bitmask = 0;
888         tmpMask = 2;
889         for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
890         {
891                 if (lock->activeHolders[i] != myHolders[i])
892                         bitmask |= tmpMask;
893         }
894
895         /* ------------------------
896          * now check again for conflicts.  'bitmask' describes the types
897          * of locks held by other processes.  If one of these
898          * conflicts with the kind of lock that I want, there is a
899          * conflict and I have to sleep.
900          * ------------------------
901          */
902         if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
903         {
904                 /* no conflict. Get the lock and go on */
905                 result->holders[lockmode]++;
906                 result->nHolding++;
907                 XID_PRINT("LockResolveConflicts: resolved", result);
908                 Assert((result->nHolding > 0) && (result->holders[lockmode] > 0));
909                 return STATUS_OK;
910         }
911
912         XID_PRINT("LockResolveConflicts: conflicting", result);
913         return STATUS_FOUND;
914 }
915
916 /*
917  * GrantLock -- update the lock data structure to show
918  *              the new lock holder.
919  */
920 void
921 GrantLock(LOCK *lock, LOCKMODE lockmode)
922 {
923         lock->nActive++;
924         lock->activeHolders[lockmode]++;
925         lock->mask |= BITS_ON[lockmode];
926         LOCK_PRINT("GrantLock", lock, lockmode);
927         Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
928         Assert(lock->nActive <= lock->nHolding);
929 }
930
931 static int
932 WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
933 {
934         PROC_QUEUE *waitQueue = &(lock->waitProcs);
935         LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
936         char            old_status[64],
937                                 new_status[64];
938
939         Assert(lockmethod < NumLockMethods);
940
941         /*
942          * the waitqueue is ordered by priority. I insert myself according to
943          * the priority of the lock I am acquiring.
944          *
945          * SYNC NOTE: I am assuming that the lock table spinlock is sufficient
946          * synchronization for this queue.      That will not be true if/when
947          * people can be deleted from the queue by a SIGINT or something.
948          */
949         LOCK_PRINT_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
950         strcpy(old_status, PS_STATUS);
951         strcpy(new_status, PS_STATUS);
952         strcat(new_status, " waiting");
953         PS_SET_STATUS(new_status);
954         if (ProcSleep(waitQueue,
955                                   lockMethodTable->ctl,
956                                   lockmode,
957                                   lock) != NO_ERROR)
958         {
959                 /* -------------------
960                  * This could have happend as a result of a deadlock,
961                  * see HandleDeadLock().
962                  * Decrement the lock nHolding and holders fields as
963                  * we are no longer waiting on this lock.
964                  * -------------------
965                  */
966                 lock->nHolding--;
967                 lock->holders[lockmode]--;
968                 LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode);
969                 Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
970                 Assert(lock->nActive <= lock->nHolding);
971                 if (lock->activeHolders[lockmode] == lock->holders[lockmode])
972                         lock->waitMask &= BITS_OFF[lockmode];
973                 SpinRelease(lockMethodTable->ctl->masterLock);
974                 elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
975
976                 /* not reached */
977         }
978
979         if (lock->activeHolders[lockmode] == lock->holders[lockmode])
980                 lock->waitMask &= BITS_OFF[lockmode];
981         PS_SET_STATUS(old_status);
982         LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
983         return STATUS_OK;
984 }
985
986 /*
987  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
988  *              release it.
989  *
990  * Side Effects: if the lock no longer conflicts with the highest
991  *              priority waiting process, that process is granted the lock
992  *              and awoken. (We have to grant the lock here to avoid a
993  *              race between the waking process and any new process to
994  *              come along and request the lock).
995  */
996 bool
997 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
998 {
999         LOCK       *lock = NULL;
1000         SPINLOCK        masterLock;
1001         bool            found;
1002         LOCKMETHODTABLE *lockMethodTable;
1003         XIDLookupEnt *result,
1004                                 item;
1005         HTAB       *xidTable;
1006         TransactionId xid;
1007         bool            wakeupNeeded = true;
1008         int                     trace_flag;
1009
1010 #ifdef USER_LOCKS
1011         int                     is_user_lock;
1012
1013         is_user_lock = (lockmethod == USER_LOCKMETHOD);
1014         if (is_user_lock)
1015         {
1016                 TPRINTF(TRACE_USERLOCKS, "LockRelease: user lock tag [%u] %d",
1017                                 locktag->objId.blkno,
1018                                 lockmode);
1019         }
1020 #endif
1021
1022         /* ???????? This must be changed when short term locks will be used */
1023         locktag->lockmethod = lockmethod;
1024
1025 #ifdef USER_LOCKS
1026         trace_flag = \
1027                 (lockmethod == USER_LOCKMETHOD) ? TRACE_USERLOCKS : TRACE_LOCKS;
1028 #else
1029         trace_flag = TRACE_LOCKS;
1030 #endif
1031
1032         Assert(lockmethod < NumLockMethods);
1033         lockMethodTable = LockMethodTable[lockmethod];
1034         if (!lockMethodTable)
1035         {
1036                 elog(NOTICE, "lockMethodTable is null in LockRelease");
1037                 return FALSE;
1038         }
1039
1040         if (LockingIsDisabled)
1041                 return TRUE;
1042
1043         masterLock = lockMethodTable->ctl->masterLock;
1044         SpinAcquire(masterLock);
1045
1046         /*
1047          * Find a lock with this tag
1048          */
1049         Assert(lockMethodTable->lockHash->hash == tag_hash);
1050         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
1051                                                                 HASH_FIND, &found);
1052
1053         /*
1054          * let the caller print its own error message, too. Do not
1055          * elog(ERROR).
1056          */
1057         if (!lock)
1058         {
1059                 SpinRelease(masterLock);
1060                 elog(NOTICE, "LockRelease: locktable corrupted");
1061                 return FALSE;
1062         }
1063
1064         if (!found)
1065         {
1066                 SpinRelease(masterLock);
1067 #ifdef USER_LOCKS
1068                 if (is_user_lock)
1069                 {
1070                         TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
1071                         return FALSE;
1072                 }
1073 #endif
1074                 elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
1075                 return FALSE;
1076         }
1077         LOCK_PRINT("LockRelease: found", lock, lockmode);
1078         Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
1079         Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
1080         Assert(lock->nActive <= lock->nHolding);
1081
1082
1083         /* ------------------
1084          * Zero out all of the tag bytes (this clears the padding bytes for long
1085          * word alignment and ensures hashing consistency).
1086          * ------------------
1087          */
1088         MemSet(&item, 0, XID_TAGSIZE);
1089         item.tag.lock = MAKE_OFFSET(lock);
1090 #ifdef USE_XIDTAG_LOCKMETHOD
1091         item.tag.lockmethod = lockmethod;
1092 #endif
1093 #ifdef USER_LOCKS
1094         if (is_user_lock)
1095         {
1096                 item.tag.pid = MyProcPid;
1097                 item.tag.xid = xid = 0;
1098         }
1099         else
1100         {
1101                 xid = GetCurrentTransactionId();
1102                 TransactionIdStore(xid, &item.tag.xid);
1103         }
1104 #else
1105         xid = GetCurrentTransactionId();
1106         TransactionIdStore(xid, &item.tag.xid);
1107 #endif
1108
1109         /*
1110          * Find an xid entry with this tag
1111          */
1112         xidTable = lockMethodTable->xidHash;
1113         result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item,
1114                                                                                   HASH_FIND_SAVE, &found);
1115         if (!result || !found)
1116         {
1117                 SpinRelease(masterLock);
1118 #ifdef USER_LOCKS
1119                 if (!found && is_user_lock)
1120                         TPRINTF(TRACE_USERLOCKS, "LockRelease: no lock with this tag");
1121                 else
1122 #endif
1123                         elog(NOTICE, "LockReplace: xid table corrupted");
1124                 return FALSE;
1125         }
1126         XID_PRINT("LockRelease: found", result);
1127         Assert(result->tag.lock == MAKE_OFFSET(lock));
1128
1129         /*
1130          * Check that we are actually holding a lock of the type we want to
1131          * release.
1132          */
1133         if (!(result->holders[lockmode] > 0))
1134         {
1135                 SpinRelease(masterLock);
1136                 XID_PRINT_AUX("LockAcquire: WRONGTYPE", result);
1137                 elog(NOTICE, "LockRelease: you don't own a lock of type %s",
1138                          lock_types[lockmode]);
1139                 Assert(result->holders[lockmode] >= 0);
1140                 return FALSE;
1141         }
1142         Assert(result->nHolding > 0);
1143
1144         /*
1145          * fix the general lock stats
1146          */
1147         lock->nHolding--;
1148         lock->holders[lockmode]--;
1149         lock->nActive--;
1150         lock->activeHolders[lockmode]--;
1151
1152 #ifdef NOT_USED
1153         /* --------------------------
1154          * If there are still active locks of the type I just released, no one
1155          * should be woken up.  Whoever is asleep will still conflict
1156          * with the remaining locks.
1157          * --------------------------
1158          */
1159         if (lock->activeHolders[lockmode])
1160                 wakeupNeeded = false;
1161         else
1162 #endif
1163         /*
1164          * Above is not valid any more (due to MVCC lock modes). 
1165          * Actually we should compare activeHolders[lockmode] with 
1166          * number of waiters holding lock of this type and try to
1167          * wakeup only if these numbers are equal (and lock released 
1168          * conflicts with locks requested by waiters). For the moment
1169          * we only check the last condition.
1170          */
1171         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
1172                 wakeupNeeded = true;
1173
1174         if (!(lock->activeHolders[lockmode]))
1175         {
1176                 /* change the conflict mask.  No more of this lock type. */
1177                 lock->mask &= BITS_OFF[lockmode];
1178         }
1179
1180         LOCK_PRINT("LockRelease: updated", lock, lockmode);
1181         Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
1182         Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
1183         Assert(lock->nActive <= lock->nHolding);
1184
1185         if (!lock->nHolding)
1186         {
1187                 /* ------------------
1188                  * if there's no one waiting in the queue,
1189                  * we just released the last lock.
1190                  * Delete it from the lock table.
1191                  * ------------------
1192                  */
1193                 Assert(lockMethodTable->lockHash->hash == tag_hash);
1194                 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1195                                                                         (Pointer) &(lock->tag),
1196                                                                         HASH_REMOVE,
1197                                                                         &found);
1198                 Assert(lock && found);
1199                 wakeupNeeded = false;
1200         }
1201
1202         /*
1203          * now check to see if I have any private locks.  If I do, decrement
1204          * the counts associated with them.
1205          */
1206         result->holders[lockmode]--;
1207         result->nHolding--;
1208         XID_PRINT("LockRelease: updated", result);
1209         Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
1210
1211         /*
1212          * If this was my last hold on this lock, delete my entry in the XID
1213          * table.
1214          */
1215         if (!result->nHolding)
1216         {
1217                 if (result->queue.prev == INVALID_OFFSET)
1218                         elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
1219                 if (result->queue.next == INVALID_OFFSET)
1220                         elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
1221                 if (result->queue.next != INVALID_OFFSET)
1222                         SHMQueueDelete(&result->queue);
1223                 XID_PRINT("LockRelease: deleting", result);
1224                 result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &result,
1225                                                                                           HASH_REMOVE_SAVED, &found);
1226                 if (!result || !found)
1227                 {
1228                         SpinRelease(masterLock);
1229                         elog(NOTICE, "LockRelease: remove xid, table corrupted");
1230                         return FALSE;
1231                 }
1232         }
1233
1234         if (wakeupNeeded)
1235         {
1236                 ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
1237         }
1238         else
1239         {
1240                 if (((LOCKDEBUG(LOCK_LOCKMETHOD(*(lock))) >= 1) \
1241                          && (lock->tag.relId >= lockDebugOidMin)) \
1242                         || \
1243                         (lockDebugRelation && (lock->tag.relId == lockDebugRelation)))
1244                         TPRINTF(TRACE_ALL, "LockRelease: no wakeup needed");
1245         }
1246
1247         SpinRelease(masterLock);
1248         return TRUE;
1249 }
1250
1251 /*
1252  * LockReleaseAll -- Release all locks in a process lock queue.
1253  */
1254 bool
1255 LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
1256 {
1257         PROC_QUEUE *waitQueue;
1258         int                     done;
1259         XIDLookupEnt *xidLook = NULL;
1260         XIDLookupEnt *tmp = NULL;
1261         XIDLookupEnt *result;
1262         SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1263         SPINLOCK        masterLock;
1264         LOCKMETHODTABLE *lockMethodTable;
1265         int                     i,
1266                                 numLockModes;
1267         LOCK       *lock;
1268         bool            found;
1269         int                     trace_flag;
1270         int                     xidtag_lockmethod;
1271
1272 #ifdef USER_LOCKS
1273         int                     is_user_lock_table,
1274                                 count,
1275                                 nleft;
1276
1277         count = nleft = 0;
1278
1279         is_user_lock_table = (lockmethod == USER_LOCKMETHOD);
1280         trace_flag = (lockmethod == 2) ? TRACE_USERLOCKS : TRACE_LOCKS;
1281 #else
1282         trace_flag = TRACE_LOCKS;
1283 #endif
1284         TPRINTF(trace_flag, "LockReleaseAll: lockmethod=%d, pid=%d",
1285                         lockmethod, MyProcPid);
1286
1287         Assert(lockmethod < NumLockMethods);
1288         lockMethodTable = LockMethodTable[lockmethod];
1289         if (!lockMethodTable)
1290         {
1291                 elog(NOTICE, "LockAcquire: bad lockmethod %d", lockmethod);
1292                 return FALSE;
1293         }
1294
1295         if (SHMQueueEmpty(lockQueue))
1296                 return TRUE;
1297
1298         numLockModes = lockMethodTable->ctl->numLockModes;
1299         masterLock = lockMethodTable->ctl->masterLock;
1300
1301         SpinAcquire(masterLock);
1302         SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1303
1304         for (;;)
1305         {
1306                 bool    wakeupNeeded = false;
1307
1308                 /*
1309                  * Sometimes the queue appears to be messed up.
1310                  */
1311                 if (count++ > 1000)
1312                 {
1313                         elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
1314                         nleft = 0;
1315                         break;
1316                 }
1317
1318                 /* ---------------------------
1319                  * XXX Here we assume the shared memory queue is circular and
1320                  * that we know its internal structure.  Should have some sort of
1321                  * macros to allow one to walk it.      mer 20 July 1991
1322                  * ---------------------------
1323                  */
1324                 done = (xidLook->queue.next == end);
1325                 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1326
1327                 xidtag_lockmethod = XIDENT_LOCKMETHOD(*xidLook);
1328                 if ((xidtag_lockmethod == lockmethod) && pg_options[trace_flag])
1329                 {
1330                         XID_PRINT("LockReleaseAll", xidLook);
1331                         LOCK_PRINT("LockReleaseAll", lock, 0);
1332                 }
1333
1334 #ifdef USE_XIDTAG_LOCKMETHOD
1335                 if (xidtag_lockmethod != LOCK_LOCKMETHOD(*lock))
1336                         elog(NOTICE, "LockReleaseAll: xid/lock method mismatch: %d != %d",
1337                                  xidtag_lockmethod, lock->tag.lockmethod);
1338 #endif
1339                 if ((xidtag_lockmethod != lockmethod) && (trace_flag >= 2))
1340                 {
1341                         TPRINTF(trace_flag, "LockReleaseAll: skipping other table");
1342                         nleft++;
1343                         goto next_item;
1344                 }
1345
1346                 Assert(lock->nHolding > 0);
1347                 Assert(lock->nActive > 0);
1348                 Assert(lock->nActive <= lock->nHolding);
1349                 Assert(xidLook->nHolding >= 0);
1350                 Assert(xidLook->nHolding <= lock->nHolding);
1351
1352 #ifdef USER_LOCKS
1353                 if (is_user_lock_table)
1354                 {
1355                         if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
1356                         {
1357                                 TPRINTF(TRACE_USERLOCKS,
1358                                                 "LockReleaseAll: skiping normal lock [%d,%d,%d]",
1359                                   xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1360                                 nleft++;
1361                                 goto next_item;
1362                         }
1363                         if (xidLook->tag.pid != MyProcPid)
1364                         {
1365                                 /* Should never happen */
1366                                 elog(NOTICE,
1367                                          "LockReleaseAll: INVALID PID: [%u] [%d,%d,%d]",
1368                                          lock->tag.objId.blkno,
1369                                   xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1370                                 nleft++;
1371                                 goto next_item;
1372                         }
1373                         TPRINTF(TRACE_USERLOCKS,
1374                                 "LockReleaseAll: releasing user lock [%u] [%d,%d,%d]",
1375                                         lock->tag.objId.blkno,
1376                                   xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1377                 }
1378                 else
1379                 {
1380
1381                         /*
1382                          * Can't check xidLook->tag.xid, can be 0 also for normal
1383                          * locks
1384                          */
1385                         if (xidLook->tag.pid != 0)
1386                         {
1387                                 TPRINTF(TRACE_LOCKS,
1388                                   "LockReleaseAll: skiping user lock [%u] [%d,%d,%d]",
1389                                                 lock->tag.objId.blkno,
1390                                   xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1391                                 nleft++;
1392                                 goto next_item;
1393                         }
1394                 }
1395 #endif
1396
1397                 /* ------------------
1398                  * fix the general lock stats
1399                  * ------------------
1400                  */
1401                 if (lock->nHolding != xidLook->nHolding)
1402                 {
1403                         for (i = 1; i <= numLockModes; i++)
1404                         {
1405                                 Assert(xidLook->holders[i] >= 0);
1406                                 lock->holders[i] -= xidLook->holders[i];
1407                                 lock->activeHolders[i] -= xidLook->holders[i];
1408                                 Assert((lock->holders[i] >= 0) \
1409                                            &&(lock->activeHolders[i] >= 0));
1410                                 if (!lock->activeHolders[i])
1411                                         lock->mask &= BITS_OFF[i];
1412                                 /*
1413                                  * Read comments in LockRelease
1414                                  */
1415                                 if (!wakeupNeeded && xidLook->holders[i] > 0 && 
1416                                         lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
1417                                         wakeupNeeded = true;
1418                         }
1419                         lock->nHolding -= xidLook->nHolding;
1420                         lock->nActive -= xidLook->nHolding;
1421                         Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
1422                         Assert(lock->nActive <= lock->nHolding);
1423                 }
1424                 else
1425                 {
1426                         /* --------------
1427                          * set nHolding to zero so that we can garbage collect the lock
1428                          * down below...
1429                          * --------------
1430                          */
1431                         lock->nHolding = 0;
1432                         /* Fix the lock status, just for next LOCK_PRINT message. */
1433                         for (i = 1; i <= numLockModes; i++)
1434                         {
1435                                 Assert(lock->holders[i] == lock->activeHolders[i]);
1436                                 lock->holders[i] = lock->activeHolders[i] = 0;
1437                         }
1438                 }
1439                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1440
1441                 /*
1442                  * Remove the xid from the process lock queue
1443                  */
1444                 SHMQueueDelete(&xidLook->queue);
1445
1446                 /* ----------------
1447                  * always remove the xidLookup entry, we're done with it now
1448                  * ----------------
1449                  */
1450
1451                 XID_PRINT("LockReleaseAll: deleting", xidLook);
1452                 result = (XIDLookupEnt *) hash_search(lockMethodTable->xidHash,
1453                                                                                           (Pointer) xidLook,
1454                                                                                           HASH_REMOVE,
1455                                                                                           &found);
1456                 if (!result || !found)
1457                 {
1458                         SpinRelease(masterLock);
1459                         elog(NOTICE, "LockReleaseAll: xid table corrupted");
1460                         return FALSE;
1461                 }
1462
1463                 if (!lock->nHolding)
1464                 {
1465                         /* --------------------
1466                          * if there's no one waiting in the queue, we've just released
1467                          * the last lock.
1468                          * --------------------
1469                          */
1470                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1471                         Assert(lockMethodTable->lockHash->hash == tag_hash);
1472                         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1473                                                                                 (Pointer) &(lock->tag),
1474                                                                                 HASH_REMOVE, &found);
1475                         if ((!lock) || (!found))
1476                         {
1477                                 SpinRelease(masterLock);
1478                                 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1479                                 return FALSE;
1480                         }
1481                 }
1482                 else if (wakeupNeeded)
1483                 {
1484                         waitQueue = &(lock->waitProcs);
1485                         ProcLockWakeup(waitQueue, lockmethod, lock);
1486                 }
1487
1488 #ifdef USER_LOCKS
1489 next_item:
1490 #endif
1491                 if (done)
1492                         break;
1493                 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1494                 xidLook = tmp;
1495         }
1496
1497         /*
1498          * Reinitialize the queue only if nothing has been left in.
1499          */
1500         if (nleft == 0)
1501         {
1502                 TPRINTF(trace_flag, "LockReleaseAll: reinitializing lockQueue");
1503                 SHMQueueInit(lockQueue);
1504         }
1505
1506         SpinRelease(masterLock);
1507         TPRINTF(trace_flag, "LockReleaseAll: done");
1508
1509         return TRUE;
1510 }
1511
1512 int
1513 LockShmemSize(int maxBackends)
1514 {
1515         int                     size = 0;
1516
1517         size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1518         size += MAXALIGN(maxBackends * sizeof(PROC)); /* each MyProc */
1519         size += MAXALIGN(maxBackends * sizeof(LOCKMETHODCTL));          /* each
1520                                                                                                                                  * lockMethodTable->ctl */
1521
1522         /* lockHash table */
1523         size += hash_estimate_size(NLOCKENTS(maxBackends),
1524                                                            SHMEM_LOCKTAB_KEYSIZE,
1525                                                            SHMEM_LOCKTAB_DATASIZE);
1526
1527         /* xidHash table */
1528         size += hash_estimate_size(maxBackends,
1529                                                            SHMEM_XIDTAB_KEYSIZE,
1530                                                            SHMEM_XIDTAB_DATASIZE);
1531
1532         /* Since the lockHash entry count above is only an estimate,
1533          * add 10% safety margin.
1534          */
1535         size += size / 10;
1536
1537         return size;
1538 }
1539
1540 /* -----------------
1541  * Boolean function to determine current locking status
1542  * -----------------
1543  */
1544 bool
1545 LockingDisabled()
1546 {
1547         return LockingIsDisabled;
1548 }
1549
1550 /*
1551  * DeadlockCheck -- Checks for deadlocks for a given process
1552  *
1553  * We can't block on user locks, so no sense testing for deadlock
1554  * because there is no blocking, and no timer for the block.
1555  *
1556  * This code takes a list of locks a process holds, and the lock that
1557  * the process is sleeping on, and tries to find if any of the processes
1558  * waiting on its locks hold the lock it is waiting for.  If no deadlock
1559  * is found, it goes on to look at all the processes waiting on their locks.
1560  *
1561  * We have already locked the master lock before being called.
1562  */
1563 bool
1564 DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
1565 {
1566         int                                             done;
1567         XIDLookupEnt               *xidLook = NULL;
1568         XIDLookupEnt               *tmp = NULL;
1569         SHMEM_OFFSET                    end = MAKE_OFFSET(lockQueue);
1570         LOCK                               *lock;
1571
1572         static PROC                        *checked_procs[MAXBACKENDS];
1573         static int                              nprocs;
1574
1575         /* initialize at start of recursion */
1576         if (skip_check)
1577         {
1578                 checked_procs[0] = MyProc;
1579                 nprocs = 1;
1580         }
1581
1582         if (SHMQueueEmpty(lockQueue))
1583                 return false;
1584
1585         SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1586
1587         XID_PRINT("DeadLockCheck", xidLook);
1588
1589         for (;;)
1590         {
1591                 done = (xidLook->queue.next == end);
1592                 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1593
1594                 LOCK_PRINT("DeadLockCheck", lock, 0);
1595
1596                 /*
1597                  * This is our only check to see if we found the lock we want.
1598                  *
1599                  * The lock we are waiting for is already in MyProc->lockQueue so we
1600                  * need to skip it here.  We are trying to find it in someone
1601                  * else's lockQueue.   bjm
1602                  */
1603                 if (lock == findlock && !skip_check)
1604                         return true;
1605
1606                 {
1607                         PROC_QUEUE *waitQueue = &(lock->waitProcs);
1608                         PROC       *proc;
1609                         int                     i;
1610                         int                     j;
1611
1612                         proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
1613                         for (i = 0; i < waitQueue->size; i++)
1614                         {
1615                                 /*
1616                                  * If I hold some locks on findlock and another proc 
1617                                  * waits on it holding locks too - check if we are
1618                                  * waiting one another.
1619                                  */
1620                                 if (proc != MyProc &&
1621                                         lock == findlock &&     /* skip_check also true */
1622                                         MyProc->holdLock)
1623                                 {
1624                                         LOCKMETHODCTL  *lockctl = 
1625                                                         LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
1626
1627                                         Assert(skip_check);
1628                                         if (lockctl->conflictTab[MyProc->token] & proc->holdLock && 
1629                                                 lockctl->conflictTab[proc->token] & MyProc->holdLock)
1630                                                 return true;
1631                                 }
1632
1633                                 /*
1634                                  * No sense in looking at the wait queue of the lock we
1635                                  * are looking for. If lock == findlock, and I got here,
1636                                  * skip_check must be true too.
1637                                  */
1638                                 if (lock != findlock)
1639                                 {
1640                                         for (j = 0; j < nprocs; j++)
1641                                                 if (checked_procs[j] == proc)
1642                                                         break;
1643                                         if (j >= nprocs && lock != findlock)
1644                                         {
1645                                                 Assert(nprocs < MAXBACKENDS);
1646                                                 checked_procs[nprocs++] = proc;
1647
1648                                                 /*
1649                                                  * For non-MyProc entries, we are looking only
1650                                                  * waiters, not necessarily people who already
1651                                                  * hold locks and are waiting. Now we check for
1652                                                  * cases where we have two or more tables in a
1653                                                  * deadlock.  We do this by continuing to search
1654                                                  * for someone holding a lock       bjm
1655                                                  */
1656                                                 if (DeadLockCheck(&(proc->lockQueue), findlock, false))
1657                                                         return true;
1658                                         }
1659                                 }
1660                                 proc = (PROC *) MAKE_PTR(proc->links.prev);
1661                         }
1662                 }
1663
1664                 if (done)
1665                         break;
1666                 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1667                 xidLook = tmp;
1668         }
1669
1670         /* if we got here, no deadlock */
1671         return false;
1672 }
1673
1674 #ifdef NOT_USED
1675 /*
1676  * Return an array with the pids of all processes owning a lock.
1677  * This works only for user locks because normal locks have no
1678  * pid information in the corresponding XIDLookupEnt.
1679  */
1680 ArrayType  *
1681 LockOwners(LOCKMETHOD lockmethod, LOCKTAG *locktag)
1682 {
1683         XIDLookupEnt *xidLook = NULL;
1684         SPINLOCK        masterLock;
1685         LOCK       *lock;
1686         SHMEM_OFFSET lock_offset;
1687         int                     count = 0;
1688         LOCKMETHODTABLE *lockMethodTable;
1689         HTAB       *xidTable;
1690         bool            found;
1691         int                     ndims,
1692                                 nitems,
1693                                 hdrlen,
1694                                 size;
1695         int                     lbounds[1],
1696                                 hbounds[1];
1697         ArrayType  *array;
1698         int                *data_ptr;
1699
1700         /* Assume that no one will modify the result */
1701         static int      empty_array[] = {20, 1, 0, 0, 0};
1702
1703 #ifdef USER_LOCKS
1704         int                     is_user_lock;
1705
1706         is_user_lock = (lockmethod == USER_LOCKMETHOD);
1707         if (is_user_lock)
1708         {
1709                 TPRINTF(TRACE_USERLOCKS, "LockOwners: user lock tag [%u]",
1710                                 locktag->objId.blkno;,
1711         }
1712 #endif
1713
1714         /* This must be changed when short term locks will be used */
1715         locktag->lockmethod = lockmethod;
1716
1717         Assert((lockmethod >= MIN_LOCKMETHOD) && (lockmethod < NumLockMethods));
1718         lockMethodTable = LockMethodTable[lockmethod];
1719         if (!lockMethodTable)
1720         {
1721                 elog(NOTICE, "lockMethodTable is null in LockOwners");
1722                 return (ArrayType *) &empty_array;
1723         }
1724
1725         if (LockingIsDisabled)
1726                 return (ArrayType *) &empty_array;
1727
1728         masterLock = lockMethodTable->ctl->masterLock;
1729         SpinAcquire(masterLock);
1730
1731         /*
1732          * Find a lock with this tag
1733          */
1734         Assert(lockMethodTable->lockHash->hash == tag_hash);
1735         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
1736                                                                 HASH_FIND, &found);
1737
1738         /*
1739          * let the caller print its own error message, too. Do not elog(WARN).
1740          */
1741         if (!lock)
1742         {
1743                 SpinRelease(masterLock);
1744                 elog(NOTICE, "LockOwners: locktable corrupted");
1745                 return (ArrayType *) &empty_array;
1746         }
1747
1748         if (!found)
1749         {
1750                 SpinRelease(masterLock);
1751 #ifdef USER_LOCKS
1752                 if (is_user_lock)
1753                 {
1754                         TPRINTF(TRACE_USERLOCKS, "LockOwners: no lock with this tag");
1755                         return (ArrayType *) &empty_array;
1756                 }
1757 #endif
1758                 elog(NOTICE, "LockOwners: locktable lookup failed, no lock");
1759                 return (ArrayType *) &empty_array;
1760         }
1761         LOCK_PRINT("LockOwners: found", lock, 0);
1762         Assert((lock->nHolding > 0) && (lock->nActive > 0));
1763         Assert(lock->nActive <= lock->nHolding);
1764         lock_offset = MAKE_OFFSET(lock);
1765
1766         /* Construct a 1-dimensional array */
1767         ndims = 1;
1768         hdrlen = ARR_OVERHEAD(ndims);
1769         lbounds[0] = 0;
1770         hbounds[0] = lock->nActive;
1771         size = hdrlen + sizeof(int) * hbounds[0];
1772         array = (ArrayType *) palloc(size);
1773         MemSet(array, 0, size);
1774         memmove((char *) array, (char *) &size, sizeof(int));
1775         memmove((char *) ARR_NDIM_PTR(array), (char *) &ndims, sizeof(int));
1776         memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
1777         memmove((char *) ARR_LBOUND(array), (char *) lbounds, ndims * sizeof(int));
1778         SET_LO_FLAG(false, array);
1779         data_ptr = (int *) ARR_DATA_PTR(array);
1780
1781         xidTable = lockMethodTable->xidHash;
1782         hash_seq(NULL);
1783         nitems = 0;
1784         while ((xidLook = (XIDLookupEnt *) hash_seq(xidTable)) &&
1785                    (xidLook != (XIDLookupEnt *) TRUE))
1786         {
1787                 if (count++ > 1000)
1788                 {
1789                         elog(NOTICE, "LockOwners: possible loop, giving up");
1790                         break;
1791                 }
1792
1793                 if (xidLook->tag.pid == 0)
1794                 {
1795                         XID_PRINT("LockOwners: no pid", xidLook);
1796                         continue;
1797                 }
1798
1799                 if (!xidLook->tag.lock)
1800                 {
1801                         XID_PRINT("LockOwners: NULL LOCK", xidLook);
1802                         continue;
1803                 }
1804
1805                 if (xidLook->tag.lock != lock_offset)
1806                 {
1807                         XID_PRINT("LockOwners: different lock", xidLook);
1808                         continue;
1809                 }
1810
1811                 if (LOCK_LOCKMETHOD(*lock) != lockmethod)
1812                 {
1813                         XID_PRINT("LockOwners: other table", xidLook);
1814                         continue;
1815                 }
1816
1817                 if (xidLook->nHolding <= 0)
1818                 {
1819                         XID_PRINT("LockOwners: not holding", xidLook);
1820                         continue;
1821                 }
1822
1823                 if (nitems >= hbounds[0])
1824                 {
1825                         elog(NOTICE, "LockOwners: array size exceeded");
1826                         break;
1827                 }
1828
1829                 /*
1830                  * Check that the holding process is still alive by sending him an
1831                  * unused (ignored) signal. If the kill fails the process is not
1832                  * alive.
1833                  */
1834                 if ((xidLook->tag.pid != MyProcPid) \
1835                         &&(kill(xidLook->tag.pid, SIGCHLD)) != 0)
1836                 {
1837                         /* Return a negative pid to signal that process is dead */
1838                         data_ptr[nitems++] = -(xidLook->tag.pid);
1839                         XID_PRINT("LockOwners: not alive", xidLook);
1840                         /* XXX - TODO: remove this entry and update lock stats */
1841                         continue;
1842                 }
1843
1844                 /* Found a process holding the lock */
1845                 XID_PRINT("LockOwners: holding", xidLook);
1846                 data_ptr[nitems++] = xidLook->tag.pid;
1847         }
1848
1849         SpinRelease(masterLock);
1850
1851         /* Adjust the actual size of the array */
1852         hbounds[0] = nitems;
1853         size = hdrlen + sizeof(int) * hbounds[0];
1854         memmove((char *) array, (char *) &size, sizeof(int));
1855         memmove((char *) ARR_DIMS(array), (char *) hbounds, ndims * sizeof(int));
1856
1857         return array;
1858 }
1859 #endif
1860
1861 #ifdef DEADLOCK_DEBUG
1862 /*
1863  * Dump all locks in the proc->lockQueue. Must have already acquired
1864  * the masterLock.
1865  */
1866 void
1867 DumpLocks()
1868 {
1869         SHMEM_OFFSET location;
1870         PROC       *proc;
1871         SHM_QUEUE  *lockQueue;
1872         int                     done;
1873         XIDLookupEnt *xidLook = NULL;
1874         XIDLookupEnt *tmp = NULL;
1875         SHMEM_OFFSET end;
1876         SPINLOCK        masterLock;
1877         int                     numLockModes;
1878         LOCK       *lock;
1879         int                     count = 0;
1880         int                     lockmethod = DEFAULT_LOCKMETHOD;
1881         LOCKMETHODTABLE *lockMethodTable;
1882
1883         ShmemPIDLookup(MyProcPid, &location);
1884         if (location == INVALID_OFFSET)
1885                 return;
1886         proc = (PROC *) MAKE_PTR(location);
1887         if (proc != MyProc)
1888                 return;
1889         lockQueue = &proc->lockQueue;
1890
1891         Assert(lockmethod < NumLockMethods);
1892         lockMethodTable = LockMethodTable[lockmethod];
1893         if (!lockMethodTable)
1894                 return;
1895
1896         numLockModes = lockMethodTable->ctl->numLockModes;
1897         masterLock = lockMethodTable->ctl->masterLock;
1898
1899         if (SHMQueueEmpty(lockQueue))
1900                 return;
1901
1902         SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1903         end = MAKE_OFFSET(lockQueue);
1904
1905         if (MyProc->waitLock)
1906                 LOCK_PRINT_AUX("DumpLocks: waiting on", MyProc->waitLock, 0);
1907
1908         for (;;)
1909         {
1910                 if (count++ > 2000)
1911                 {
1912                         elog(NOTICE, "DumpLocks: xid loop detected, giving up");
1913                         break;
1914                 }
1915
1916                 /* ---------------------------
1917                  * XXX Here we assume the shared memory queue is circular and
1918                  * that we know its internal structure.  Should have some sort of
1919                  * macros to allow one to walk it.      mer 20 July 1991
1920                  * ---------------------------
1921                  */
1922                 done = (xidLook->queue.next == end);
1923                 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1924
1925                 XID_PRINT_AUX("DumpLocks", xidLook);
1926                 LOCK_PRINT_AUX("DumpLocks", lock, 0);
1927
1928                 if (done)
1929                         break;
1930
1931                 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1932                 xidLook = tmp;
1933         }
1934 }
1935
1936 /*
1937  * Dump all postgres locks. Must have already acquired the masterLock.
1938  */
1939 void
1940 DumpAllLocks()
1941 {
1942         SHMEM_OFFSET location;
1943         PROC       *proc;
1944         XIDLookupEnt *xidLook = NULL;
1945         LOCK       *lock;
1946         int                     pid;
1947         int                     count = 0;
1948         int                     lockmethod = DEFAULT_LOCKMETHOD;
1949         LOCKMETHODTABLE *lockMethodTable;
1950         HTAB       *xidTable;
1951
1952         pid = getpid();
1953         ShmemPIDLookup(pid, &location);
1954         if (location == INVALID_OFFSET)
1955                 return;
1956         proc = (PROC *) MAKE_PTR(location);
1957         if (proc != MyProc)
1958                 return;
1959
1960         Assert(lockmethod < NumLockMethods);
1961         lockMethodTable = LockMethodTable[lockmethod];
1962         if (!lockMethodTable)
1963                 return;
1964
1965         xidTable = lockMethodTable->xidHash;
1966
1967         if (MyProc->waitLock)
1968                 LOCK_PRINT_AUX("DumpAllLocks: waiting on", MyProc->waitLock, 0);
1969
1970         hash_seq(NULL);
1971         while ((xidLook = (XIDLookupEnt *) hash_seq(xidTable)) &&
1972                    (xidLook != (XIDLookupEnt *) TRUE))
1973         {
1974                 XID_PRINT_AUX("DumpAllLocks", xidLook);
1975
1976                 if (xidLook->tag.lock)
1977                 {
1978                         lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1979                         LOCK_PRINT_AUX("DumpAllLocks", lock, 0);
1980                 }
1981                 else
1982                         elog(DEBUG, "DumpAllLocks: xidLook->tag.lock = NULL");
1983
1984                 if (count++ > 2000)
1985                 {
1986                         elog(NOTICE, "DumpAllLocks: possible loop, giving up");
1987                         break;
1988                 }
1989         }
1990 }
1991
1992 #endif