OSDN Git Service

Fix locking while setting flags in MySerializableXact.
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 10 Jun 2011 20:15:05 +0000 (23:15 +0300)
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>
Fri, 10 Jun 2011 20:41:10 +0000 (23:41 +0300)
Even if a flag is modified only by the backend owning the transaction, it's
not safe to modify it without a lock. Another backend might be setting or
clearing a different flag in the flags field concurrently, and that
operation might be lost because setting or clearing a bit in a word is not
atomic.

Make did-write flag a simple backend-private boolean variable, because it
was only set or tested in the owning backend (except when committing a
prepared transaction, but it's not worthwhile to optimize for the case of a
read-only prepared transaction). This also eliminates the need to add
locking where that flag is set.

Also, set the did-write flag when doing DDL operations like DROP TABLE or
TRUNCATE -- that was missed earlier.

src/backend/storage/lmgr/predicate.c
src/include/storage/predicate_internals.h

index d5cb7da..9f79f54 100644 (file)
@@ -392,10 +392,11 @@ static HTAB *LocalPredicateLockHash = NULL;
 
 /*
  * Keep a pointer to the currently-running serializable transaction (if any)
- * for quick reference.
- * TODO SSI: Remove volatile qualifier and the then-unnecessary casts?
+ * for quick reference. Also, remember if we have written anything that could
+ * cause a rw-conflict.
  */
-static volatile SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
+static SERIALIZABLEXACT *MySerializableXact = InvalidSerializableXact;
+static bool MyXactDidWrite = false;
 
 /* local functions */
 
@@ -1424,20 +1425,30 @@ GetSafeSnapshot(Snapshot origSnapshot)
                if (MySerializableXact == InvalidSerializableXact)
                        return snapshot;        /* no concurrent r/w xacts; it's safe */
 
-               MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
+               LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
 
                /*
                 * Wait for concurrent transactions to finish. Stop early if one of
                 * them marked us as conflicted.
                 */
+               MySerializableXact->flags |= SXACT_FLAG_DEFERRABLE_WAITING;
                while (!(SHMQueueEmpty((SHM_QUEUE *)
                                                         &MySerializableXact->possibleUnsafeConflicts) ||
                                 SxactIsROUnsafe(MySerializableXact)))
+               {
+                       LWLockRelease(SerializableXactHashLock);
                        ProcWaitForSignal();
-
+                       LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+               }
                MySerializableXact->flags &= ~SXACT_FLAG_DEFERRABLE_WAITING;
+
                if (!SxactIsROUnsafe(MySerializableXact))
+               {
+                       LWLockRelease(SerializableXactHashLock);
                        break;                          /* success */
+               }
+
+               LWLockRelease(SerializableXactHashLock);
 
                /* else, need to retry... */
                ereport(DEBUG2,
@@ -1600,6 +1611,7 @@ RegisterSerializableTransactionInt(Snapshot snapshot)
        }
 
        MySerializableXact = sxact;
+       MyXactDidWrite = false;         /* haven't written anything yet */
 
        LWLockRelease(SerializableXactHashLock);
 
@@ -1635,24 +1647,24 @@ RegisterPredicateLockingXid(const TransactionId xid)
        if (MySerializableXact == InvalidSerializableXact)
                return;
 
-       /* This should only be done once per transaction. */
-       Assert(MySerializableXact->topXid == InvalidTransactionId);
-
        /* We should have a valid XID and be at the top level. */
        Assert(TransactionIdIsValid(xid));
 
+       LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
+
+       /* This should only be done once per transaction. */
+       Assert(MySerializableXact->topXid == InvalidTransactionId);
+
        MySerializableXact->topXid = xid;
 
        sxidtag.xid = xid;
-       LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
        sxid = (SERIALIZABLEXID *) hash_search(SerializableXidHash,
                                                                                   &sxidtag,
                                                                                   HASH_ENTER, &found);
-       Assert(sxid != NULL);
        Assert(!found);
 
        /* Initialize the structure. */
-       sxid->myXact = (SERIALIZABLEXACT *) MySerializableXact;
+       sxid->myXact = MySerializableXact;
        LWLockRelease(SerializableXactHashLock);
 }
 
@@ -1881,7 +1893,7 @@ DeleteChildTargetLocks(const PREDICATELOCKTARGETTAG *newtargettag)
        PREDICATELOCK *predlock;
 
        LWLockAcquire(SerializablePredicateLockListLock, LW_SHARED);
