OSDN Git Service

Change error messages to oids come out as %u and not %d. Change has no
[pg-rex/syncrep.git] / src / backend / storage / buffer / bufmgr.c
1 /*-------------------------------------------------------------------------
2  *
3  * bufmgr.c
4  *        buffer manager interface routines
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.51 1999/05/10 00:45:35 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 /*
15  *
16  * BufferAlloc() -- lookup a buffer in the buffer table.  If
17  *              it isn't there add it, but do not read it into memory.
18  *              This is used when we are about to reinitialize the
19  *              buffer so don't care what the current disk contents are.
20  *              BufferAlloc() pins the new buffer in memory.
21  *
22  * ReadBuffer() -- same as BufferAlloc() but reads the data
23  *              on a buffer cache miss.
24  *
25  * ReleaseBuffer() -- unpin the buffer
26  *
27  * WriteNoReleaseBuffer() -- mark the buffer contents as "dirty"
28  *              but don't unpin.  The disk IO is delayed until buffer
29  *              replacement if WriteMode is BUFFER_LATE_WRITE.
30  *
31  * WriteBuffer() -- WriteNoReleaseBuffer() + ReleaseBuffer()
32  *
33  * FlushBuffer() -- as above but never delayed write.
34  *
35  * BufferSync() -- flush all dirty buffers in the buffer pool.
36  *
37  * InitBufferPool() -- Init the buffer module.
38  *
39  * See other files:
40  *              freelist.c -- chooses victim for buffer replacement
41  *              buf_table.c -- manages the buffer lookup table
42  */
43 #include <sys/types.h>
44 #include <sys/file.h>
45 #include <stdio.h>
46 #include <string.h>
47 #include <math.h>
48 #include <signal.h>
49
50 #include "postgres.h"
51
52 /* declarations split between these three files */
53 #include "storage/buf.h"
54 #include "storage/buf_internals.h"
55 #include "storage/bufmgr.h"
56
57 #include "storage/fd.h"
58 #include "storage/ipc.h"
59 #include "storage/s_lock.h"
60 #include "storage/shmem.h"
61 #include "storage/spin.h"
62 #include "storage/smgr.h"
63 #include "storage/lmgr.h"
64 #include "miscadmin.h"
65 #include "utils/builtins.h"
66 #include "utils/hsearch.h"
67 #include "utils/palloc.h"
68 #include "utils/memutils.h"
69 #include "utils/relcache.h"
70 #include "executor/execdebug.h" /* for NDirectFileRead */
71 #include "catalog/catalog.h"
72
73 extern SPINLOCK BufMgrLock;
74 extern long int ReadBufferCount;
75 extern long int ReadLocalBufferCount;
76 extern long int BufferHitCount;
77 extern long int LocalBufferHitCount;
78 extern long int BufferFlushCount;
79 extern long int LocalBufferFlushCount;
80
81 static int      WriteMode = BUFFER_LATE_WRITE;          /* Delayed write is
82                                                                                                  * default */
83
84 static void WaitIO(BufferDesc *buf, SPINLOCK spinlock);
85
86 #ifndef HAS_TEST_AND_SET
87 static void SignalIO(BufferDesc *buf);
88 extern long *NWaitIOBackendP;   /* defined in buf_init.c */
89 #endif   /* HAS_TEST_AND_SET */
90
91 static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum,
92                                                  bool bufferLockHeld);
93 static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
94                         bool *foundPtr, bool bufferLockHeld);
95 static int      FlushBuffer(Buffer buffer, bool release);
96 static void BufferSync(void);
97 static int      BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld);
98 void PrintBufferDescs(void);
99
100 /* not static but used by vacuum only ... */
101 int                     BlowawayRelationBuffers(Relation rel, BlockNumber block);
102
103 /* ---------------------------------------------------
104  * RelationGetBufferWithBuffer
105  *              see if the given buffer is what we want
106  *              if yes, we don't need to bother the buffer manager
107  * ---------------------------------------------------
108  */
109 Buffer
110 RelationGetBufferWithBuffer(Relation relation,
111                                                         BlockNumber blockNumber,
112                                                         Buffer buffer)
113 {
114         BufferDesc *bufHdr;
115
116         if (BufferIsValid(buffer))
117         {
118                 if (!BufferIsLocal(buffer))
119                 {
120                         LockRelId  *lrelId = &(((LockInfo) (relation->lockInfo))->lockRelId);
121
122                         bufHdr = &BufferDescriptors[buffer - 1];
123                         SpinAcquire(BufMgrLock);
124                         if (bufHdr->tag.blockNum == blockNumber &&
125                                 bufHdr->tag.relId.relId == lrelId->relId &&
126                                 bufHdr->tag.relId.dbId == lrelId->dbId)
127                         {
128                                 SpinRelease(BufMgrLock);
129                                 return buffer;
130                         }
131                         return ReadBufferWithBufferLock(relation, blockNumber, true);
132                 }
133                 else
134                 {
135                         bufHdr = &LocalBufferDescriptors[-buffer - 1];
136                         if (bufHdr->tag.relId.relId == RelationGetRelid(relation) &&
137                                 bufHdr->tag.blockNum == blockNumber)
138                                 return buffer;
139                 }
140         }
141         return ReadBuffer(relation, blockNumber);
142 }
143
144 /*
145  * ReadBuffer -- returns a buffer containing the requested
146  *              block of the requested relation.  If the blknum
147  *              requested is P_NEW, extend the relation file and
148  *              allocate a new block.
149  *
150  * Returns: the buffer number for the buffer containing
151  *              the block read or NULL on an error.
152  *
153  * Assume when this function is called, that reln has been
154  *              opened already.
155  */
156
157 extern int      ShowPinTrace;
158
159
160 #undef ReadBuffer                               /* conflicts with macro when BUFMGR_DEBUG
161                                                                  * defined */
162
163 /*
164  * ReadBuffer 
165  *
166  */
167 Buffer
168 ReadBuffer(Relation reln, BlockNumber blockNum)
169 {
170         return ReadBufferWithBufferLock(reln, blockNum, false);
171 }
172
173 /*
174  * is_userbuffer
175  *
176  * XXX caller must have already acquired BufMgrLock
177  */
178 #ifdef NOT_USED
179 static bool
180 is_userbuffer(Buffer buffer)
181 {
182         BufferDesc *buf = &BufferDescriptors[buffer - 1];
183
184         if (IsSystemRelationName(buf->sb_relname))
185                 return false;
186         return true;
187 }
188
189 #endif
190
191 #ifdef NOT_USED
192 Buffer
193 ReadBuffer_Debug(char *file,
194                                  int line,
195                                  Relation reln,
196                                  BlockNumber blockNum)
197 {
198         Buffer          buffer;
199
200         buffer = ReadBufferWithBufferLock(reln, blockNum, false);
201         if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
202         {
203                 BufferDesc *buf = &BufferDescriptors[buffer - 1];
204
205                 fprintf(stderr, "PIN(RD) %ld relname = %s, blockNum = %d, \
206 refcount = %ld, file: %s, line: %d\n",
207                                 buffer, buf->sb_relname, buf->tag.blockNum,
208                                 PrivateRefCount[buffer - 1], file, line);
209         }
210         return buffer;
211 }
212
213 #endif
214
215 /*
216  * ReadBufferWithBufferLock -- does the work of
217  *              ReadBuffer() but with the possibility that
218  *              the buffer lock has already been held. this
219  *              is yet another effort to reduce the number of
220  *              semops in the system.
221  */
222 static Buffer
223 ReadBufferWithBufferLock(Relation reln,
224                                                  BlockNumber blockNum,
225                                                  bool bufferLockHeld)
226 {
227         BufferDesc *bufHdr;
228         int                     extend;                 /* extending the file by one block */
229         int                     status;
230         bool            found;
231         bool            isLocalBuf;
232
233         extend = (blockNum == P_NEW);
234         isLocalBuf = reln->rd_myxactonly;
235
236         if (isLocalBuf)
237         {
238                 ReadLocalBufferCount++;
239                 bufHdr = LocalBufferAlloc(reln, blockNum, &found);
240                 if (found)
241                         LocalBufferHitCount++;
242         }
243         else
244         {
245                 ReadBufferCount++;
246
247                 /*
248                  * lookup the buffer.  IO_IN_PROGRESS is set if the requested
249                  * block is not currently in memory.
250                  */
251                 bufHdr = BufferAlloc(reln, blockNum, &found, bufferLockHeld);
252                 if (found)
253                         BufferHitCount++;
254         }
255
256         if (!bufHdr)
257                 return InvalidBuffer;
258
259         /* if its already in the buffer pool, we're done */
260         if (found)
261         {
262
263                 /*
264                  * This happens when a bogus buffer was returned previously and is
265                  * floating around in the buffer pool.  A routine calling this
266                  * would want this extended.
267                  */
268                 if (extend)
269                 {
270                         /* new buffers are zero-filled */
271                         MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
272                         smgrextend(DEFAULT_SMGR, reln,
273                                            (char *) MAKE_PTR(bufHdr->data));
274                 }
275                 return BufferDescriptorGetBuffer(bufHdr);
276
277         }
278
279         /*
280          * if we have gotten to this point, the reln pointer must be ok and
281          * the relation file must be open.
282          */
283         if (extend)
284         {
285                 /* new buffers are zero-filled */
286                 MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
287                 status = smgrextend(DEFAULT_SMGR, reln,
288                                                         (char *) MAKE_PTR(bufHdr->data));
289         }
290         else
291         {
292                 status = smgrread(DEFAULT_SMGR, reln, blockNum,
293                                                   (char *) MAKE_PTR(bufHdr->data));
294         }
295
296         if (isLocalBuf)
297                 return BufferDescriptorGetBuffer(bufHdr);
298
299         /* lock buffer manager again to update IO IN PROGRESS */
300         SpinAcquire(BufMgrLock);
301
302         if (status == SM_FAIL)
303         {
304                 /* IO Failed.  cleanup the data structures and go home */
305
306                 if (!BufTableDelete(bufHdr))
307                 {
308                         SpinRelease(BufMgrLock);
309                         elog(FATAL, "BufRead: buffer table broken after IO error\n");
310                 }
311                 /* remember that BufferAlloc() pinned the buffer */
312                 UnpinBuffer(bufHdr);
313
314                 /*
315                  * Have to reset the flag so that anyone waiting for the buffer
316                  * can tell that the contents are invalid.
317                  */
318                 bufHdr->flags |= BM_IO_ERROR;
319                 bufHdr->flags &= ~BM_IO_IN_PROGRESS;
320         }
321         else
322         {
323                 /* IO Succeeded.  clear the flags, finish buffer update */
324
325                 bufHdr->flags &= ~(BM_IO_ERROR | BM_IO_IN_PROGRESS);
326         }
327
328         /* If anyone was waiting for IO to complete, wake them up now */
329 #ifdef HAS_TEST_AND_SET
330         S_UNLOCK(&(bufHdr->io_in_progress_lock));
331 #else
332         if (bufHdr->refcount > 1)
333                 SignalIO(bufHdr);
334 #endif
335
336         SpinRelease(BufMgrLock);
337
338         if (status == SM_FAIL)
339                 return InvalidBuffer;
340
341         return BufferDescriptorGetBuffer(bufHdr);
342 }
343
344 /*
345  * BufferAlloc -- Get a buffer from the buffer pool but dont
346  *              read it.
347  *
348  * Returns: descriptor for buffer
349  *
350  * When this routine returns, the BufMgrLock is guaranteed NOT be held.
351  */
352 static BufferDesc *
353 BufferAlloc(Relation reln,
354                         BlockNumber blockNum,
355                         bool *foundPtr,
356                         bool bufferLockHeld)
357 {
358         BufferDesc *buf,
359                            *buf2;
360         BufferTag       newTag;                 /* identity of requested block */
361         bool            inProgress;             /* buffer undergoing IO */
362         bool            newblock = FALSE;
363
364         /* create a new tag so we can lookup the buffer */
365         /* assume that the relation is already open */
366         if (blockNum == P_NEW)
367         {
368                 newblock = TRUE;
369                 blockNum = smgrnblocks(DEFAULT_SMGR, reln);
370         }
371
372         INIT_BUFFERTAG(&newTag, reln, blockNum);
373
374         if (!bufferLockHeld)
375                 SpinAcquire(BufMgrLock);
376
377         /* see if the block is in the buffer pool already */
378         buf = BufTableLookup(&newTag);
379         if (buf != NULL)
380         {
381
382                 /*
383                  * Found it.  Now, (a) pin the buffer so no one steals it from the
384                  * buffer pool, (b) check IO_IN_PROGRESS, someone may be faulting
385                  * the buffer into the buffer pool.
386                  */
387
388                 PinBuffer(buf);
389                 inProgress = (buf->flags & BM_IO_IN_PROGRESS);
390
391                 *foundPtr = TRUE;
392                 if (inProgress)
393                 {
394                         WaitIO(buf, BufMgrLock);
395                         if (buf->flags & BM_IO_ERROR)
396                         {
397
398                                 /*
399                                  * wierd race condition:
400                                  *
401                                  * We were waiting for someone else to read the buffer. While
402                                  * we were waiting, the reader boof'd in some way, so the
403                                  * contents of the buffer are still invalid.  By saying
404                                  * that we didn't find it, we can make the caller
405                                  * reinitialize the buffer.  If two processes are waiting
406                                  * for this block, both will read the block.  The second
407                                  * one to finish may overwrite any updates made by the
408                                  * first.  (Assume higher level synchronization prevents
409                                  * this from happening).
410                                  *
411                                  * This is never going to happen, don't worry about it.
412                                  */
413                                 *foundPtr = FALSE;
414                         }
415                 }
416 #ifdef BMTRACE
417                 _bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCFND);
418 #endif   /* BMTRACE */
419
420                 SpinRelease(BufMgrLock);
421
422                 return buf;
423         }
424
425         *foundPtr = FALSE;
426
427         /*
428          * Didn't find it in the buffer pool.  We'll have to initialize a new
429          * buffer.      First, grab one from the free list.  If it's dirty, flush
430          * it to disk. Remember to unlock BufMgr spinlock while doing the IOs.
431          */
432         inProgress = FALSE;
433         for (buf = (BufferDesc *) NULL; buf == (BufferDesc *) NULL;)
434         {
435
436                 /* GetFreeBuffer will abort if it can't find a free buffer */
437                 buf = GetFreeBuffer();
438
439                 /*
440                  * But it can return buf == NULL if we are in aborting transaction
441                  * now and so elog(ERROR,...) in GetFreeBuffer will not abort
442                  * again.
443                  */
444                 if (buf == NULL)
445                         return NULL;
446
447                 /*
448                  * There should be exactly one pin on the buffer after it is
449                  * allocated -- ours.  If it had a pin it wouldn't have been on
450                  * the free list.  No one else could have pinned it between
451                  * GetFreeBuffer and here because we have the BufMgrLock.
452                  */
453                 Assert(buf->refcount == 0);
454                 buf->refcount = 1;
455                 PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 1;
456
457                 if (buf->flags & BM_DIRTY)
458                 {
459                         bool            smok;
460
461                         /*
462                          * Set BM_IO_IN_PROGRESS to keep anyone from doing anything
463                          * with the contents of the buffer while we write it out. We
464                          * don't really care if they try to read it, but if they can
465                          * complete a BufferAlloc on it they can then scribble into
466                          * it, and we'd really like to avoid that while we are
467                          * flushing the buffer.  Setting this flag should block them
468                          * in WaitIO until we're done.
469                          */
470                         inProgress = TRUE;
471                         buf->flags |= BM_IO_IN_PROGRESS;
472 #ifdef HAS_TEST_AND_SET
473
474                         /*
475                          * All code paths that acquire this lock pin the buffer first;
476                          * since no one had it pinned (it just came off the free
477                          * list), no one else can have this lock.
478                          */
479                         Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
480                         S_LOCK(&(buf->io_in_progress_lock));
481 #endif   /* HAS_TEST_AND_SET */
482
483                         /*
484                          * Write the buffer out, being careful to release BufMgrLock
485                          * before starting the I/O.
486                          *
487                          * This #ifndef is here because a few extra semops REALLY kill
488                          * you on machines that don't have spinlocks.  If you don't
489                          * operate with much concurrency, well...
490                          */
491                         smok = BufferReplace(buf, true);
492 #ifndef OPTIMIZE_SINGLE
493                         SpinAcquire(BufMgrLock);
494 #endif   /* OPTIMIZE_SINGLE */
495
496                         if (smok == FALSE)
497                         {
498                                 elog(NOTICE, "BufferAlloc: cannot write block %u for %s/%s",
499                                          buf->tag.blockNum, buf->sb_dbname, buf->sb_relname);
500                                 inProgress = FALSE;
501                                 buf->flags |= BM_IO_ERROR;
502                                 buf->flags &= ~BM_IO_IN_PROGRESS;
503 #ifdef HAS_TEST_AND_SET
504                                 S_UNLOCK(&(buf->io_in_progress_lock));
505 #else                                                   /* !HAS_TEST_AND_SET */
506                                 if (buf->refcount > 1)
507                                         SignalIO(buf);
508 #endif   /* !HAS_TEST_AND_SET */
509                                 PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
510                                 buf->refcount--;
511                                 if (buf->refcount == 0)
512                                 {
513                                         AddBufferToFreelist(buf);
514                                         buf->flags |= BM_FREE;
515                                 }
516                                 buf = (BufferDesc *) NULL;
517                         }
518                         else
519                         {
520
521                                 /*
522                                  * BM_JUST_DIRTIED cleared by BufferReplace and shouldn't
523                                  * be setted by anyone.         - vadim 01/17/97
524                                  */
525                                 if (buf->flags & BM_JUST_DIRTIED)
526                                 {
527                                         elog(FATAL, "BufferAlloc: content of block %u (%s) changed while flushing",
528                                                  buf->tag.blockNum, buf->sb_relname);
529                                 }
530                                 else
531                                         buf->flags &= ~BM_DIRTY;
532                         }
533
534                         /*
535                          * Somebody could have pinned the buffer while we were doing
536                          * the I/O and had given up the BufMgrLock (though they would
537                          * be waiting for us to clear the BM_IO_IN_PROGRESS flag).
538                          * That's why this is a loop -- if so, we need to clear the
539                          * I/O flags, remove our pin and start all over again.
540                          *
541                          * People may be making buffers free at any time, so there's no
542                          * reason to think that we have an immediate disaster on our
543                          * hands.
544                          */
545                         if (buf && buf->refcount > 1)
546                         {
547                                 inProgress = FALSE;
548                                 buf->flags &= ~BM_IO_IN_PROGRESS;
549 #ifdef HAS_TEST_AND_SET
550                                 S_UNLOCK(&(buf->io_in_progress_lock));
551 #else                                                   /* !HAS_TEST_AND_SET */
552                                 if (buf->refcount > 1)
553                                         SignalIO(buf);
554 #endif   /* !HAS_TEST_AND_SET */
555                                 PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
556                                 buf->refcount--;
557                                 buf = (BufferDesc *) NULL;
558                         }
559
560                         /*
561                          * Somebody could have allocated another buffer for the same
562                          * block we are about to read in. (While we flush out the
563                          * dirty buffer, we don't hold the lock and someone could have
564                          * allocated another buffer for the same block. The problem is
565                          * we haven't gotten around to insert the new tag into the
566                          * buffer table. So we need to check here.              -ay 3/95
567                          */
568                         buf2 = BufTableLookup(&newTag);
569                         if (buf2 != NULL)
570                         {
571
572                                 /*
573                                  * Found it. Someone has already done what we're about to
574                                  * do. We'll just handle this as if it were found in the
575                                  * buffer pool in the first place.
576                                  */
577                                 if (buf != NULL)
578                                 {
579 #ifdef HAS_TEST_AND_SET
580                                         S_UNLOCK(&(buf->io_in_progress_lock));
581 #else                                                   /* !HAS_TEST_AND_SET */
582                                         if (buf->refcount > 1)
583                                                 SignalIO(buf);
584 #endif   /* !HAS_TEST_AND_SET */
585                                         /* give up the buffer since we don't need it any more */
586                                         buf->refcount--;
587                                         PrivateRefCount[BufferDescriptorGetBuffer(buf) - 1] = 0;
588                                         AddBufferToFreelist(buf);
589                                         buf->flags |= BM_FREE;
590                                         buf->flags &= ~BM_IO_IN_PROGRESS;
591                                 }
592
593                                 PinBuffer(buf2);
594                                 inProgress = (buf2->flags & BM_IO_IN_PROGRESS);
595
596                                 *foundPtr = TRUE;
597                                 if (inProgress)
598                                 {
599                                         WaitIO(buf2, BufMgrLock);
600                                         if (buf2->flags & BM_IO_ERROR)
601                                                 *foundPtr = FALSE;
602                                 }
603
604                                 SpinRelease(BufMgrLock);
605
606                                 return buf2;
607                         }
608                 }
609         }
610
611         /*
612          * At this point we should have the sole pin on a non-dirty buffer and
613          * we may or may not already have the BM_IO_IN_PROGRESS flag set.
614          */
615
616         /*
617          * Change the name of the buffer in the lookup table:
618          *
619          * Need to update the lookup table before the read starts. If someone
620          * comes along looking for the buffer while we are reading it in, we
621          * don't want them to allocate a new buffer.  For the same reason, we
622          * didn't want to erase the buf table entry for the buffer we were
623          * writing back until now, either.
624          */
625
626         if (!BufTableDelete(buf))
627         {
628                 SpinRelease(BufMgrLock);
629                 elog(FATAL, "buffer wasn't in the buffer table\n");
630
631         }
632
633         /* record the database name and relation name for this buffer */
634         strcpy(buf->sb_relname, reln->rd_rel->relname.data);
635         strcpy(buf->sb_dbname, DatabaseName);
636
637         INIT_BUFFERTAG(&(buf->tag), reln, blockNum);
638         if (!BufTableInsert(buf))
639         {
640                 SpinRelease(BufMgrLock);
641                 elog(FATAL, "Buffer in lookup table twice \n");
642         }
643
644         /*
645          * Buffer contents are currently invalid.  Have to mark IO IN PROGRESS
646          * so no one fiddles with them until the read completes.  If this
647          * routine has been called simply to allocate a buffer, no io will be
648          * attempted, so the flag isnt set.
649          */
650         if (!inProgress)
651         {
652                 buf->flags |= BM_IO_IN_PROGRESS;
653 #ifdef HAS_TEST_AND_SET
654                 Assert(S_LOCK_FREE(&(buf->io_in_progress_lock)));
655                 S_LOCK(&(buf->io_in_progress_lock));
656 #endif   /* HAS_TEST_AND_SET */
657         }
658
659 #ifdef BMTRACE
660         _bm_trace((reln->rd_rel->relisshared ? 0 : MyDatabaseId), RelationGetRelid(reln), blockNum, BufferDescriptorGetBuffer(buf), BMT_ALLOCNOTFND);
661 #endif   /* BMTRACE */
662
663         SpinRelease(BufMgrLock);
664
665         return buf;
666 }
667
668 /*
669  * WriteBuffer
670  *
671  *              Pushes buffer contents to disk if WriteMode is BUFFER_FLUSH_WRITE.
672  *              Otherwise, marks contents as dirty.
673  *
674  * Assume that buffer is pinned.  Assume that reln is
675  *              valid.
676  *
677  * Side Effects:
678  *              Pin count is decremented.
679  */
680
681 #undef WriteBuffer
682
683 int
684 WriteBuffer(Buffer buffer)
685 {
686         BufferDesc *bufHdr;
687
688         if (WriteMode == BUFFER_FLUSH_WRITE)
689                 return FlushBuffer(buffer, TRUE);
690         else
691         {
692
693                 if (BufferIsLocal(buffer))
694                         return WriteLocalBuffer(buffer, TRUE);
695
696                 if (BAD_BUFFER_ID(buffer))
697                         return FALSE;
698
699                 bufHdr = &BufferDescriptors[buffer - 1];
700
701                 SpinAcquire(BufMgrLock);
702                 Assert(bufHdr->refcount > 0);
703                 bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
704                 UnpinBuffer(bufHdr);
705                 SpinRelease(BufMgrLock);
706                 CommitInfoNeedsSave[buffer - 1] = 0;
707         }
708         return TRUE;
709 }
710
711 #ifdef NOT_USED
712 void
713 WriteBuffer_Debug(char *file, int line, Buffer buffer)
714 {
715         WriteBuffer(buffer);
716         if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
717         {
718                 BufferDesc *buf;
719
720                 buf = &BufferDescriptors[buffer - 1];
721                 fprintf(stderr, "UNPIN(WR) %ld relname = %s, blockNum = %d, \
722 refcount = %ld, file: %s, line: %d\n",
723                                 buffer, buf->sb_relname, buf->tag.blockNum,
724                                 PrivateRefCount[buffer - 1], file, line);
725         }
726 }
727
728 #endif
729
730 /*
731  * DirtyBufferCopy() -- For a given dbid/relid/blockno, if the buffer is
732  *                                              in the cache and is dirty, mark it clean and copy
733  *                                              it to the requested location.  This is a logical
734  *                                              write, and has been installed to support the cache
735  *                                              management code for write-once storage managers.
736  *
737  *      DirtyBufferCopy() -- Copy a given dirty buffer to the requested
738  *                                               destination.
739  *
740  *              We treat this as a write.  If the requested buffer is in the pool
741  *              and is dirty, we copy it to the location requested and mark it
742  *              clean.  This routine supports the Sony jukebox storage manager,
743  *              which agrees to take responsibility for the data once we mark
744  *              it clean.
745  *
746  *      NOTE: used by sony jukebox code in postgres 4.2   - ay 2/95
747  */
748 #ifdef NOT_USED
749 void
750 DirtyBufferCopy(Oid dbid, Oid relid, BlockNumber blkno, char *dest)
751 {
752         BufferDesc *buf;
753         BufferTag       btag;
754
755         btag.relId.relId = relid;
756         btag.relId.dbId = dbid;
757         btag.blockNum = blkno;
758
759         SpinAcquire(BufMgrLock);
760         buf = BufTableLookup(&btag);
761
762         if (buf == (BufferDesc *) NULL
763                 || !(buf->flags & BM_DIRTY)
764                 || !(buf->flags & BM_VALID))
765         {
766                 SpinRelease(BufMgrLock);
767                 return;
768         }
769
770         /*
771          * hate to do this holding the lock, but release and reacquire is
772          * slower
773          */
774         memmove(dest, (char *) MAKE_PTR(buf->data), BLCKSZ);
775
776         buf->flags &= ~BM_DIRTY;
777
778         SpinRelease(BufMgrLock);
779 }
780
781 #endif
782
783 /*
784  * FlushBuffer -- like WriteBuffer, but force the page to disk.
785  *
786  * 'buffer' is known to be dirty/pinned, so there should not be a
787  * problem reading the BufferDesc members without the BufMgrLock
788  * (nobody should be able to change tags, flags, etc. out from under
789  * us).
790  */
791 static int
792 FlushBuffer(Buffer buffer, bool release)
793 {
794         BufferDesc *bufHdr;
795         Oid                     bufdb;
796         Relation        bufrel;
797         int                     status;
798
799         if (BufferIsLocal(buffer))
800                 return FlushLocalBuffer(buffer, release);
801
802         if (BAD_BUFFER_ID(buffer))
803                 return STATUS_ERROR;
804
805         bufHdr = &BufferDescriptors[buffer - 1];
806         bufdb = bufHdr->tag.relId.dbId;
807
808         Assert(bufdb == MyDatabaseId || bufdb == (Oid) NULL);
809         bufrel = RelationIdCacheGetRelation(bufHdr->tag.relId.relId);
810         Assert(bufrel != (Relation) NULL);
811
812         /* To check if block content changed while flushing. - vadim 01/17/97 */
813         SpinAcquire(BufMgrLock);
814         bufHdr->flags &= ~BM_JUST_DIRTIED;
815         SpinRelease(BufMgrLock);
816
817         status = smgrflush(DEFAULT_SMGR, bufrel, bufHdr->tag.blockNum,
818                                            (char *) MAKE_PTR(bufHdr->data));
819
820         RelationDecrementReferenceCount(bufrel);
821
822         if (status == SM_FAIL)
823         {
824                 elog(ERROR, "FlushBuffer: cannot flush block %u of the relation %s",
825                          bufHdr->tag.blockNum, bufHdr->sb_relname);
826                 return STATUS_ERROR;
827         }
828         BufferFlushCount++;
829
830         SpinAcquire(BufMgrLock);
831
832         /*
833          * If this buffer was marked by someone as DIRTY while we were
834          * flushing it out we must not clear DIRTY flag - vadim 01/17/97
835          */
836         if (bufHdr->flags & BM_JUST_DIRTIED)
837         {
838                 elog(NOTICE, "FlusfBuffer: content of block %u (%s) changed while flushing",
839                          bufHdr->tag.blockNum, bufHdr->sb_relname);
840         }
841         else
842                 bufHdr->flags &= ~BM_DIRTY;
843         if (release)
844                 UnpinBuffer(bufHdr);
845         SpinRelease(BufMgrLock);
846         CommitInfoNeedsSave[buffer - 1] = 0;
847
848         return STATUS_OK;
849 }
850
851 /*
852  * WriteNoReleaseBuffer -- like WriteBuffer, but do not unpin the buffer
853  *                                                 when the operation is complete.
854  *
855  *              We know that the buffer is for a relation in our private cache,
856  *              because this routine is called only to write out buffers that
857  *              were changed by the executing backend.
858  */
859 int
860 WriteNoReleaseBuffer(Buffer buffer)
861 {
862         BufferDesc *bufHdr;
863
864         if (WriteMode == BUFFER_FLUSH_WRITE)
865                 return FlushBuffer(buffer, FALSE);
866         else
867         {
868
869                 if (BufferIsLocal(buffer))
870                         return WriteLocalBuffer(buffer, FALSE);
871
872                 if (BAD_BUFFER_ID(buffer))
873                         return STATUS_ERROR;
874
875                 bufHdr = &BufferDescriptors[buffer - 1];
876
877                 SpinAcquire(BufMgrLock);
878                 bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
879                 SpinRelease(BufMgrLock);
880                 CommitInfoNeedsSave[buffer - 1] = 0;
881         }
882         return STATUS_OK;
883 }
884
885
886 #undef ReleaseAndReadBuffer
887 /*
888  * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
889  *              so that only one semop needs to be called.
890  *
891  */
892 Buffer
893 ReleaseAndReadBuffer(Buffer buffer,
894                                          Relation relation,
895                                          BlockNumber blockNum)
896 {
897         BufferDesc *bufHdr;
898         Buffer          retbuf;
899
900         if (BufferIsLocal(buffer))
901         {
902                 Assert(LocalRefCount[-buffer - 1] > 0);
903                 LocalRefCount[-buffer - 1]--;
904         }
905         else
906         {
907                 if (BufferIsValid(buffer))
908                 {
909                         bufHdr = &BufferDescriptors[buffer - 1];
910                         Assert(PrivateRefCount[buffer - 1] > 0);
911                         PrivateRefCount[buffer - 1]--;
912                         if (PrivateRefCount[buffer - 1] == 0 &&
913                                 LastRefCount[buffer - 1] == 0)
914                         {
915
916                                 /*
917                                  * only release buffer if it is not pinned in previous
918                                  * ExecMain level
919                                  */
920                                 SpinAcquire(BufMgrLock);
921                                 bufHdr->refcount--;
922                                 if (bufHdr->refcount == 0)
923                                 {
924                                         AddBufferToFreelist(bufHdr);
925                                         bufHdr->flags |= BM_FREE;
926                                 }
927                                 if (CommitInfoNeedsSave[buffer - 1])
928                                 {
929                                         bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
930                                         CommitInfoNeedsSave[buffer - 1] = 0;
931                                 }
932                                 retbuf = ReadBufferWithBufferLock(relation, blockNum, true);
933                                 return retbuf;
934                         }
935                 }
936         }
937
938         return ReadBuffer(relation, blockNum);
939 }
940
941 /*
942  * BufferSync -- Flush all dirty buffers in the pool.
943  *
944  *              This is called at transaction commit time.      It does the wrong thing,
945  *              right now.      We should flush only our own changes to stable storage,
946  *              and we should obey the lock protocol on the buffer manager metadata
947  *              as we do it.  Also, we need to be sure that no other transaction is
948  *              modifying the page as we flush it.      This is only a problem for objects
949  *              that use a non-two-phase locking protocol, like btree indices.  For
950  *              those objects, we would like to set a write lock for the duration of
951  *              our IO.  Another possibility is to code updates to btree pages
952  *              carefully, so that writing them out out of order cannot cause
953  *              any unrecoverable errors.
954  *
955  *              I don't want to think hard about this right now, so I will try
956  *              to come back to it later.
957  */
958 static void
959 BufferSync()
960 {
961         int                     i;
962         Oid                     bufdb;
963         Oid                     bufrel;
964         Relation        reln;
965         BufferDesc *bufHdr;
966         int                     status;
967
968         SpinAcquire(BufMgrLock);
969         for (i = 0, bufHdr = BufferDescriptors; i < NBuffers; i++, bufHdr++)
970         {
971                 if ((bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY))
972                 {
973                         bufdb = bufHdr->tag.relId.dbId;
974                         bufrel = bufHdr->tag.relId.relId;
975                         if (bufdb == MyDatabaseId || bufdb == (Oid) 0)
976                         {
977                                 reln = RelationIdCacheGetRelation(bufrel);
978
979                                 /*
980                                  * We have to pin buffer to keep anyone from stealing it
981                                  * from the buffer pool while we are flushing it or
982                                  * waiting in WaitIO. It's bad for GetFreeBuffer in
983                                  * BufferAlloc, but there is no other way to prevent
984                                  * writing into disk block data from some other buffer,
985                                  * getting smgr status of some other block and clearing
986                                  * BM_DIRTY of ...                        - VAdim 09/16/96
987                                  */
988                                 PinBuffer(bufHdr);
989                                 if (bufHdr->flags & BM_IO_IN_PROGRESS)
990                                 {
991                                         WaitIO(bufHdr, BufMgrLock);
992                                         UnpinBuffer(bufHdr);
993                                         if (bufHdr->flags & BM_IO_ERROR)
994                                         {
995                                                 elog(ERROR, "BufferSync: write error %u for %s",
996                                                          bufHdr->tag.blockNum, bufHdr->sb_relname);
997                                         }
998                                         if (reln != (Relation) NULL)
999                                                 RelationDecrementReferenceCount(reln);
1000                                         continue;
1001                                 }
1002
1003                                 /*
1004                                  * To check if block content changed while flushing (see
1005                                  * below). - vadim 01/17/97
1006                                  */
1007                                 bufHdr->flags &= ~BM_JUST_DIRTIED;
1008
1009                                 /*
1010                                  * If we didn't have the reldesc in our local cache, flush
1011                                  * this page out using the 'blind write' storage manager
1012                                  * routine.  If we did find it, use the standard
1013                                  * interface.
1014                                  */
1015
1016 #ifndef OPTIMIZE_SINGLE
1017                                 SpinRelease(BufMgrLock);
1018 #endif   /* OPTIMIZE_SINGLE */
1019                                 if (reln == (Relation) NULL)
1020                                 {
1021                                         status = smgrblindwrt(DEFAULT_SMGR, bufHdr->sb_dbname,
1022                                                                            bufHdr->sb_relname, bufdb, bufrel,
1023                                                                                   bufHdr->tag.blockNum,
1024                                                                                 (char *) MAKE_PTR(bufHdr->data));
1025                                 }
1026                                 else
1027                                 {
1028                                         status = smgrwrite(DEFAULT_SMGR, reln,
1029                                                                            bufHdr->tag.blockNum,
1030                                                                            (char *) MAKE_PTR(bufHdr->data));
1031                                 }
1032 #ifndef OPTIMIZE_SINGLE
1033                                 SpinAcquire(BufMgrLock);
1034 #endif   /* OPTIMIZE_SINGLE */
1035
1036                                 UnpinBuffer(bufHdr);
1037                                 if (status == SM_FAIL)
1038                                 {
1039                                         bufHdr->flags |= BM_IO_ERROR;
1040                                         elog(ERROR, "BufferSync: cannot write %u for %s",
1041                                                  bufHdr->tag.blockNum, bufHdr->sb_relname);
1042                                 }
1043                                 BufferFlushCount++;
1044
1045                                 /*
1046                                  * If this buffer was marked by someone as DIRTY while we
1047                                  * were flushing it out we must not clear DIRTY flag -
1048                                  * vadim 01/17/97
1049                                  */
1050                                 if (!(bufHdr->flags & BM_JUST_DIRTIED))
1051                                         bufHdr->flags &= ~BM_DIRTY;
1052                                 if (reln != (Relation) NULL)
1053                                         RelationDecrementReferenceCount(reln);
1054                         }
1055                 }
1056         }
1057         SpinRelease(BufMgrLock);
1058
1059         LocalBufferSync();
1060 }
1061
1062
1063 /*
1064  * WaitIO -- Block until the IO_IN_PROGRESS flag on 'buf'
1065  *              is cleared.  Because IO_IN_PROGRESS conflicts are
1066  *              expected to be rare, there is only one BufferIO
1067  *              lock in the entire system.      All processes block
1068  *              on this semaphore when they try to use a buffer
1069  *              that someone else is faulting in.  Whenever a
1070  *              process finishes an IO and someone is waiting for
1071  *              the buffer, BufferIO is signaled (SignalIO).  All
1072  *              waiting processes then wake up and check to see
1073  *              if their buffer is now ready.  This implementation
1074  *              is simple, but efficient enough if WaitIO is
1075  *              rarely called by multiple processes simultaneously.
1076  *
1077  *      ProcSleep atomically releases the spinlock and goes to
1078  *              sleep.
1079  *
1080  *      Note: there is an easy fix if the queue becomes long.
1081  *              save the id of the buffer we are waiting for in
1082  *              the queue structure.  That way signal can figure
1083  *              out which proc to wake up.
1084  */
1085 #ifdef HAS_TEST_AND_SET
1086 static void
1087 WaitIO(BufferDesc *buf, SPINLOCK spinlock)
1088 {
1089         SpinRelease(spinlock);
1090         S_LOCK(&(buf->io_in_progress_lock));
1091         S_UNLOCK(&(buf->io_in_progress_lock));
1092         SpinAcquire(spinlock);
1093 }
1094
1095 #else                                                   /* HAS_TEST_AND_SET */
1096 IpcSemaphoreId WaitIOSemId;
1097 IpcSemaphoreId WaitCLSemId;
1098
1099 static void
1100 WaitIO(BufferDesc *buf, SPINLOCK spinlock)
1101 {
1102         bool            inProgress;
1103
1104         for (;;)
1105         {
1106
1107                 /* wait until someone releases IO lock */
1108                 (*NWaitIOBackendP)++;
1109                 SpinRelease(spinlock);
1110                 IpcSemaphoreLock(WaitIOSemId, 0, 1);
1111                 SpinAcquire(spinlock);
1112                 inProgress = (buf->flags & BM_IO_IN_PROGRESS);
1113                 if (!inProgress)
1114                         break;
1115         }
1116 }
1117
1118 /*
1119  * SignalIO 
1120  */
1121 static void
1122 SignalIO(BufferDesc *buf)
1123 {
1124         /* somebody better be waiting. */
1125         Assert(buf->refcount > 1);
1126         IpcSemaphoreUnlock(WaitIOSemId, 0, *NWaitIOBackendP);
1127         *NWaitIOBackendP = 0;
1128 }
1129
1130 #endif   /* HAS_TEST_AND_SET */
1131
1132 long            NDirectFileRead;        /* some I/O's are direct file access.
1133                                                                  * bypass bufmgr */
1134 long            NDirectFileWrite;       /* e.g., I/O in psort and hashjoin.                                     */
1135
1136 void
1137 PrintBufferUsage(FILE *statfp)
1138 {
1139         float           hitrate;
1140         float           localhitrate;
1141
1142         if (ReadBufferCount == 0)
1143                 hitrate = 0.0;
1144         else
1145                 hitrate = (float) BufferHitCount *100.0 / ReadBufferCount;
1146
1147         if (ReadLocalBufferCount == 0)
1148                 localhitrate = 0.0;
1149         else
1150                 localhitrate = (float) LocalBufferHitCount *100.0 / ReadLocalBufferCount;
1151
1152         fprintf(statfp, "!\tShared blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
1153                         ReadBufferCount - BufferHitCount, BufferFlushCount, hitrate);
1154         fprintf(statfp, "!\tLocal  blocks: %10ld read, %10ld written, buffer hit rate = %.2f%%\n",
1155                         ReadLocalBufferCount - LocalBufferHitCount, LocalBufferFlushCount, localhitrate);
1156         fprintf(statfp, "!\tDirect blocks: %10ld read, %10ld written\n",
1157                         NDirectFileRead, NDirectFileWrite);
1158 }
1159
1160 void
1161 ResetBufferUsage()
1162 {
1163         BufferHitCount = 0;
1164         ReadBufferCount = 0;
1165         BufferFlushCount = 0;
1166         LocalBufferHitCount = 0;
1167         ReadLocalBufferCount = 0;
1168         LocalBufferFlushCount = 0;
1169         NDirectFileRead = 0;
1170         NDirectFileWrite = 0;
1171 }
1172
1173 /* ----------------------------------------------
1174  *              ResetBufferPool
1175  *
1176  *              this routine is supposed to be called when a transaction aborts.
1177  *              it will release all the buffer pins held by the transaciton.
1178  *
1179  * ----------------------------------------------
1180  */
1181 void
1182 ResetBufferPool()
1183 {
1184         int                     i;
1185
1186         for (i = 1; i <= NBuffers; i++)
1187         {
1188                 CommitInfoNeedsSave[i - 1] = 0;
1189                 if (BufferIsValid(i))
1190                 {
1191                         while (PrivateRefCount[i - 1] > 0)
1192                                 ReleaseBuffer(i);
1193                 }
1194                 LastRefCount[i - 1] = 0;
1195         }
1196
1197         ResetLocalBufferPool();
1198 }
1199
1200 /* -----------------------------------------------
1201  *              BufferPoolCheckLeak
1202  *
1203  *              check if there is buffer leak
1204  *
1205  * -----------------------------------------------
1206  */
1207 int
1208 BufferPoolCheckLeak()
1209 {
1210         int                     i;
1211         int                     result = 0;
1212
1213         for (i = 1; i <= NBuffers; i++)
1214         {
1215                 if (BufferIsValid(i))
1216                 {
1217                         BufferDesc *buf = &(BufferDescriptors[i - 1]);
1218
1219                         elog(NOTICE,
1220                         "Buffer Leak: [%03d] (freeNext=%d, freePrev=%d, \
1221 relname=%s, blockNum=%d, flags=0x%x, refcount=%d %d)",
1222                                  i - 1, buf->freeNext, buf->freePrev,
1223                                  buf->sb_relname, buf->tag.blockNum, buf->flags,
1224                                  buf->refcount, PrivateRefCount[i - 1]);
1225                         result = 1;
1226                 }
1227         }
1228         return (result);
1229 }
1230
1231 /* ------------------------------------------------
1232  *              FlushBufferPool
1233  *
1234  *              flush all dirty blocks in buffer pool to disk
1235  *
1236  * ------------------------------------------------
1237  */
1238 void
1239 FlushBufferPool(int StableMainMemoryFlag)
1240 {
1241         if (!StableMainMemoryFlag)
1242         {
1243                 BufferSync();
1244                 smgrcommit();
1245         }
1246 }
1247
1248 /*
1249  * BufferGetBlockNumber 
1250  *              Returns the block number associated with a buffer.
1251  *
1252  * Note:
1253  *              Assumes that the buffer is valid.
1254  */
1255 BlockNumber
1256 BufferGetBlockNumber(Buffer buffer)
1257 {
1258         Assert(BufferIsValid(buffer));
1259
1260         /* XXX should be a critical section */
1261         if (BufferIsLocal(buffer))
1262                 return LocalBufferDescriptors[-buffer - 1].tag.blockNum;
1263         else
1264                 return BufferDescriptors[buffer - 1].tag.blockNum;
1265 }
1266
1267 #ifdef NOT_USED
1268 /*
1269  * BufferGetRelation 
1270  *              Returns the relation desciptor associated with a buffer.
1271  *
1272  * Note:
1273  *              Assumes buffer is valid.
1274  */
1275 Relation
1276 BufferGetRelation(Buffer buffer)
1277 {
1278         Relation        relation;
1279         Oid                     relid;
1280
1281         Assert(BufferIsValid(buffer));
1282         Assert(!BufferIsLocal(buffer));         /* not supported for local buffers */
1283
1284         /* XXX should be a critical section */
1285         relid = BufferDescriptors[buffer - 1].tag.relId.relId;
1286         relation = RelationIdGetRelation(relid);
1287
1288         RelationDecrementReferenceCount(relation);
1289
1290         if (RelationHasReferenceCountZero(relation))
1291         {
1292
1293                 /*
1294                  * elog(NOTICE, "BufferGetRelation: 0->1");
1295                  */
1296
1297                 RelationIncrementReferenceCount(relation);
1298         }
1299
1300         return relation;
1301 }
1302 #endif
1303
1304 /*
1305  * BufferReplace
1306  *
1307  * Flush the buffer corresponding to 'bufHdr'
1308  *
1309  */
1310 static int
1311 BufferReplace(BufferDesc *bufHdr, bool bufferLockHeld)
1312 {
1313         Relation        reln;
1314         Oid                     bufdb,
1315                                 bufrel;
1316         int                     status;
1317
1318         if (!bufferLockHeld)
1319                 SpinAcquire(BufMgrLock);
1320
1321         /*
1322          * first try to find the reldesc in the cache, if no luck, don't
1323          * bother to build the reldesc from scratch, just do a blind write.
1324          */
1325
1326         bufdb = bufHdr->tag.relId.dbId;
1327         bufrel = bufHdr->tag.relId.relId;
1328
1329         if (bufdb == MyDatabaseId || bufdb == (Oid) NULL)
1330                 reln = RelationIdCacheGetRelation(bufrel);
1331         else
1332                 reln = (Relation) NULL;
1333
1334         /* To check if block content changed while flushing. - vadim 01/17/97 */
1335         bufHdr->flags &= ~BM_JUST_DIRTIED;
1336
1337         SpinRelease(BufMgrLock);
1338
1339         if (reln != (Relation) NULL)
1340         {
1341                 status = smgrflush(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
1342                                                    (char *) MAKE_PTR(bufHdr->data));
1343         }
1344         else
1345         {
1346
1347                 /* blind write always flushes */
1348                 status = smgrblindwrt(DEFAULT_SMGR, bufHdr->sb_dbname,
1349                                                           bufHdr->sb_relname, bufdb, bufrel,
1350                                                           bufHdr->tag.blockNum,
1351                                                           (char *) MAKE_PTR(bufHdr->data));
1352         }
1353
1354         if (reln != (Relation) NULL)
1355                 RelationDecrementReferenceCount(reln);
1356
1357         if (status == SM_FAIL)
1358                 return FALSE;
1359
1360         BufferFlushCount++;
1361
1362         return TRUE;
1363 }
1364
1365 /*
1366  * RelationGetNumberOfBlocks 
1367  *              Returns the buffer descriptor associated with a page in a relation.
1368  *
1369  * Note:
1370  *              XXX may fail for huge relations.
1371  *              XXX should be elsewhere.
1372  *              XXX maybe should be hidden
1373  */
1374 BlockNumber
1375 RelationGetNumberOfBlocks(Relation relation)
1376 {
1377         return ((relation->rd_myxactonly) ? relation->rd_nblocks :
1378          smgrnblocks(DEFAULT_SMGR, relation));
1379 }
1380
1381 /* ---------------------------------------------------------------------
1382  *              ReleaseRelationBuffers
1383  *
1384  *              this function unmarks all the dirty pages of a relation
1385  *              in the buffer pool so that at the end of transaction
1386  *              these pages will not be flushed.
1387  *              XXX currently it sequentially searches the buffer pool, should be
1388  *              changed to more clever ways of searching.
1389  * --------------------------------------------------------------------
1390  */
1391 void
1392 ReleaseRelationBuffers(Relation rel)
1393 {
1394         int                     i;
1395         int                     holding = 0;
1396         BufferDesc *buf;
1397
1398         if (rel->rd_myxactonly)
1399         {
1400                 for (i = 0; i < NLocBuffer; i++)
1401                 {
1402                         buf = &LocalBufferDescriptors[i];
1403                         if ((buf->flags & BM_DIRTY) &&
1404                                 (buf->tag.relId.relId == RelationGetRelid(rel)))
1405                                 buf->flags &= ~BM_DIRTY;
1406                 }
1407                 return;
1408         }
1409
1410         for (i = 1; i <= NBuffers; i++)
1411         {
1412                 buf = &BufferDescriptors[i - 1];
1413                 if (!holding)
1414                 {
1415                         SpinAcquire(BufMgrLock);
1416                         holding = 1;
1417                 }
1418                 if ((buf->flags & BM_DIRTY) &&
1419                         (buf->tag.relId.dbId == MyDatabaseId) &&
1420                         (buf->tag.relId.relId == RelationGetRelid(rel)))
1421                 {
1422                         buf->flags &= ~BM_DIRTY;
1423                         if (!(buf->flags & BM_FREE))
1424                         {
1425                                 SpinRelease(BufMgrLock);
1426                                 holding = 0;
1427                                 ReleaseBuffer(i);
1428                         }
1429                 }
1430         }
1431         if (holding)
1432                 SpinRelease(BufMgrLock);
1433 }
1434
1435 /* ---------------------------------------------------------------------
1436  *              DropBuffers
1437  *
1438  *              This function marks all the buffers in the buffer cache for a
1439  *              particular database as clean.  This is used when we destroy a
1440  *              database, to avoid trying to flush data to disk when the directory
1441  *              tree no longer exists.
1442  *
1443  *              This is an exceedingly non-public interface.
1444  * --------------------------------------------------------------------
1445  */
1446 void
1447 DropBuffers(Oid dbid)
1448 {
1449         int                     i;
1450         BufferDesc *buf;
1451
1452         SpinAcquire(BufMgrLock);
1453         for (i = 1; i <= NBuffers; i++)
1454         {
1455                 buf = &BufferDescriptors[i - 1];
1456                 if ((buf->tag.relId.dbId == dbid) && (buf->flags & BM_DIRTY))
1457                         buf->flags &= ~BM_DIRTY;
1458         }
1459         SpinRelease(BufMgrLock);
1460 }
1461
1462 /* -----------------------------------------------------------------
1463  *              PrintBufferDescs
1464  *
1465  *              this function prints all the buffer descriptors, for debugging
1466  *              use only.
1467  * -----------------------------------------------------------------
1468  */
1469 void
1470 PrintBufferDescs()
1471 {
1472         int                     i;
1473         BufferDesc *buf = BufferDescriptors;
1474
1475         if (IsUnderPostmaster)
1476         {
1477                 SpinAcquire(BufMgrLock);
1478                 for (i = 0; i < NBuffers; ++i, ++buf)
1479                 {
1480                         elog(DEBUG, "[%02d] (freeNext=%d, freePrev=%d, relname=%s, \
1481 blockNum=%d, flags=0x%x, refcount=%d %d)",
1482                                  i, buf->freeNext, buf->freePrev,
1483                                  buf->sb_relname, buf->tag.blockNum, buf->flags,
1484                                  buf->refcount, PrivateRefCount[i]);
1485                 }
1486                 SpinRelease(BufMgrLock);
1487         }
1488         else
1489         {
1490                 /* interactive backend */
1491                 for (i = 0; i < NBuffers; ++i, ++buf)
1492                 {
1493                         printf("[%-2d] (%s, %d) flags=0x%x, refcnt=%d %ld)\n",
1494                                    i, buf->sb_relname, buf->tag.blockNum,
1495                                    buf->flags, buf->refcount, PrivateRefCount[i]);
1496                 }
1497         }
1498 }
1499
1500 void
1501 PrintPinnedBufs()
1502 {
1503         int                     i;
1504         BufferDesc *buf = BufferDescriptors;
1505
1506         SpinAcquire(BufMgrLock);
1507         for (i = 0; i < NBuffers; ++i, ++buf)
1508         {
1509                 if (PrivateRefCount[i] > 0)
1510                         elog(NOTICE, "[%02d] (freeNext=%d, freePrev=%d, relname=%s, \
1511 blockNum=%d, flags=0x%x, refcount=%d %d)\n",
1512                                  i, buf->freeNext, buf->freePrev, buf->sb_relname,
1513                                  buf->tag.blockNum, buf->flags,
1514                                  buf->refcount, PrivateRefCount[i]);
1515         }
1516         SpinRelease(BufMgrLock);
1517 }
1518
1519 /*
1520  * BufferPoolBlowaway
1521  *
1522  * this routine is solely for the purpose of experiments -- sometimes
1523  * you may want to blowaway whatever is left from the past in buffer
1524  * pool and start measuring some performance with a clean empty buffer
1525  * pool.
1526  */
1527 #ifdef NOT_USED
1528 void
1529 BufferPoolBlowaway()
1530 {
1531         int                     i;
1532
1533         BufferSync();
1534         for (i = 1; i <= NBuffers; i++)
1535         {
1536                 if (BufferIsValid(i))
1537                 {
1538                         while (BufferIsValid(i))
1539                                 ReleaseBuffer(i);
1540                 }
1541                 BufTableDelete(&BufferDescriptors[i - 1]);
1542         }
1543 }
1544
1545 #endif
1546
1547 /* ---------------------------------------------------------------------
1548  *              BlowawayRelationBuffers
1549  *
1550  *              This function blowaway all the pages with blocknumber >= passed
1551  *              of a relation in the buffer pool. Used by vacuum before truncation...
1552  *
1553  *              Returns: 0 - Ok, -1 - DIRTY, -2 - PINNED
1554  *
1555  *              XXX currently it sequentially searches the buffer pool, should be
1556  *              changed to more clever ways of searching.
1557  * --------------------------------------------------------------------
1558  */
1559 int
1560 BlowawayRelationBuffers(Relation rel, BlockNumber block)
1561 {
1562         int                     i;
1563         BufferDesc *buf;
1564
1565         if (rel->rd_myxactonly)
1566         {
1567                 for (i = 0; i < NLocBuffer; i++)
1568                 {
1569                         buf = &LocalBufferDescriptors[i];
1570                         if (buf->tag.relId.relId == RelationGetRelid(rel) &&
1571                                 buf->tag.blockNum >= block)
1572                         {
1573                                 if (buf->flags & BM_DIRTY)
1574                                 {
1575                                         elog(NOTICE, "BlowawayRelationBuffers(%s (local), %u): block %u is dirty",
1576                                         rel->rd_rel->relname.data, block, buf->tag.blockNum);
1577                                         return -1;
1578                                 }
1579                                 if (LocalRefCount[i] > 0)
1580                                 {
1581                                         elog(NOTICE, "BlowawayRelationBuffers(%s (local), %u): block %u is referenced (%d)",
1582                                                  rel->rd_rel->relname.data, block,
1583                                                  buf->tag.blockNum, LocalRefCount[i]);
1584                                         return -2;
1585                                 }
1586                                 buf->tag.relId.relId = InvalidOid;
1587                         }
1588                 }
1589                 return 0;
1590         }
1591
1592         SpinAcquire(BufMgrLock);
1593         for (i = 0; i < NBuffers; i++)
1594         {
1595                 buf = &BufferDescriptors[i];
1596                 if (buf->tag.relId.dbId == MyDatabaseId &&
1597                         buf->tag.relId.relId == RelationGetRelid(rel) &&
1598                         buf->tag.blockNum >= block)
1599                 {
1600                         if (buf->flags & BM_DIRTY)
1601                         {
1602                                 elog(NOTICE, "BlowawayRelationBuffers(%s, %u): block %u is dirty (private %d, last %d, global %d)",
1603                                          buf->sb_relname, block, buf->tag.blockNum,
1604                                          PrivateRefCount[i], LastRefCount[i], buf->refcount);
1605                                 SpinRelease(BufMgrLock);
1606                                 return -1;
1607                         }
1608                         if (!(buf->flags & BM_FREE))
1609                         {
1610                                 elog(NOTICE, "BlowawayRelationBuffers(%s, %u): block %u is referenced (private %d, last %d, global %d)",
1611                                          buf->sb_relname, block, buf->tag.blockNum,
1612                                          PrivateRefCount[i], LastRefCount[i], buf->refcount);
1613                                 SpinRelease(BufMgrLock);
1614                                 return -2;
1615                         }
1616                         BufTableDelete(buf);
1617                 }
1618         }
1619         SpinRelease(BufMgrLock);
1620         return 0;
1621 }
1622
1623 #undef ReleaseBuffer
1624
1625 /*
1626  * ReleaseBuffer -- remove the pin on a buffer without
1627  *              marking it dirty.
1628  *
1629  */
1630 int
1631 ReleaseBuffer(Buffer buffer)
1632 {
1633         BufferDesc *bufHdr;
1634
1635         if (BufferIsLocal(buffer))
1636         {
1637                 Assert(LocalRefCount[-buffer - 1] > 0);
1638                 LocalRefCount[-buffer - 1]--;
1639                 return STATUS_OK;
1640         }
1641
1642         if (BAD_BUFFER_ID(buffer))
1643                 return STATUS_ERROR;
1644
1645         bufHdr = &BufferDescriptors[buffer - 1];
1646
1647         Assert(PrivateRefCount[buffer - 1] > 0);
1648         PrivateRefCount[buffer - 1]--;
1649         if (PrivateRefCount[buffer - 1] == 0 && LastRefCount[buffer - 1] == 0)
1650         {
1651
1652                 /*
1653                  * only release buffer if it is not pinned in previous ExecMain
1654                  * levels
1655                  */
1656                 SpinAcquire(BufMgrLock);
1657                 bufHdr->refcount--;
1658                 if (bufHdr->refcount == 0)
1659                 {
1660                         AddBufferToFreelist(bufHdr);
1661                         bufHdr->flags |= BM_FREE;
1662                 }
1663                 if (CommitInfoNeedsSave[buffer - 1])
1664                 {
1665                         bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED);
1666                         CommitInfoNeedsSave[buffer - 1] = 0;
1667                 }
1668                 SpinRelease(BufMgrLock);
1669         }
1670
1671         return STATUS_OK;
1672 }
1673
1674 #ifdef NOT_USED
1675 void
1676 IncrBufferRefCount_Debug(char *file, int line, Buffer buffer)
1677 {
1678         IncrBufferRefCount(buffer);
1679         if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
1680         {
1681                 BufferDesc *buf = &BufferDescriptors[buffer - 1];
1682
1683                 fprintf(stderr, "PIN(Incr) %ld relname = %s, blockNum = %d, \
1684 refcount = %ld, file: %s, line: %d\n",
1685                                 buffer, buf->sb_relname, buf->tag.blockNum,
1686                                 PrivateRefCount[buffer - 1], file, line);
1687         }
1688 }
1689
1690 #endif
1691
1692 #ifdef NOT_USED
1693 void
1694 ReleaseBuffer_Debug(char *file, int line, Buffer buffer)
1695 {
1696         ReleaseBuffer(buffer);
1697         if (ShowPinTrace && !BufferIsLocal(buffer) && is_userbuffer(buffer))
1698         {
1699                 BufferDesc *buf = &BufferDescriptors[buffer - 1];
1700
1701                 fprintf(stderr, "UNPIN(Rel) %ld relname = %s, blockNum = %d, \
1702 refcount = %ld, file: %s, line: %d\n",
1703                                 buffer, buf->sb_relname, buf->tag.blockNum,
1704                                 PrivateRefCount[buffer - 1], file, line);
1705         }
1706 }
1707
1708 #endif
1709
1710 #ifdef NOT_USED
1711 int
1712 ReleaseAndReadBuffer_Debug(char *file,
1713                                                    int line,
1714                                                    Buffer buffer,
1715                                                    Relation relation,
1716                                                    BlockNumber blockNum)
1717 {
1718         bool            bufferValid;
1719         Buffer          b;
1720
1721         bufferValid = BufferIsValid(buffer);
1722         b = ReleaseAndReadBuffer(buffer, relation, blockNum);
1723         if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
1724                 && is_userbuffer(buffer))
1725         {
1726                 BufferDesc *buf = &BufferDescriptors[buffer - 1];
1727
1728                 fprintf(stderr, "UNPIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
1729 refcount = %ld, file: %s, line: %d\n",
1730                                 buffer, buf->sb_relname, buf->tag.blockNum,
1731                                 PrivateRefCount[buffer - 1], file, line);
1732         }
1733         if (ShowPinTrace && BufferIsLocal(buffer) && is_userbuffer(buffer))
1734         {
1735                 BufferDesc *buf = &BufferDescriptors[b - 1];
1736
1737                 fprintf(stderr, "PIN(Rel&Rd) %ld relname = %s, blockNum = %d, \
1738 refcount = %ld, file: %s, line: %d\n",
1739                                 b, buf->sb_relname, buf->tag.blockNum,
1740                                 PrivateRefCount[b - 1], file, line);
1741         }
1742         return b;
1743 }
1744
1745 #endif
1746
1747 #ifdef BMTRACE
1748
1749 /*
1750  *      trace allocations and deallocations in a circular buffer in
1751  *      shared memory.  check the buffer before doing the allocation,
1752  *      and die if there's anything fishy.
1753  */
1754
1755 _bm_trace(Oid dbId, Oid relId, int blkNo, int bufNo, int allocType)
1756 {
1757         long            start,
1758                                 cur;
1759         bmtrace    *tb;
1760
1761         start = *CurTraceBuf;
1762
1763         if (start > 0)
1764                 cur = start - 1;
1765         else
1766                 cur = BMT_LIMIT - 1;
1767
1768         for (;;)
1769         {
1770                 tb = &TraceBuf[cur];
1771                 if (tb->bmt_op != BMT_NOTUSED)
1772                 {
1773                         if (tb->bmt_buf == bufNo)
1774                         {
1775                                 if ((tb->bmt_op == BMT_DEALLOC)
1776                                         || (tb->bmt_dbid == dbId && tb->bmt_relid == relId
1777                                                 && tb->bmt_blkno == blkNo))
1778                                         goto okay;
1779
1780                                 /* die holding the buffer lock */
1781                                 _bm_die(dbId, relId, blkNo, bufNo, allocType, start, cur);
1782                         }
1783                 }
1784
1785                 if (cur == start)
1786                         goto okay;
1787
1788                 if (cur == 0)
1789                         cur = BMT_LIMIT - 1;
1790                 else
1791                         cur--;
1792         }
1793
1794 okay:
1795         tb = &TraceBuf[start];
1796         tb->bmt_pid = MyProcPid;
1797         tb->bmt_buf = bufNo;
1798         tb->bmt_dbid = dbId;
1799         tb->bmt_relid = relId;
1800         tb->bmt_blkno = blkNo;
1801         tb->bmt_op = allocType;
1802
1803         *CurTraceBuf = (start + 1) % BMT_LIMIT;
1804 }
1805
1806 _bm_die(Oid dbId, Oid relId, int blkNo, int bufNo,
1807                 int allocType, long start, long cur)
1808 {
1809         FILE       *fp;
1810         bmtrace    *tb;
1811         int                     i;
1812
1813         tb = &TraceBuf[cur];
1814
1815         if ((fp = AllocateFile("/tmp/death_notice", "w")) == NULL)
1816                 elog(FATAL, "buffer alloc trace error and can't open log file");
1817
1818         fprintf(fp, "buffer alloc trace detected the following error:\n\n");
1819         fprintf(fp, "    buffer %d being %s inconsistently with a previous %s\n\n",
1820                  bufNo, (allocType == BMT_DEALLOC ? "deallocated" : "allocated"),
1821                         (tb->bmt_op == BMT_DEALLOC ? "deallocation" : "allocation"));
1822
1823         fprintf(fp, "the trace buffer contains:\n");
1824
1825         i = start;
1826         for (;;)
1827         {
1828                 tb = &TraceBuf[i];
1829                 if (tb->bmt_op != BMT_NOTUSED)
1830                 {
1831                         fprintf(fp, "     [%3d]%spid %d buf %2d for <%d,%u,%d> ",
1832                                         i, (i == cur ? " ---> " : "\t"),
1833                                         tb->bmt_pid, tb->bmt_buf,
1834                                         tb->bmt_dbid, tb->bmt_relid, tb->bmt_blkno);
1835
1836                         switch (tb->bmt_op)
1837                         {
1838                                 case BMT_ALLOCFND:
1839                                         fprintf(fp, "allocate (found)\n");
1840                                         break;
1841
1842                                 case BMT_ALLOCNOTFND:
1843                                         fprintf(fp, "allocate (not found)\n");
1844                                         break;
1845
1846                                 case BMT_DEALLOC:
1847                                         fprintf(fp, "deallocate\n");
1848                                         break;
1849
1850                                 default:
1851                                         fprintf(fp, "unknown op type %d\n", tb->bmt_op);
1852                                         break;
1853                         }
1854                 }
1855
1856                 i = (i + 1) % BMT_LIMIT;
1857                 if (i == start)
1858                         break;
1859         }
1860
1861         fprintf(fp, "\noperation causing error:\n");
1862         fprintf(fp, "\tpid %d buf %d for <%d,%u,%d> ",
1863                         getpid(), bufNo, dbId, relId, blkNo);
1864
1865         switch (allocType)
1866         {
1867                 case BMT_ALLOCFND:
1868                         fprintf(fp, "allocate (found)\n");
1869                         break;
1870
1871                 case BMT_ALLOCNOTFND:
1872                         fprintf(fp, "allocate (not found)\n");
1873                         break;
1874
1875                 case BMT_DEALLOC:
1876                         fprintf(fp, "deallocate\n");
1877                         break;
1878
1879                 default:
1880                         fprintf(fp, "unknown op type %d\n", allocType);
1881                         break;
1882         }
1883
1884         FreeFile(fp);
1885
1886         kill(getpid(), SIGILL);
1887 }
1888
1889 #endif   /* BMTRACE */
1890
1891 void
1892 BufferRefCountReset(int *refcountsave)
1893 {
1894         int                     i;
1895
1896         for (i = 0; i < NBuffers; i++)
1897         {
1898                 refcountsave[i] = PrivateRefCount[i];
1899                 LastRefCount[i] += PrivateRefCount[i];
1900                 PrivateRefCount[i] = 0;
1901         }
1902 }
1903
1904 void
1905 BufferRefCountRestore(int *refcountsave)
1906 {
1907         int                     i;
1908
1909         for (i = 0; i < NBuffers; i++)
1910         {
1911                 PrivateRefCount[i] = refcountsave[i];
1912                 LastRefCount[i] -= refcountsave[i];
1913                 refcountsave[i] = 0;
1914         }
1915 }
1916
1917 int
1918 SetBufferWriteMode(int mode)
1919 {
1920         int                     old;
1921
1922         old = WriteMode;
1923         WriteMode = mode;
1924         return old;
1925 }
1926
1927 void
1928 SetBufferCommitInfoNeedsSave(Buffer buffer)
1929 {
1930         if (!BufferIsLocal(buffer))
1931                 CommitInfoNeedsSave[buffer - 1]++;
1932 }
1933
1934 void
1935 UnlockBuffers()
1936 {
1937         BufferDesc *buf;
1938         int                     i;
1939
1940         for (i = 0; i < NBuffers; i++)
1941         {
1942                 if (BufferLocks[i] == 0)
1943                         continue;
1944                 
1945                 Assert(BufferIsValid(i+1));
1946                 buf = &(BufferDescriptors[i]);
1947
1948 #ifdef HAS_TEST_AND_SET
1949                 S_LOCK(&(buf->cntx_lock));
1950 #else
1951                 IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
1952 #endif
1953
1954                 if (BufferLocks[i] & BL_R_LOCK)
1955                 {
1956                         Assert(buf->r_locks > 0);
1957                         (buf->r_locks)--;
1958                 }
1959                 if (BufferLocks[i] & BL_RI_LOCK)
1960                 {
1961                         Assert(buf->ri_lock);
1962                         buf->ri_lock = false;
1963                 }
1964                 if (BufferLocks[i] & BL_W_LOCK)
1965                 {
1966                         Assert(buf->w_lock);
1967                         buf->w_lock = false;
1968                 }
1969 #ifdef HAS_TEST_AND_SET
1970                 S_UNLOCK(&(buf->cntx_lock));
1971 #else
1972                 IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
1973 #endif
1974                 BufferLocks[i] = 0;
1975         }
1976 }
1977
1978 void
1979 LockBuffer (Buffer buffer, int mode)
1980 {
1981         BufferDesc *buf;
1982
1983         Assert(BufferIsValid(buffer));
1984         if (BufferIsLocal(buffer))
1985                 return;
1986
1987         buf = &(BufferDescriptors[buffer-1]);
1988
1989 #ifdef HAS_TEST_AND_SET
1990                 S_LOCK(&(buf->cntx_lock));
1991 #else
1992                 IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
1993 #endif
1994
1995         if (mode == BUFFER_LOCK_UNLOCK)
1996         {
1997                 if (BufferLocks[buffer-1] & BL_R_LOCK)
1998                 {
1999                         Assert(buf->r_locks > 0);
2000                         Assert(!(buf->w_lock));
2001                         Assert(!(BufferLocks[buffer-1] & (BL_W_LOCK | BL_RI_LOCK)))
2002                         (buf->r_locks)--;
2003                         BufferLocks[buffer-1] &= ~BL_R_LOCK;
2004                 }
2005                 else if (BufferLocks[buffer-1] & BL_W_LOCK)
2006                 {
2007                         Assert(buf->w_lock);
2008                         Assert(buf->r_locks == 0 && !buf->ri_lock);
2009                         Assert(!(BufferLocks[buffer-1] & (BL_R_LOCK | BL_RI_LOCK)))
2010                         buf->w_lock = false;
2011                         BufferLocks[buffer-1] &= ~BL_W_LOCK;
2012                 }
2013                 else
2014                         elog(ERROR, "UNLockBuffer: buffer %u is not locked", buffer);
2015         }
2016         else if (mode == BUFFER_LOCK_SHARE)
2017         {
2018                 unsigned        i = 0;
2019
2020                 Assert(!(BufferLocks[buffer-1] & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
2021                 while (buf->ri_lock || buf->w_lock)
2022                 {
2023 #ifdef HAS_TEST_AND_SET
2024                         S_UNLOCK(&(buf->cntx_lock));
2025                         s_lock_sleep(i++);
2026                         S_LOCK(&(buf->cntx_lock));
2027 #else
2028                         IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
2029                         s_lock_sleep(i++)
2030                         IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
2031 #endif
2032                 }
2033                 (buf->r_locks)++;
2034                 BufferLocks[buffer-1] |= BL_R_LOCK;
2035         }
2036         else if (mode == BUFFER_LOCK_EXCLUSIVE)
2037         {
2038                 unsigned        i = 0;
2039                 
2040                 Assert(!(BufferLocks[buffer-1] & (BL_R_LOCK | BL_W_LOCK | BL_RI_LOCK)));
2041                 while (buf->r_locks > 0 || buf->w_lock)
2042                 {
2043                         if (buf->r_locks > 3)
2044                         {
2045                                 if (!(BufferLocks[buffer-1] & BL_RI_LOCK))
2046                                         BufferLocks[buffer-1] |= BL_RI_LOCK;
2047                                 buf->ri_lock = true;
2048                         }
2049 #ifdef HAS_TEST_AND_SET
2050                         S_UNLOCK(&(buf->cntx_lock));
2051                         s_lock_sleep(i++);
2052                         S_LOCK(&(buf->cntx_lock));
2053 #else
2054                         IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
2055                         s_lock_sleep(i++)
2056                         IpcSemaphoreLock(WaitCLSemId, 0, IpcExclusiveLock);
2057 #endif
2058                 }
2059                 buf->w_lock = true;
2060                 BufferLocks[buffer-1] |= BL_W_LOCK;
2061                 if (BufferLocks[buffer-1] & BL_RI_LOCK)
2062                 {
2063                         buf->ri_lock = false;
2064                         BufferLocks[buffer-1] &= ~BL_RI_LOCK;
2065                 }
2066         }
2067         else
2068                 elog(ERROR, "LockBuffer: unknown lock mode %d", mode);
2069
2070 #ifdef HAS_TEST_AND_SET
2071                 S_UNLOCK(&(buf->cntx_lock));
2072 #else
2073                 IpcSemaphoreUnlock(WaitCLSemId, 0, IpcExclusiveLock);
2074 #endif
2075
2076 }