OSDN Git Service

Unify spelling of "canceled", "canceling", "cancellation"
[pg-rex/syncrep.git] / src / backend / storage / lmgr / predicate.c
1 /*-------------------------------------------------------------------------
2  *
3  * predicate.c
4  *        POSTGRES predicate locking
5  *        to support full serializable transaction isolation
6  *
7  *
8  * The approach taken is to implement Serializable Snapshot Isolation (SSI)
9  * as initially described in this paper:
10  *
11  *      Michael J. Cahill, Uwe Röhm, and Alan D. Fekete. 2008.
12  *      Serializable isolation for snapshot databases.
13  *      In SIGMOD '08: Proceedings of the 2008 ACM SIGMOD
14  *      international conference on Management of data,
15  *      pages 729-738, New York, NY, USA. ACM.
16  *      http://doi.acm.org/10.1145/1376616.1376690
17  *
18  * and further elaborated in Cahill's doctoral thesis:
19  *
20  *      Michael James Cahill. 2009.
21  *      Serializable Isolation for Snapshot Databases.
22  *      Sydney Digital Theses.
23  *      University of Sydney, School of Information Technologies.
24  *      http://hdl.handle.net/2123/5353
25  *
26  *
27  * Predicate locks for Serializable Snapshot Isolation (SSI) are SIREAD
28  * locks, which are so different from normal locks that a distinct set of
29  * structures is required to handle them.  They are needed to detect
30  * rw-conflicts when the read happens before the write.  (When the write
31  * occurs first, the reading transaction can check for a conflict by
32  * examining the MVCC data.)
33  *
34  * (1)  Besides tuples actually read, they must cover ranges of tuples
35  *              which would have been read based on the predicate.      This will
36  *              require modelling the predicates through locks against database
37  *              objects such as pages, index ranges, or entire tables.
38  *
39  * (2)  They must be kept in RAM for quick access.      Because of this, it
40  *              isn't possible to always maintain tuple-level granularity -- when
41  *              the space allocated to store these approaches exhaustion, a
42  *              request for a lock may need to scan for situations where a single
43  *              transaction holds many fine-grained locks which can be coalesced
44  *              into a single coarser-grained lock.
45  *
46  * (3)  They never block anything; they are more like flags than locks
47  *              in that regard; although they refer to database objects and are
48  *              used to identify rw-conflicts with normal write locks.
49  *
50  * (4)  While they are associated with a transaction, they must survive
51  *              a successful COMMIT of that transaction, and remain until all
52  *              overlapping transactions complete.      This even means that they
53  *              must survive termination of the transaction's process.  If a
54  *              top level transaction is rolled back, however, it is immediately
55  *              flagged so that it can be ignored, and its SIREAD locks can be
56  *              released any time after that.
57  *
58  * (5)  The only transactions which create SIREAD locks or check for
59  *              conflicts with them are serializable transactions.
60  *
61  * (6)  When a write lock for a top level transaction is found to cover
62  *              an existing SIREAD lock for the same transaction, the SIREAD lock
63  *              can be deleted.
64  *
65  * (7)  A write from a serializable transaction must ensure that a xact
66  *              record exists for the transaction, with the same lifespan (until
67  *              all concurrent transaction complete or the transaction is rolled
68  *              back) so that rw-dependencies to that transaction can be
69  *              detected.
70  *
71  * We use an optimization for read-only transactions. Under certain
72  * circumstances, a read-only transaction's snapshot can be shown to
73  * never have conflicts with other transactions.  This is referred to
74  * as a "safe" snapshot (and one known not to be is "unsafe").
75  * However, it can't be determined whether a snapshot is safe until
76  * all concurrent read/write transactions complete.
77  *
78  * Once a read-only transaction is known to have a safe snapshot, it
79  * can release its predicate locks and exempt itself from further
80  * predicate lock tracking. READ ONLY DEFERRABLE transactions run only
81  * on safe snapshots, waiting as necessary for one to be available.
82  *
83  *
84  * Lightweight locks to manage access to the predicate locking shared
85  * memory objects must be taken in this order, and should be released in
86  * reverse order:
87  *
88  *      SerializableFinishedListLock
89  *              - Protects the list of transactions which have completed but which
90  *                      may yet matter because they overlap still-active transactions.
91  *
92  *      SerializablePredicateLockListLock
93  *              - Protects the linked list of locks held by a transaction.      Note
94  *                      that the locks themselves are also covered by the partition
95  *                      locks of their respective lock targets; this lock only affects
96  *                      the linked list connecting the locks related to a transaction.
97  *              - All transactions share this single lock (with no partitioning).
98  *              - There is never a need for a process other than the one running
99  *                      an active transaction to walk the list of locks held by that
100  *                      transaction.
101  *              - It is relatively infrequent that another process needs to
102  *                      modify the list for a transaction, but it does happen for such
103  *                      things as index page splits for pages with predicate locks and
104  *                      freeing of predicate locked pages by a vacuum process.  When
105  *                      removing a lock in such cases, the lock itself contains the
106  *                      pointers needed to remove it from the list.  When adding a
107  *                      lock in such cases, the lock can be added using the anchor in
108  *                      the transaction structure.      Neither requires walking the list.
109  *              - Cleaning up the list for a terminated transaction is sometimes
110  *                      not done on a retail basis, in which case no lock is required.
111  *              - Due to the above, a process accessing its active transaction's
112  *                      list always uses a shared lock, regardless of whether it is
113  *                      walking or maintaining the list.  This improves concurrency
114  *                      for the common access patterns.
115  *              - A process which needs to alter the list of a transaction other
116  *                      than its own active transaction must acquire an exclusive
117  *                      lock.
118  *
119  *      FirstPredicateLockMgrLock based partition locks
120  *              - The same lock protects a target, all locks on that target, and
121  *                      the linked list of locks on the target..
122  *              - When more than one is needed, acquire in ascending order.
123  *
124  *      SerializableXactHashLock
125  *              - Protects both PredXact and SerializableXidHash.
126  *
127  *
128  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
129  * Portions Copyright (c) 1994, Regents of the University of California
130  *
131  *
132  * IDENTIFICATION
133  *        src/backend/storage/lmgr/predicate.c
134  *
135  *-------------------------------------------------------------------------
136  */
137 /*
138  * INTERFACE ROUTINES
139  *
140  * housekeeping for setting up shared memory predicate lock structures
141  *              InitPredicateLocks(void)
142  *              PredicateLockShmemSize(void)
143  *
144  * predicate lock reporting
145  *              GetPredicateLockStatusData(void)
146  *              PageIsPredicateLocked(Relation relation, BlockNumber blkno)
147  *
148  * predicate lock maintenance
149  *              RegisterSerializableTransaction(Snapshot snapshot)
150  *              RegisterPredicateLockingXid(void)
151  *              PredicateLockRelation(Relation relation, Snapshot snapshot)
152  *              PredicateLockPage(Relation relation, BlockNumber blkno,
153  *                                              Snapshot snapshot)
154  *              PredicateLockTuple(Relation relation, HeapTuple tuple,
155  *                                              Snapshot snapshot)
156  *              PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
157  *                                                         BlockNumber newblkno);
158  *              PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
159  *                                                               BlockNumber newblkno);
160  *              TransferPredicateLocksToHeapRelation(Relation relation)
161  *              ReleasePredicateLocks(bool isCommit)
162  *
163  * conflict detection (may also trigger rollback)
164  *              CheckForSerializableConflictOut(bool visible, Relation relation,
165  *                                                                              HeapTupleData *tup, Buffer buffer,
166  *                                                                              Snapshot snapshot)
167  *              CheckForSerializableConflictIn(Relation relation, HeapTupleData *tup,
168  *                                                                         Buffer buffer)
169  *              CheckTableForSerializableConflictIn(Relation relation)
170  *
171  * final rollback checking
172  *              PreCommit_CheckForSerializationFailure(void)
173  *
174  * two-phase commit support
175  *              AtPrepare_PredicateLocks(void);
176  *              PostPrepare_PredicateLocks(TransactionId xid);
177  *              PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
178  *              predicatelock_twophase_recover(TransactionId xid, uint16 info,
179  *                                                                         void *recdata, uint32 len);
180  */
181
182 #include "postgres.h"
183
184 #include "access/slru.h"
185 #include "access/subtrans.h"
186 #include "access/transam.h"
187 #include "access/twophase.h"
188 #include "access/twophase_rmgr.h"
189 #include "access/xact.h"
190 #include "miscadmin.h"
191 #include "storage/bufmgr.h"
192 #include "storage/predicate.h"
193 #include "storage/predicate_internals.h"
194 #include "storage/procarray.h"
195 #include "utils/rel.h"
196 #include "utils/snapmgr.h"
197 #include "utils/tqual.h"
198
199 /* Uncomment the next line to test the graceful degradation code. */
200 /* #define TEST_OLDSERXID */
201
202 /*
203  * Test the most selective fields first, for performance.
204  *
205  * a is covered by b if all of the following hold:
206  *      1) a.database = b.database
207  *      2) a.relation = b.relation
208  *      3) b.offset is invalid (b is page-granularity or higher)
209  *      4) either of the following:
210  *              4a) a.offset is valid (a is tuple-granularity) and a.page = b.page
211  *       or 4b) a.offset is invalid and b.page is invalid (a is
212  *                      page-granularity and b is relation-granularity
213  */
214 #define TargetTagIsCoveredBy(covered_target, covering_target)                   \
215         ((GET_PREDICATELOCKTARGETTAG_RELATION(covered_target) == /* (2) */      \
216           GET_PREDICATELOCKTARGETTAG_RELATION(covering_target))                         \
217          && (GET_PREDICATELOCKTARGETTAG_OFFSET(covering_target) ==                      \
218                  InvalidOffsetNumber)                                                            /* (3) */      \
219          && (((GET_PREDICATELOCKTARGETTAG_OFFSET(covered_target) !=                     \
220                    InvalidOffsetNumber)                                                          /* (4a) */ \
221                   && (GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) ==               \
222                           GET_PREDICATELOCKTARGETTAG_PAGE(covered_target)))                     \
223                  || ((GET_PREDICATELOCKTARGETTAG_PAGE(covering_target) ==               \
224                           InvalidBlockNumber)                                                    /* (4b) */ \
225                          && (GET_PREDICATELOCKTARGETTAG_PAGE(covered_target)            \
226                                  != InvalidBlockNumber)))                                                               \
227          && (GET_PREDICATELOCKTARGETTAG_DB(covered_target) ==    /* (1) */      \
228                  GET_PREDICATELOCKTARGETTAG_DB(covering_target)))
229
230 /*
231  * The predicate locking target and lock shared hash tables are partitioned to
232  * reduce contention.  To determine which partition a given target belongs to,
233  * compute the tag's hash code with PredicateLockTargetTagHashCode(), then
234  * apply one of these macros.
235  * NB: NUM_PREDICATELOCK_PARTITIONS must be a power of 2!
236  */
237 #define PredicateLockHashPartition(hashcode) \
238         ((hashcode) % NUM_PREDICATELOCK_PARTITIONS)
239 #define PredicateLockHashPartitionLock(hashcode) \
240         ((LWLockId) (FirstPredicateLockMgrLock + PredicateLockHashPartition(hashcode)))
241
242 #define NPREDICATELOCKTARGETENTS() \
243         mul_size(max_predicate_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
244
245 #define SxactIsOnFinishedList(sxact) (!SHMQueueIsDetached(&((sxact)->finishedLink)))
246
247 #define SxactIsCommitted(sxact) (((sxact)->flags & SXACT_FLAG_COMMITTED) != 0)
248 #define SxactIsPrepared(sxact) (((sxact)->flags & SXACT_FLAG_PREPARED) != 0)
249 #define SxactIsRolledBack(sxact) (((sxact)->flags & SXACT_FLAG_ROLLED_BACK) != 0)
250 #define SxactIsDoomed(sxact) (((sxact)->flags & SXACT_FLAG_DOOMED) != 0)
251 #define SxactIsReadOnly(sxact) (((sxact)->flags & SXACT_FLAG_READ_ONLY) != 0)
252 #define SxactHasSummaryConflictIn(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_IN) != 0)
253 #define SxactHasSummaryConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_SUMMARY_CONFLICT_OUT) != 0)
254 /*
255  * The following macro actually means that the specified transaction has a
256  * conflict out *to a transaction which committed ahead of it*.  It's hard
257  * to get that into a name of a reasonable length.
258  */
259 #define SxactHasConflictOut(sxact) (((sxact)->flags & SXACT_FLAG_CONFLICT_OUT) != 0)
260 #define SxactIsDeferrableWaiting(sxact) (((sxact)->flags & SXACT_FLAG_DEFERRABLE_WAITING) != 0)
261 #define SxactIsROSafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_SAFE) != 0)
262 #define SxactIsROUnsafe(sxact) (((sxact)->flags & SXACT_FLAG_RO_UNSAFE) != 0)
263
264 /*
265  * Compute the hash code associated with a PREDICATELOCKTARGETTAG.
266  *
267  * To avoid unnecessary recomputations of the hash code, we try to do this
268  * just once per function, and then pass it around as needed.  Aside from
269  * passing the hashcode to hash_search_with_hash_value(), we can extract
270  * the lock partition number from the hashcode.
271  */
272 #define PredicateLockTargetTagHashCode(predicatelocktargettag) \
273         (tag_hash((predicatelocktargettag), sizeof(PREDICATELOCKTARGETTAG)))
274
275 /*
276  * Given a predicate lock tag, and the hash for its target,
277  * compute the lock hash.
278  *
279  * To make the hash code also depend on the transaction, we xor the sxid
280  * struct's address into the hash code, left-shifted so that the
281  * partition-number bits don't change.  Since this is only a hash, we
282  * don't care if we lose high-order bits of the address; use an
283  * intermediate variable to suppress cast-pointer-to-int warnings.
284  */
285 #define PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash) \
286         ((targethash) ^ ((uint32) PointerGetDatum((predicatelocktag)->myXact)) \
287          << LOG2_NUM_PREDICATELOCK_PARTITIONS)
288
289
290 /*
291  * The SLRU buffer area through which we access the old xids.
292  */
293 static SlruCtlData OldSerXidSlruCtlData;
294
295 #define OldSerXidSlruCtl                        (&OldSerXidSlruCtlData)
296
297 #define OLDSERXID_PAGESIZE                      BLCKSZ
298 #define OLDSERXID_ENTRYSIZE                     sizeof(SerCommitSeqNo)
299 #define OLDSERXID_ENTRIESPERPAGE        (OLDSERXID_PAGESIZE / OLDSERXID_ENTRYSIZE)
300 #define OLDSERXID_MAX_PAGE                      (SLRU_PAGES_PER_SEGMENT * 0x10000 - 1)
301
302 #define OldSerXidNextPage(page) (((page) >= OLDSERXID_MAX_PAGE) ? 0 : (page) + 1)
303
304 #define OldSerXidValue(slotno, xid) (*((SerCommitSeqNo *) \
305         (OldSerXidSlruCtl->shared->page_buffer[slotno] + \
306         ((((uint32) (xid)) % OLDSERXID_ENTRIESPERPAGE) * OLDSERXID_ENTRYSIZE))))
307
308 #define OldSerXidPage(xid)      ((((uint32) (xid)) / OLDSERXID_ENTRIESPERPAGE) % (OLDSERXID_MAX_PAGE + 1))
309 #define OldSerXidSegment(page)  ((page) / SLRU_PAGES_PER_SEGMENT)
310
311 typedef struct OldSerXidControlData
312 {
313         int                     headPage;               /* newest initialized page */
314         TransactionId headXid;          /* newest valid Xid in the SLRU */
315         TransactionId tailXid;          /* oldest xmin we might be interested in */
316         bool            warningIssued;  /* have we issued SLRU wrap-around warning? */
317 }       OldSerXidControlData;
318
319 typedef struct OldSerXidControlData *OldSerXidControl;
320
321 static OldSerXidControl oldSerXidControl;
322
323 /*
324  * When the oldest committed transaction on the "finished" list is moved to
325  * SLRU, its predicate locks will be moved to this "dummy" transaction,
326  * collapsing duplicate targets.  When a duplicate is found, the later
327  * commitSeqNo is used.
328  */
329 static SERIALIZABLEXACT *OldCommittedSxact;
330
331
332 /* This configuration variable is used to set the predicate lock table size */
333 int                     max_predicate_locks_per_xact;           /* set by guc.c */
334
335 /*
336  * This provides a list of objects in order to track transactions
337  * participating in predicate locking.  Entries in the list are fixed size,
338  * and reside in shared memory.  The memory address of an entry must remain
339  * fixed during its lifetime.  The list will be protected from concurrent
340  * update externally; no provision is made in this code to manage that.  The
341  * number of entries in the list, and the size allowed for each entry is
342  * fixed upon creation.
343  */
344 static PredXactList PredXact;
345
346 /*
347  * This provides a pool of RWConflict data elements to use in conflict lists
348  * between transactions.
349  */
350 static RWConflictPoolHeader RWConflictPool;
351
352 /*
353  * The predicate locking hash tables are in shared memory.
354  * Each backend keeps pointers to them.
355  */
356 static HTAB *SerializableXidHash;
357 static HTAB *PredicateLockTargetHash;
358 static HTAB *PredicateLockHash;
359 static SHM_QUEUE *FinishedSerializableTransactions;
360
361 /*
362  * Tag for a dummy entry in PredicateLockTargetHash. By temporarily removing
363  * this entry, you can ensure that there's enough scratch space available for
364  * inserting one entry in the hash table. This is an otherwise-invalid tag.
365  */
366 static const PREDICATELOCKTARGETTAG ScratchTargetTag = {0, 0, 0, 0, 0};
367 static uint32 ScratchTargetTagHash;
368 static int      ScratchPartitionLock;
369
370 /*
371  * The local hash table used to determine when to combine multiple fine-
372  * grained locks into a single courser-grained lock.
373  */
374 static HTAB *LocalPredicateLockHash = NULL;
375
376 /*
377  * Keep a pointer to the currently-running serializable transaction (if any)
378  * for quick reference. Also, remember if we have written anything that could
379  * cause a rw-conflict.
380  */
381 static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
382 static bool MyXactDidWrite = false;
383
384 /* local functions */
385
386 static SERIALIZABLEXACT *CreatePredXact(void);
387 static void ReleasePredXact(SERIALIZABLEXACT *sxact);
388 static SERIALIZABLEXACT *FirstPredXact(void);
389 static SERIALIZABLEXACT *NextPredXact(SERIALIZABLEXACT *sxact);
390
391 static bool RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer);
392 static void SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
393 static void SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact, SERIALIZABLEXACT *activeXact);
394 static void ReleaseRWConflict(RWConflict conflict);
395 static void FlagSxactUnsafe(SERIALIZABLEXACT *sxact);
396
397 static bool OldSerXidPagePrecedesLogically(int p, int q);
398 static void OldSerXidInit(void);
399 static void OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo);
400 static SerCommitSeqNo OldSerXidGetMinConflictCommitSeqNo(TransactionId xid);
401 static void OldSerXidSetActiveSerXmin(TransactionId xid);
402
403 static uint32 predicatelock_hash(const void *key, Size keysize);
404 static void SummarizeOldestCommittedSxact(void);
405 static Snapshot GetSafeSnapshot(Snapshot snapshot);
406 static Snapshot RegisterSerializableTransactionInt(Snapshot snapshot);
407 static bool PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag);
408 static bool GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
409                                                   PREDICATELOCKTARGETTAG *parent);
410 static bool CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag);
411 static void RemoveScratchTarget(bool lockheld);
412 static void RestoreScratchTarget(bool lockheld);
413 static void RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target,
414                                                    uint32 targettaghash);
415 static void DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag);
416 static int      PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag);
417 static bool CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag);
418 static void DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag);
419 static void CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
420                                         uint32 targettaghash,
421                                         SERIALIZABLEXACT *sxact);
422 static void DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash);
423 static bool TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
424                                                                   PREDICATELOCKTARGETTAG newtargettag,
425                                                                   bool removeOld);
426 static void PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag);
427 static void DropAllPredicateLocksFromTable(Relation relation,
428                                                            bool transfer);
429 static void SetNewSxactGlobalXmin(void);
430 static void ClearOldPredicateLocks(void);
431 static void ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
432                                                    bool summarize);
433 static bool XidIsConcurrent(TransactionId xid);
434 static void CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag);
435 static void FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer);
436 static void OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
437                                                                                 SERIALIZABLEXACT *writer);
438
439
440 /*------------------------------------------------------------------------*/
441
442 /*
443  * Does this relation participate in predicate locking? Temporary and system
444  * relations are exempt.
445  */
446 static inline bool
447 PredicateLockingNeededForRelation(Relation relation)
448 {
449         return !(relation->rd_id < FirstBootstrapObjectId ||
450                          RelationUsesLocalBuffers(relation));
451 }
452
453 /*
454  * When a public interface method is called for a read, this is the test to
455  * see if we should do a quick return.
456  *
457  * Note: this function has side-effects! If this transaction has been flagged
458  * as RO-safe since the last call, we release all predicate locks and reset
459  * MySerializableXact. That makes subsequent calls to return quickly.
460  *
461  * This is marked as 'inline' to make to eliminate the function call overhead
462  * in the common case that serialization is not needed.
463  */
464 static inline bool
465 SerializationNeededForRead(Relation relation, Snapshot snapshot)
466 {
467         /* Nothing to do if this is not a serializable transaction */
468         if (MySerializableXact == InvalidSerializableXact)
469                 return false;
470
471         /*
472          * Don't acquire locks or conflict when scanning with a special snapshot.
473          * This excludes things like CLUSTER and REINDEX. They use the wholesale
474          * functions TransferPredicateLocksToHeapRelation() and
475          * CheckTableForSerializableConflictIn() to participate serialization, but
476          * the scans involved don't need serialization.
477          */
478         if (!IsMVCCSnapshot(snapshot))
479                 return false;
480
481         /*
482          * Check if we have just become "RO-safe". If we have, immediately release
483          * all locks as they're not needed anymore. This also resets
484          * MySerializableXact, so that subsequent calls to this function can exit
485          * quickly.
486          *
487          * A transaction is flagged as RO_SAFE if all concurrent R/W transactions
488          * commit without having conflicts out to an earlier snapshot, thus
489          * ensuring that no conflicts are possible for this transaction.
490          */
491         if (SxactIsROSafe(MySerializableXact))
492         {
493                 ReleasePredicateLocks(false);
494                 return false;
495         }
496
497         /* Check if the relation doesn't participate in predicate locking */
498         if (!PredicateLockingNeededForRelation(relation))
499                 return false;
500
501         return true;                            /* no excuse to skip predicate locking */
502 }
503
504 /*
505  * Like SerializationNeededForRead(), but called on writes.
506  * The logic is the same, but there is no snapshot and we can't be RO-safe.
507  */
508 static inline bool
509 SerializationNeededForWrite(Relation relation)
510 {
511         /* Nothing to do if this is not a serializable transaction */
512         if (MySerializableXact == InvalidSerializableXact)
513                 return false;
514
515         /* Check if the relation doesn't participate in predicate locking */
516         if (!PredicateLockingNeededForRelation(relation))
517                 return false;
518
519         return true;                            /* no excuse to skip predicate locking */
520 }
521
522
523 /*------------------------------------------------------------------------*/
524
525 /*
526  * These functions are a simple implementation of a list for this specific
527  * type of struct.      If there is ever a generalized shared memory list, we
528  * should probably switch to that.
529  */
530 static SERIALIZABLEXACT *
531 CreatePredXact(void)
532 {
533         PredXactListElement ptle;
534
535         ptle = (PredXactListElement)
536                 SHMQueueNext(&PredXact->availableList,
537                                          &PredXact->availableList,
538                                          offsetof(PredXactListElementData, link));
539         if (!ptle)
540                 return NULL;
541
542         SHMQueueDelete(&ptle->link);
543         SHMQueueInsertBefore(&PredXact->activeList, &ptle->link);
544         return &ptle->sxact;
545 }
546
547 static void
548 ReleasePredXact(SERIALIZABLEXACT *sxact)
549 {
550         PredXactListElement ptle;
551
552         Assert(ShmemAddrIsValid(sxact));
553
554         ptle = (PredXactListElement)
555                 (((char *) sxact)
556                  - offsetof(PredXactListElementData, sxact)
557                  + offsetof(PredXactListElementData, link));
558         SHMQueueDelete(&ptle->link);
559         SHMQueueInsertBefore(&PredXact->availableList, &ptle->link);
560 }
561
562 static SERIALIZABLEXACT *
563 FirstPredXact(void)
564 {
565         PredXactListElement ptle;
566
567         ptle = (PredXactListElement)
568                 SHMQueueNext(&PredXact->activeList,
569                                          &PredXact->activeList,
570                                          offsetof(PredXactListElementData, link));
571         if (!ptle)
572                 return NULL;
573
574         return &ptle->sxact;
575 }
576
577 static SERIALIZABLEXACT *
578 NextPredXact(SERIALIZABLEXACT *sxact)
579 {
580         PredXactListElement ptle;
581
582         Assert(ShmemAddrIsValid(sxact));
583
584         ptle = (PredXactListElement)
585                 (((char *) sxact)
586                  - offsetof(PredXactListElementData, sxact)
587                  + offsetof(PredXactListElementData, link));
588         ptle = (PredXactListElement)
589                 SHMQueueNext(&PredXact->activeList,
590                                          &ptle->link,
591                                          offsetof(PredXactListElementData, link));
592         if (!ptle)
593                 return NULL;
594
595         return &ptle->sxact;
596 }
597
598 /*------------------------------------------------------------------------*/
599
600 /*
601  * These functions manage primitive access to the RWConflict pool and lists.
602  */
603 static bool
604 RWConflictExists(const SERIALIZABLEXACT *reader, const SERIALIZABLEXACT *writer)
605 {
606         RWConflict      conflict;
607
608         Assert(reader != writer);
609
610         /* Check the ends of the purported conflict first. */
611         if (SxactIsDoomed(reader)
612                 || SxactIsDoomed(writer)
613                 || SHMQueueEmpty(&reader->outConflicts)
614                 || SHMQueueEmpty(&writer->inConflicts))
615                 return false;
616
617         /* A conflict is possible; walk the list to find out. */
618         conflict = (RWConflict)
619                 SHMQueueNext(&reader->outConflicts,
620                                          &reader->outConflicts,
621                                          offsetof(RWConflictData, outLink));
622         while (conflict)
623         {
624                 if (conflict->sxactIn == writer)
625                         return true;
626                 conflict = (RWConflict)
627                         SHMQueueNext(&reader->outConflicts,
628                                                  &conflict->outLink,
629                                                  offsetof(RWConflictData, outLink));
630         }
631
632         /* No conflict found. */
633         return false;
634 }
635
636 static void
637 SetRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
638 {
639         RWConflict      conflict;
640
641         Assert(reader != writer);
642         Assert(!RWConflictExists(reader, writer));
643
644         conflict = (RWConflict)
645                 SHMQueueNext(&RWConflictPool->availableList,
646                                          &RWConflictPool->availableList,
647                                          offsetof(RWConflictData, outLink));
648         if (!conflict)
649                 ereport(ERROR,
650                                 (errcode(ERRCODE_OUT_OF_MEMORY),
651                                  errmsg("not enough elements in RWConflictPool to record a rw-conflict"),
652                                  errhint("You might need to run fewer transactions at a time or increase max_connections.")));
653
654         SHMQueueDelete(&conflict->outLink);
655
656         conflict->sxactOut = reader;
657         conflict->sxactIn = writer;
658         SHMQueueInsertBefore(&reader->outConflicts, &conflict->outLink);
659         SHMQueueInsertBefore(&writer->inConflicts, &conflict->inLink);
660 }
661
662 static void
663 SetPossibleUnsafeConflict(SERIALIZABLEXACT *roXact,
664                                                   SERIALIZABLEXACT *activeXact)
665 {
666         RWConflict      conflict;
667
668         Assert(roXact != activeXact);
669         Assert(SxactIsReadOnly(roXact));
670         Assert(!SxactIsReadOnly(activeXact));
671
672         conflict = (RWConflict)
673                 SHMQueueNext(&RWConflictPool->availableList,
674                                          &RWConflictPool->availableList,
675                                          offsetof(RWConflictData, outLink));
676         if (!conflict)
677                 ereport(ERROR,
678                                 (errcode(ERRCODE_OUT_OF_MEMORY),
679                                  errmsg("not enough elements in RWConflictPool to record a potential rw-conflict"),
680                                  errhint("You might need to run fewer transactions at a time or increase max_connections.")));
681
682         SHMQueueDelete(&conflict->outLink);
683
684         conflict->sxactOut = activeXact;
685         conflict->sxactIn = roXact;
686         SHMQueueInsertBefore(&activeXact->possibleUnsafeConflicts,
687                                                  &conflict->outLink);
688         SHMQueueInsertBefore(&roXact->possibleUnsafeConflicts,
689                                                  &conflict->inLink);
690 }
691
692 static void
693 ReleaseRWConflict(RWConflict conflict)
694 {
695         SHMQueueDelete(&conflict->inLink);
696         SHMQueueDelete(&conflict->outLink);
697         SHMQueueInsertBefore(&RWConflictPool->availableList, &conflict->outLink);
698 }
699
700 static void
701 FlagSxactUnsafe(SERIALIZABLEXACT *sxact)
702 {
703         RWConflict      conflict,
704                                 nextConflict;
705
706         Assert(SxactIsReadOnly(sxact));
707         Assert(!SxactIsROSafe(sxact));
708
709         sxact->flags |= SXACT_FLAG_RO_UNSAFE;
710
711         /*
712          * We know this isn't a safe snapshot, so we can stop looking for other
713          * potential conflicts.
714          */
715         conflict = (RWConflict)
716                 SHMQueueNext(&sxact->possibleUnsafeConflicts,
717                                          &sxact->possibleUnsafeConflicts,
718                                          offsetof(RWConflictData, inLink));
719         while (conflict)
720         {
721                 nextConflict = (RWConflict)
722                         SHMQueueNext(&sxact->possibleUnsafeConflicts,
723                                                  &conflict->inLink,
724                                                  offsetof(RWConflictData, inLink));
725
726                 Assert(!SxactIsReadOnly(conflict->sxactOut));
727                 Assert(sxact == conflict->sxactIn);
728
729                 ReleaseRWConflict(conflict);
730
731                 conflict = nextConflict;
732         }
733 }
734
735 /*------------------------------------------------------------------------*/
736
737 /*
738  * We will work on the page range of 0..OLDSERXID_MAX_PAGE.
739  * Compares using wraparound logic, as is required by slru.c.
740  */
741 static bool
742 OldSerXidPagePrecedesLogically(int p, int q)
743 {
744         int                     diff;
745
746         /*
747          * We have to compare modulo (OLDSERXID_MAX_PAGE+1)/2.  Both inputs should
748          * be in the range 0..OLDSERXID_MAX_PAGE.
749          */
750         Assert(p >= 0 && p <= OLDSERXID_MAX_PAGE);
751         Assert(q >= 0 && q <= OLDSERXID_MAX_PAGE);
752
753         diff = p - q;
754         if (diff >= ((OLDSERXID_MAX_PAGE + 1) / 2))
755                 diff -= OLDSERXID_MAX_PAGE + 1;
756         else if (diff < -((OLDSERXID_MAX_PAGE + 1) / 2))
757                 diff += OLDSERXID_MAX_PAGE + 1;
758         return diff < 0;
759 }
760
761 /*
762  * Initialize for the tracking of old serializable committed xids.
763  */
764 static void
765 OldSerXidInit(void)
766 {
767         bool            found;
768
769         /*
770          * Set up SLRU management of the pg_serial data.
771          */
772         OldSerXidSlruCtl->PagePrecedes = OldSerXidPagePrecedesLogically;
773         SimpleLruInit(OldSerXidSlruCtl, "OldSerXid SLRU Ctl",
774                                   NUM_OLDSERXID_BUFFERS, 0, OldSerXidLock, "pg_serial");
775         /* Override default assumption that writes should be fsync'd */
776         OldSerXidSlruCtl->do_fsync = false;
777
778         /*
779          * Create or attach to the OldSerXidControl structure.
780          */
781         oldSerXidControl = (OldSerXidControl)
782                 ShmemInitStruct("OldSerXidControlData", sizeof(OldSerXidControlData), &found);
783
784         if (!found)
785         {
786                 /*
787                  * Set control information to reflect empty SLRU.
788                  */
789                 oldSerXidControl->headPage = -1;
790                 oldSerXidControl->headXid = InvalidTransactionId;
791                 oldSerXidControl->tailXid = InvalidTransactionId;
792                 oldSerXidControl->warningIssued = false;
793         }
794 }
795
796 /*
797  * Record a committed read write serializable xid and the minimum
798  * commitSeqNo of any transactions to which this xid had a rw-conflict out.
799  * An invalid seqNo means that there were no conflicts out from xid.
800  */
801 static void
802 OldSerXidAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo)
803 {
804         TransactionId tailXid;
805         int                     targetPage;
806         int                     slotno;
807         int                     firstZeroPage;
808         bool            isNewPage;
809
810         Assert(TransactionIdIsValid(xid));
811
812         targetPage = OldSerXidPage(xid);
813
814         LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
815
816         /*
817          * If no serializable transactions are active, there shouldn't be anything
818          * to push out to the SLRU.  Hitting this assert would mean there's
819          * something wrong with the earlier cleanup logic.
820          */
821         tailXid = oldSerXidControl->tailXid;
822         Assert(TransactionIdIsValid(tailXid));
823
824         /*
825          * If the SLRU is currently unused, zero out the whole active region from
826          * tailXid to headXid before taking it into use. Otherwise zero out only
827          * any new pages that enter the tailXid-headXid range as we advance
828          * headXid.
829          */
830         if (oldSerXidControl->headPage < 0)
831         {
832                 firstZeroPage = OldSerXidPage(tailXid);
833                 isNewPage = true;
834         }
835         else
836         {
837                 firstZeroPage = OldSerXidNextPage(oldSerXidControl->headPage);
838                 isNewPage = OldSerXidPagePrecedesLogically(oldSerXidControl->headPage,
839                                                                                                    targetPage);
840         }
841
842         if (!TransactionIdIsValid(oldSerXidControl->headXid)
843                 || TransactionIdFollows(xid, oldSerXidControl->headXid))
844                 oldSerXidControl->headXid = xid;
845         if (isNewPage)
846                 oldSerXidControl->headPage = targetPage;
847
848         /*
849          * Give a warning if we're about to run out of SLRU pages.
850          *
851          * slru.c has a maximum of 64k segments, with 32 (SLRU_PAGES_PER_SEGMENT)
852          * pages each. We need to store a 64-bit integer for each Xid, and with
853          * default 8k block size, 65536*32 pages is only enough to cover 2^30
854          * XIDs. If we're about to hit that limit and wrap around, warn the user.
855          *
856          * To avoid spamming the user, we only give one warning when we've used 1
857          * billion XIDs, and stay silent until the situation is fixed and the
858          * number of XIDs used falls below 800 million again.
859          *
860          * XXX: We have no safeguard to actually *prevent* the wrap-around,
861          * though. All you get is a warning.
862          */
863         if (oldSerXidControl->warningIssued)
864         {
865                 TransactionId lowWatermark;
866
867                 lowWatermark = tailXid + 800000000;
868                 if (lowWatermark < FirstNormalTransactionId)
869                         lowWatermark = FirstNormalTransactionId;
870                 if (TransactionIdPrecedes(xid, lowWatermark))
871                         oldSerXidControl->warningIssued = false;
872         }
873         else
874         {
875                 TransactionId highWatermark;
876
877                 highWatermark = tailXid + 1000000000;
878                 if (highWatermark < FirstNormalTransactionId)
879                         highWatermark = FirstNormalTransactionId;
880                 if (TransactionIdFollows(xid, highWatermark))
881                 {
882                         oldSerXidControl->warningIssued = true;
883                         ereport(WARNING,
884                                         (errmsg("memory for serializable conflict tracking is nearly exhausted"),
885                                          errhint("There may be an idle transaction or a forgotten prepared transaction causing this.")));
886                 }
887         }
888
889         if (isNewPage)
890         {
891                 /* Initialize intervening pages. */
892                 while (firstZeroPage != targetPage)
893                 {
894                         (void) SimpleLruZeroPage(OldSerXidSlruCtl, firstZeroPage);
895                         firstZeroPage = OldSerXidNextPage(firstZeroPage);
896                 }
897                 slotno = SimpleLruZeroPage(OldSerXidSlruCtl, targetPage);
898         }
899         else
900                 slotno = SimpleLruReadPage(OldSerXidSlruCtl, targetPage, true, xid);
901
902         OldSerXidValue(slotno, xid) = minConflictCommitSeqNo;
903         OldSerXidSlruCtl->shared->page_dirty[slotno] = true;
904
905         LWLockRelease(OldSerXidLock);
906 }
907
908 /*
909  * Get the minimum commitSeqNo for any conflict out for the given xid.  For
910  * a transaction which exists but has no conflict out, InvalidSerCommitSeqNo
911  * will be returned.
912  */
913 static SerCommitSeqNo
914 OldSerXidGetMinConflictCommitSeqNo(TransactionId xid)
915 {
916         TransactionId headXid;
917         TransactionId tailXid;
918         SerCommitSeqNo val;
919         int                     slotno;
920
921         Assert(TransactionIdIsValid(xid));
922
923         LWLockAcquire(OldSerXidLock, LW_SHARED);
924         headXid = oldSerXidControl->headXid;
925         tailXid = oldSerXidControl->tailXid;
926         LWLockRelease(OldSerXidLock);
927
928         if (!TransactionIdIsValid(headXid))
929                 return 0;
930
931         Assert(TransactionIdIsValid(tailXid));
932
933         if (TransactionIdPrecedes(xid, tailXid)
934                 || TransactionIdFollows(xid, headXid))
935                 return 0;
936
937         /*
938          * The following function must be called without holding OldSerXidLock,
939          * but will return with that lock held, which must then be released.
940          */
941         slotno = SimpleLruReadPage_ReadOnly(OldSerXidSlruCtl,
942                                                                                 OldSerXidPage(xid), xid);
943         val = OldSerXidValue(slotno, xid);
944         LWLockRelease(OldSerXidLock);
945         return val;
946 }
947
948 /*
949  * Call this whenever there is a new xmin for active serializable
950  * transactions.  We don't need to keep information on transactions which
951  * precede that.  InvalidTransactionId means none active, so everything in
952  * the SLRU can be discarded.
953  */
954 static void
955 OldSerXidSetActiveSerXmin(TransactionId xid)
956 {
957         LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
958
959         /*
960          * When no sxacts are active, nothing overlaps, set the xid values to
961          * invalid to show that there are no valid entries.  Don't clear headPage,
962          * though.      A new xmin might still land on that page, and we don't want to
963          * repeatedly zero out the same page.
964          */
965         if (!TransactionIdIsValid(xid))
966         {
967                 oldSerXidControl->tailXid = InvalidTransactionId;
968                 oldSerXidControl->headXid = InvalidTransactionId;
969                 LWLockRelease(OldSerXidLock);
970                 return;
971         }
972
973         /*
974          * When we're recovering prepared transactions, the global xmin might move
975          * backwards depending on the order they're recovered. Normally that's not
976          * OK, but during recovery no serializable transactions will commit, so
977          * the SLRU is empty and we can get away with it.
978          */
979         if (RecoveryInProgress())
980         {
981                 Assert(oldSerXidControl->headPage < 0);
982                 if (!TransactionIdIsValid(oldSerXidControl->tailXid)
983                         || TransactionIdPrecedes(xid, oldSerXidControl->tailXid))
984                 {
985                         oldSerXidControl->tailXid = xid;
986                 }
987                 LWLockRelease(OldSerXidLock);
988                 return;
989         }
990
991         Assert(!TransactionIdIsValid(oldSerXidControl->tailXid)
992                    || TransactionIdFollows(xid, oldSerXidControl->tailXid));
993
994         oldSerXidControl->tailXid = xid;
995
996         LWLockRelease(OldSerXidLock);
997 }
998
999 /*
1000  * Perform a checkpoint --- either during shutdown, or on-the-fly
1001  *
1002  * We don't have any data that needs to survive a restart, but this is a
1003  * convenient place to truncate the SLRU.
1004  */
1005 void
1006 CheckPointPredicate(void)
1007 {
1008         int                     tailPage;
1009
1010         LWLockAcquire(OldSerXidLock, LW_EXCLUSIVE);
1011
1012         /* Exit quickly if the SLRU is currently not in use. */
1013         if (oldSerXidControl->headPage < 0)
1014         {
1015                 LWLockRelease(OldSerXidLock);
1016                 return;
1017         }
1018
1019         if (TransactionIdIsValid(oldSerXidControl->tailXid))
1020         {
1021                 /* We can truncate the SLRU up to the page containing tailXid */
1022                 tailPage = OldSerXidPage(oldSerXidControl->tailXid);
1023         }
1024         else
1025         {
1026                 /*
1027                  * The SLRU is no longer needed. Truncate to head before we set head
1028                  * invalid.
1029                  *
1030                  * XXX: It's possible that the SLRU is not needed again until XID
1031                  * wrap-around has happened, so that the segment containing headPage
1032                  * that we leave behind will appear to be new again. In that case it
1033                  * won't be removed until XID horizon advances enough to make it
1034                  * current again.
1035                  */
1036                 tailPage = oldSerXidControl->headPage;
1037                 oldSerXidControl->headPage = -1;
1038         }
1039
1040         LWLockRelease(OldSerXidLock);
1041
1042         /* Truncate away pages that are no longer required */
1043         SimpleLruTruncate(OldSerXidSlruCtl, tailPage);
1044
1045         /*
1046          * Flush dirty SLRU pages to disk
1047          *
1048          * This is not actually necessary from a correctness point of view. We do
1049          * it merely as a debugging aid.
1050          *
1051          * We're doing this after the truncation to avoid writing pages right
1052          * before deleting the file in which they sit, which would be completely
1053          * pointless.
1054          */
1055         SimpleLruFlush(OldSerXidSlruCtl, true);
1056 }
1057
1058 /*------------------------------------------------------------------------*/
1059
1060 /*
1061  * InitPredicateLocks -- Initialize the predicate locking data structures.
1062  *
1063  * This is called from CreateSharedMemoryAndSemaphores(), which see for
1064  * more comments.  In the normal postmaster case, the shared hash tables
1065  * are created here.  Backends inherit the pointers
1066  * to the shared tables via fork().  In the EXEC_BACKEND case, each
1067  * backend re-executes this code to obtain pointers to the already existing
1068  * shared hash tables.
1069  */
1070 void
1071 InitPredicateLocks(void)
1072 {
1073         HASHCTL         info;
1074         int                     hash_flags;
1075         long            max_table_size;
1076         Size            requestSize;
1077         bool            found;
1078
1079         /*
1080          * Compute size of predicate lock target hashtable. Note these
1081          * calculations must agree with PredicateLockShmemSize!
1082          */
1083         max_table_size = NPREDICATELOCKTARGETENTS();
1084
1085         /*
1086          * Allocate hash table for PREDICATELOCKTARGET structs.  This stores
1087          * per-predicate-lock-target information.
1088          */
1089         MemSet(&info, 0, sizeof(info));
1090         info.keysize = sizeof(PREDICATELOCKTARGETTAG);
1091         info.entrysize = sizeof(PREDICATELOCKTARGET);
1092         info.hash = tag_hash;
1093         info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
1094         hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE);
1095
1096         PredicateLockTargetHash = ShmemInitHash("PREDICATELOCKTARGET hash",
1097                                                                                         max_table_size,
1098                                                                                         max_table_size,
1099                                                                                         &info,
1100                                                                                         hash_flags);
1101
1102         /* Assume an average of 2 xacts per target */
1103         max_table_size *= 2;
1104
1105         /*
1106          * Reserve a dummy entry in the hash table; we use it to make sure there's
1107          * always one entry available when we need to split or combine a page,
1108          * because running out of space there could mean aborting a
1109          * non-serializable transaction.
1110          */
1111         hash_search(PredicateLockTargetHash, &ScratchTargetTag, HASH_ENTER, NULL);
1112
1113         /*
1114          * Allocate hash table for PREDICATELOCK structs.  This stores per
1115          * xact-lock-of-a-target information.
1116          */
1117         MemSet(&info, 0, sizeof(info));
1118         info.keysize = sizeof(PREDICATELOCKTAG);
1119         info.entrysize = sizeof(PREDICATELOCK);
1120         info.hash = predicatelock_hash;
1121         info.num_partitions = NUM_PREDICATELOCK_PARTITIONS;
1122         hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION | HASH_FIXED_SIZE);
1123
1124         PredicateLockHash = ShmemInitHash("PREDICATELOCK hash",
1125                                                                           max_table_size,
1126                                                                           max_table_size,
1127                                                                           &info,
1128                                                                           hash_flags);
1129
1130         /*
1131          * Compute size for serializable transaction hashtable. Note these
1132          * calculations must agree with PredicateLockShmemSize!
1133          */
1134         max_table_size = (MaxBackends + max_prepared_xacts);
1135
1136         /*
1137          * Allocate a list to hold information on transactions participating in
1138          * predicate locking.
1139          *
1140          * Assume an average of 10 predicate locking transactions per backend.
1141          * This allows aggressive cleanup while detail is present before data must
1142          * be summarized for storage in SLRU and the "dummy" transaction.
1143          */
1144         max_table_size *= 10;
1145
1146         PredXact = ShmemInitStruct("PredXactList",
1147                                                            PredXactListDataSize,
1148                                                            &found);
1149         if (!found)
1150         {
1151                 int                     i;
1152
1153                 SHMQueueInit(&PredXact->availableList);
1154                 SHMQueueInit(&PredXact->activeList);
1155                 PredXact->SxactGlobalXmin = InvalidTransactionId;
1156                 PredXact->SxactGlobalXminCount = 0;
1157                 PredXact->WritableSxactCount = 0;
1158                 PredXact->LastSxactCommitSeqNo = FirstNormalSerCommitSeqNo - 1;
1159                 PredXact->CanPartialClearThrough = 0;
1160                 PredXact->HavePartialClearedThrough = 0;
1161                 requestSize = mul_size((Size) max_table_size,
1162                                                            PredXactListElementDataSize);
1163                 PredXact->element = ShmemAlloc(requestSize);
1164                 if (PredXact->element == NULL)
1165                         ereport(ERROR,
1166                                         (errcode(ERRCODE_OUT_OF_MEMORY),
1167                          errmsg("not enough shared memory for elements of data structure"
1168                                         " \"%s\" (%lu bytes requested)",
1169                                         "PredXactList", (unsigned long) requestSize)));
1170                 /* Add all elements to available list, clean. */
1171                 memset(PredXact->element, 0, requestSize);
1172                 for (i = 0; i < max_table_size; i++)
1173                 {
1174                         SHMQueueInsertBefore(&(PredXact->availableList),
1175                                                                  &(PredXact->element[i].link));
1176                 }
1177                 PredXact->OldCommittedSxact = CreatePredXact();
1178                 SetInvalidVirtualTransactionId(PredXact->OldCommittedSxact->vxid);
1179                 PredXact->OldCommittedSxact->commitSeqNo = 0;
1180                 PredXact->OldCommittedSxact->SeqNo.lastCommitBeforeSnapshot = 0;
1181                 SHMQueueInit(&PredXact->OldCommittedSxact->outConflicts);
1182                 SHMQueueInit(&PredXact->OldCommittedSxact->inConflicts);
1183                 SHMQueueInit(&PredXact->OldCommittedSxact->predicateLocks);
1184                 SHMQueueInit(&PredXact->OldCommittedSxact->finishedLink);
1185                 SHMQueueInit(&PredXact->OldCommittedSxact->possibleUnsafeConflicts);
1186                 PredXact->OldCommittedSxact->topXid = InvalidTransactionId;
1187                 PredXact->OldCommittedSxact->finishedBefore = InvalidTransactionId;
1188                 PredXact->OldCommittedSxact->xmin = InvalidTransactionId;
1189                 PredXact->OldCommittedSxact->flags = SXACT_FLAG_COMMITTED;
1190                 PredXact->OldCommittedSxact->pid = 0;
1191         }
1192         /* This never changes, so let's keep a local copy. */
1193         OldCommittedSxact = PredXact->OldCommittedSxact;
1194
1195         /*
1196          * Allocate hash table for SERIALIZABLEXID structs.  This stores per-xid
1197          * information for serializable transactions which have accessed data.
1198          */
1199         MemSet(&info, 0, sizeof(info));
1200         info.keysize = sizeof(SERIALIZABLEXIDTAG);
1201         info.entrysize = sizeof(SERIALIZABLEXID);
1202         info.hash = tag_hash;
1203         hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_FIXED_SIZE);
1204
1205         SerializableXidHash = ShmemInitHash("SERIALIZABLEXID hash",
1206                                                                                 max_table_size,
1207                                                                                 max_table_size,
1208                                                                                 &info,
1209                                                                                 hash_flags);
1210
1211         /*
1212          * Allocate space for tracking rw-conflicts in lists attached to the
1213          * transactions.
1214          *
1215          * Assume an average of 5 conflicts per transaction.  Calculations suggest
1216          * that this will prevent resource exhaustion in even the most pessimal
1217          * loads up to max_connections = 200 with all 200 connections pounding the
1218          * database with serializable transactions.  Beyond that, there may be
1219          * occassional transactions canceled when trying to flag conflicts. That's
1220          * probably OK.
1221          */
1222         max_table_size *= 5;
1223
1224         RWConflictPool = ShmemInitStruct("RWConflictPool",
1225                                                                          RWConflictPoolHeaderDataSize,
1226                                                                          &found);
1227         if (!found)
1228         {
1229                 int                     i;
1230
1231                 SHMQueueInit(&RWConflictPool->availableList);
1232                 requestSize = mul_size((Size) max_table_size,
1233                                                            RWConflictDataSize);
1234                 RWConflictPool->element = ShmemAlloc(requestSize);
1235                 if (RWConflictPool->element == NULL)
1236                         ereport(ERROR,
1237                                         (errcode(ERRCODE_OUT_OF_MEMORY),
1238                          errmsg("not enough shared memory for elements of data structure"
1239                                         " \"%s\" (%lu bytes requested)",
1240                                         "RWConflictPool", (unsigned long) requestSize)));
1241                 /* Add all elements to available list, clean. */
1242                 memset(RWConflictPool->element, 0, requestSize);
1243                 for (i = 0; i < max_table_size; i++)
1244                 {
1245                         SHMQueueInsertBefore(&(RWConflictPool->availableList),
1246                                                                  &(RWConflictPool->element[i].outLink));
1247                 }
1248         }
1249
1250         /*
1251          * Create or attach to the header for the list of finished serializable
1252          * transactions.
1253          */
1254         FinishedSerializableTransactions = (SHM_QUEUE *)
1255                 ShmemInitStruct("FinishedSerializableTransactions",
1256                                                 sizeof(SHM_QUEUE),
1257                                                 &found);
1258         if (!found)
1259                 SHMQueueInit(FinishedSerializableTransactions);
1260
1261         /*
1262          * Initialize the SLRU storage for old committed serializable
1263          * transactions.
1264          */
1265         OldSerXidInit();
1266
1267         /* Pre-calculate the hash and partition lock of the scratch entry */
1268         ScratchTargetTagHash = PredicateLockTargetTagHashCode(&ScratchTargetTag);
1269         ScratchPartitionLock = PredicateLockHashPartitionLock(ScratchTargetTagHash);
1270 }
1271
1272 /*
1273  * Estimate shared-memory space used for predicate lock table
1274  */
1275 Size
1276 PredicateLockShmemSize(void)
1277 {
1278         Size            size = 0;
1279         long            max_table_size;
1280
1281         /* predicate lock target hash table */
1282         max_table_size = NPREDICATELOCKTARGETENTS();
1283         size = add_size(size, hash_estimate_size(max_table_size,
1284                                                                                          sizeof(PREDICATELOCKTARGET)));
1285
1286         /* predicate lock hash table */
1287         max_table_size *= 2;
1288         size = add_size(size, hash_estimate_size(max_table_size,
1289                                                                                          sizeof(PREDICATELOCK)));
1290
1291         /*
1292          * Since NPREDICATELOCKTARGETENTS is only an estimate, add 10% safety
1293          * margin.
1294          */
1295         size = add_size(size, size / 10);
1296
1297         /* transaction list */
1298         max_table_size = MaxBackends + max_prepared_xacts;
1299         max_table_size *= 10;
1300         size = add_size(size, PredXactListDataSize);
1301         size = add_size(size, mul_size((Size) max_table_size,
1302                                                                    PredXactListElementDataSize));
1303
1304         /* transaction xid table */
1305         size = add_size(size, hash_estimate_size(max_table_size,
1306                                                                                          sizeof(SERIALIZABLEXID)));
1307
1308         /* rw-conflict pool */
1309         max_table_size *= 5;
1310         size = add_size(size, RWConflictPoolHeaderDataSize);
1311         size = add_size(size, mul_size((Size) max_table_size,
1312                                                                    RWConflictDataSize));
1313
1314         /* Head for list of finished serializable transactions. */
1315         size = add_size(size, sizeof(SHM_QUEUE));
1316
1317         /* Shared memory structures for SLRU tracking of old committed xids. */
1318         size = add_size(size, sizeof(OldSerXidControlData));
1319         size = add_size(size, SimpleLruShmemSize(NUM_OLDSERXID_BUFFERS, 0));
1320
1321         return size;
1322 }
1323
1324
1325 /*
1326  * Compute the hash code associated with a PREDICATELOCKTAG.
1327  *
1328  * Because we want to use just one set of partition locks for both the
1329  * PREDICATELOCKTARGET and PREDICATELOCK hash tables, we have to make sure
1330  * that PREDICATELOCKs fall into the same partition number as their
1331  * associated PREDICATELOCKTARGETs.  dynahash.c expects the partition number
1332  * to be the low-order bits of the hash code, and therefore a
1333  * PREDICATELOCKTAG's hash code must have the same low-order bits as the
1334  * associated PREDICATELOCKTARGETTAG's hash code.  We achieve this with this
1335  * specialized hash function.
1336  */
1337 static uint32
1338 predicatelock_hash(const void *key, Size keysize)
1339 {
1340         const PREDICATELOCKTAG *predicatelocktag = (const PREDICATELOCKTAG *) key;
1341         uint32          targethash;
1342
1343         Assert(keysize == sizeof(PREDICATELOCKTAG));
1344
1345         /* Look into the associated target object, and compute its hash code */
1346         targethash = PredicateLockTargetTagHashCode(&predicatelocktag->myTarget->tag);
1347
1348         return PredicateLockHashCodeFromTargetHashCode(predicatelocktag, targethash);
1349 }
1350
1351
1352 /*
1353  * GetPredicateLockStatusData
1354  *              Return a table containing the internal state of the predicate
1355  *              lock manager for use in pg_lock_status.
1356  *
1357  * Like GetLockStatusData, this function tries to hold the partition LWLocks
1358  * for as short a time as possible by returning two arrays that simply
1359  * contain the PREDICATELOCKTARGETTAG and SERIALIZABLEXACT for each lock
1360  * table entry. Multiple copies of the same PREDICATELOCKTARGETTAG and
1361  * SERIALIZABLEXACT will likely appear.
1362  */
1363 PredicateLockData *
1364 GetPredicateLockStatusData(void)
1365 {
1366         PredicateLockData *data;
1367         int                     i;
1368         int                     els,
1369                                 el;
1370         HASH_SEQ_STATUS seqstat;
1371         PREDICATELOCK *predlock;
1372
1373         data = (PredicateLockData *) palloc(sizeof(PredicateLockData));
1374
1375         /*
1376          * To ensure consistency, take simultaneous locks on all partition locks
1377          * in ascending order, then SerializableXactHashLock.
1378          */
1379         for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
1380                 LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
1381         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
1382
1383         /* Get number of locks and allocate appropriately-sized arrays. */
1384         els = hash_get_num_entries(PredicateLockHash);
1385         data->nelements = els;
1386         data->locktags = (PREDICATELOCKTARGETTAG *)
1387                 palloc(sizeof(PREDICATELOCKTARGETTAG) * els);
1388         data->xacts = (SERIALIZABLEXACT *)
1389                 palloc(sizeof(SERIALIZABLEXACT) * els);
1390
1391
1392         /* Scan through PredicateLockHash and copy contents */
1393         hash_seq_init(&seqstat, PredicateLockHash);
1394
1395         el = 0;
1396
1397         while ((predlock = (PREDICATELOCK *) hash_seq_search(&seqstat)))
1398         {
1399                 data->locktags[el] = predlock->tag.myTarget->tag;
1400                 data->xacts[el] = *predlock->tag.myXact;
1401                 el++;
1402         }
1403
1404         Assert(el == els);
1405
1406         /* Release locks in reverse order */
1407         LWLockRelease(SerializableXactHashLock);
1408         for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
1409                 LWLockRelease(FirstPredicateLockMgrLock + i);
1410
1411         return data;
1412 }
1413
1414 /*
1415  * Free up shared memory structures by pushing the oldest sxact (the one at
1416  * the front of the SummarizeOldestCommittedSxact queue) into summary form.
1417  * Each call will free exactly one SERIALIZABLEXACT structure and may also
1418  * free one or more of these structures: SERIALIZABLEXID, PREDICATELOCK,
1419  * PREDICATELOCKTARGET, RWConflictData.
1420  */
1421 static void
1422 SummarizeOldestCommittedSxact(void)
1423 {
1424         SERIALIZABLEXACT *sxact;
1425
1426         LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
1427
1428         /*
1429          * This function is only called if there are no sxact slots available.
1430          * Some of them must belong to old, already-finished transactions, so
1431          * there should be something in FinishedSerializableTransactions list that
1432          * we can summarize. However, there's a race condition: while we were not
1433          * holding any locks, a transaction might have ended and cleaned up all
1434          * the finished sxact entries already, freeing up their sxact slots. In
1435          * that case, we have nothing to do here. The caller will find one of the
1436          * slots released by the other backend when it retries.
1437          */
1438         if (SHMQueueEmpty(FinishedSerializableTransactions))
1439         {
1440                 LWLockRelease(SerializableFinishedListLock);
1441                 return;
1442         }
1443
1444         /*
1445          * Grab the first sxact off the finished list -- this will be the earliest
1446          * commit.      Remove it from the list.
1447          */
1448         sxact = (SERIALIZABLEXACT *)
1449                 SHMQueueNext(FinishedSerializableTransactions,
1450                                          FinishedSerializableTransactions,
1451                                          offsetof(SERIALIZABLEXACT, finishedLink));
1452         SHMQueueDelete(&(sxact->finishedLink));
1453
1454         /* Add to SLRU summary information. */
1455         if (TransactionIdIsValid(sxact->topXid) && !SxactIsReadOnly(sxact))
1456                 OldSerXidAdd(sxact->topXid, SxactHasConflictOut(sxact)
1457                    ? sxact->SeqNo.earliestOutConflictCommit : InvalidSerCommitSeqNo);
1458
1459         /* Summarize and release the detail. */
1460         ReleaseOneSerializableXact(sxact, false, true);
1461
1462         LWLockRelease(SerializableFinishedListLock);
1463 }
1464
1465 /*
1466  * GetSafeSnapshot
1467  *              Obtain and register a snapshot for a READ ONLY DEFERRABLE
1468  *              transaction. Ensures that the snapshot is "safe", i.e. a
1469  *              read-only transaction running on it can execute serializably
1470  *              without further checks. This requires waiting for concurrent
1471  *              transactions to complete, and retrying with a new snapshot if
1472  *              one of them could possibly create a conflict.
1473  */
1474 static Snapshot
1475 GetSafeSnapshot(Snapshot origSnapshot)
1476 {
1477         Snapshot        snapshot;
1478
1479         Assert(XactReadOnly && XactDeferrable);
1480
1481         while (true)
1482         {
1483                 /*
1484                  * RegisterSerializableTransactionInt is going to call
1485                  * GetSnapshotData, so we need to provide it the static snapshot our
1486                  * caller passed to us. It returns a copy of that snapshot and
1487                  * registers it on TopTransactionResourceOwner.
1488                  */
1489                 snapshot = RegisterSerializableTransactionInt(origSnapshot);
1490
1491                 if (MySerializableXact == InvalidSerializableXact)
1492                         return snapshot;        /* no concurrent r/w xacts; it's safe */
1493
1494                 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1495
1496                 /*
1497                  * Wait for concurrent transactions to finish. Stop early if one of
1498                  * them marked us as conflicted.
1499                  */
1500                 MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
1501                 while (!(SHMQueueEmpty(&MySerializableXact->possibleUnsafeConflicts) ||
1502                                  SxactIsROUnsafe(MySerializableXact)))
1503                 {
1504                         LWLockRelease(SerializableXactHashLock);
1505                         ProcWaitForSignal();
1506                         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1507                 }
1508                 MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
1509
1510                 if (!SxactIsROUnsafe(MySerializableXact))
1511                 {
1512                         LWLockRelease(SerializableXactHashLock);
1513                         break;                          /* success */
1514                 }
1515
1516                 LWLockRelease(SerializableXactHashLock);
1517
1518                 /* else, need to retry... */
1519                 ereport(DEBUG2,
1520                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
1521                                  errmsg("deferrable snapshot was unsafe; trying a new one")));
1522                 ReleasePredicateLocks(false);
1523                 UnregisterSnapshotFromOwner(snapshot,
1524                                                                         TopTransactionResourceOwner);
1525         }
1526
1527         /*
1528          * Now we have a safe snapshot, so we don't need to do any further checks.
1529          */
1530         Assert(SxactIsROSafe(MySerializableXact));
1531         ReleasePredicateLocks(false);
1532
1533         return snapshot;
1534 }
1535
1536 /*
1537  * Acquire and register a snapshot which can be used for this transaction..
1538  * Make sure we have a SERIALIZABLEXACT reference in MySerializableXact.
1539  * It should be current for this process and be contained in PredXact.
1540  */
1541 Snapshot
1542 RegisterSerializableTransaction(Snapshot snapshot)
1543 {
1544         Assert(IsolationIsSerializable());
1545
1546         /*
1547          * A special optimization is available for SERIALIZABLE READ ONLY
1548          * DEFERRABLE transactions -- we can wait for a suitable snapshot and
1549          * thereby avoid all SSI overhead once it's running..
1550          */
1551         if (XactReadOnly && XactDeferrable)
1552                 return GetSafeSnapshot(snapshot);
1553
1554         return RegisterSerializableTransactionInt(snapshot);
1555 }
1556
1557 static Snapshot
1558 RegisterSerializableTransactionInt(Snapshot snapshot)
1559 {
1560         PGPROC     *proc;
1561         VirtualTransactionId vxid;
1562         SERIALIZABLEXACT *sxact,
1563                            *othersxact;
1564         HASHCTL         hash_ctl;
1565
1566         /* We only do this for serializable transactions.  Once. */
1567         Assert(MySerializableXact == InvalidSerializableXact);
1568
1569         Assert(!RecoveryInProgress());
1570
1571         proc = MyProc;
1572         Assert(proc != NULL);
1573         GET_VXID_FROM_PGPROC(vxid, *proc);
1574
1575         /*
1576          * First we get the sxact structure, which may involve looping and access
1577          * to the "finished" list to free a structure for use.
1578          */
1579 #ifdef TEST_OLDSERXID
1580         SummarizeOldestCommittedSxact();
1581 #endif
1582         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1583         do
1584         {
1585                 sxact = CreatePredXact();
1586                 /* If null, push out committed sxact to SLRU summary & retry. */
1587                 if (!sxact)
1588                 {
1589                         LWLockRelease(SerializableXactHashLock);
1590                         SummarizeOldestCommittedSxact();
1591                         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1592                 }
1593         } while (!sxact);
1594
1595         /* Get and register a snapshot */
1596         snapshot = GetSnapshotData(snapshot);
1597         snapshot = RegisterSnapshotOnOwner(snapshot, TopTransactionResourceOwner);
1598
1599         /*
1600          * If there are no serializable transactions which are not read-only, we
1601          * can "opt out" of predicate locking and conflict checking for a
1602          * read-only transaction.
1603          *
1604          * The reason this is safe is that a read-only transaction can only become
1605          * part of a dangerous structure if it overlaps a writable transaction
1606          * which in turn overlaps a writable transaction which committed before
1607          * the read-only transaction started.  A new writable transaction can
1608          * overlap this one, but it can't meet the other condition of overlapping
1609          * a transaction which committed before this one started.
1610          */
1611         if (XactReadOnly && PredXact->WritableSxactCount == 0)
1612         {
1613                 ReleasePredXact(sxact);
1614                 LWLockRelease(SerializableXactHashLock);
1615                 return snapshot;
1616         }
1617
1618         /* Maintain serializable global xmin info. */
1619         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
1620         {
1621                 Assert(PredXact->SxactGlobalXminCount == 0);
1622                 PredXact->SxactGlobalXmin = snapshot->xmin;
1623                 PredXact->SxactGlobalXminCount = 1;
1624                 OldSerXidSetActiveSerXmin(snapshot->xmin);
1625         }
1626         else if (TransactionIdEquals(snapshot->xmin, PredXact->SxactGlobalXmin))
1627         {
1628                 Assert(PredXact->SxactGlobalXminCount > 0);
1629                 PredXact->SxactGlobalXminCount++;
1630         }
1631         else
1632         {
1633                 Assert(TransactionIdFollows(snapshot->xmin, PredXact->SxactGlobalXmin));
1634         }
1635
1636         /* Initialize the structure. */
1637         sxact->vxid = vxid;
1638         sxact->SeqNo.lastCommitBeforeSnapshot = PredXact->LastSxactCommitSeqNo;
1639         sxact->commitSeqNo = InvalidSerCommitSeqNo;
1640         SHMQueueInit(&(sxact->outConflicts));
1641         SHMQueueInit(&(sxact->inConflicts));
1642         SHMQueueInit(&(sxact->possibleUnsafeConflicts));
1643         sxact->topXid = GetTopTransactionIdIfAny();
1644         sxact->finishedBefore = InvalidTransactionId;
1645         sxact->xmin = snapshot->xmin;
1646         sxact->pid = MyProcPid;
1647         SHMQueueInit(&(sxact->predicateLocks));
1648         SHMQueueElemInit(&(sxact->finishedLink));
1649         sxact->flags = 0;
1650         if (XactReadOnly)
1651         {
1652                 sxact->flags |= SXACT_FLAG_READ_ONLY;
1653
1654                 /*
1655                  * Register all concurrent r/w transactions as possible conflicts; if
1656                  * all of them commit without any outgoing conflicts to earlier
1657                  * transactions then this snapshot can be deemed safe (and we can run
1658                  * without tracking predicate locks).
1659                  */
1660                 for (othersxact = FirstPredXact();
1661                          othersxact != NULL;
1662                          othersxact = NextPredXact(othersxact))
1663                 {
1664                         if (!SxactIsOnFinishedList(othersxact) &&
1665                                 !SxactIsReadOnly(othersxact))
1666                         {
1667                                 SetPossibleUnsafeConflict(sxact, othersxact);
1668                         }
1669                 }
1670         }
1671         else
1672         {
1673                 ++(PredXact->WritableSxactCount);
1674                 Assert(PredXact->WritableSxactCount <=
1675                            (MaxBackends + max_prepared_xacts));
1676         }
1677
1678         MySerializableXact = sxact;
1679         MyXactDidWrite = false;         /* haven't written anything yet */
1680
1681         LWLockRelease(SerializableXactHashLock);
1682
1683         /* Initialize the backend-local hash table of parent locks */
1684         Assert(LocalPredicateLockHash == NULL);
1685         MemSet(&hash_ctl, 0, sizeof(hash_ctl));
1686         hash_ctl.keysize = sizeof(PREDICATELOCKTARGETTAG);
1687         hash_ctl.entrysize = sizeof(LOCALPREDICATELOCK);
1688         hash_ctl.hash = tag_hash;
1689         LocalPredicateLockHash = hash_create("Local predicate lock",
1690                                                                                  max_predicate_locks_per_xact,
1691                                                                                  &hash_ctl,
1692                                                                                  HASH_ELEM | HASH_FUNCTION);
1693
1694         return snapshot;
1695 }
1696
1697 /*
1698  * Register the top level XID in SerializableXidHash.
1699  * Also store it for easy reference in MySerializableXact.
1700  */
1701 void
1702 RegisterPredicateLockingXid(TransactionId xid)
1703 {
1704         SERIALIZABLEXIDTAG sxidtag;
1705         SERIALIZABLEXID *sxid;
1706         bool            found;
1707
1708         /*
1709          * If we're not tracking predicate lock data for this transaction, we
1710          * should ignore the request and return quickly.
1711          */
1712         if (MySerializableXact == InvalidSerializableXact)
1713                 return;
1714
1715         /* We should have a valid XID and be at the top level. */
1716         Assert(TransactionIdIsValid(xid));
1717
1718         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
1719
1720         /* This should only be done once per transaction. */
1721         Assert(MySerializableXact->topXid == InvalidTransactionId);
1722
1723         MySerializableXact->topXid = xid;
1724
1725         sxidtag.xid = xid;
1726         sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
1727                                                                                    &sxidtag,
1728                                                                                    HASH_ENTER, &found);
1729         Assert(!found);
1730
1731         /* Initialize the structure. */
1732         sxid->myXact = MySerializableXact;
1733         LWLockRelease(SerializableXactHashLock);
1734 }
1735
1736
1737 /*
1738  * Check whether there are any predicate locks held by any transaction
1739  * for the page at the given block number.
1740  *
1741  * Note that the transaction may be completed but not yet subject to
1742  * cleanup due to overlapping serializable transactions.  This must
1743  * return valid information regardless of transaction isolation level.
1744  *
1745  * Also note that this doesn't check for a conflicting relation lock,
1746  * just a lock specifically on the given page.
1747  *
1748  * One use is to support proper behavior during GiST index vacuum.
1749  */
1750 bool
1751 PageIsPredicateLocked(Relation relation, BlockNumber blkno)
1752 {
1753         PREDICATELOCKTARGETTAG targettag;
1754         uint32          targettaghash;
1755         LWLockId        partitionLock;
1756         PREDICATELOCKTARGET *target;
1757
1758         SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
1759                                                                         relation->rd_node.dbNode,
1760                                                                         relation->rd_id,
1761                                                                         blkno);
1762
1763         targettaghash = PredicateLockTargetTagHashCode(&targettag);
1764         partitionLock = PredicateLockHashPartitionLock(targettaghash);
1765         LWLockAcquire(partitionLock, LW_SHARED);
1766         target = (PREDICATELOCKTARGET *)
1767                 hash_search_with_hash_value(PredicateLockTargetHash,
1768                                                                         &targettag, targettaghash,
1769                                                                         HASH_FIND, NULL);
1770         LWLockRelease(partitionLock);
1771
1772         return (target != NULL);
1773 }
1774
1775
1776 /*
1777  * Check whether a particular lock is held by this transaction.
1778  *
1779  * Important note: this function may return false even if the lock is
1780  * being held, because it uses the local lock table which is not
1781  * updated if another transaction modifies our lock list (e.g. to
1782  * split an index page). It can also return true when a coarser
1783  * granularity lock that covers this target is being held. Be careful
1784  * to only use this function in circumstances where such errors are
1785  * acceptable!
1786  */
1787 static bool
1788 PredicateLockExists(const PREDICATELOCKTARGETTAG *targettag)
1789 {
1790         LOCALPREDICATELOCK *lock;
1791
1792         /* check local hash table */
1793         lock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
1794                                                                                           targettag,
1795                                                                                           HASH_FIND, NULL);
1796
1797         if (!lock)
1798                 return false;
1799
1800         /*
1801          * Found entry in the table, but still need to check whether it's actually
1802          * held -- it could just be a parent of some held lock.
1803          */
1804         return lock->held;
1805 }
1806
1807 /*
1808  * Return the parent lock tag in the lock hierarchy: the next coarser
1809  * lock that covers the provided tag.
1810  *
1811  * Returns true and sets *parent to the parent tag if one exists,
1812  * returns false if none exists.
1813  */
1814 static bool
1815 GetParentPredicateLockTag(const PREDICATELOCKTARGETTAG *tag,
1816                                                   PREDICATELOCKTARGETTAG *parent)
1817 {
1818         switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
1819         {
1820                 case PREDLOCKTAG_RELATION:
1821                         /* relation locks have no parent lock */
1822                         return false;
1823
1824                 case PREDLOCKTAG_PAGE:
1825                         /* parent lock is relation lock */
1826                         SET_PREDICATELOCKTARGETTAG_RELATION(*parent,
1827                                                                                  GET_PREDICATELOCKTARGETTAG_DB(*tag),
1828                                                                   GET_PREDICATELOCKTARGETTAG_RELATION(*tag));
1829
1830                         return true;
1831
1832                 case PREDLOCKTAG_TUPLE:
1833                         /* parent lock is page lock */
1834                         SET_PREDICATELOCKTARGETTAG_PAGE(*parent,
1835                                                                                  GET_PREDICATELOCKTARGETTAG_DB(*tag),
1836                                                                    GET_PREDICATELOCKTARGETTAG_RELATION(*tag),
1837                                                                           GET_PREDICATELOCKTARGETTAG_PAGE(*tag));
1838                         return true;
1839         }
1840
1841         /* not reachable */
1842         Assert(false);
1843         return false;
1844 }
1845
1846 /*
1847  * Check whether the lock we are considering is already covered by a
1848  * coarser lock for our transaction.
1849  *
1850  * Like PredicateLockExists, this function might return a false
1851  * negative, but it will never return a false positive.
1852  */
1853 static bool
1854 CoarserLockCovers(const PREDICATELOCKTARGETTAG *newtargettag)
1855 {
1856         PREDICATELOCKTARGETTAG targettag,
1857                                 parenttag;
1858
1859         targettag = *newtargettag;
1860
1861         /* check parents iteratively until no more */
1862         while (GetParentPredicateLockTag(&targettag, &parenttag))
1863         {
1864                 targettag = parenttag;
1865                 if (PredicateLockExists(&targettag))
1866                         return true;
1867         }
1868
1869         /* no more parents to check; lock is not covered */
1870         return false;
1871 }
1872
1873 /*
1874  * Remove the dummy entry from the predicate lock target hash, to free up some
1875  * scratch space. The caller must be holding SerializablePredicateLockListLock,
1876  * and must restore the entry with RestoreScratchTarget() before releasing the
1877  * lock.
1878  *
1879  * If lockheld is true, the caller is already holding the partition lock
1880  * of the partition containing the scratch entry.
1881  */
1882 static void
1883 RemoveScratchTarget(bool lockheld)
1884 {
1885         bool            found;
1886
1887         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
1888
1889         if (!lockheld)
1890                 LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
1891         hash_search_with_hash_value(PredicateLockTargetHash,
1892                                                                 &ScratchTargetTag,
1893                                                                 ScratchTargetTagHash,
1894                                                                 HASH_REMOVE, &found);
1895         Assert(found);
1896         if (!lockheld)
1897                 LWLockRelease(ScratchPartitionLock);
1898 }
1899
1900 /*
1901  * Re-insert the dummy entry in predicate lock target hash.
1902  */
1903 static void
1904 RestoreScratchTarget(bool lockheld)
1905 {
1906         bool            found;
1907
1908         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
1909
1910         if (!lockheld)
1911                 LWLockAcquire(ScratchPartitionLock, LW_EXCLUSIVE);
1912         hash_search_with_hash_value(PredicateLockTargetHash,
1913                                                                 &ScratchTargetTag,
1914                                                                 ScratchTargetTagHash,
1915                                                                 HASH_ENTER, &found);
1916         Assert(!found);
1917         if (!lockheld)
1918                 LWLockRelease(ScratchPartitionLock);
1919 }
1920
1921 /*
1922  * Check whether the list of related predicate locks is empty for a
1923  * predicate lock target, and remove the target if it is.
1924  */
1925 static void
1926 RemoveTargetIfNoLongerUsed(PREDICATELOCKTARGET *target, uint32 targettaghash)
1927 {
1928         PREDICATELOCKTARGET *rmtarget;
1929
1930         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
1931
1932         /* Can't remove it until no locks at this target. */
1933         if (!SHMQueueEmpty(&target->predicateLocks))
1934                 return;
1935
1936         /* Actually remove the target. */
1937         rmtarget = hash_search_with_hash_value(PredicateLockTargetHash,
1938                                                                                    &target->tag,
1939                                                                                    targettaghash,
1940                                                                                    HASH_REMOVE, NULL);
1941         Assert(rmtarget == target);
1942 }
1943
1944 /*
1945  * Delete child target locks owned by this process.
1946  * This implementation is assuming that the usage of each target tag field
1947  * is uniform.  No need to make this hard if we don't have to.
1948  *
1949  * We aren't acquiring lightweight locks for the predicate lock or lock
1950  * target structures associated with this transaction unless we're going
1951  * to modify them, because no other process is permitted to modify our
1952  * locks.
1953  */
1954 static void
1955 DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
1956 {
1957         SERIALIZABLEXACT *sxact;
1958         PREDICATELOCK *predlock;
1959
1960         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
1961         sxact = MySerializableXact;
1962         predlock = (PREDICATELOCK *)
1963                 SHMQueueNext(&(sxact->predicateLocks),
1964                                          &(sxact->predicateLocks),
1965                                          offsetof(PREDICATELOCK, xactLink));
1966         while (predlock)
1967         {
1968                 SHM_QUEUE  *predlocksxactlink;
1969                 PREDICATELOCK *nextpredlock;
1970                 PREDICATELOCKTAG oldlocktag;
1971                 PREDICATELOCKTARGET *oldtarget;
1972                 PREDICATELOCKTARGETTAG oldtargettag;
1973
1974                 predlocksxactlink = &(predlock->xactLink);
1975                 nextpredlock = (PREDICATELOCK *)
1976                         SHMQueueNext(&(sxact->predicateLocks),
1977                                                  predlocksxactlink,
1978                                                  offsetof(PREDICATELOCK, xactLink));
1979
1980                 oldlocktag = predlock->tag;
1981                 Assert(oldlocktag.myXact == sxact);
1982                 oldtarget = oldlocktag.myTarget;
1983                 oldtargettag = oldtarget->tag;
1984
1985                 if (TargetTagIsCoveredBy(oldtargettag, *newtargettag))
1986                 {
1987                         uint32          oldtargettaghash;
1988                         LWLockId        partitionLock;
1989                         PREDICATELOCK *rmpredlock;
1990
1991                         oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
1992                         partitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
1993
1994                         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1995
1996                         SHMQueueDelete(predlocksxactlink);
1997                         SHMQueueDelete(&(predlock->targetLink));
1998                         rmpredlock = hash_search_with_hash_value
1999                                 (PredicateLockHash,
2000                                  &oldlocktag,
2001                                  PredicateLockHashCodeFromTargetHashCode(&oldlocktag,
2002                                                                                                                  oldtargettaghash),
2003                                  HASH_REMOVE, NULL);
2004                         Assert(rmpredlock == predlock);
2005
2006                         RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
2007
2008                         LWLockRelease(partitionLock);
2009
2010                         DecrementParentLocks(&oldtargettag);
2011                 }
2012
2013                 predlock = nextpredlock;
2014         }
2015         LWLockRelease(SerializablePredicateLockListLock);
2016 }
2017
2018 /*
2019  * Returns the promotion threshold for a given predicate lock
2020  * target. This is the number of descendant locks required to promote
2021  * to the specified tag. Note that the threshold includes non-direct
2022  * descendants, e.g. both tuples and pages for a relation lock.
2023  *
2024  * TODO SSI: We should do something more intelligent about what the
2025  * thresholds are, either making it proportional to the number of
2026  * tuples in a page & pages in a relation, or at least making it a
2027  * GUC. Currently the threshold is 3 for a page lock, and
2028  * max_pred_locks_per_transaction/2 for a relation lock, chosen
2029  * entirely arbitrarily (and without benchmarking).
2030  */
2031 static int
2032 PredicateLockPromotionThreshold(const PREDICATELOCKTARGETTAG *tag)
2033 {
2034         switch (GET_PREDICATELOCKTARGETTAG_TYPE(*tag))
2035         {
2036                 case PREDLOCKTAG_RELATION:
2037                         return max_predicate_locks_per_xact / 2;
2038
2039                 case PREDLOCKTAG_PAGE:
2040                         return 3;
2041
2042                 case PREDLOCKTAG_TUPLE:
2043
2044                         /*
2045                          * not reachable: nothing is finer-granularity than a tuple, so we
2046                          * should never try to promote to it.
2047                          */
2048                         Assert(false);
2049                         return 0;
2050         }
2051
2052         /* not reachable */
2053         Assert(false);
2054         return 0;
2055 }
2056
2057 /*
2058  * For all ancestors of a newly-acquired predicate lock, increment
2059  * their child count in the parent hash table. If any of them have
2060  * more descendants than their promotion threshold, acquire the
2061  * coarsest such lock.
2062  *
2063  * Returns true if a parent lock was acquired and false otherwise.
2064  */
2065 static bool
2066 CheckAndPromotePredicateLockRequest(const PREDICATELOCKTARGETTAG *reqtag)
2067 {
2068         PREDICATELOCKTARGETTAG targettag,
2069                                 nexttag,
2070                                 promotiontag;
2071         LOCALPREDICATELOCK *parentlock;
2072         bool            found,
2073                                 promote;
2074
2075         promote = false;
2076
2077         targettag = *reqtag;
2078
2079         /* check parents iteratively */
2080         while (GetParentPredicateLockTag(&targettag, &nexttag))
2081         {
2082                 targettag = nexttag;
2083                 parentlock = (LOCALPREDICATELOCK *) hash_search(LocalPredicateLockHash,
2084                                                                                                                 &targettag,
2085                                                                                                                 HASH_ENTER,
2086                                                                                                                 &found);
2087                 if (!found)
2088                 {
2089                         parentlock->held = false;
2090                         parentlock->childLocks = 1;
2091                 }
2092                 else
2093                         parentlock->childLocks++;
2094
2095                 if (parentlock->childLocks >=
2096                         PredicateLockPromotionThreshold(&targettag))
2097                 {
2098                         /*
2099                          * We should promote to this parent lock. Continue to check its
2100                          * ancestors, however, both to get their child counts right and to
2101                          * check whether we should just go ahead and promote to one of
2102                          * them.
2103                          */
2104                         promotiontag = targettag;
2105                         promote = true;
2106                 }
2107         }
2108
2109         if (promote)
2110         {
2111                 /* acquire coarsest ancestor eligible for promotion */
2112                 PredicateLockAcquire(&promotiontag);
2113                 return true;
2114         }
2115         else
2116                 return false;
2117 }
2118
2119 /*
2120  * When releasing a lock, decrement the child count on all ancestor
2121  * locks.
2122  *
2123  * This is called only when releasing a lock via
2124  * DeleteChildTargetLocks (i.e. when a lock becomes redundant because
2125  * we've acquired its parent, possibly due to promotion) or when a new
2126  * MVCC write lock makes the predicate lock unnecessary. There's no
2127  * point in calling it when locks are released at transaction end, as
2128  * this information is no longer needed.
2129  */
2130 static void
2131 DecrementParentLocks(const PREDICATELOCKTARGETTAG *targettag)
2132 {
2133         PREDICATELOCKTARGETTAG parenttag,
2134                                 nexttag;
2135
2136         parenttag = *targettag;
2137
2138         while (GetParentPredicateLockTag(&parenttag, &nexttag))
2139         {
2140                 uint32          targettaghash;
2141                 LOCALPREDICATELOCK *parentlock,
2142                                    *rmlock;
2143
2144                 parenttag = nexttag;
2145                 targettaghash = PredicateLockTargetTagHashCode(&parenttag);
2146                 parentlock = (LOCALPREDICATELOCK *)
2147                         hash_search_with_hash_value(LocalPredicateLockHash,
2148                                                                                 &parenttag, targettaghash,
2149                                                                                 HASH_FIND, NULL);
2150
2151                 /*
2152                  * There's a small chance the parent lock doesn't exist in the lock
2153                  * table. This can happen if we prematurely removed it because an
2154                  * index split caused the child refcount to be off.
2155                  */
2156                 if (parentlock == NULL)
2157                         continue;
2158
2159                 parentlock->childLocks--;
2160
2161                 /*
2162                  * Under similar circumstances the parent lock's refcount might be
2163                  * zero. This only happens if we're holding that lock (otherwise we
2164                  * would have removed the entry).
2165                  */
2166                 if (parentlock->childLocks < 0)
2167                 {
2168                         Assert(parentlock->held);
2169                         parentlock->childLocks = 0;
2170                 }
2171
2172                 if ((parentlock->childLocks == 0) && (!parentlock->held))
2173                 {
2174                         rmlock = (LOCALPREDICATELOCK *)
2175                                 hash_search_with_hash_value(LocalPredicateLockHash,
2176                                                                                         &parenttag, targettaghash,
2177                                                                                         HASH_REMOVE, NULL);
2178                         Assert(rmlock == parentlock);
2179                 }
2180         }
2181 }
2182
2183 /*
2184  * Indicate that a predicate lock on the given target is held by the
2185  * specified transaction. Has no effect if the lock is already held.
2186  *
2187  * This updates the lock table and the sxact's lock list, and creates
2188  * the lock target if necessary, but does *not* do anything related to
2189  * granularity promotion or the local lock table. See
2190  * PredicateLockAcquire for that.
2191  */
2192 static void
2193 CreatePredicateLock(const PREDICATELOCKTARGETTAG *targettag,
2194                                         uint32 targettaghash,
2195                                         SERIALIZABLEXACT *sxact)
2196 {
2197         PREDICATELOCKTARGET *target;
2198         PREDICATELOCKTAG locktag;
2199         PREDICATELOCK *lock;
2200         LWLockId        partitionLock;
2201         bool            found;
2202
2203         partitionLock = PredicateLockHashPartitionLock(targettaghash);
2204
2205         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
2206         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2207
2208         /* Make sure that the target is represented. */
2209         target = (PREDICATELOCKTARGET *)
2210                 hash_search_with_hash_value(PredicateLockTargetHash,
2211                                                                         targettag, targettaghash,
2212                                                                         HASH_ENTER_NULL, &found);
2213         if (!target)
2214                 ereport(ERROR,
2215                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2216                                  errmsg("out of shared memory"),
2217                                  errhint("You might need to increase max_pred_locks_per_transaction.")));
2218         if (!found)
2219                 SHMQueueInit(&(target->predicateLocks));
2220
2221         /* We've got the sxact and target, make sure they're joined. */
2222         locktag.myTarget = target;
2223         locktag.myXact = sxact;
2224         lock = (PREDICATELOCK *)
2225                 hash_search_with_hash_value(PredicateLockHash, &locktag,
2226                         PredicateLockHashCodeFromTargetHashCode(&locktag, targettaghash),
2227                                                                         HASH_ENTER_NULL, &found);
2228         if (!lock)
2229                 ereport(ERROR,
2230                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2231                                  errmsg("out of shared memory"),
2232                                  errhint("You might need to increase max_pred_locks_per_transaction.")));
2233
2234         if (!found)
2235         {
2236                 SHMQueueInsertBefore(&(target->predicateLocks), &(lock->targetLink));
2237                 SHMQueueInsertBefore(&(sxact->predicateLocks),
2238                                                          &(lock->xactLink));
2239                 lock->commitSeqNo = InvalidSerCommitSeqNo;
2240         }
2241
2242         LWLockRelease(partitionLock);
2243         LWLockRelease(SerializablePredicateLockListLock);
2244 }
2245
2246 /*
2247  * Acquire a predicate lock on the specified target for the current
2248  * connection if not already held. This updates the local lock table
2249  * and uses it to implement granularity promotion. It will consolidate
2250  * multiple locks into a coarser lock if warranted, and will release
2251  * any finer-grained locks covered by the new one.
2252  */
2253 static void
2254 PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
2255 {
2256         uint32          targettaghash;
2257         bool            found;
2258         LOCALPREDICATELOCK *locallock;
2259
2260         /* Do we have the lock already, or a covering lock? */
2261         if (PredicateLockExists(targettag))
2262                 return;
2263
2264         if (CoarserLockCovers(targettag))
2265                 return;
2266
2267         /* the same hash and LW lock apply to the lock target and the local lock. */
2268         targettaghash = PredicateLockTargetTagHashCode(targettag);
2269
2270         /* Acquire lock in local table */
2271         locallock = (LOCALPREDICATELOCK *)
2272                 hash_search_with_hash_value(LocalPredicateLockHash,
2273                                                                         targettag, targettaghash,
2274                                                                         HASH_ENTER, &found);
2275         locallock->held = true;
2276         if (!found)
2277                 locallock->childLocks = 0;
2278
2279         /* Actually create the lock */
2280         CreatePredicateLock(targettag, targettaghash, MySerializableXact);
2281
2282         /*
2283          * Lock has been acquired. Check whether it should be promoted to a
2284          * coarser granularity, or whether there are finer-granularity locks to
2285          * clean up.
2286          */
2287         if (CheckAndPromotePredicateLockRequest(targettag))
2288         {
2289                 /*
2290                  * Lock request was promoted to a coarser-granularity lock, and that
2291                  * lock was acquired. It will delete this lock and any of its
2292                  * children, so we're done.
2293                  */
2294         }
2295         else
2296         {
2297                 /* Clean up any finer-granularity locks */
2298                 if (GET_PREDICATELOCKTARGETTAG_TYPE(*targettag) != PREDLOCKTAG_TUPLE)
2299                         DeleteChildTargetLocks(targettag);
2300         }
2301 }
2302
2303
2304 /*
2305  *              PredicateLockRelation
2306  *
2307  * Gets a predicate lock at the relation level.
2308  * Skip if not in full serializable transaction isolation level.
2309  * Skip if this is a temporary table.
2310  * Clear any finer-grained predicate locks this session has on the relation.
2311  */
2312 void
2313 PredicateLockRelation(Relation relation, Snapshot snapshot)
2314 {
2315         PREDICATELOCKTARGETTAG tag;
2316
2317         if (!SerializationNeededForRead(relation, snapshot))
2318                 return;
2319
2320         SET_PREDICATELOCKTARGETTAG_RELATION(tag,
2321                                                                                 relation->rd_node.dbNode,
2322                                                                                 relation->rd_id);
2323         PredicateLockAcquire(&tag);
2324 }
2325
2326 /*
2327  *              PredicateLockPage
2328  *
2329  * Gets a predicate lock at the page level.
2330  * Skip if not in full serializable transaction isolation level.
2331  * Skip if this is a temporary table.
2332  * Skip if a coarser predicate lock already covers this page.
2333  * Clear any finer-grained predicate locks this session has on the relation.
2334  */
2335 void
2336 PredicateLockPage(Relation relation, BlockNumber blkno, Snapshot snapshot)
2337 {
2338         PREDICATELOCKTARGETTAG tag;
2339
2340         if (!SerializationNeededForRead(relation, snapshot))
2341                 return;
2342
2343         SET_PREDICATELOCKTARGETTAG_PAGE(tag,
2344                                                                         relation->rd_node.dbNode,
2345                                                                         relation->rd_id,
2346                                                                         blkno);
2347         PredicateLockAcquire(&tag);
2348 }
2349
2350 /*
2351  *              PredicateLockTuple
2352  *
2353  * Gets a predicate lock at the tuple level.
2354  * Skip if not in full serializable transaction isolation level.
2355  * Skip if this is a temporary table.
2356  */
2357 void
2358 PredicateLockTuple(Relation relation, HeapTuple tuple, Snapshot snapshot)
2359 {
2360         PREDICATELOCKTARGETTAG tag;
2361         ItemPointer tid;
2362         TransactionId targetxmin;
2363
2364         if (!SerializationNeededForRead(relation, snapshot))
2365                 return;
2366
2367         /*
2368          * If it's a heap tuple, return if this xact wrote it.
2369          */
2370         if (relation->rd_index == NULL)
2371         {
2372                 TransactionId myxid;
2373
2374                 targetxmin = HeapTupleHeaderGetXmin(tuple->t_data);
2375
2376                 myxid = GetTopTransactionIdIfAny();
2377                 if (TransactionIdIsValid(myxid))
2378                 {
2379                         if (TransactionIdFollowsOrEquals(targetxmin, TransactionXmin))
2380                         {
2381                                 TransactionId xid = SubTransGetTopmostTransaction(targetxmin);
2382
2383                                 if (TransactionIdEquals(xid, myxid))
2384                                 {
2385                                         /* We wrote it; we already have a write lock. */
2386                                         return;
2387                                 }
2388                         }
2389                 }
2390         }
2391         else
2392                 targetxmin = InvalidTransactionId;
2393
2394         /*
2395          * Do quick-but-not-definitive test for a relation lock first.  This will
2396          * never cause a return when the relation is *not* locked, but will
2397          * occasionally let the check continue when there really *is* a relation
2398          * level lock.
2399          */
2400         SET_PREDICATELOCKTARGETTAG_RELATION(tag,
2401                                                                                 relation->rd_node.dbNode,
2402                                                                                 relation->rd_id);
2403         if (PredicateLockExists(&tag))
2404                 return;
2405
2406         tid = &(tuple->t_self);
2407         SET_PREDICATELOCKTARGETTAG_TUPLE(tag,
2408                                                                          relation->rd_node.dbNode,
2409                                                                          relation->rd_id,
2410                                                                          ItemPointerGetBlockNumber(tid),
2411                                                                          ItemPointerGetOffsetNumber(tid),
2412                                                                          targetxmin);
2413         PredicateLockAcquire(&tag);
2414 }
2415
2416
2417 /*
2418  *              DeleteLockTarget
2419  *
2420  * Remove a predicate lock target along with any locks held for it.
2421  *
2422  * Caller must hold SerializablePredicateLockListLock and the
2423  * appropriate hash partition lock for the target.
2424  */
2425 static void
2426 DeleteLockTarget(PREDICATELOCKTARGET *target, uint32 targettaghash)
2427 {
2428         PREDICATELOCK *predlock;
2429         SHM_QUEUE  *predlocktargetlink;
2430         PREDICATELOCK *nextpredlock;
2431         bool            found;
2432
2433         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2434         Assert(LWLockHeldByMe(PredicateLockHashPartitionLock(targettaghash)));
2435
2436         predlock = (PREDICATELOCK *)
2437                 SHMQueueNext(&(target->predicateLocks),
2438                                          &(target->predicateLocks),
2439                                          offsetof(PREDICATELOCK, targetLink));
2440         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2441         while (predlock)
2442         {
2443                 predlocktargetlink = &(predlock->targetLink);
2444                 nextpredlock = (PREDICATELOCK *)
2445                         SHMQueueNext(&(target->predicateLocks),
2446                                                  predlocktargetlink,
2447                                                  offsetof(PREDICATELOCK, targetLink));
2448
2449                 SHMQueueDelete(&(predlock->xactLink));
2450                 SHMQueueDelete(&(predlock->targetLink));
2451
2452                 hash_search_with_hash_value
2453                         (PredicateLockHash,
2454                          &predlock->tag,
2455                          PredicateLockHashCodeFromTargetHashCode(&predlock->tag,
2456                                                                                                          targettaghash),
2457                          HASH_REMOVE, &found);
2458                 Assert(found);
2459
2460                 predlock = nextpredlock;
2461         }
2462         LWLockRelease(SerializableXactHashLock);
2463
2464         /* Remove the target itself, if possible. */
2465         RemoveTargetIfNoLongerUsed(target, targettaghash);
2466 }
2467
2468
2469 /*
2470  *              TransferPredicateLocksToNewTarget
2471  *
2472  * Move or copy all the predicate locks for a lock target, for use by
2473  * index page splits/combines and other things that create or replace
2474  * lock targets. If 'removeOld' is true, the old locks and the target
2475  * will be removed.
2476  *
2477  * Returns true on success, or false if we ran out of shared memory to
2478  * allocate the new target or locks. Guaranteed to always succeed if
2479  * removeOld is set (by using the scratch entry in PredicateLockTargetHash
2480  * for scratch space).
2481  *
2482  * Warning: the "removeOld" option should be used only with care,
2483  * because this function does not (indeed, can not) update other
2484  * backends' LocalPredicateLockHash. If we are only adding new
2485  * entries, this is not a problem: the local lock table is used only
2486  * as a hint, so missing entries for locks that are held are
2487  * OK. Having entries for locks that are no longer held, as can happen
2488  * when using "removeOld", is not in general OK. We can only use it
2489  * safely when replacing a lock with a coarser-granularity lock that
2490  * covers it, or if we are absolutely certain that no one will need to
2491  * refer to that lock in the future.
2492  *
2493  * Caller must hold SerializablePredicateLockListLock.
2494  */
2495 static bool
2496 TransferPredicateLocksToNewTarget(PREDICATELOCKTARGETTAG oldtargettag,
2497                                                                   PREDICATELOCKTARGETTAG newtargettag,
2498                                                                   bool removeOld)
2499 {
2500         uint32          oldtargettaghash;
2501         LWLockId        oldpartitionLock;
2502         PREDICATELOCKTARGET *oldtarget;
2503         uint32          newtargettaghash;
2504         LWLockId        newpartitionLock;
2505         bool            found;
2506         bool            outOfShmem = false;
2507
2508         Assert(LWLockHeldByMe(SerializablePredicateLockListLock));
2509
2510         oldtargettaghash = PredicateLockTargetTagHashCode(&oldtargettag);
2511         newtargettaghash = PredicateLockTargetTagHashCode(&newtargettag);
2512         oldpartitionLock = PredicateLockHashPartitionLock(oldtargettaghash);
2513         newpartitionLock = PredicateLockHashPartitionLock(newtargettaghash);
2514
2515         if (removeOld)
2516         {
2517                 /*
2518                  * Remove the dummy entry to give us scratch space, so we know we'll
2519                  * be able to create the new lock target.
2520                  */
2521                 RemoveScratchTarget(false);
2522         }
2523
2524         /*
2525          * We must get the partition locks in ascending sequence to avoid
2526          * deadlocks. If old and new partitions are the same, we must request the
2527          * lock only once.
2528          */
2529         if (oldpartitionLock < newpartitionLock)
2530         {
2531                 LWLockAcquire(oldpartitionLock,
2532                                           (removeOld ? LW_EXCLUSIVE : LW_SHARED));
2533                 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2534         }
2535         else if (oldpartitionLock > newpartitionLock)
2536         {
2537                 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2538                 LWLockAcquire(oldpartitionLock,
2539                                           (removeOld ? LW_EXCLUSIVE : LW_SHARED));
2540         }
2541         else
2542                 LWLockAcquire(newpartitionLock, LW_EXCLUSIVE);
2543
2544         /*
2545          * Look for the old target.  If not found, that's OK; no predicate locks
2546          * are affected, so we can just clean up and return. If it does exist,
2547          * walk its list of predicate locks and move or copy them to the new
2548          * target.
2549          */
2550         oldtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2551                                                                                         &oldtargettag,
2552                                                                                         oldtargettaghash,
2553                                                                                         HASH_FIND, NULL);
2554
2555         if (oldtarget)
2556         {
2557                 PREDICATELOCKTARGET *newtarget;
2558                 PREDICATELOCK *oldpredlock;
2559                 PREDICATELOCKTAG newpredlocktag;
2560
2561                 newtarget = hash_search_with_hash_value(PredicateLockTargetHash,
2562                                                                                                 &newtargettag,
2563                                                                                                 newtargettaghash,
2564                                                                                                 HASH_ENTER_NULL, &found);
2565
2566                 if (!newtarget)
2567                 {
2568                         /* Failed to allocate due to insufficient shmem */
2569                         outOfShmem = true;
2570                         goto exit;
2571                 }
2572
2573                 /* If we created a new entry, initialize it */
2574                 if (!found)
2575                         SHMQueueInit(&(newtarget->predicateLocks));
2576
2577                 newpredlocktag.myTarget = newtarget;
2578
2579                 /*
2580                  * Loop through all the locks on the old target, replacing them with
2581                  * locks on the new target.
2582                  */
2583                 oldpredlock = (PREDICATELOCK *)
2584                         SHMQueueNext(&(oldtarget->predicateLocks),
2585                                                  &(oldtarget->predicateLocks),
2586                                                  offsetof(PREDICATELOCK, targetLink));
2587                 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2588                 while (oldpredlock)
2589                 {
2590                         SHM_QUEUE  *predlocktargetlink;
2591                         PREDICATELOCK *nextpredlock;
2592                         PREDICATELOCK *newpredlock;
2593                         SerCommitSeqNo oldCommitSeqNo = oldpredlock->commitSeqNo;
2594
2595                         predlocktargetlink = &(oldpredlock->targetLink);
2596                         nextpredlock = (PREDICATELOCK *)
2597                                 SHMQueueNext(&(oldtarget->predicateLocks),
2598                                                          predlocktargetlink,
2599                                                          offsetof(PREDICATELOCK, targetLink));
2600                         newpredlocktag.myXact = oldpredlock->tag.myXact;
2601
2602                         if (removeOld)
2603                         {
2604                                 SHMQueueDelete(&(oldpredlock->xactLink));
2605                                 SHMQueueDelete(&(oldpredlock->targetLink));
2606
2607                                 hash_search_with_hash_value
2608                                         (PredicateLockHash,
2609                                          &oldpredlock->tag,
2610                                    PredicateLockHashCodeFromTargetHashCode(&oldpredlock->tag,
2611                                                                                                                    oldtargettaghash),
2612                                          HASH_REMOVE, &found);
2613                                 Assert(found);
2614                         }
2615
2616                         newpredlock = (PREDICATELOCK *)
2617                                 hash_search_with_hash_value(PredicateLockHash,
2618                                                                                         &newpredlocktag,
2619                                                                                         PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
2620                                                                                                                                                                         newtargettaghash),
2621                                                                                         HASH_ENTER_NULL,
2622                                                                                         &found);
2623                         if (!newpredlock)
2624                         {
2625                                 /* Out of shared memory. Undo what we've done so far. */
2626                                 LWLockRelease(SerializableXactHashLock);
2627                                 DeleteLockTarget(newtarget, newtargettaghash);
2628                                 outOfShmem = true;
2629                                 goto exit;
2630                         }
2631                         if (!found)
2632                         {
2633                                 SHMQueueInsertBefore(&(newtarget->predicateLocks),
2634                                                                          &(newpredlock->targetLink));
2635                                 SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
2636                                                                          &(newpredlock->xactLink));
2637                                 newpredlock->commitSeqNo = oldCommitSeqNo;
2638                         }
2639                         else
2640                         {
2641                                 if (newpredlock->commitSeqNo < oldCommitSeqNo)
2642                                         newpredlock->commitSeqNo = oldCommitSeqNo;
2643                         }
2644
2645                         Assert(newpredlock->commitSeqNo != 0);
2646                         Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
2647                                    || (newpredlock->tag.myXact == OldCommittedSxact));
2648
2649                         oldpredlock = nextpredlock;
2650                 }
2651                 LWLockRelease(SerializableXactHashLock);
2652
2653                 if (removeOld)
2654                 {
2655                         Assert(SHMQueueEmpty(&oldtarget->predicateLocks));
2656                         RemoveTargetIfNoLongerUsed(oldtarget, oldtargettaghash);
2657                 }
2658         }
2659
2660
2661 exit:
2662         /* Release partition locks in reverse order of acquisition. */
2663         if (oldpartitionLock < newpartitionLock)
2664         {
2665                 LWLockRelease(newpartitionLock);
2666                 LWLockRelease(oldpartitionLock);
2667         }
2668         else if (oldpartitionLock > newpartitionLock)
2669         {
2670                 LWLockRelease(oldpartitionLock);
2671                 LWLockRelease(newpartitionLock);
2672         }
2673         else
2674                 LWLockRelease(newpartitionLock);
2675
2676         if (removeOld)
2677         {
2678                 /* We shouldn't run out of memory if we're moving locks */
2679                 Assert(!outOfShmem);
2680
2681                 /* Put the scrach entry back */
2682                 RestoreScratchTarget(false);
2683         }
2684
2685         return !outOfShmem;
2686 }
2687
2688 /*
2689  * Drop all predicate locks of any granularity from the specified relation,
2690  * which can be a heap relation or an index relation.  If 'transfer' is true,
2691  * acquire a relation lock on the heap for any transactions with any lock(s)
2692  * on the specified relation.
2693  *
2694  * This requires grabbing a lot of LW locks and scanning the entire lock
2695  * target table for matches.  That makes this more expensive than most
2696  * predicate lock management functions, but it will only be called for DDL
2697  * type commands that are expensive anyway, and there are fast returns when
2698  * no serializable transactions are active or the relation is temporary.
2699  *
2700  * We don't use the TransferPredicateLocksToNewTarget function because it
2701  * acquires its own locks on the partitions of the two targets involved,
2702  * and we'll already be holding all partition locks.
2703  *
2704  * We can't throw an error from here, because the call could be from a
2705  * transaction which is not serializable.
2706  *
2707  * NOTE: This is currently only called with transfer set to true, but that may
2708  * change.      If we decide to clean up the locks from a table on commit of a
2709  * transaction which executed DROP TABLE, the false condition will be useful.
2710  */
2711 static void
2712 DropAllPredicateLocksFromTable(Relation relation, bool transfer)
2713 {
2714         HASH_SEQ_STATUS seqstat;
2715         PREDICATELOCKTARGET *oldtarget;
2716         PREDICATELOCKTARGET *heaptarget;
2717         Oid                     dbId;
2718         Oid                     relId;
2719         Oid                     heapId;
2720         int                     i;
2721         bool            isIndex;
2722         bool            found;
2723         uint32          heaptargettaghash;
2724
2725         /*
2726          * Bail out quickly if there are no serializable transactions running.
2727          * It's safe to check this without taking locks because the caller is
2728          * holding an ACCESS EXCLUSIVE lock on the relation.  No new locks which
2729          * would matter here can be acquired while that is held.
2730          */
2731         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
2732                 return;
2733
2734         if (!PredicateLockingNeededForRelation(relation))
2735                 return;
2736
2737         dbId = relation->rd_node.dbNode;
2738         relId = relation->rd_id;
2739         if (relation->rd_index == NULL)
2740         {
2741                 isIndex = false;
2742                 heapId = relId;
2743         }
2744         else
2745         {
2746                 isIndex = true;
2747                 heapId = relation->rd_index->indrelid;
2748         }
2749         Assert(heapId != InvalidOid);
2750         Assert(transfer || !isIndex);           /* index OID only makes sense with
2751                                                                                  * transfer */
2752
2753         /* Retrieve first time needed, then keep. */
2754         heaptargettaghash = 0;
2755         heaptarget = NULL;
2756
2757         /* Acquire locks on all lock partitions */
2758         LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
2759         for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
2760                 LWLockAcquire(FirstPredicateLockMgrLock + i, LW_EXCLUSIVE);
2761         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
2762
2763         /*
2764          * Remove the dummy entry to give us scratch space, so we know we'll be
2765          * able to create the new lock target.
2766          */
2767         if (transfer)
2768                 RemoveScratchTarget(true);
2769
2770         /* Scan through target map */
2771         hash_seq_init(&seqstat, PredicateLockTargetHash);
2772
2773         while ((oldtarget = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
2774         {
2775                 PREDICATELOCK *oldpredlock;
2776
2777                 /*
2778                  * Check whether this is a target which needs attention.
2779                  */
2780                 if (GET_PREDICATELOCKTARGETTAG_RELATION(oldtarget->tag) != relId)
2781                         continue;                       /* wrong relation id */
2782                 if (GET_PREDICATELOCKTARGETTAG_DB(oldtarget->tag) != dbId)
2783                         continue;                       /* wrong database id */
2784                 if (transfer && !isIndex
2785                         && GET_PREDICATELOCKTARGETTAG_TYPE(oldtarget->tag) == PREDLOCKTAG_RELATION)
2786                         continue;                       /* already the right lock */
2787
2788                 /*
2789                  * If we made it here, we have work to do.      We make sure the heap
2790                  * relation lock exists, then we walk the list of predicate locks for
2791                  * the old target we found, moving all locks to the heap relation lock
2792                  * -- unless they already hold that.
2793                  */
2794
2795                 /*
2796                  * First make sure we have the heap relation target.  We only need to
2797                  * do this once.
2798                  */
2799                 if (transfer && heaptarget == NULL)
2800                 {
2801                         PREDICATELOCKTARGETTAG heaptargettag;
2802
2803                         SET_PREDICATELOCKTARGETTAG_RELATION(heaptargettag, dbId, heapId);
2804                         heaptargettaghash = PredicateLockTargetTagHashCode(&heaptargettag);
2805                         heaptarget = hash_search_with_hash_value(PredicateLockTargetHash,
2806                                                                                                          &heaptargettag,
2807                                                                                                          heaptargettaghash,
2808                                                                                                          HASH_ENTER, &found);
2809                         if (!found)
2810                                 SHMQueueInit(&heaptarget->predicateLocks);
2811                 }
2812
2813                 /*
2814                  * Loop through all the locks on the old target, replacing them with
2815                  * locks on the new target.
2816                  */
2817                 oldpredlock = (PREDICATELOCK *)
2818                         SHMQueueNext(&(oldtarget->predicateLocks),
2819                                                  &(oldtarget->predicateLocks),
2820                                                  offsetof(PREDICATELOCK, targetLink));
2821                 while (oldpredlock)
2822                 {
2823                         PREDICATELOCK *nextpredlock;
2824                         PREDICATELOCK *newpredlock;
2825                         SerCommitSeqNo oldCommitSeqNo;
2826                         SERIALIZABLEXACT *oldXact;
2827
2828                         nextpredlock = (PREDICATELOCK *)
2829                                 SHMQueueNext(&(oldtarget->predicateLocks),
2830                                                          &(oldpredlock->targetLink),
2831                                                          offsetof(PREDICATELOCK, targetLink));
2832
2833                         /*
2834                          * Remove the old lock first. This avoids the chance of running
2835                          * out of lock structure entries for the hash table.
2836                          */
2837                         oldCommitSeqNo = oldpredlock->commitSeqNo;
2838                         oldXact = oldpredlock->tag.myXact;
2839
2840                         SHMQueueDelete(&(oldpredlock->xactLink));
2841
2842                         /*
2843                          * No need for retail delete from oldtarget list, we're removing
2844                          * the whole target anyway.
2845                          */
2846                         hash_search(PredicateLockHash,
2847                                                 &oldpredlock->tag,
2848                                                 HASH_REMOVE, &found);
2849                         Assert(found);
2850
2851                         if (transfer)
2852                         {
2853                                 PREDICATELOCKTAG newpredlocktag;
2854
2855                                 newpredlocktag.myTarget = heaptarget;
2856                                 newpredlocktag.myXact = oldXact;
2857                                 newpredlock = (PREDICATELOCK *)
2858                                         hash_search_with_hash_value(PredicateLockHash,
2859                                                                                                 &newpredlocktag,
2860                                                                                                 PredicateLockHashCodeFromTargetHashCode(&newpredlocktag,
2861                                                                                                                                                                                 heaptargettaghash),
2862                                                                                                 HASH_ENTER,
2863                                                                                                 &found);
2864                                 if (!found)
2865                                 {
2866                                         SHMQueueInsertBefore(&(heaptarget->predicateLocks),
2867                                                                                  &(newpredlock->targetLink));
2868                                         SHMQueueInsertBefore(&(newpredlocktag.myXact->predicateLocks),
2869                                                                                  &(newpredlock->xactLink));
2870                                         newpredlock->commitSeqNo = oldCommitSeqNo;
2871                                 }
2872                                 else
2873                                 {
2874                                         if (newpredlock->commitSeqNo < oldCommitSeqNo)
2875                                                 newpredlock->commitSeqNo = oldCommitSeqNo;
2876                                 }
2877
2878                                 Assert(newpredlock->commitSeqNo != 0);
2879                                 Assert((newpredlock->commitSeqNo == InvalidSerCommitSeqNo)
2880                                            || (newpredlock->tag.myXact == OldCommittedSxact));
2881                         }
2882
2883                         oldpredlock = nextpredlock;
2884                 }
2885
2886                 hash_search(PredicateLockTargetHash, &oldtarget->tag, HASH_REMOVE,
2887                                         &found);
2888                 Assert(found);
2889         }
2890
2891         /* Put the scratch entry back */
2892         if (transfer)
2893                 RestoreScratchTarget(true);
2894
2895         /* Release locks in reverse order */
2896         LWLockRelease(SerializableXactHashLock);
2897         for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
2898                 LWLockRelease(FirstPredicateLockMgrLock + i);
2899         LWLockRelease(SerializablePredicateLockListLock);
2900 }
2901
2902 /*
2903  * TransferPredicateLocksToHeapRelation
2904  *              For all transactions, transfer all predicate locks for the given
2905  *              relation to a single relation lock on the heap.
2906  */
2907 void
2908 TransferPredicateLocksToHeapRelation(Relation relation)
2909 {
2910         DropAllPredicateLocksFromTable(relation, true);
2911 }
2912
2913
2914 /*
2915  *              PredicateLockPageSplit
2916  *
2917  * Copies any predicate locks for the old page to the new page.
2918  * Skip if this is a temporary table or toast table.
2919  *
2920  * NOTE: A page split (or overflow) affects all serializable transactions,
2921  * even if it occurs in the context of another transaction isolation level.
2922  *
2923  * NOTE: This currently leaves the local copy of the locks without
2924  * information on the new lock which is in shared memory.  This could cause
2925  * problems if enough page splits occur on locked pages without the processes
2926  * which hold the locks getting in and noticing.
2927  */
2928 void
2929 PredicateLockPageSplit(Relation relation, BlockNumber oldblkno,
2930                                            BlockNumber newblkno)
2931 {
2932         PREDICATELOCKTARGETTAG oldtargettag;
2933         PREDICATELOCKTARGETTAG newtargettag;
2934         bool            success;
2935
2936         /*
2937          * Bail out quickly if there are no serializable transactions running.
2938          *
2939          * It's safe to do this check without taking any additional locks. Even if
2940          * a serializable transaction starts concurrently, we know it can't take
2941          * any SIREAD locks on the page being split because the caller is holding
2942          * the associated buffer page lock. Memory reordering isn't an issue; the
2943          * memory barrier in the LWLock acquisition guarantees that this read
2944          * occurs while the buffer page lock is held.
2945          */
2946         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
2947                 return;
2948
2949         if (!PredicateLockingNeededForRelation(relation))
2950                 return;
2951
2952         Assert(oldblkno != newblkno);
2953         Assert(BlockNumberIsValid(oldblkno));
2954         Assert(BlockNumberIsValid(newblkno));
2955
2956         SET_PREDICATELOCKTARGETTAG_PAGE(oldtargettag,
2957                                                                         relation->rd_node.dbNode,
2958                                                                         relation->rd_id,
2959                                                                         oldblkno);
2960         SET_PREDICATELOCKTARGETTAG_PAGE(newtargettag,
2961                                                                         relation->rd_node.dbNode,
2962                                                                         relation->rd_id,
2963                                                                         newblkno);
2964
2965         LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
2966
2967         /*
2968          * Try copying the locks over to the new page's tag, creating it if
2969          * necessary.
2970          */
2971         success = TransferPredicateLocksToNewTarget(oldtargettag,
2972                                                                                                 newtargettag,
2973                                                                                                 false);
2974
2975         if (!success)
2976         {
2977                 /*
2978                  * No more predicate lock entries are available. Failure isn't an
2979                  * option here, so promote the page lock to a relation lock.
2980                  */
2981
2982                 /* Get the parent relation lock's lock tag */
2983                 success = GetParentPredicateLockTag(&oldtargettag,
2984                                                                                         &newtargettag);
2985                 Assert(success);
2986
2987                 /*
2988                  * Move the locks to the parent. This shouldn't fail.
2989                  *
2990                  * Note that here we are removing locks held by other backends,
2991                  * leading to a possible inconsistency in their local lock hash table.
2992                  * This is OK because we're replacing it with a lock that covers the
2993                  * old one.
2994                  */
2995                 success = TransferPredicateLocksToNewTarget(oldtargettag,
2996                                                                                                         newtargettag,
2997                                                                                                         true);
2998                 Assert(success);
2999         }
3000
3001         LWLockRelease(SerializablePredicateLockListLock);
3002 }
3003
3004 /*
3005  *              PredicateLockPageCombine
3006  *
3007  * Combines predicate locks for two existing pages.
3008  * Skip if this is a temporary table or toast table.
3009  *
3010  * NOTE: A page combine affects all serializable transactions, even if it
3011  * occurs in the context of another transaction isolation level.
3012  */
3013 void
3014 PredicateLockPageCombine(Relation relation, BlockNumber oldblkno,
3015                                                  BlockNumber newblkno)
3016 {
3017         /*
3018          * Page combines differ from page splits in that we ought to be able to
3019          * remove the locks on the old page after transferring them to the new
3020          * page, instead of duplicating them. However, because we can't edit other
3021          * backends' local lock tables, removing the old lock would leave them
3022          * with an entry in their LocalPredicateLockHash for a lock they're not
3023          * holding, which isn't acceptable. So we wind up having to do the same
3024          * work as a page split, acquiring a lock on the new page and keeping the
3025          * old page locked too. That can lead to some false positives, but should
3026          * be rare in practice.
3027          */
3028         PredicateLockPageSplit(relation, oldblkno, newblkno);
3029 }
3030
3031 /*
3032  * Walk the list of in-progress serializable transactions and find the new
3033  * xmin.
3034  */
3035 static void
3036 SetNewSxactGlobalXmin(void)
3037 {
3038         SERIALIZABLEXACT *sxact;
3039
3040         Assert(LWLockHeldByMe(SerializableXactHashLock));
3041
3042         PredXact->SxactGlobalXmin = InvalidTransactionId;
3043         PredXact->SxactGlobalXminCount = 0;
3044
3045         for (sxact = FirstPredXact(); sxact != NULL; sxact = NextPredXact(sxact))
3046         {
3047                 if (!SxactIsRolledBack(sxact)
3048                         && !SxactIsCommitted(sxact)
3049                         && sxact != OldCommittedSxact)
3050                 {
3051                         Assert(sxact->xmin != InvalidTransactionId);
3052                         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
3053                                 || TransactionIdPrecedes(sxact->xmin,
3054                                                                                  PredXact->SxactGlobalXmin))
3055                         {
3056                                 PredXact->SxactGlobalXmin = sxact->xmin;
3057                                 PredXact->SxactGlobalXminCount = 1;
3058                         }
3059                         else if (TransactionIdEquals(sxact->xmin,
3060                                                                                  PredXact->SxactGlobalXmin))
3061                                 PredXact->SxactGlobalXminCount++;
3062                 }
3063         }
3064
3065         OldSerXidSetActiveSerXmin(PredXact->SxactGlobalXmin);
3066 }
3067
3068 /*
3069  *              ReleasePredicateLocks
3070  *
3071  * Releases predicate locks based on completion of the current transaction,
3072  * whether committed or rolled back.  It can also be called for a read only
3073  * transaction when it becomes impossible for the transaction to become
3074  * part of a dangerous structure.
3075  *
3076  * We do nothing unless this is a serializable transaction.
3077  *
3078  * This method must ensure that shared memory hash tables are cleaned
3079  * up in some relatively timely fashion.
3080  *
3081  * If this transaction is committing and is holding any predicate locks,
3082  * it must be added to a list of completed serializable transactions still
3083  * holding locks.
3084  */
3085 void
3086 ReleasePredicateLocks(bool isCommit)
3087 {
3088         bool            needToClear;
3089         RWConflict      conflict,
3090                                 nextConflict,
3091                                 possibleUnsafeConflict;
3092         SERIALIZABLEXACT *roXact;
3093
3094         /*
3095          * We can't trust XactReadOnly here, because a transaction which started
3096          * as READ WRITE can show as READ ONLY later, e.g., within
3097          * substransactions.  We want to flag a transaction as READ ONLY if it
3098          * commits without writing so that de facto READ ONLY transactions get the
3099          * benefit of some RO optimizations, so we will use this local variable to
3100          * get some cleanup logic right which is based on whether the transaction
3101          * was declared READ ONLY at the top level.
3102          */
3103         bool            topLevelIsDeclaredReadOnly;
3104
3105         if (MySerializableXact == InvalidSerializableXact)
3106         {
3107                 Assert(LocalPredicateLockHash == NULL);
3108                 return;
3109         }
3110
3111         Assert(!isCommit || SxactIsPrepared(MySerializableXact));
3112         Assert(!isCommit || !SxactIsDoomed(MySerializableXact));
3113         Assert(!SxactIsCommitted(MySerializableXact));
3114         Assert(!SxactIsRolledBack(MySerializableXact));
3115
3116         /* may not be serializable during COMMIT/ROLLBACK PREPARED */
3117         if (MySerializableXact->pid != 0)
3118                 Assert(IsolationIsSerializable());
3119
3120         /* We'd better not already be on the cleanup list. */
3121         Assert(!SxactIsOnFinishedList(MySerializableXact));
3122
3123         topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact);
3124
3125         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3126
3127         /*
3128          * We don't hold XidGenLock lock here, assuming that TransactionId is
3129          * atomic!
3130          *
3131          * If this value is changing, we don't care that much whether we get the
3132          * old or new value -- it is just used to determine how far
3133          * GlobalSerizableXmin must advance before this transaction can be fully
3134          * cleaned up.  The worst that could happen is we wait for one more
3135          * transaction to complete before freeing some RAM; correctness of visible
3136          * behavior is not affected.
3137          */
3138         MySerializableXact->finishedBefore = ShmemVariableCache->nextXid;
3139
3140         /*
3141          * If it's not a commit it's a rollback, and we can clear our locks
3142          * immediately.
3143          */
3144         if (isCommit)
3145         {
3146                 MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
3147                 MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
3148                 /* Recognize implicit read-only transaction (commit without write). */
3149                 if (!MyXactDidWrite)
3150                         MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
3151         }
3152         else
3153         {
3154                 /*
3155                  * The DOOMED flag indicates that we intend to roll back this
3156                  * transaction and so it should not cause serialization failures for
3157                  * other transactions that conflict with it. Note that this flag might
3158                  * already be set, if another backend marked this transaction for
3159                  * abort.
3160                  *
3161                  * The ROLLED_BACK flag further indicates that ReleasePredicateLocks
3162                  * has been called, and so the SerializableXact is eligible for
3163                  * cleanup. This means it should not be considered when calculating
3164                  * SxactGlobalXmin.
3165                  */
3166                 MySerializableXact->flags |= SXACT_FLAG_DOOMED;
3167                 MySerializableXact->flags |= SXACT_FLAG_ROLLED_BACK;
3168         }
3169
3170         if (!topLevelIsDeclaredReadOnly)
3171         {
3172                 Assert(PredXact->WritableSxactCount > 0);
3173                 if (--(PredXact->WritableSxactCount) == 0)
3174                 {
3175                         /*
3176                          * Release predicate locks and rw-conflicts in for all committed
3177                          * transactions.  There are no longer any transactions which might
3178                          * conflict with the locks and no chance for new transactions to
3179                          * overlap.  Similarly, existing conflicts in can't cause pivots,
3180                          * and any conflicts in which could have completed a dangerous
3181                          * structure would already have caused a rollback, so any
3182                          * remaining ones must be benign.
3183                          */
3184                         PredXact->CanPartialClearThrough = PredXact->LastSxactCommitSeqNo;
3185                 }
3186         }
3187         else
3188         {
3189                 /*
3190                  * Read-only transactions: clear the list of transactions that might
3191                  * make us unsafe. Note that we use 'inLink' for the iteration as
3192                  * opposed to 'outLink' for the r/w xacts.
3193                  */
3194                 possibleUnsafeConflict = (RWConflict)
3195                         SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3196                                                  &MySerializableXact->possibleUnsafeConflicts,
3197                                                  offsetof(RWConflictData, inLink));
3198                 while (possibleUnsafeConflict)
3199                 {
3200                         nextConflict = (RWConflict)
3201                                 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3202                                                          &possibleUnsafeConflict->inLink,
3203                                                          offsetof(RWConflictData, inLink));
3204
3205                         Assert(!SxactIsReadOnly(possibleUnsafeConflict->sxactOut));
3206                         Assert(MySerializableXact == possibleUnsafeConflict->sxactIn);
3207
3208                         ReleaseRWConflict(possibleUnsafeConflict);
3209
3210                         possibleUnsafeConflict = nextConflict;
3211                 }
3212         }
3213
3214         /* Check for conflict out to old committed transactions. */
3215         if (isCommit
3216                 && !SxactIsReadOnly(MySerializableXact)
3217                 && SxactHasSummaryConflictOut(MySerializableXact))
3218         {
3219                 /*
3220                  * we don't know which old committed transaction we conflicted with,
3221                  * so be conservative and use FirstNormalSerCommitSeqNo here
3222                  */
3223                 MySerializableXact->SeqNo.earliestOutConflictCommit =
3224                         FirstNormalSerCommitSeqNo;
3225                 MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
3226         }
3227
3228         /*
3229          * Release all outConflicts to committed transactions.  If we're rolling
3230          * back clear them all.  Set SXACT_FLAG_CONFLICT_OUT if any point to
3231          * previously committed transactions.
3232          */
3233         conflict = (RWConflict)
3234                 SHMQueueNext(&MySerializableXact->outConflicts,
3235                                          &MySerializableXact->outConflicts,
3236                                          offsetof(RWConflictData, outLink));
3237         while (conflict)
3238         {
3239                 nextConflict = (RWConflict)
3240                         SHMQueueNext(&MySerializableXact->outConflicts,
3241                                                  &conflict->outLink,
3242                                                  offsetof(RWConflictData, outLink));
3243
3244                 if (isCommit
3245                         && !SxactIsReadOnly(MySerializableXact)
3246                         && SxactIsCommitted(conflict->sxactIn))
3247                 {
3248                         if ((MySerializableXact->flags & SXACT_FLAG_CONFLICT_OUT) == 0
3249                                 || conflict->sxactIn->commitSeqNo < MySerializableXact->SeqNo.earliestOutConflictCommit)
3250                                 MySerializableXact->SeqNo.earliestOutConflictCommit = conflict->sxactIn->commitSeqNo;
3251                         MySerializableXact->flags |= SXACT_FLAG_CONFLICT_OUT;
3252                 }
3253
3254                 if (!isCommit
3255                         || SxactIsCommitted(conflict->sxactIn)
3256                         || (conflict->sxactIn->SeqNo.lastCommitBeforeSnapshot >= PredXact->LastSxactCommitSeqNo))
3257                         ReleaseRWConflict(conflict);
3258
3259                 conflict = nextConflict;
3260         }
3261
3262         /*
3263          * Release all inConflicts from committed and read-only transactions. If
3264          * we're rolling back, clear them all.
3265          */
3266         conflict = (RWConflict)
3267                 SHMQueueNext(&MySerializableXact->inConflicts,
3268                                          &MySerializableXact->inConflicts,
3269                                          offsetof(RWConflictData, inLink));
3270         while (conflict)
3271         {
3272                 nextConflict = (RWConflict)
3273                         SHMQueueNext(&MySerializableXact->inConflicts,
3274                                                  &conflict->inLink,
3275                                                  offsetof(RWConflictData, inLink));
3276
3277                 if (!isCommit
3278                         || SxactIsCommitted(conflict->sxactOut)
3279                         || SxactIsReadOnly(conflict->sxactOut))
3280                         ReleaseRWConflict(conflict);
3281
3282                 conflict = nextConflict;
3283         }
3284
3285         if (!topLevelIsDeclaredReadOnly)
3286         {
3287                 /*
3288                  * Remove ourselves from the list of possible conflicts for concurrent
3289                  * READ ONLY transactions, flagging them as unsafe if we have a
3290                  * conflict out. If any are waiting DEFERRABLE transactions, wake them
3291                  * up if they are known safe or known unsafe.
3292                  */
3293                 possibleUnsafeConflict = (RWConflict)
3294                         SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3295                                                  &MySerializableXact->possibleUnsafeConflicts,
3296                                                  offsetof(RWConflictData, outLink));
3297                 while (possibleUnsafeConflict)
3298                 {
3299                         nextConflict = (RWConflict)
3300                                 SHMQueueNext(&MySerializableXact->possibleUnsafeConflicts,
3301                                                          &possibleUnsafeConflict->outLink,
3302                                                          offsetof(RWConflictData, outLink));
3303
3304                         roXact = possibleUnsafeConflict->sxactIn;
3305                         Assert(MySerializableXact == possibleUnsafeConflict->sxactOut);
3306                         Assert(SxactIsReadOnly(roXact));
3307
3308                         /* Mark conflicted if necessary. */
3309                         if (isCommit
3310                                 && MyXactDidWrite
3311                                 && SxactHasConflictOut(MySerializableXact)
3312                                 && (MySerializableXact->SeqNo.earliestOutConflictCommit
3313                                         <= roXact->SeqNo.lastCommitBeforeSnapshot))
3314                         {
3315                                 /*
3316                                  * This releases possibleUnsafeConflict (as well as all other
3317                                  * possible conflicts for roXact)
3318                                  */
3319                                 FlagSxactUnsafe(roXact);
3320                         }
3321                         else
3322                         {
3323                                 ReleaseRWConflict(possibleUnsafeConflict);
3324
3325                                 /*
3326                                  * If we were the last possible conflict, flag it safe. The
3327                                  * transaction can now safely release its predicate locks (but
3328                                  * that transaction's backend has to do that itself).
3329                                  */
3330                                 if (SHMQueueEmpty(&roXact->possibleUnsafeConflicts))
3331                                         roXact->flags |= SXACT_FLAG_RO_SAFE;
3332                         }
3333
3334                         /*
3335                          * Wake up the process for a waiting DEFERRABLE transaction if we
3336                          * now know it's either safe or conflicted.
3337                          */
3338                         if (SxactIsDeferrableWaiting(roXact) &&
3339                                 (SxactIsROUnsafe(roXact) || SxactIsROSafe(roXact)))
3340                                 ProcSendSignal(roXact->pid);
3341
3342                         possibleUnsafeConflict = nextConflict;
3343                 }
3344         }
3345
3346         /*
3347          * Check whether it's time to clean up old transactions. This can only be
3348          * done when the last serializable transaction with the oldest xmin among
3349          * serializable transactions completes.  We then find the "new oldest"
3350          * xmin and purge any transactions which finished before this transaction
3351          * was launched.
3352          */
3353         needToClear = false;
3354         if (TransactionIdEquals(MySerializableXact->xmin, PredXact->SxactGlobalXmin))
3355         {
3356                 Assert(PredXact->SxactGlobalXminCount > 0);
3357                 if (--(PredXact->SxactGlobalXminCount) == 0)
3358                 {
3359                         SetNewSxactGlobalXmin();
3360                         needToClear = true;
3361                 }
3362         }
3363
3364         LWLockRelease(SerializableXactHashLock);
3365
3366         LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
3367
3368         /* Add this to the list of transactions to check for later cleanup. */
3369         if (isCommit)
3370                 SHMQueueInsertBefore(FinishedSerializableTransactions,
3371                                                          &MySerializableXact->finishedLink);
3372
3373         if (!isCommit)
3374                 ReleaseOneSerializableXact(MySerializableXact, false, false);
3375
3376         LWLockRelease(SerializableFinishedListLock);
3377
3378         if (needToClear)
3379                 ClearOldPredicateLocks();
3380
3381         MySerializableXact = InvalidSerializableXact;
3382         MyXactDidWrite = false;
3383
3384         /* Delete per-transaction lock table */
3385         if (LocalPredicateLockHash != NULL)
3386         {
3387                 hash_destroy(LocalPredicateLockHash);
3388                 LocalPredicateLockHash = NULL;
3389         }
3390 }
3391
3392 /*
3393  * Clear old predicate locks, belonging to committed transactions that are no
3394  * longer interesting to any in-progress transaction.
3395  */
3396 static void
3397 ClearOldPredicateLocks(void)
3398 {
3399         SERIALIZABLEXACT *finishedSxact;
3400         PREDICATELOCK *predlock;
3401
3402         /*
3403          * Loop through finished transactions. They are in commit order, so we can
3404          * stop as soon as we find one that's still interesting.
3405          */
3406         LWLockAcquire(SerializableFinishedListLock, LW_EXCLUSIVE);
3407         finishedSxact = (SERIALIZABLEXACT *)
3408                 SHMQueueNext(FinishedSerializableTransactions,
3409                                          FinishedSerializableTransactions,
3410                                          offsetof(SERIALIZABLEXACT, finishedLink));
3411         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3412         while (finishedSxact)
3413         {
3414                 SERIALIZABLEXACT *nextSxact;
3415
3416                 nextSxact = (SERIALIZABLEXACT *)
3417                         SHMQueueNext(FinishedSerializableTransactions,
3418                                                  &(finishedSxact->finishedLink),
3419                                                  offsetof(SERIALIZABLEXACT, finishedLink));
3420                 if (!TransactionIdIsValid(PredXact->SxactGlobalXmin)
3421                         || TransactionIdPrecedesOrEquals(finishedSxact->finishedBefore,
3422                                                                                          PredXact->SxactGlobalXmin))
3423                 {
3424                         /*
3425                          * This transaction committed before any in-progress transaction
3426                          * took its snapshot. It's no longer interesting.
3427                          */
3428                         LWLockRelease(SerializableXactHashLock);
3429                         SHMQueueDelete(&(finishedSxact->finishedLink));
3430                         ReleaseOneSerializableXact(finishedSxact, false, false);
3431                         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3432                 }
3433                 else if (finishedSxact->commitSeqNo > PredXact->HavePartialClearedThrough
3434                    && finishedSxact->commitSeqNo <= PredXact->CanPartialClearThrough)
3435                 {
3436                         LWLockRelease(SerializableXactHashLock);
3437                         ReleaseOneSerializableXact(finishedSxact,
3438                                                                            !SxactIsReadOnly(finishedSxact),
3439                                                                            false);
3440                         PredXact->HavePartialClearedThrough = finishedSxact->commitSeqNo;
3441                         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3442                 }
3443                 else
3444                 {
3445                         /* Still interesting. */
3446                         break;
3447                 }
3448                 finishedSxact = nextSxact;
3449         }
3450         LWLockRelease(SerializableXactHashLock);
3451
3452         /*
3453          * Loop through predicate locks on dummy transaction for summarized data.
3454          */
3455         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3456         predlock = (PREDICATELOCK *)
3457                 SHMQueueNext(&OldCommittedSxact->predicateLocks,
3458                                          &OldCommittedSxact->predicateLocks,
3459                                          offsetof(PREDICATELOCK, xactLink));
3460         while (predlock)
3461         {
3462                 PREDICATELOCK *nextpredlock;
3463                 bool            canDoPartialCleanup;
3464
3465                 nextpredlock = (PREDICATELOCK *)
3466                         SHMQueueNext(&OldCommittedSxact->predicateLocks,
3467                                                  &predlock->xactLink,
3468                                                  offsetof(PREDICATELOCK, xactLink));
3469
3470                 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3471                 Assert(predlock->commitSeqNo != 0);
3472                 Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
3473                 canDoPartialCleanup = (predlock->commitSeqNo <= PredXact->CanPartialClearThrough);
3474                 LWLockRelease(SerializableXactHashLock);
3475
3476                 /*
3477                  * If this lock originally belonged to an old enough transaction, we
3478                  * can release it.
3479                  */
3480                 if (canDoPartialCleanup)
3481                 {
3482                         PREDICATELOCKTAG tag;
3483                         PREDICATELOCKTARGET *target;
3484                         PREDICATELOCKTARGETTAG targettag;
3485                         uint32          targettaghash;
3486                         LWLockId        partitionLock;
3487
3488                         tag = predlock->tag;
3489                         target = tag.myTarget;
3490                         targettag = target->tag;
3491                         targettaghash = PredicateLockTargetTagHashCode(&targettag);
3492                         partitionLock = PredicateLockHashPartitionLock(targettaghash);
3493
3494                         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3495
3496                         SHMQueueDelete(&(predlock->targetLink));
3497                         SHMQueueDelete(&(predlock->xactLink));
3498
3499                         hash_search_with_hash_value(PredicateLockHash, &tag,
3500                                                                 PredicateLockHashCodeFromTargetHashCode(&tag,
3501                                                                                                                           targettaghash),
3502                                                                                 HASH_REMOVE, NULL);
3503                         RemoveTargetIfNoLongerUsed(target, targettaghash);
3504
3505                         LWLockRelease(partitionLock);
3506                 }
3507
3508                 predlock = nextpredlock;
3509         }
3510
3511         LWLockRelease(SerializablePredicateLockListLock);
3512         LWLockRelease(SerializableFinishedListLock);
3513 }
3514
3515 /*
3516  * This is the normal way to delete anything from any of the predicate
3517  * locking hash tables.  Given a transaction which we know can be deleted:
3518  * delete all predicate locks held by that transaction and any predicate
3519  * lock targets which are now unreferenced by a lock; delete all conflicts
3520  * for the transaction; delete all xid values for the transaction; then
3521  * delete the transaction.
3522  *
3523  * When the partial flag is set, we can release all predicate locks and
3524  * in-conflict information -- we've established that there are no longer
3525  * any overlapping read write transactions for which this transaction could
3526  * matter -- but keep the transaction entry itself and any outConflicts.
3527  *
3528  * When the summarize flag is set, we've run short of room for sxact data
3529  * and must summarize to the SLRU.      Predicate locks are transferred to a
3530  * dummy "old" transaction, with duplicate locks on a single target
3531  * collapsing to a single lock with the "latest" commitSeqNo from among
3532  * the conflicting locks..
3533  */
3534 static void
3535 ReleaseOneSerializableXact(SERIALIZABLEXACT *sxact, bool partial,
3536                                                    bool summarize)
3537 {
3538         PREDICATELOCK *predlock;
3539         SERIALIZABLEXIDTAG sxidtag;
3540         RWConflict      conflict,
3541                                 nextConflict;
3542
3543         Assert(sxact != NULL);
3544         Assert(SxactIsRolledBack(sxact) || SxactIsCommitted(sxact));
3545         Assert(LWLockHeldByMe(SerializableFinishedListLock));
3546
3547         /*
3548          * First release all the predicate locks held by this xact (or transfer
3549          * them to OldCommittedSxact if summarize is true)
3550          */
3551         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
3552         predlock = (PREDICATELOCK *)
3553                 SHMQueueNext(&(sxact->predicateLocks),
3554                                          &(sxact->predicateLocks),
3555                                          offsetof(PREDICATELOCK, xactLink));
3556         while (predlock)
3557         {
3558                 PREDICATELOCK *nextpredlock;
3559                 PREDICATELOCKTAG tag;
3560                 SHM_QUEUE  *targetLink;
3561                 PREDICATELOCKTARGET *target;
3562                 PREDICATELOCKTARGETTAG targettag;
3563                 uint32          targettaghash;
3564                 LWLockId        partitionLock;
3565
3566                 nextpredlock = (PREDICATELOCK *)
3567                         SHMQueueNext(&(sxact->predicateLocks),
3568                                                  &(predlock->xactLink),
3569                                                  offsetof(PREDICATELOCK, xactLink));
3570
3571                 tag = predlock->tag;
3572                 targetLink = &(predlock->targetLink);
3573                 target = tag.myTarget;
3574                 targettag = target->tag;
3575                 targettaghash = PredicateLockTargetTagHashCode(&targettag);
3576                 partitionLock = PredicateLockHashPartitionLock(targettaghash);
3577
3578                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3579
3580                 SHMQueueDelete(targetLink);
3581
3582                 hash_search_with_hash_value(PredicateLockHash, &tag,
3583                                                                 PredicateLockHashCodeFromTargetHashCode(&tag,
3584                                                                                                                           targettaghash),
3585                                                                         HASH_REMOVE, NULL);
3586                 if (summarize)
3587                 {
3588                         bool            found;
3589
3590                         /* Fold into dummy transaction list. */
3591                         tag.myXact = OldCommittedSxact;
3592                         predlock = hash_search_with_hash_value(PredicateLockHash, &tag,
3593                                                                 PredicateLockHashCodeFromTargetHashCode(&tag,
3594                                                                                                                           targettaghash),
3595                                                                                                    HASH_ENTER_NULL, &found);
3596                         if (!predlock)
3597                                 ereport(ERROR,
3598                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
3599                                                  errmsg("out of shared memory"),
3600                                                  errhint("You might need to increase max_pred_locks_per_transaction.")));
3601                         if (found)
3602                         {
3603                                 Assert(predlock->commitSeqNo != 0);
3604                                 Assert(predlock->commitSeqNo != InvalidSerCommitSeqNo);
3605                                 if (predlock->commitSeqNo < sxact->commitSeqNo)
3606                                         predlock->commitSeqNo = sxact->commitSeqNo;
3607                         }
3608                         else
3609                         {
3610                                 SHMQueueInsertBefore(&(target->predicateLocks),
3611                                                                          &(predlock->targetLink));
3612                                 SHMQueueInsertBefore(&(OldCommittedSxact->predicateLocks),
3613                                                                          &(predlock->xactLink));
3614                                 predlock->commitSeqNo = sxact->commitSeqNo;
3615                         }
3616                 }
3617                 else
3618                         RemoveTargetIfNoLongerUsed(target, targettaghash);
3619
3620                 LWLockRelease(partitionLock);
3621
3622                 predlock = nextpredlock;
3623         }
3624
3625         /*
3626          * Rather than retail removal, just re-init the head after we've run
3627          * through the list.
3628          */
3629         SHMQueueInit(&sxact->predicateLocks);
3630
3631         LWLockRelease(SerializablePredicateLockListLock);
3632
3633         sxidtag.xid = sxact->topXid;
3634         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3635
3636         /* Release all outConflicts (unless 'partial' is true) */
3637         if (!partial)
3638         {
3639                 conflict = (RWConflict)
3640                         SHMQueueNext(&sxact->outConflicts,
3641                                                  &sxact->outConflicts,
3642                                                  offsetof(RWConflictData, outLink));
3643                 while (conflict)
3644                 {
3645                         nextConflict = (RWConflict)
3646                                 SHMQueueNext(&sxact->outConflicts,
3647                                                          &conflict->outLink,
3648                                                          offsetof(RWConflictData, outLink));
3649                         if (summarize)
3650                                 conflict->sxactIn->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
3651                         ReleaseRWConflict(conflict);
3652                         conflict = nextConflict;
3653                 }
3654         }
3655
3656         /* Release all inConflicts. */
3657         conflict = (RWConflict)
3658                 SHMQueueNext(&sxact->inConflicts,
3659                                          &sxact->inConflicts,
3660                                          offsetof(RWConflictData, inLink));
3661         while (conflict)
3662         {
3663                 nextConflict = (RWConflict)
3664                         SHMQueueNext(&sxact->inConflicts,
3665                                                  &conflict->inLink,
3666                                                  offsetof(RWConflictData, inLink));
3667                 if (summarize)
3668                         conflict->sxactOut->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
3669                 ReleaseRWConflict(conflict);
3670                 conflict = nextConflict;
3671         }
3672
3673         /* Finally, get rid of the xid and the record of the transaction itself. */
3674         if (!partial)
3675         {
3676                 if (sxidtag.xid != InvalidTransactionId)
3677                         hash_search(SerializableXidHash, &sxidtag, HASH_REMOVE, NULL);
3678                 ReleasePredXact(sxact);
3679         }
3680
3681         LWLockRelease(SerializableXactHashLock);
3682 }
3683
3684 /*
3685  * Tests whether the given top level transaction is concurrent with
3686  * (overlaps) our current transaction.
3687  *
3688  * We need to identify the top level transaction for SSI, anyway, so pass
3689  * that to this function to save the overhead of checking the snapshot's
3690  * subxip array.
3691  */
3692 static bool
3693 XidIsConcurrent(TransactionId xid)
3694 {
3695         Snapshot        snap;
3696         uint32          i;
3697
3698         Assert(TransactionIdIsValid(xid));
3699         Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
3700
3701         snap = GetTransactionSnapshot();
3702
3703         if (TransactionIdPrecedes(xid, snap->xmin))
3704                 return false;
3705
3706         if (TransactionIdFollowsOrEquals(xid, snap->xmax))
3707                 return true;
3708
3709         for (i = 0; i < snap->xcnt; i++)
3710         {
3711                 if (xid == snap->xip[i])
3712                         return true;
3713         }
3714
3715         return false;
3716 }
3717
3718 /*
3719  * CheckForSerializableConflictOut
3720  *              We are reading a tuple which has been modified.  If it is visible to
3721  *              us but has been deleted, that indicates a rw-conflict out.      If it's
3722  *              not visible and was created by a concurrent (overlapping)
3723  *              serializable transaction, that is also a rw-conflict out,
3724  *
3725  * We will determine the top level xid of the writing transaction with which
3726  * we may be in conflict, and check for overlap with our own transaction.
3727  * If the transactions overlap (i.e., they cannot see each other's writes),
3728  * then we have a conflict out.
3729  *
3730  * This function should be called just about anywhere in heapam.c where a
3731  * tuple has been read. The caller must hold at least a shared lock on the
3732  * buffer, because this function might set hint bits on the tuple. There is
3733  * currently no known reason to call this function from an index AM.
3734  */
3735 void
3736 CheckForSerializableConflictOut(bool visible, Relation relation,
3737                                                                 HeapTuple tuple, Buffer buffer,
3738                                                                 Snapshot snapshot)
3739 {
3740         TransactionId xid;
3741         SERIALIZABLEXIDTAG sxidtag;
3742         SERIALIZABLEXID *sxid;
3743         SERIALIZABLEXACT *sxact;
3744         HTSV_Result htsvResult;
3745
3746         if (!SerializationNeededForRead(relation, snapshot))
3747                 return;
3748
3749         /* Check if someone else has already decided that we need to die */
3750         if (SxactIsDoomed(MySerializableXact))
3751         {
3752                 ereport(ERROR,
3753                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3754                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
3755                                  errdetail("Canceled on identification as a pivot, during conflict out checking."),
3756                                  errhint("The transaction might succeed if retried.")));
3757         }
3758
3759         /*
3760          * Check to see whether the tuple has been written to by a concurrent
3761          * transaction, either to create it not visible to us, or to delete it
3762          * while it is visible to us.  The "visible" bool indicates whether the
3763          * tuple is visible to us, while HeapTupleSatisfiesVacuum checks what else
3764          * is going on with it.
3765          */
3766         htsvResult = HeapTupleSatisfiesVacuum(tuple->t_data, TransactionXmin, buffer);
3767         switch (htsvResult)
3768         {
3769                 case HEAPTUPLE_LIVE:
3770                         if (visible)
3771                                 return;
3772                         xid = HeapTupleHeaderGetXmin(tuple->t_data);
3773                         break;
3774                 case HEAPTUPLE_RECENTLY_DEAD:
3775                         if (!visible)
3776                                 return;
3777                         xid = HeapTupleHeaderGetXmax(tuple->t_data);
3778                         break;
3779                 case HEAPTUPLE_DELETE_IN_PROGRESS:
3780                         xid = HeapTupleHeaderGetXmax(tuple->t_data);
3781                         break;
3782                 case HEAPTUPLE_INSERT_IN_PROGRESS:
3783                         xid = HeapTupleHeaderGetXmin(tuple->t_data);
3784                         break;
3785                 case HEAPTUPLE_DEAD:
3786                         return;
3787                 default:
3788
3789                         /*
3790                          * The only way to get to this default clause is if a new value is
3791                          * added to the enum type without adding it to this switch
3792                          * statement.  That's a bug, so elog.
3793                          */
3794                         elog(ERROR, "unrecognized return value from HeapTupleSatisfiesVacuum: %u", htsvResult);
3795
3796                         /*
3797                          * In spite of having all enum values covered and calling elog on
3798                          * this default, some compilers think this is a code path which
3799                          * allows xid to be used below without initialization. Silence
3800                          * that warning.
3801                          */
3802                         xid = InvalidTransactionId;
3803         }
3804         Assert(TransactionIdIsValid(xid));
3805         Assert(TransactionIdFollowsOrEquals(xid, TransactionXmin));
3806
3807         /*
3808          * Find top level xid.  Bail out if xid is too early to be a conflict, or
3809          * if it's our own xid.
3810          */
3811         if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
3812                 return;
3813         xid = SubTransGetTopmostTransaction(xid);
3814         if (TransactionIdPrecedes(xid, TransactionXmin))
3815                 return;
3816         if (TransactionIdEquals(xid, GetTopTransactionIdIfAny()))
3817                 return;
3818
3819         /*
3820          * Find sxact or summarized info for the top level xid.
3821          */
3822         sxidtag.xid = xid;
3823         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
3824         sxid = (SERIALIZABLEXID *)
3825                 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
3826         if (!sxid)
3827         {
3828                 /*
3829                  * Transaction not found in "normal" SSI structures.  Check whether it
3830                  * got pushed out to SLRU storage for "old committed" transactions.
3831                  */
3832                 SerCommitSeqNo conflictCommitSeqNo;
3833
3834                 conflictCommitSeqNo = OldSerXidGetMinConflictCommitSeqNo(xid);
3835                 if (conflictCommitSeqNo != 0)
3836                 {
3837                         if (conflictCommitSeqNo != InvalidSerCommitSeqNo
3838                                 && (!SxactIsReadOnly(MySerializableXact)
3839                                         || conflictCommitSeqNo
3840                                         <= MySerializableXact->SeqNo.lastCommitBeforeSnapshot))
3841                                 ereport(ERROR,
3842                                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3843                                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
3844                                 errdetail("Canceled on conflict out to old pivot %u.", xid),
3845                                           errhint("The transaction might succeed if retried.")));
3846
3847                         if (SxactHasSummaryConflictIn(MySerializableXact)
3848                                 || !SHMQueueEmpty(&MySerializableXact->inConflicts))
3849                                 ereport(ERROR,
3850                                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3851                                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
3852                                                  errdetail("Canceled on identification as a pivot, with conflict out to old committed transaction %u.", xid),
3853                                           errhint("The transaction might succeed if retried.")));
3854
3855                         MySerializableXact->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
3856                 }
3857
3858                 /* It's not serializable or otherwise not important. */
3859                 LWLockRelease(SerializableXactHashLock);
3860                 return;
3861         }
3862         sxact = sxid->myXact;
3863         Assert(TransactionIdEquals(sxact->topXid, xid));
3864         if (sxact == MySerializableXact || SxactIsDoomed(sxact))
3865         {
3866                 /* Can't conflict with ourself or a transaction that will roll back. */
3867                 LWLockRelease(SerializableXactHashLock);
3868                 return;
3869         }
3870
3871         /*
3872          * We have a conflict out to a transaction which has a conflict out to a
3873          * summarized transaction.      That summarized transaction must have
3874          * committed first, and we can't tell when it committed in relation to our
3875          * snapshot acquisition, so something needs to be canceled.
3876          */
3877         if (SxactHasSummaryConflictOut(sxact))
3878         {
3879                 if (!SxactIsPrepared(sxact))
3880                 {
3881                         sxact->flags |= SXACT_FLAG_DOOMED;
3882                         LWLockRelease(SerializableXactHashLock);
3883                         return;
3884                 }
3885                 else
3886                 {
3887                         LWLockRelease(SerializableXactHashLock);
3888                         ereport(ERROR,
3889                                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
3890                                          errmsg("could not serialize access due to read/write dependencies among transactions"),
3891                                          errdetail("Canceled on conflict out to old pivot."),
3892                                          errhint("The transaction might succeed if retried.")));
3893                 }
3894         }
3895
3896         /*
3897          * If this is a read-only transaction and the writing transaction has
3898          * committed, and it doesn't have a rw-conflict to a transaction which
3899          * committed before it, no conflict.
3900          */
3901         if (SxactIsReadOnly(MySerializableXact)
3902                 && SxactIsCommitted(sxact)
3903                 && !SxactHasSummaryConflictOut(sxact)
3904                 && (!SxactHasConflictOut(sxact)
3905                         || MySerializableXact->SeqNo.lastCommitBeforeSnapshot < sxact->SeqNo.earliestOutConflictCommit))
3906         {
3907                 /* Read-only transaction will appear to run first.      No conflict. */
3908                 LWLockRelease(SerializableXactHashLock);
3909                 return;
3910         }
3911
3912         if (!XidIsConcurrent(xid))
3913         {
3914                 /* This write was already in our snapshot; no conflict. */
3915                 LWLockRelease(SerializableXactHashLock);
3916                 return;
3917         }
3918
3919         if (RWConflictExists(MySerializableXact, sxact))
3920         {
3921                 /* We don't want duplicate conflict records in the list. */
3922                 LWLockRelease(SerializableXactHashLock);
3923                 return;
3924         }
3925
3926         /*
3927          * Flag the conflict.  But first, if this conflict creates a dangerous
3928          * structure, ereport an error.
3929          */
3930         FlagRWConflict(MySerializableXact, sxact);
3931         LWLockRelease(SerializableXactHashLock);
3932 }
3933
3934 /*
3935  * Check a particular target for rw-dependency conflict in. A subroutine of
3936  * CheckForSerializableConflictIn().
3937  */
3938 static void
3939 CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
3940 {
3941         uint32          targettaghash;
3942         LWLockId        partitionLock;
3943         PREDICATELOCKTARGET *target;
3944         PREDICATELOCK *predlock;
3945         PREDICATELOCK *mypredlock = NULL;
3946         PREDICATELOCKTAG mypredlocktag;
3947
3948         Assert(MySerializableXact != InvalidSerializableXact);
3949
3950         /*
3951          * The same hash and LW lock apply to the lock target and the lock itself.
3952          */
3953         targettaghash = PredicateLockTargetTagHashCode(targettag);
3954         partitionLock = PredicateLockHashPartitionLock(targettaghash);
3955         LWLockAcquire(partitionLock, LW_SHARED);
3956         target = (PREDICATELOCKTARGET *)
3957                 hash_search_with_hash_value(PredicateLockTargetHash,
3958                                                                         targettag, targettaghash,
3959                                                                         HASH_FIND, NULL);
3960         if (!target)
3961         {
3962                 /* Nothing has this target locked; we're done here. */
3963                 LWLockRelease(partitionLock);
3964                 return;
3965         }
3966
3967         /*
3968          * Each lock for an overlapping transaction represents a conflict: a
3969          * rw-dependency in to this transaction.
3970          */
3971         predlock = (PREDICATELOCK *)
3972                 SHMQueueNext(&(target->predicateLocks),
3973                                          &(target->predicateLocks),
3974                                          offsetof(PREDICATELOCK, targetLink));
3975         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
3976         while (predlock)
3977         {
3978                 SHM_QUEUE  *predlocktargetlink;
3979                 PREDICATELOCK *nextpredlock;
3980                 SERIALIZABLEXACT *sxact;
3981
3982                 predlocktargetlink = &(predlock->targetLink);
3983                 nextpredlock = (PREDICATELOCK *)
3984                         SHMQueueNext(&(target->predicateLocks),
3985                                                  predlocktargetlink,
3986                                                  offsetof(PREDICATELOCK, targetLink));
3987
3988                 sxact = predlock->tag.myXact;
3989                 if (sxact == MySerializableXact)
3990                 {
3991                         /*
3992                          * If we're getting a write lock on a tuple, we don't need a
3993                          * predicate (SIREAD) lock on the same tuple. We can safely remove
3994                          * our SIREAD lock, but we'll defer doing so until after the loop
3995                          * because that requires upgrading to an exclusive partition lock.
3996                          *
3997                          * We can't use this optimization within a subtransaction because
3998                          * the subtransaction could roll back, and we would be left
3999                          * without any lock at the top level.
4000                          */
4001                         if (!IsSubTransaction()
4002                                 && GET_PREDICATELOCKTARGETTAG_OFFSET(*targettag))
4003                         {
4004                                 mypredlock = predlock;
4005                                 mypredlocktag = predlock->tag;
4006                         }
4007                 }
4008                 else if (!SxactIsDoomed(sxact)
4009                                  && (!SxactIsCommitted(sxact)
4010                                          || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
4011                                                                                           sxact->finishedBefore))
4012                                  && !RWConflictExists(sxact, MySerializableXact))
4013                 {
4014                         LWLockRelease(SerializableXactHashLock);
4015                         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4016
4017                         /*
4018                          * Re-check after getting exclusive lock because the other
4019                          * transaction may have flagged a conflict.
4020                          */
4021                         if (!SxactIsDoomed(sxact)
4022                                 && (!SxactIsCommitted(sxact)
4023                                         || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
4024                                                                                          sxact->finishedBefore))
4025                                 && !RWConflictExists(sxact, MySerializableXact))
4026                         {
4027                                 FlagRWConflict(sxact, MySerializableXact);
4028                         }
4029
4030                         LWLockRelease(SerializableXactHashLock);
4031                         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4032                 }
4033
4034                 predlock = nextpredlock;
4035         }
4036         LWLockRelease(SerializableXactHashLock);
4037         LWLockRelease(partitionLock);
4038
4039         /*
4040          * If we found one of our own SIREAD locks to remove, remove it now.
4041          *
4042          * At this point our transaction already has an ExclusiveRowLock on the
4043          * relation, so we are OK to drop the predicate lock on the tuple, if
4044          * found, without fearing that another write against the tuple will occur
4045          * before the MVCC information makes it to the buffer.
4046          */
4047         if (mypredlock != NULL)
4048         {
4049                 uint32          predlockhashcode;
4050                 PREDICATELOCK *rmpredlock;
4051
4052                 LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
4053                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4054                 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4055
4056                 /*
4057                  * Remove the predicate lock from shared memory, if it wasn't removed
4058                  * while the locks were released.  One way that could happen is from
4059                  * autovacuum cleaning up an index.
4060                  */
4061                 predlockhashcode = PredicateLockHashCodeFromTargetHashCode
4062                         (&mypredlocktag, targettaghash);
4063                 rmpredlock = (PREDICATELOCK *)
4064                         hash_search_with_hash_value(PredicateLockHash,
4065                                                                                 &mypredlocktag,
4066                                                                                 predlockhashcode,
4067                                                                                 HASH_FIND, NULL);
4068                 if (rmpredlock != NULL)
4069                 {
4070                         Assert(rmpredlock == mypredlock);
4071
4072                         SHMQueueDelete(&(mypredlock->targetLink));
4073                         SHMQueueDelete(&(mypredlock->xactLink));
4074
4075                         rmpredlock = (PREDICATELOCK *)
4076                                 hash_search_with_hash_value(PredicateLockHash,
4077                                                                                         &mypredlocktag,
4078                                                                                         predlockhashcode,
4079                                                                                         HASH_REMOVE, NULL);
4080                         Assert(rmpredlock == mypredlock);
4081
4082                         RemoveTargetIfNoLongerUsed(target, targettaghash);
4083                 }
4084
4085                 LWLockRelease(SerializableXactHashLock);
4086                 LWLockRelease(partitionLock);
4087                 LWLockRelease(SerializablePredicateLockListLock);
4088
4089                 if (rmpredlock != NULL)
4090                 {
4091                         /*
4092                          * Remove entry in local lock table if it exists. It's OK if it
4093                          * doesn't exist; that means the lock was transferred to a new
4094                          * target by a different backend.
4095                          */
4096                         hash_search_with_hash_value(LocalPredicateLockHash,
4097                                                                                 targettag, targettaghash,
4098                                                                                 HASH_REMOVE, NULL);
4099
4100                         DecrementParentLocks(targettag);
4101                 }
4102         }
4103 }
4104
4105 /*
4106  * CheckForSerializableConflictIn
4107  *              We are writing the given tuple.  If that indicates a rw-conflict
4108  *              in from another serializable transaction, take appropriate action.
4109  *
4110  * Skip checking for any granularity for which a parameter is missing.
4111  *
4112  * A tuple update or delete is in conflict if we have a predicate lock
4113  * against the relation or page in which the tuple exists, or against the
4114  * tuple itself.
4115  */
4116 void
4117 CheckForSerializableConflictIn(Relation relation, HeapTuple tuple,
4118                                                            Buffer buffer)
4119 {
4120         PREDICATELOCKTARGETTAG targettag;
4121
4122         if (!SerializationNeededForWrite(relation))
4123                 return;
4124
4125         /* Check if someone else has already decided that we need to die */
4126         if (SxactIsDoomed(MySerializableXact))
4127                 ereport(ERROR,
4128                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4129                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
4130                                  errdetail("Canceled on identification as a pivot, during conflict in checking."),
4131                                  errhint("The transaction might succeed if retried.")));
4132
4133         /*
4134          * We're doing a write which might cause rw-conflicts now or later.
4135          * Memorize that fact.
4136          */
4137         MyXactDidWrite = true;
4138
4139         /*
4140          * It is important that we check for locks from the finest granularity to
4141          * the coarsest granularity, so that granularity promotion doesn't cause
4142          * us to miss a lock.  The new (coarser) lock will be acquired before the
4143          * old (finer) locks are released.
4144          *
4145          * It is not possible to take and hold a lock across the checks for all
4146          * granularities because each target could be in a separate partition.
4147          */
4148         if (tuple != NULL)
4149         {
4150                 SET_PREDICATELOCKTARGETTAG_TUPLE(targettag,
4151                                                                                  relation->rd_node.dbNode,
4152                                                                                  relation->rd_id,
4153                                                  ItemPointerGetBlockNumber(&(tuple->t_data->t_ctid)),
4154                                                 ItemPointerGetOffsetNumber(&(tuple->t_data->t_ctid)),
4155                                                                           HeapTupleHeaderGetXmin(tuple->t_data));
4156                 CheckTargetForConflictsIn(&targettag);
4157         }
4158
4159         if (BufferIsValid(buffer))
4160         {
4161                 SET_PREDICATELOCKTARGETTAG_PAGE(targettag,
4162                                                                                 relation->rd_node.dbNode,
4163                                                                                 relation->rd_id,
4164                                                                                 BufferGetBlockNumber(buffer));
4165                 CheckTargetForConflictsIn(&targettag);
4166         }
4167
4168         SET_PREDICATELOCKTARGETTAG_RELATION(targettag,
4169                                                                                 relation->rd_node.dbNode,
4170                                                                                 relation->rd_id);
4171         CheckTargetForConflictsIn(&targettag);
4172 }
4173
4174 /*
4175  * CheckTableForSerializableConflictIn
4176  *              The entire table is going through a DDL-style logical mass delete
4177  *              like TRUNCATE or DROP TABLE.  If that causes a rw-conflict in from
4178  *              another serializable transaction, take appropriate action.
4179  *
4180  * While these operations do not operate entirely within the bounds of
4181  * snapshot isolation, they can occur inside a serializable transaction, and
4182  * will logically occur after any reads which saw rows which were destroyed
4183  * by these operations, so we do what we can to serialize properly under
4184  * SSI.
4185  *
4186  * The relation passed in must be a heap relation. Any predicate lock of any
4187  * granularity on the heap will cause a rw-conflict in to this transaction.
4188  * Predicate locks on indexes do not matter because they only exist to guard
4189  * against conflicting inserts into the index, and this is a mass *delete*.
4190  * When a table is truncated or dropped, the index will also be truncated
4191  * or dropped, and we'll deal with locks on the index when that happens.
4192  *
4193  * Dropping or truncating a table also needs to drop any existing predicate
4194  * locks on heap tuples or pages, because they're about to go away. This
4195  * should be done before altering the predicate locks because the transaction
4196  * could be rolled back because of a conflict, in which case the lock changes
4197  * are not needed. (At the moment, we don't actually bother to drop the
4198  * existing locks on a dropped or truncated table at the moment. That might
4199  * lead to some false positives, but it doesn't seem worth the trouble.)
4200  */
4201 void
4202 CheckTableForSerializableConflictIn(Relation relation)
4203 {
4204         HASH_SEQ_STATUS seqstat;
4205         PREDICATELOCKTARGET *target;
4206         Oid                     dbId;
4207         Oid                     heapId;
4208         int                     i;
4209
4210         /*
4211          * Bail out quickly if there are no serializable transactions running.
4212          * It's safe to check this without taking locks because the caller is
4213          * holding an ACCESS EXCLUSIVE lock on the relation.  No new locks which
4214          * would matter here can be acquired while that is held.
4215          */
4216         if (!TransactionIdIsValid(PredXact->SxactGlobalXmin))
4217                 return;
4218
4219         if (!SerializationNeededForWrite(relation))
4220                 return;
4221
4222         /*
4223          * We're doing a write which might cause rw-conflicts now or later.
4224          * Memorize that fact.
4225          */
4226         MyXactDidWrite = true;
4227
4228         Assert(relation->rd_index == NULL); /* not an index relation */
4229
4230         dbId = relation->rd_node.dbNode;
4231         heapId = relation->rd_id;
4232
4233         LWLockAcquire(SerializablePredicateLockListLock, LW_EXCLUSIVE);
4234         for (i = 0; i < NUM_PREDICATELOCK_PARTITIONS; i++)
4235                 LWLockAcquire(FirstPredicateLockMgrLock + i, LW_SHARED);
4236         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4237
4238         /* Scan through target list */
4239         hash_seq_init(&seqstat, PredicateLockTargetHash);
4240
4241         while ((target = (PREDICATELOCKTARGET *) hash_seq_search(&seqstat)))
4242         {
4243                 PREDICATELOCK *predlock;
4244
4245                 /*
4246                  * Check whether this is a target which needs attention.
4247                  */
4248                 if (GET_PREDICATELOCKTARGETTAG_RELATION(target->tag) != heapId)
4249                         continue;                       /* wrong relation id */
4250                 if (GET_PREDICATELOCKTARGETTAG_DB(target->tag) != dbId)
4251                         continue;                       /* wrong database id */
4252
4253                 /*
4254                  * Loop through locks for this target and flag conflicts.
4255                  */
4256                 predlock = (PREDICATELOCK *)
4257                         SHMQueueNext(&(target->predicateLocks),
4258                                                  &(target->predicateLocks),
4259                                                  offsetof(PREDICATELOCK, targetLink));
4260                 while (predlock)
4261                 {
4262                         PREDICATELOCK *nextpredlock;
4263
4264                         nextpredlock = (PREDICATELOCK *)
4265                                 SHMQueueNext(&(target->predicateLocks),
4266                                                          &(predlock->targetLink),
4267                                                          offsetof(PREDICATELOCK, targetLink));
4268
4269                         if (predlock->tag.myXact != MySerializableXact
4270                           && !RWConflictExists(predlock->tag.myXact, MySerializableXact))
4271                         {
4272                                 FlagRWConflict(predlock->tag.myXact, MySerializableXact);
4273                         }
4274
4275                         predlock = nextpredlock;
4276                 }
4277         }
4278
4279         /* Release locks in reverse order */
4280         LWLockRelease(SerializableXactHashLock);
4281         for (i = NUM_PREDICATELOCK_PARTITIONS - 1; i >= 0; i--)
4282                 LWLockRelease(FirstPredicateLockMgrLock + i);
4283         LWLockRelease(SerializablePredicateLockListLock);
4284 }
4285
4286
4287 /*
4288  * Flag a rw-dependency between two serializable transactions.
4289  *
4290  * The caller is responsible for ensuring that we have a LW lock on
4291  * the transaction hash table.
4292  */
4293 static void
4294 FlagRWConflict(SERIALIZABLEXACT *reader, SERIALIZABLEXACT *writer)
4295 {
4296         Assert(reader != writer);
4297
4298         /* First, see if this conflict causes failure. */
4299         OnConflict_CheckForSerializationFailure(reader, writer);
4300
4301         /* Actually do the conflict flagging. */
4302         if (reader == OldCommittedSxact)
4303                 writer->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
4304         else if (writer == OldCommittedSxact)
4305                 reader->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
4306         else
4307                 SetRWConflict(reader, writer);
4308 }
4309
4310 /*----------------------------------------------------------------------------
4311  * We are about to add a RW-edge to the dependency graph - check that we don't
4312  * introduce a dangerous structure by doing so, and abort one of the
4313  * transactions if so.
4314  *
4315  * A serialization failure can only occur if there is a dangerous structure
4316  * in the dependency graph:
4317  *
4318  *              Tin ------> Tpivot ------> Tout
4319  *                        rw                     rw
4320  *
4321  * Furthermore, Tout must commit first.
4322  *
4323  * One more optimization is that if Tin is declared READ ONLY (or commits
4324  * without writing), we can only have a problem if Tout committed before Tin
4325  * acquired its snapshot.
4326  *----------------------------------------------------------------------------
4327  */
4328 static void
4329 OnConflict_CheckForSerializationFailure(const SERIALIZABLEXACT *reader,
4330                                                                                 SERIALIZABLEXACT *writer)
4331 {
4332         bool            failure;
4333         RWConflict      conflict;
4334
4335         Assert(LWLockHeldByMe(SerializableXactHashLock));
4336
4337         failure = false;
4338
4339         /*------------------------------------------------------------------------
4340          * Check for already-committed writer with rw-conflict out flagged
4341          * (conflict-flag on W means that T2 committed before W):
4342          *
4343          *              R ------> W ------> T2
4344          *                      rw                rw
4345          *
4346          * That is a dangerous structure, so we must abort. (Since the writer
4347          * has already committed, we must be the reader)
4348          *------------------------------------------------------------------------
4349          */
4350         if (SxactIsCommitted(writer)
4351           && (SxactHasConflictOut(writer) || SxactHasSummaryConflictOut(writer)))
4352                 failure = true;
4353
4354         /*------------------------------------------------------------------------
4355          * Check whether the writer has become a pivot with an out-conflict
4356          * committed transaction (T2), and T2 committed first:
4357          *
4358          *              R ------> W ------> T2
4359          *                      rw                rw
4360          *
4361          * Because T2 must've committed first, there is no anomaly if:
4362          * - the reader committed before T2
4363          * - the writer committed before T2
4364          * - the reader is a READ ONLY transaction and the reader was concurrent
4365          *       with T2 (= reader acquired its snapshot before T2 committed)
4366          *------------------------------------------------------------------------
4367          */
4368         if (!failure)
4369         {
4370                 if (SxactHasSummaryConflictOut(writer))
4371                 {
4372                         failure = true;
4373                         conflict = NULL;
4374                 }
4375                 else
4376                         conflict = (RWConflict)
4377                                 SHMQueueNext(&writer->outConflicts,
4378                                                          &writer->outConflicts,
4379                                                          offsetof(RWConflictData, outLink));
4380                 while (conflict)
4381                 {
4382                         SERIALIZABLEXACT *t2 = conflict->sxactIn;
4383
4384                         if (SxactIsCommitted(t2)
4385                                 && (!SxactIsCommitted(reader)
4386                                         || t2->commitSeqNo <= reader->commitSeqNo)
4387                                 && (!SxactIsCommitted(writer)
4388                                         || t2->commitSeqNo <= writer->commitSeqNo)
4389                                 && (!SxactIsReadOnly(reader)
4390                            || t2->commitSeqNo <= reader->SeqNo.lastCommitBeforeSnapshot))
4391                         {
4392                                 failure = true;
4393                                 break;
4394                         }
4395                         conflict = (RWConflict)
4396                                 SHMQueueNext(&writer->outConflicts,
4397                                                          &conflict->outLink,
4398                                                          offsetof(RWConflictData, outLink));
4399                 }
4400         }
4401
4402         /*------------------------------------------------------------------------
4403          * Check whether the reader has become a pivot with a committed writer:
4404          *
4405          *              T0 ------> R ------> W
4406          *                       rw                rw
4407          *
4408          * Because W must've committed first for an anomaly to occur, there is no
4409          * anomaly if:
4410          * - T0 committed before the writer
4411          * - T0 is READ ONLY, and overlaps the writer
4412          *------------------------------------------------------------------------
4413          */
4414         if (!failure && SxactIsCommitted(writer) && !SxactIsReadOnly(reader))
4415         {
4416                 if (SxactHasSummaryConflictIn(reader))
4417                 {
4418                         failure = true;
4419                         conflict = NULL;
4420                 }
4421                 else
4422                         conflict = (RWConflict)
4423                                 SHMQueueNext(&reader->inConflicts,
4424                                                          &reader->inConflicts,
4425                                                          offsetof(RWConflictData, inLink));
4426                 while (conflict)
4427                 {
4428                         SERIALIZABLEXACT *t0 = conflict->sxactOut;
4429
4430                         if (!SxactIsDoomed(t0)
4431                                 && (!SxactIsCommitted(t0)
4432                                         || t0->commitSeqNo >= writer->commitSeqNo)
4433                                 && (!SxactIsReadOnly(t0)
4434                            || t0->SeqNo.lastCommitBeforeSnapshot >= writer->commitSeqNo))
4435                         {
4436                                 failure = true;
4437                                 break;
4438                         }
4439                         conflict = (RWConflict)
4440                                 SHMQueueNext(&reader->inConflicts,
4441                                                          &conflict->inLink,
4442                                                          offsetof(RWConflictData, inLink));
4443                 }
4444         }
4445
4446         if (failure)
4447         {
4448                 /*
4449                  * We have to kill a transaction to avoid a possible anomaly from
4450                  * occurring. If the writer is us, we can just ereport() to cause a
4451                  * transaction abort. Otherwise we flag the writer for termination,
4452                  * causing it to abort when it tries to commit. However, if the writer
4453                  * is a prepared transaction, already prepared, we can't abort it
4454                  * anymore, so we have to kill the reader instead.
4455                  */
4456                 if (MySerializableXact == writer)
4457                 {
4458                         LWLockRelease(SerializableXactHashLock);
4459                         ereport(ERROR,
4460                                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4461                                          errmsg("could not serialize access due to read/write dependencies among transactions"),
4462                                          errdetail("Canceled on identification as a pivot, during write."),
4463                                          errhint("The transaction might succeed if retried.")));
4464                 }
4465                 else if (SxactIsPrepared(writer))
4466                 {
4467                         LWLockRelease(SerializableXactHashLock);
4468
4469                         /* if we're not the writer, we have to be the reader */
4470                         Assert(MySerializableXact == reader);
4471                         ereport(ERROR,
4472                                         (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4473                                          errmsg("could not serialize access due to read/write dependencies among transactions"),
4474                                          errdetail("Canceled on conflict out to pivot %u, during read.", writer->topXid),
4475                                          errhint("The transaction might succeed if retried.")));
4476                 }
4477                 writer->flags |= SXACT_FLAG_DOOMED;
4478         }
4479 }
4480
4481 /*
4482  * PreCommit_CheckForSerializableConflicts
4483  *              Check for dangerous structures in a serializable transaction
4484  *              at commit.
4485  *
4486  * We're checking for a dangerous structure as each conflict is recorded.
4487  * The only way we could have a problem at commit is if this is the "out"
4488  * side of a pivot, and neither the "in" side nor the pivot has yet
4489  * committed.
4490  *
4491  * If a dangerous structure is found, the pivot (the near conflict) is
4492  * marked for death, because rolling back another transaction might mean
4493  * that we flail without ever making progress.  This transaction is
4494  * committing writes, so letting it commit ensures progress.  If we
4495  * canceled the far conflict, it might immediately fail again on retry.
4496  */
4497 void
4498 PreCommit_CheckForSerializationFailure(void)
4499 {
4500         RWConflict      nearConflict;
4501
4502         if (MySerializableXact == InvalidSerializableXact)
4503                 return;
4504
4505         Assert(IsolationIsSerializable());
4506
4507         LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4508
4509         /* Check if someone else has already decided that we need to die */
4510         if (SxactIsDoomed(MySerializableXact))
4511         {
4512                 LWLockRelease(SerializableXactHashLock);
4513                 ereport(ERROR,
4514                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4515                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
4516                                  errdetail("Canceled on identification as a pivot, during commit attempt."),
4517                                  errhint("The transaction might succeed if retried.")));
4518         }
4519
4520         nearConflict = (RWConflict)
4521                 SHMQueueNext(&MySerializableXact->inConflicts,
4522                                          &MySerializableXact->inConflicts,
4523                                          offsetof(RWConflictData, inLink));
4524         while (nearConflict)
4525         {
4526                 if (!SxactIsCommitted(nearConflict->sxactOut)
4527                         && !SxactIsDoomed(nearConflict->sxactOut))
4528                 {
4529                         RWConflict      farConflict;
4530
4531                         farConflict = (RWConflict)
4532                                 SHMQueueNext(&nearConflict->sxactOut->inConflicts,
4533                                                          &nearConflict->sxactOut->inConflicts,
4534                                                          offsetof(RWConflictData, inLink));
4535                         while (farConflict)
4536                         {
4537                                 if (farConflict->sxactOut == MySerializableXact
4538                                         || (!SxactIsCommitted(farConflict->sxactOut)
4539                                                 && !SxactIsReadOnly(farConflict->sxactOut)
4540                                                 && !SxactIsDoomed(farConflict->sxactOut)))
4541                                 {
4542                                         /*
4543                                          * Normally, we kill the pivot transaction to make sure we
4544                                          * make progress if the failing transaction is retried.
4545                                          * However, we can't kill it if it's already prepared, so
4546                                          * in that case we commit suicide instead.
4547                                          */
4548                                         if (SxactIsPrepared(nearConflict->sxactOut))
4549                                         {
4550                                                 LWLockRelease(SerializableXactHashLock);
4551                                                 ereport(ERROR,
4552                                                                 (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
4553                                                                  errmsg("could not serialize access due to read/write dependencies among transactions"),
4554                                                                  errdetail("Canceled on commit attempt with conflict in from prepared pivot."),
4555                                                                  errhint("The transaction might succeed if retried.")));
4556                                         }
4557                                         nearConflict->sxactOut->flags |= SXACT_FLAG_DOOMED;
4558                                         break;
4559                                 }
4560                                 farConflict = (RWConflict)
4561                                         SHMQueueNext(&nearConflict->sxactOut->inConflicts,
4562                                                                  &farConflict->inLink,
4563                                                                  offsetof(RWConflictData, inLink));
4564                         }
4565                 }
4566
4567                 nearConflict = (RWConflict)
4568                         SHMQueueNext(&MySerializableXact->inConflicts,
4569                                                  &nearConflict->inLink,
4570                                                  offsetof(RWConflictData, inLink));
4571         }
4572
4573         MySerializableXact->flags |= SXACT_FLAG_PREPARED;
4574
4575         LWLockRelease(SerializableXactHashLock);
4576 }
4577
4578 /*------------------------------------------------------------------------*/
4579
4580 /*
4581  * Two-phase commit support
4582  */
4583
4584 /*
4585  * AtPrepare_Locks
4586  *              Do the preparatory work for a PREPARE: make 2PC state file
4587  *              records for all predicate locks currently held.
4588  */
4589 void
4590 AtPrepare_PredicateLocks(void)
4591 {
4592         PREDICATELOCK *predlock;
4593         SERIALIZABLEXACT *sxact;
4594         TwoPhasePredicateRecord record;
4595         TwoPhasePredicateXactRecord *xactRecord;
4596         TwoPhasePredicateLockRecord *lockRecord;
4597
4598         sxact = MySerializableXact;
4599         xactRecord = &(record.data.xactRecord);
4600         lockRecord = &(record.data.lockRecord);
4601
4602         if (MySerializableXact == InvalidSerializableXact)
4603                 return;
4604
4605         /* Generate a xact record for our SERIALIZABLEXACT */
4606         record.type = TWOPHASEPREDICATERECORD_XACT;
4607         xactRecord->xmin = MySerializableXact->xmin;
4608         xactRecord->flags = MySerializableXact->flags;
4609
4610         /*
4611          * Tweak the flags. Since we're not going to output the inConflicts and
4612          * outConflicts lists, if they're non-empty we'll represent that by
4613          * setting the appropriate summary conflict flags.
4614          */
4615         if (!SHMQueueEmpty(&MySerializableXact->inConflicts))
4616                 xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_IN;
4617         if (!SHMQueueEmpty(&MySerializableXact->outConflicts))
4618                 xactRecord->flags |= SXACT_FLAG_SUMMARY_CONFLICT_OUT;
4619
4620         RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
4621                                                    &record, sizeof(record));
4622
4623         /*
4624          * Generate a lock record for each lock.
4625          *
4626          * To do this, we need to walk the predicate lock list in our sxact rather
4627          * than using the local predicate lock table because the latter is not
4628          * guaranteed to be accurate.
4629          */
4630         LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
4631
4632         predlock = (PREDICATELOCK *)
4633                 SHMQueueNext(&(sxact->predicateLocks),
4634                                          &(sxact->predicateLocks),
4635                                          offsetof(PREDICATELOCK, xactLink));
4636
4637         while (predlock != NULL)
4638         {
4639                 record.type = TWOPHASEPREDICATERECORD_LOCK;
4640                 lockRecord->target = predlock->tag.myTarget->tag;
4641
4642                 RegisterTwoPhaseRecord(TWOPHASE_RM_PREDICATELOCK_ID, 0,
4643                                                            &record, sizeof(record));
4644
4645                 predlock = (PREDICATELOCK *)
4646                         SHMQueueNext(&(sxact->predicateLocks),
4647                                                  &(predlock->xactLink),
4648                                                  offsetof(PREDICATELOCK, xactLink));
4649         }
4650
4651         LWLockRelease(SerializablePredicateLockListLock);
4652 }
4653
4654 /*
4655  * PostPrepare_Locks
4656  *              Clean up after successful PREPARE. Unlike the non-predicate
4657  *              lock manager, we do not need to transfer locks to a dummy
4658  *              PGPROC because our SERIALIZABLEXACT will stay around
4659  *              anyway. We only need to clean up our local state.
4660  */
4661 void
4662 PostPrepare_PredicateLocks(TransactionId xid)
4663 {
4664         if (MySerializableXact == InvalidSerializableXact)
4665                 return;
4666
4667         Assert(SxactIsPrepared(MySerializableXact));
4668
4669         MySerializableXact->pid = 0;
4670
4671         hash_destroy(LocalPredicateLockHash);
4672         LocalPredicateLockHash = NULL;
4673
4674         MySerializableXact = InvalidSerializableXact;
4675         MyXactDidWrite = false;
4676 }
4677
4678 /*
4679  * PredicateLockTwoPhaseFinish
4680  *              Release a prepared transaction's predicate locks once it
4681  *              commits or aborts.
4682  */
4683 void
4684 PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
4685 {
4686         SERIALIZABLEXID *sxid;
4687         SERIALIZABLEXIDTAG sxidtag;
4688
4689         sxidtag.xid = xid;
4690
4691         LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4692         sxid = (SERIALIZABLEXID *)
4693                 hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
4694         LWLockRelease(SerializableXactHashLock);
4695
4696         /* xid will not be found if it wasn't a serializable transaction */
4697         if (sxid == NULL)
4698                 return;
4699
4700         /* Release its locks */
4701         MySerializableXact = sxid->myXact;
4702         MyXactDidWrite = true;          /* conservatively assume that we wrote
4703                                                                  * something */
4704         ReleasePredicateLocks(isCommit);
4705 }
4706
4707 /*
4708  * Re-acquire a predicate lock belonging to a transaction that was prepared.
4709  */
4710 void
4711 predicatelock_twophase_recover(TransactionId xid, uint16 info,
4712                                                            void *recdata, uint32 len)
4713 {
4714         TwoPhasePredicateRecord *record;
4715
4716         Assert(len == sizeof(TwoPhasePredicateRecord));
4717
4718         record = (TwoPhasePredicateRecord *) recdata;
4719
4720         Assert((record->type == TWOPHASEPREDICATERECORD_XACT) ||
4721                    (record->type == TWOPHASEPREDICATERECORD_LOCK));
4722
4723         if (record->type == TWOPHASEPREDICATERECORD_XACT)
4724         {
4725                 /* Per-transaction record. Set up a SERIALIZABLEXACT. */
4726                 TwoPhasePredicateXactRecord *xactRecord;
4727                 SERIALIZABLEXACT *sxact;
4728                 SERIALIZABLEXID *sxid;
4729                 SERIALIZABLEXIDTAG sxidtag;
4730                 bool            found;
4731
4732                 xactRecord = (TwoPhasePredicateXactRecord *) &record->data.xactRecord;
4733
4734                 LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
4735                 sxact = CreatePredXact();
4736                 if (!sxact)
4737                         ereport(ERROR,
4738                                         (errcode(ERRCODE_OUT_OF_MEMORY),
4739                                          errmsg("out of shared memory")));
4740
4741                 /* vxid for a prepared xact is InvalidBackendId/xid; no pid */
4742                 sxact->vxid.backendId = InvalidBackendId;
4743                 sxact->vxid.localTransactionId = (LocalTransactionId) xid;
4744                 sxact->pid = 0;
4745
4746                 /* a prepared xact hasn't committed yet */
4747                 sxact->commitSeqNo = InvalidSerCommitSeqNo;
4748                 sxact->finishedBefore = InvalidTransactionId;
4749
4750                 sxact->SeqNo.lastCommitBeforeSnapshot = RecoverySerCommitSeqNo;
4751
4752
4753                 /*
4754                  * We don't need the details of a prepared transaction's conflicts,
4755                  * just whether it had conflicts in or out (which we get from the
4756                  * flags)
4757                  */
4758                 SHMQueueInit(&(sxact->outConflicts));
4759                 SHMQueueInit(&(sxact->inConflicts));
4760
4761                 /*
4762                  * Don't need to track this; no transactions running at the time the
4763                  * recovered xact started are still active, except possibly other
4764                  * prepared xacts and we don't care whether those are RO_SAFE or not.
4765                  */
4766                 SHMQueueInit(&(sxact->possibleUnsafeConflicts));
4767
4768                 SHMQueueInit(&(sxact->predicateLocks));
4769                 SHMQueueElemInit(&(sxact->finishedLink));
4770
4771                 sxact->topXid = xid;
4772                 sxact->xmin = xactRecord->xmin;
4773                 sxact->flags = xactRecord->flags;
4774                 Assert(SxactIsPrepared(sxact));
4775                 if (!SxactIsReadOnly(sxact))
4776                 {
4777                         ++(PredXact->WritableSxactCount);
4778                         Assert(PredXact->WritableSxactCount <=
4779                                    (MaxBackends + max_prepared_xacts));
4780                 }
4781
4782                 /* Register the transaction's xid */
4783                 sxidtag.xid = xid;
4784                 sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
4785                                                                                            &sxidtag,
4786                                                                                            HASH_ENTER, &found);
4787                 Assert(sxid != NULL);
4788                 Assert(!found);
4789                 sxid->myXact = (SERIALIZABLEXACT *) sxact;
4790
4791                 /*
4792                  * Update global xmin. Note that this is a special case compared to
4793                  * registering a normal transaction, because the global xmin might go
4794                  * backwards. That's OK, because until recovery is over we're not
4795                  * going to complete any transactions or create any non-prepared
4796                  * transactions, so there's no danger of throwing away.
4797                  */
4798                 if ((!TransactionIdIsValid(PredXact->SxactGlobalXmin)) ||
4799                         (TransactionIdFollows(PredXact->SxactGlobalXmin, sxact->xmin)))
4800                 {
4801                         PredXact->SxactGlobalXmin = sxact->xmin;
4802                         PredXact->SxactGlobalXminCount = 1;
4803                         OldSerXidSetActiveSerXmin(sxact->xmin);
4804                 }
4805                 else if (TransactionIdEquals(sxact->xmin, PredXact->SxactGlobalXmin))
4806                 {
4807                         Assert(PredXact->SxactGlobalXminCount > 0);
4808                         PredXact->SxactGlobalXminCount++;
4809                 }
4810
4811                 LWLockRelease(SerializableXactHashLock);
4812         }
4813         else if (record->type == TWOPHASEPREDICATERECORD_LOCK)
4814         {
4815                 /* Lock record. Recreate the PREDICATELOCK */
4816                 TwoPhasePredicateLockRecord *lockRecord;
4817                 SERIALIZABLEXID *sxid;
4818                 SERIALIZABLEXACT *sxact;
4819                 SERIALIZABLEXIDTAG sxidtag;
4820                 uint32          targettaghash;
4821
4822                 lockRecord = (TwoPhasePredicateLockRecord *) &record->data.lockRecord;
4823                 targettaghash = PredicateLockTargetTagHashCode(&lockRecord->target);
4824
4825                 LWLockAcquire(SerializableXactHashLock, LW_SHARED);
4826                 sxidtag.xid = xid;
4827                 sxid = (SERIALIZABLEXID *)
4828                         hash_search(SerializableXidHash, &sxidtag, HASH_FIND, NULL);
4829                 LWLockRelease(SerializableXactHashLock);
4830
4831                 Assert(sxid != NULL);
4832                 sxact = sxid->myXact;
4833                 Assert(sxact != InvalidSerializableXact);
4834
4835                 CreatePredicateLock(&lockRecord->target, targettaghash, sxact);
4836         }
4837 }