From 93b24772782e7518dee2f85366aa7a80a2beb937 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Sat, 30 Apr 2005 19:03:33 +0000 Subject: [PATCH] Use the standard lock manager to establish priority order when there is contention for a tuple-level lock. This solves the problem of a would-be exclusive locker being starved out by an indefinite succession of share-lockers. Per recent discussion with Alvaro. --- src/backend/access/heap/heapam.c | 294 +++++++++++++++++++++++++++------------ src/backend/storage/lmgr/lmgr.c | 126 ++++++++++++++++- src/include/storage/lmgr.h | 19 ++- 3 files changed, 349 insertions(+), 90 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ee604df2ca..06b1fdb644 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.188 2005/04/28 21:47:10 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.189 2005/04/30 19:03:32 tgl Exp $ * * * INTERFACE ROUTINES @@ -1209,12 +1209,13 @@ heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait) { + HTSU_Result result; TransactionId xid = GetCurrentTransactionId(); ItemId lp; HeapTupleData tp; PageHeader dp; Buffer buffer; - HTSU_Result result; + bool have_tuple_lock = false; Assert(ItemPointerIsValid(tid)); @@ -1243,20 +1244,36 @@ l1: TransactionId xwait; uint16 infomask; + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(tp.t_data); + infomask = tp.t_data->t_infomask; + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + /* + * Acquire tuple lock to establish our priority for the tuple + * (see heap_lock_tuple). LockTuple will release us when we are + * next-in-line for the tuple. + * + * If we are forced to "start over" below, we keep the tuple lock; + * this arranges that we stay at the head of the line while + * rechecking tuple state. + */ + if (!have_tuple_lock) + { + LockTuple(relation, &(tp.t_self), ExclusiveLock); + have_tuple_lock = true; + } + /* * Sleep until concurrent transaction ends. Note that we don't care * if the locker has an exclusive or shared lock, because we need * exclusive. */ - /* must copy state data before unlocking buffer */ - xwait = HeapTupleHeaderGetXmax(tp.t_data); - infomask = tp.t_data->t_infomask; - if (infomask & HEAP_XMAX_IS_MULTI) { /* wait for multixact */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); MultiXactIdWait((MultiXactId) xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1283,7 +1300,6 @@ l1: else { /* wait for regular transaction to end */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); XactLockTableWait(xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1335,6 +1351,8 @@ l1: *ctid = tp.t_data->t_ctid; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); + if (have_tuple_lock) + UnlockTuple(relation, &(tp.t_self), ExclusiveLock); return result; } @@ -1406,6 +1424,12 @@ l1: WriteBuffer(buffer); + /* + * Release the lmgr tuple lock, if we had it. + */ + if (have_tuple_lock) + UnlockTuple(relation, &(tp.t_self), ExclusiveLock); + return HeapTupleMayBeUpdated; } @@ -1476,6 +1500,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait) { + HTSU_Result result; TransactionId xid = GetCurrentTransactionId(); ItemId lp; HeapTupleData oldtup; @@ -1486,7 +1511,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, already_marked; Size newtupsize, pagefree; - HTSU_Result result; + bool have_tuple_lock = false; Assert(ItemPointerIsValid(otid)); @@ -1522,20 +1547,36 @@ l2: TransactionId xwait; uint16 infomask; + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(oldtup.t_data); + infomask = oldtup.t_data->t_infomask; + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + /* + * Acquire tuple lock to establish our priority for the tuple + * (see heap_lock_tuple). LockTuple will release us when we are + * next-in-line for the tuple. + * + * If we are forced to "start over" below, we keep the tuple lock; + * this arranges that we stay at the head of the line while + * rechecking tuple state. + */ + if (!have_tuple_lock) + { + LockTuple(relation, &(oldtup.t_self), ExclusiveLock); + have_tuple_lock = true; + } + /* * Sleep until concurrent transaction ends. Note that we don't care * if the locker has an exclusive or shared lock, because we need * exclusive. */ - /* must copy state data before unlocking buffer */ - xwait = HeapTupleHeaderGetXmax(oldtup.t_data); - infomask = oldtup.t_data->t_infomask; - if (infomask & HEAP_XMAX_IS_MULTI) { /* wait for multixact */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); MultiXactIdWait((MultiXactId) xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1562,7 +1603,6 @@ l2: else { /* wait for regular transaction to end */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); XactLockTableWait(xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1614,6 +1654,8 @@ l2: *ctid = oldtup.t_data->t_ctid; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); + if (have_tuple_lock) + UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock); return result; } @@ -1803,6 +1845,12 @@ l2: */ CacheInvalidateHeapTuple(relation, newtup); + /* + * Release the lmgr tuple lock, if we had it. + */ + if (have_tuple_lock) + UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock); + return HeapTupleMayBeUpdated; } @@ -1847,17 +1895,53 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup) /* * heap_lock_tuple - lock a tuple in shared or exclusive mode + * + * NOTES: because the shared-memory lock table is of finite size, but users + * could reasonably want to lock large numbers of tuples, we do not rely on + * the standard lock manager to store tuple-level locks over the long term. + * Instead, a tuple is marked as locked by setting the current transaction's + * XID as its XMAX, and setting additional infomask bits to distinguish this + * usage from the more normal case of having deleted the tuple. When + * multiple transactions concurrently share-lock a tuple, the first locker's + * XID is replaced in XMAX with a MultiTransactionId representing the set of + * XIDs currently holding share-locks. + * + * When it is necessary to wait for a tuple-level lock to be released, the + * basic delay is provided by XactLockTableWait or MultiXactIdWait on the + * contents of the tuple's XMAX. However, that mechanism will release all + * waiters concurrently, so there would be a race condition as to which + * waiter gets the tuple, potentially leading to indefinite starvation of + * some waiters. The possibility of share-locking makes the problem much + * worse --- a steady stream of share-lockers can easily block an exclusive + * locker forever. To provide more reliable semantics about who gets a + * tuple-level lock first, we use the standard lock manager. The protocol + * for waiting for a tuple-level lock is really + * LockTuple() + * XactLockTableWait() + * mark tuple as locked by me + * UnlockTuple() + * When there are multiple waiters, arbitration of who is to get the lock next + * is provided by LockTuple(). However, at most one tuple-level lock will + * be held or awaited per backend at any time, so we don't risk overflow + * of the lock table. Note that incoming share-lockers are required to + * do LockTuple as well, if there is any conflict, to ensure that they don't + * starve out waiting exclusive-lockers. However, if there is not any active + * conflict for a tuple, we don't incur any extra overhead. */ HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer, CommandId cid, LockTupleMode mode) { - TransactionId xid; + HTSU_Result result; ItemPointer tid = &(tuple->t_self); ItemId lp; PageHeader dp; - HTSU_Result result; + TransactionId xid; uint16 new_infomask; + LOCKMODE tuple_lock_type; + bool have_tuple_lock = false; + + tuple_lock_type = (mode == LockTupleShared) ? ShareLock : ExclusiveLock; *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1879,94 +1963,121 @@ l3: } else if (result == HeapTupleBeingUpdated) { - if (mode == LockTupleShared && - (tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK)) - result = HeapTupleMayBeUpdated; - else + TransactionId xwait; + uint16 infomask; + + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(tuple->t_data); + infomask = tuple->t_data->t_infomask; + + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + + /* + * Acquire tuple lock to establish our priority for the tuple. + * LockTuple will release us when we are next-in-line for the + * tuple. We must do this even if we are share-locking. + * + * If we are forced to "start over" below, we keep the tuple lock; + * this arranges that we stay at the head of the line while + * rechecking tuple state. + */ + if (!have_tuple_lock) { - TransactionId xwait; - uint16 infomask; + LockTuple(relation, tid, tuple_lock_type); + have_tuple_lock = true; + } + if (mode == LockTupleShared && (infomask & HEAP_XMAX_SHARED_LOCK)) + { /* - * Sleep until concurrent transaction ends. + * Acquiring sharelock when there's at least one sharelocker + * already. We need not wait for him/them to complete. */ + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - /* must copy state data before unlocking buffer */ - xwait = HeapTupleHeaderGetXmax(tuple->t_data); - infomask = tuple->t_data->t_infomask; - - if (infomask & HEAP_XMAX_IS_MULTI) - { - /* wait for multixact */ - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); - MultiXactIdWait((MultiXactId) xwait); - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - - /* - * If xwait had just locked the tuple then some other xact - * could update this tuple before we get to this point. - * Check for xmax change, and start over if so. - */ - if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || - !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), - xwait)) - goto l3; + /* + * Make sure it's still a shared lock, else start over. (It's + * OK if the ownership of the shared lock has changed, though.) + */ + if (!(tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK)) + goto l3; + } + else if (infomask & HEAP_XMAX_IS_MULTI) + { + /* wait for multixact to end */ + MultiXactIdWait((MultiXactId) xwait); + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - /* - * You might think the multixact is necessarily done here, but - * not so: it could have surviving members, namely our own xact - * or other subxacts of this backend. It is legal for us to - * lock the tuple in either case, however. We don't bother - * changing the on-disk hint bits since we are about to - * overwrite the xmax altogether. - */ - } - else - { - /* wait for regular transaction to end */ - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); - XactLockTableWait(xwait); - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + /* + * If xwait had just locked the tuple then some other xact + * could update this tuple before we get to this point. + * Check for xmax change, and start over if so. + */ + if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), + xwait)) + goto l3; - /* - * xwait is done, but if xwait had just locked the tuple then - * some other xact could update this tuple before we get to - * this point. Check for xmax change, and start over if so. - */ - if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || - !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), - xwait)) - goto l3; - - /* Otherwise we can mark it committed or aborted */ - if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED | - HEAP_XMAX_INVALID))) - { - if (TransactionIdDidCommit(xwait)) - tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED; - else - tuple->t_data->t_infomask |= HEAP_XMAX_INVALID; - SetBufferCommitInfoNeedsSave(*buffer); - } - } + /* + * You might think the multixact is necessarily done here, but + * not so: it could have surviving members, namely our own xact + * or other subxacts of this backend. It is legal for us to + * lock the tuple in either case, however. We don't bother + * changing the on-disk hint bits since we are about to + * overwrite the xmax altogether. + */ + } + else + { + /* wait for regular transaction to end */ + XactLockTableWait(xwait); + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); /* - * We may lock if previous xmax aborted, or if it committed - * but only locked the tuple without updating it. + * xwait is done, but if xwait had just locked the tuple then + * some other xact could update this tuple before we get to + * this point. Check for xmax change, and start over if so. */ - if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID | - HEAP_IS_LOCKED)) - result = HeapTupleMayBeUpdated; - else - result = HeapTupleUpdated; + if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), + xwait)) + goto l3; + + /* Otherwise we can mark it committed or aborted */ + if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID))) + { + if (TransactionIdDidCommit(xwait)) + tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED; + else + tuple->t_data->t_infomask |= HEAP_XMAX_INVALID; + SetBufferCommitInfoNeedsSave(*buffer); + } } + + /* + * We may lock if previous xmax aborted, or if it committed + * but only locked the tuple without updating it. The case where + * we didn't wait because we are joining an existing shared lock + * is correctly handled, too. + */ + if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID | + HEAP_IS_LOCKED)) + result = HeapTupleMayBeUpdated; + else + result = HeapTupleUpdated; } if (result != HeapTupleMayBeUpdated) { + ItemPointerData newctid = tuple->t_data->t_ctid; + Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); - tuple->t_self = tuple->t_data->t_ctid; LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + if (have_tuple_lock) + UnlockTuple(relation, tid, tuple_lock_type); + /* can't overwrite t_self (== *tid) until after above Unlock */ + tuple->t_self = newctid; return result; } @@ -2142,6 +2253,13 @@ l3: WriteNoReleaseBuffer(*buffer); + /* + * Now that we have successfully marked the tuple as locked, we can + * release the lmgr tuple lock, if we had it. + */ + if (have_tuple_lock) + UnlockTuple(relation, tid, tuple_lock_type); + return HeapTupleMayBeUpdated; } diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index d527724b9a..f0e2dadfa8 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.72 2005/04/29 22:28:24 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.73 2005/04/30 19:03:33 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -340,6 +340,46 @@ UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode) } /* + * LockTuple + * + * Obtain a tuple-level lock. This is used in a less-than-intuitive fashion + * because we can't afford to keep a separate lock in shared memory for every + * tuple. See heap_lock_tuple before using this! + */ +void +LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode) +{ + LOCKTAG tag; + + SET_LOCKTAG_TUPLE(tag, + relation->rd_lockInfo.lockRelId.dbId, + relation->rd_lockInfo.lockRelId.relId, + ItemPointerGetBlockNumber(tid), + ItemPointerGetOffsetNumber(tid)); + + if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(), + lockmode, false)) + elog(ERROR, "LockAcquire failed"); +} + +/* + * UnlockTuple + */ +void +UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode) +{ + LOCKTAG tag; + + SET_LOCKTAG_TUPLE(tag, + relation->rd_lockInfo.lockRelId.dbId, + relation->rd_lockInfo.lockRelId.relId, + ItemPointerGetBlockNumber(tid), + ItemPointerGetOffsetNumber(tid)); + + LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode); +} + +/* * XactLockTableInsert * * Insert a lock showing that the given transaction ID is running --- @@ -417,3 +457,87 @@ XactLockTableWait(TransactionId xid) if (!TransactionIdDidCommit(xid) && !TransactionIdDidAbort(xid)) TransactionIdAbort(xid); } + + +/* + * LockDatabaseObject + * + * Obtain a lock on a general object of the current database. Don't use + * this for shared objects (such as tablespaces). It's usually unwise to + * apply it to entire relations, also, since a lock taken this way will + * NOT conflict with LockRelation. + */ +void +LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, + LOCKMODE lockmode) +{ + LOCKTAG tag; + + SET_LOCKTAG_OBJECT(tag, + MyDatabaseId, + classid, + objid, + objsubid); + + if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(), + lockmode, false)) + elog(ERROR, "LockAcquire failed"); +} + +/* + * UnlockDatabaseObject + */ +void +UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, + LOCKMODE lockmode) +{ + LOCKTAG tag; + + SET_LOCKTAG_OBJECT(tag, + MyDatabaseId, + classid, + objid, + objsubid); + + LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode); +} + +/* + * LockSharedObject + * + * Obtain a lock on a shared-across-databases object. + */ +void +LockSharedObject(Oid classid, Oid objid, uint16 objsubid, + LOCKMODE lockmode) +{ + LOCKTAG tag; + + SET_LOCKTAG_OBJECT(tag, + InvalidOid, + classid, + objid, + objsubid); + + if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(), + lockmode, false)) + elog(ERROR, "LockAcquire failed"); +} + +/* + * UnlockSharedObject + */ +void +UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, + LOCKMODE lockmode) +{ + LOCKTAG tag; + + SET_LOCKTAG_OBJECT(tag, + InvalidOid, + classid, + objid, + objsubid); + + LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode); +} diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h index 4d1027c174..e983880706 100644 --- a/src/include/storage/lmgr.h +++ b/src/include/storage/lmgr.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/lmgr.h,v 1.47 2005/04/29 22:28:24 tgl Exp $ + * $PostgreSQL: pgsql/src/include/storage/lmgr.h,v 1.48 2005/04/30 19:03:33 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -17,6 +17,7 @@ #include "storage/lock.h" #include "utils/rel.h" + /* These are the valid values of type LOCKMODE: */ /* NoLock is not a lock mode, but a flag value meaning "don't get a lock" */ @@ -60,9 +61,25 @@ extern void LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode); extern bool ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode); extern void UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode); +/* Lock a tuple (see heap_lock_tuple before assuming you understand this) */ +extern void LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode); +extern void UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode); + /* Lock an XID (used to wait for a transaction to finish) */ extern void XactLockTableInsert(TransactionId xid); extern void XactLockTableDelete(TransactionId xid); extern void XactLockTableWait(TransactionId xid); +/* Lock a general object (other than a relation) of the current database */ +extern void LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, + LOCKMODE lockmode); +extern void UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid, + LOCKMODE lockmode); + +/* Lock a shared-across-databases object (other than a relation) */ +extern void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, + LOCKMODE lockmode); +extern void UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid, + LOCKMODE lockmode); + #endif /* LMGR_H */ -- 2.11.0