-       sxact = (SERIALIZABLEXACT *) MySerializableXact;
+       sxact = MySerializableXact;
        predlock = (PREDICATELOCK *)
                SHMQueueNext(&(sxact->predicateLocks),
                                         &(sxact->predicateLocks),
@@ -2200,8 +2212,7 @@ PredicateLockAcquire(const PREDICATELOCKTARGETTAG *targettag)
                locallock->childLocks = 0;
 
        /* Actually create the lock */
-       CreatePredicateLock(targettag, targettaghash,
-                                               (SERIALIZABLEXACT *) MySerializableXact);
+       CreatePredicateLock(targettag, targettaghash, MySerializableXact);
 
        /*
         * Lock has been acquired. Check whether it should be promoted to a
@@ -3042,7 +3053,7 @@ ReleasePredicateLocks(const bool isCommit)
                Assert(IsolationIsSerializable());
 
        /* We'd better not already be on the cleanup list. */
-       Assert(!SxactIsOnFinishedList((SERIALIZABLEXACT *) MySerializableXact));
+       Assert(!SxactIsOnFinishedList(MySerializableXact));
 
        topLevelIsDeclaredReadOnly = SxactIsReadOnly(MySerializableXact);
 
@@ -3070,7 +3081,7 @@ ReleasePredicateLocks(const bool isCommit)
                MySerializableXact->flags |= SXACT_FLAG_COMMITTED;
                MySerializableXact->commitSeqNo = ++(PredXact->LastSxactCommitSeqNo);
                /* Recognize implicit read-only transaction (commit without write). */
-               if (!(MySerializableXact->flags & SXACT_FLAG_DID_WRITE))
+               if (!MyXactDidWrite)
                        MySerializableXact->flags |= SXACT_FLAG_READ_ONLY;
        }
        else
@@ -3218,7 +3229,7 @@ ReleasePredicateLocks(const bool isCommit)
 
                        /* Mark conflicted if necessary. */
                        if (isCommit
-                               && (MySerializableXact->flags & SXACT_FLAG_DID_WRITE)
+                               && MyXactDidWrite
                                && SxactHasConflictOut(MySerializableXact)
                                && (MySerializableXact->SeqNo.earliestOutConflictCommit
                                        <= roXact->SeqNo.lastCommitBeforeSnapshot))
@@ -3282,8 +3293,7 @@ ReleasePredicateLocks(const bool isCommit)
                                                  (SHM_QUEUE *) &(MySerializableXact->finishedLink));
 
        if (!isCommit)
-               ReleaseOneSerializableXact((SERIALIZABLEXACT *) MySerializableXact,
-                                                                  false, false);
+               ReleaseOneSerializableXact(MySerializableXact, false, false);
 
        LWLockRelease(SerializableFinishedListLock);
 
@@ -3291,6 +3301,7 @@ ReleasePredicateLocks(const bool isCommit)
                ClearOldPredicateLocks();
 
        MySerializableXact = InvalidSerializableXact;
+       MyXactDidWrite = false;
 
        /* Delete per-transaction lock table */
        if (LocalPredicateLockHash != NULL)
@@ -3851,7 +3862,7 @@ CheckForSerializableConflictOut(const bool visible, const Relation relation,
                return;
        }
 
-       if (RWConflictExists((SERIALIZABLEXACT *) MySerializableXact, sxact))
+       if (RWConflictExists(MySerializableXact, sxact))
        {
                /* We don't want duplicate conflict records in the list. */
                LWLockRelease(SerializableXactHashLock);
@@ -3862,7 +3873,7 @@ CheckForSerializableConflictOut(const bool visible, const Relation relation,
         * Flag the conflict.  But first, if this conflict creates a dangerous
         * structure, ereport an error.
         */
-       FlagRWConflict((SERIALIZABLEXACT *) MySerializableXact, sxact);
+       FlagRWConflict(MySerializableXact, sxact);
        LWLockRelease(SerializableXactHashLock);
 }
 
@@ -3944,7 +3955,7 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
                                 && (!SxactIsCommitted(sxact)
                                         || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
                                                                                          sxact->finishedBefore))
-               && !RWConflictExists(sxact, (SERIALIZABLEXACT *) MySerializableXact))
+                                && !RWConflictExists(sxact, MySerializableXact))
                {
                        LWLockRelease(SerializableXactHashLock);
                        LWLockAcquire(SerializableXactHashLock, LW_EXCLUSIVE);
@@ -3957,10 +3968,9 @@ CheckTargetForConflictsIn(PREDICATELOCKTARGETTAG *targettag)
                                && (!SxactIsCommitted(sxact)
                                        || TransactionIdPrecedes(GetTransactionSnapshot()->xmin,
                                                                                         sxact->finishedBefore))
-                               && !RWConflictExists(sxact,
-                                                                        (SERIALIZABLEXACT *) MySerializableXact))
+                               && !RWConflictExists(sxact, MySerializableXact))
                        {
-                               FlagRWConflict(sxact, (SERIALIZABLEXACT *) MySerializableXact);
+                               FlagRWConflict(sxact, MySerializableXact);
                        }
 
                        LWLockRelease(SerializableXactHashLock);
