From dab421d2f0c5111f8549b90142e743b9c6e9e5e0 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Fri, 20 Jun 2008 00:24:53 +0000 Subject: [PATCH] Seems I was too optimistic in supposing that sinval's maxMsgNum could be read and written without a lock. The value itself is atomic, sure, but on processors with weak memory ordering it's possible for a reader to see the value change before it sees the associated message written into the buffer array. Fix by introducing a spinlock that's used just to read and write maxMsgNum. (We could do this with less overhead if we recognized a concept of "memory access barrier"; is it worth introducing such a thing? At the moment probably not --- I can't measure any clear slowdown from adding the spinlock, so this solution is probably fine.) Per buildfarm results. --- src/backend/storage/ipc/sinvaladt.c | 63 ++++++++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 15 deletions(-) diff --git a/src/backend/storage/ipc/sinvaladt.c b/src/backend/storage/ipc/sinvaladt.c index 0befc4a934..e414e4a507 100644 --- a/src/backend/storage/ipc/sinvaladt.c +++ b/src/backend/storage/ipc/sinvaladt.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.71 2008/06/19 21:32:56 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/sinvaladt.c,v 1.72 2008/06/20 00:24:53 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -23,6 +23,7 @@ #include "storage/proc.h" #include "storage/shmem.h" #include "storage/sinvaladt.h" +#include "storage/spin.h" /* @@ -84,10 +85,20 @@ * can operate in parallel with one or more readers, because the writer * has no need to touch anyone's ProcState, except in the infrequent cases * when SICleanupQueue is needed. The only point of overlap is that - * the writer might change maxMsgNum while readers are looking at it. - * This should be okay: we are assuming that fetching or storing an int - * is atomic, an assumption also made elsewhere in Postgres. However - * readers mustn't assume that maxMsgNum isn't changing under them. + * the writer wants to change maxMsgNum while readers need to read it. + * We deal with that by having a spinlock that readers must take for just + * long enough to read maxMsgNum, while writers take it for just long enough + * to write maxMsgNum. (The exact rule is that you need the spinlock to + * read maxMsgNum if you are not holding SInvalWriteLock, and you need the + * spinlock to write maxMsgNum unless you are holding both locks.) + * + * Note: since maxMsgNum is an int and hence presumably atomically readable/ + * writable, the spinlock might seem unnecessary. The reason it is needed + * is to provide a memory barrier: we need to be sure that messages written + * to the array are actually there before maxMsgNum is increased, and that + * readers will see that data after fetching maxMsgNum. Multiprocessors + * that have weak memory-ordering guarantees can fail without the memory + * barrier instructions that are included in the spinlock sequences. */ @@ -153,6 +164,8 @@ typedef struct SISeg int lastBackend; /* index of last active procState entry, +1 */ int maxBackends; /* size of procState array */ + slock_t msgnumLock; /* spinlock protecting maxMsgNum */ + /* * Circular buffer holding shared-inval messages */ @@ -209,12 +222,13 @@ CreateSharedInvalidationState(void) if (found) return; - /* Clear message counters, save size of procState array */ + /* Clear message counters, save size of procState array, init spinlock */ shmInvalBuffer->minMsgNum = 0; shmInvalBuffer->maxMsgNum = 0; shmInvalBuffer->nextThreshold = CLEANUP_MIN; shmInvalBuffer->lastBackend = 0; shmInvalBuffer->maxBackends = MaxBackends; + SpinLockInit(&shmInvalBuffer->msgnumLock); /* The buffer[] array is initially all unused, so we need not fill it */ @@ -365,6 +379,7 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n) { int nthistime = Min(n, WRITE_QUANTUM); int numMsgs; + int max; n -= nthistime; @@ -388,10 +403,21 @@ SIInsertDataEntries(const SharedInvalidationMessage *data, int n) /* * Insert new message(s) into proper slot of circular buffer */ + max = segP->maxMsgNum; while (nthistime-- > 0) { - segP->buffer[segP->maxMsgNum % MAXNUMMESSAGES] = *data++; - segP->maxMsgNum++; + segP->buffer[max % MAXNUMMESSAGES] = *data++; + max++; + } + + /* Update current value of maxMsgNum using spinlock */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile SISeg *vsegP = segP; + + SpinLockAcquire(&vsegP->msgnumLock); + vsegP->maxMsgNum = max; + SpinLockRelease(&vsegP->msgnumLock); } LWLockRelease(SInvalWriteLock); @@ -431,6 +457,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize) { SISeg *segP; ProcState *stateP; + int max; int n; LWLockAcquire(SInvalReadLock, LW_SHARED); @@ -438,6 +465,16 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize) segP = shmInvalBuffer; stateP = &segP->procState[MyBackendId - 1]; + /* Fetch current value of maxMsgNum using spinlock */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile SISeg *vsegP = segP; + + SpinLockAcquire(&vsegP->msgnumLock); + max = vsegP->maxMsgNum; + SpinLockRelease(&vsegP->msgnumLock); + } + if (stateP->resetState) { /* @@ -445,7 +482,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize) * since the reset, as well; and that means we should clear the * signaled flag, too. */ - stateP->nextMsgNum = segP->maxMsgNum; + stateP->nextMsgNum = max; stateP->resetState = false; stateP->signaled = false; LWLockRelease(SInvalReadLock); @@ -459,13 +496,9 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize) * There may be other backends that haven't read the message(s), so we * cannot delete them here. SICleanupQueue() will eventually remove them * from the queue. - * - * Note: depending on the compiler, we might read maxMsgNum only once - * here, or each time through the loop. It doesn't matter (as long as - * each fetch is atomic). */ n = 0; - while (n < datasize && stateP->nextMsgNum < segP->maxMsgNum) + while (n < datasize && stateP->nextMsgNum < max) { data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; stateP->nextMsgNum++; @@ -474,7 +507,7 @@ SIGetDataEntries(SharedInvalidationMessage *data, int datasize) /* * Reset our "signaled" flag whenever we have caught up completely. */ - if (stateP->nextMsgNum >= segP->maxMsgNum) + if (stateP->nextMsgNum >= max) stateP->signaled = false; LWLockRelease(SInvalReadLock); -- 2.11.0