@@ -4065,7 +4075,11 @@ CheckForSerializableConflictIn(const Relation relation, const HeapTuple tuple,
                                 errdetail("Cancelled on identification as a pivot, during conflict in checking."),
                                 errhint("The transaction might succeed if retried.")));
 
-       MySerializableXact->flags |= SXACT_FLAG_DID_WRITE;
+       /*
+        * We're doing a write which might cause rw-conflicts now or later.
+        * Memorize that fact.
+        */
+       MyXactDidWrite = true;
 
        /*
         * It is important that we check for locks from the finest granularity to
@@ -4150,6 +4164,12 @@ CheckTableForSerializableConflictIn(const Relation relation)
        if (SkipSerialization(relation))
                return;
 
+       /*
+        * We're doing a write which might cause rw-conflicts now or later.
+        * Memorize that fact.
+        */
+       MyXactDidWrite = true;
+
        Assert(relation->rd_index == NULL); /* not an index relation */
 
        dbId = relation->rd_node.dbNode;
@@ -4192,10 +4212,10 @@ CheckTableForSerializableConflictIn(const Relation relation)
                                                         offsetof(PREDICATELOCK, targetLink));
 
                        if (predlock->tag.myXact != MySerializableXact
-                               && !RWConflictExists(predlock->tag.myXact,
-                                                                        (SERIALIZABLEXACT *) MySerializableXact))
-                               FlagRWConflict(predlock->tag.myXact,
-                                                          (SERIALIZABLEXACT *) MySerializableXact);
+                         && !RWConflictExists(predlock->tag.myXact, MySerializableXact))
+                       {
+                               FlagRWConflict(predlock->tag.myXact, MySerializableXact);
+                       }
 
                        predlock = nextpredlock;
                }
@@ -4506,7 +4526,7 @@ AtPrepare_PredicateLocks(void)
        TwoPhasePredicateXactRecord *xactRecord;
        TwoPhasePredicateLockRecord *lockRecord;
 
-       sxact = (SERIALIZABLEXACT *) MySerializableXact;
+       sxact = MySerializableXact;
        xactRecord = &(record.data.xactRecord);
        lockRecord = &(record.data.lockRecord);
 
@@ -4583,6 +4603,7 @@ PostPrepare_PredicateLocks(TransactionId xid)
        LocalPredicateLockHash = NULL;
 
        MySerializableXact = InvalidSerializableXact;
+       MyXactDidWrite = false;
 }
 
 /*
@@ -4609,6 +4630,8 @@ PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
 
        /* Release its locks */
        MySerializableXact = sxid->myXact;
+       MyXactDidWrite = true;          /* conservatively assume that we wrote
+                                                                * something */
        ReleasePredicateLocks(isCommit);
 }
 
index e731595..da6e641 100644 (file)
@@ -99,14 +99,13 @@ typedef struct SERIALIZABLEXACT
  */
 #define SXACT_FLAG_CONFLICT_OUT                                0x00000004
 #define SXACT_FLAG_READ_ONLY                           0x00000008
-#define SXACT_FLAG_DID_WRITE                           0x00000010
-#define SXACT_FLAG_MARKED_FOR_DEATH                    0x00000020
-#define SXACT_FLAG_DEFERRABLE_WAITING          0x00000040
-#define SXACT_FLAG_RO_SAFE                                     0x00000080
-#define SXACT_FLAG_RO_UNSAFE                           0x00000100
-#define SXACT_FLAG_SUMMARY_CONFLICT_IN         0x00000200
-#define SXACT_FLAG_SUMMARY_CONFLICT_OUT                0x00000400
-#define SXACT_FLAG_PREPARED                                    0x00000800
+#define SXACT_FLAG_MARKED_FOR_DEATH                    0x00000010
+#define SXACT_FLAG_DEFERRABLE_WAITING          0x00000020
+#define SXACT_FLAG_RO_SAFE                                     0x00000040
+#define SXACT_FLAG_RO_UNSAFE                           0x00000080
+#define SXACT_FLAG_SUMMARY_CONFLICT_IN         0x00000100
+#define SXACT_FLAG_SUMMARY_CONFLICT_OUT                0x00000200
+#define SXACT_FLAG_PREPARED                                    0x00000400
 
 /*
  * The following types are used to provide an ad hoc list for holding