OSDN Git Service

Update copyrights to 2003.
[pg-rex/syncrep.git] / src / backend / access / transam / xlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  *              PostgreSQL transaction log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.122 2003/08/04 02:39:57 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include <fcntl.h>
18 #include <signal.h>
19 #include <unistd.h>
20 #include <errno.h>
21 #include <sys/stat.h>
22 #include <sys/time.h>
23 #include <dirent.h>
24
25 #include "access/clog.h"
26 #include "access/transam.h"
27 #include "access/xact.h"
28 #include "access/xlog.h"
29 #include "access/xlogutils.h"
30 #include "catalog/catversion.h"
31 #include "catalog/pg_control.h"
32 #include "storage/bufpage.h"
33 #include "storage/lwlock.h"
34 #include "storage/pmsignal.h"
35 #include "storage/proc.h"
36 #include "storage/sinval.h"
37 #include "storage/spin.h"
38 #include "utils/builtins.h"
39 #include "utils/guc.h"
40 #include "utils/relcache.h"
41 #include "miscadmin.h"
42
43
44 /*
45  * This chunk of hackery attempts to determine which file sync methods
46  * are available on the current platform, and to choose an appropriate
47  * default method.      We assume that fsync() is always available, and that
48  * configure determined whether fdatasync() is.
49  */
50 #define SYNC_METHOD_FSYNC               0
51 #define SYNC_METHOD_FDATASYNC   1
52 #define SYNC_METHOD_OPEN                2               /* used for both O_SYNC and
53                                                                                  * O_DSYNC */
54
55 #if defined(O_SYNC)
56 #define OPEN_SYNC_FLAG     O_SYNC
57 #else
58 #if defined(O_FSYNC)
59 #define OPEN_SYNC_FLAG    O_FSYNC
60 #endif
61 #endif
62
63 #if defined(OPEN_SYNC_FLAG)
64 #if defined(O_DSYNC) && (O_DSYNC != OPEN_SYNC_FLAG)
65 #define OPEN_DATASYNC_FLAG        O_DSYNC
66 #endif
67 #endif
68
69 #if defined(OPEN_DATASYNC_FLAG)
70 #define DEFAULT_SYNC_METHOD_STR    "open_datasync"
71 #define DEFAULT_SYNC_METHOD                SYNC_METHOD_OPEN
72 #define DEFAULT_SYNC_FLAGBIT       OPEN_DATASYNC_FLAG
73 #else
74 #if defined(HAVE_FDATASYNC)
75 #define DEFAULT_SYNC_METHOD_STR   "fdatasync"
76 #define DEFAULT_SYNC_METHOD               SYNC_METHOD_FDATASYNC
77 #define DEFAULT_SYNC_FLAGBIT      0
78 #else
79 #define DEFAULT_SYNC_METHOD_STR   "fsync"
80 #define DEFAULT_SYNC_METHOD               SYNC_METHOD_FSYNC
81 #define DEFAULT_SYNC_FLAGBIT      0
82 #endif
83 #endif
84
85
86 /* User-settable parameters */
87 int                     CheckPointSegments = 3;
88 int                     XLOGbuffers = 8;
89 int                     XLOG_DEBUG = 0;
90 char       *XLOG_sync_method = NULL;
91 const char      XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
92 char            XLOG_archive_dir[MAXPGPATH];            /* null string means
93                                                                                                  * delete 'em */
94
95 /*
96  * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
97  * preallocated XLOG segments --- we try to have at least XLOGfiles advance
98  * segments but no more than XLOGfileslop segments.  This could
99  * be made a separate GUC variable, but at present I think it's sufficient
100  * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
101  * checkpoint will free no more than 2*CheckPointSegments log segments, and
102  * we want to recycle all of them; the +1 allows boundary cases to happen
103  * without wasting a delete/create-segment cycle.
104  */
105
106 #define XLOGfileslop    (2*CheckPointSegments + 1)
107
108
109 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
110 static int      sync_method = DEFAULT_SYNC_METHOD;
111 static int      open_sync_bit = DEFAULT_SYNC_FLAGBIT;
112
113 #define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
114
115 #define MinXLOGbuffers  4
116
117
118 /*
119  * ThisStartUpID will be same in all backends --- it identifies current
120  * instance of the database system.
121  */
122 StartUpID       ThisStartUpID = 0;
123
124 /* Are we doing recovery by reading XLOG? */
125 bool            InRecovery = false;
126
127 /*
128  * MyLastRecPtr points to the start of the last XLOG record inserted by the
129  * current transaction.  If MyLastRecPtr.xrecoff == 0, then the current
130  * xact hasn't yet inserted any transaction-controlled XLOG records.
131  *
132  * Note that XLOG records inserted outside transaction control are not
133  * reflected into MyLastRecPtr.  They do, however, cause MyXactMadeXLogEntry
134  * to be set true.      The latter can be used to test whether the current xact
135  * made any loggable changes (including out-of-xact changes, such as
136  * sequence updates).
137  *
138  * When we insert/update/delete a tuple in a temporary relation, we do not
139  * make any XLOG record, since we don't care about recovering the state of
140  * the temp rel after a crash.  However, we will still need to remember
141  * whether our transaction committed or aborted in that case.  So, we must
142  * set MyXactMadeTempRelUpdate true to indicate that the XID will be of
143  * interest later.
144  */
145 XLogRecPtr      MyLastRecPtr = {0, 0};
146
147 bool            MyXactMadeXLogEntry = false;
148
149 bool            MyXactMadeTempRelUpdate = false;
150
151 /*
152  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
153  * current backend.  It is updated for all inserts, transaction-controlled
154  * or not.      ProcLastRecEnd is similar but points to end+1 of last record.
155  */
156 static XLogRecPtr ProcLastRecPtr = {0, 0};
157
158 XLogRecPtr      ProcLastRecEnd = {0, 0};
159
160 /*
161  * RedoRecPtr is this backend's local copy of the REDO record pointer
162  * (which is almost but not quite the same as a pointer to the most recent
163  * CHECKPOINT record).  We update this from the shared-memory copy,
164  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
165  * hold the Insert lock).  See XLogInsert for details.  We are also allowed
166  * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
167  * see GetRedoRecPtr.
168  */
169 static XLogRecPtr RedoRecPtr;
170
171 /*----------
172  * Shared-memory data structures for XLOG control
173  *
174  * LogwrtRqst indicates a byte position that we need to write and/or fsync
175  * the log up to (all records before that point must be written or fsynced).
176  * LogwrtResult indicates the byte positions we have already written/fsynced.
177  * These structs are identical but are declared separately to indicate their
178  * slightly different functions.
179  *
180  * We do a lot of pushups to minimize the amount of access to lockable
181  * shared memory values.  There are actually three shared-memory copies of
182  * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
183  *              XLogCtl->LogwrtResult is protected by info_lck
184  *              XLogCtl->Write.LogwrtResult is protected by WALWriteLock
185  *              XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
186  * One must hold the associated lock to read or write any of these, but
187  * of course no lock is needed to read/write the unshared LogwrtResult.
188  *
189  * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
190  * right", since both are updated by a write or flush operation before
191  * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
192  * is that it can be examined/modified by code that already holds WALWriteLock
193  * without needing to grab info_lck as well.
194  *
195  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
196  * but is updated when convenient.      Again, it exists for the convenience of
197  * code that is already holding WALInsertLock but not the other locks.
198  *
199  * The unshared LogwrtResult may lag behind any or all of these, and again
200  * is updated when convenient.
201  *
202  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
203  * (protected by info_lck), but we don't need to cache any copies of it.
204  *
205  * Note that this all works because the request and result positions can only
206  * advance forward, never back up, and so we can easily determine which of two
207  * values is "more up to date".
208  *
209  * info_lck is only held long enough to read/update the protected variables,
210  * so it's a plain spinlock.  The other locks are held longer (potentially
211  * over I/O operations), so we use LWLocks for them.  These locks are:
212  *
213  * WALInsertLock: must be held to insert a record into the WAL buffers.
214  *
215  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
216  * XLogFlush).
217  *
218  * ControlFileLock: must be held to read/update control file or create
219  * new log file.
220  *
221  * CheckpointLock: must be held to do a checkpoint (ensures only one
222  * checkpointer at a time; even though the postmaster won't launch
223  * parallel checkpoint processes, we need this because manual checkpoints
224  * could be launched simultaneously).
225  *
226  *----------
227  */
228 typedef struct XLogwrtRqst
229 {
230         XLogRecPtr      Write;                  /* last byte + 1 to write out */
231         XLogRecPtr      Flush;                  /* last byte + 1 to flush */
232 } XLogwrtRqst;
233
234 typedef struct XLogwrtResult
235 {
236         XLogRecPtr      Write;                  /* last byte + 1 written out */
237         XLogRecPtr      Flush;                  /* last byte + 1 flushed */
238 } XLogwrtResult;
239
240 /*
241  * Shared state data for XLogInsert.
242  */
243 typedef struct XLogCtlInsert
244 {
245         XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
246         XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
247         uint16          curridx;                /* current block index in cache */
248         XLogPageHeader currpage;        /* points to header of block in cache */
249         char       *currpos;            /* current insertion point in cache */
250         XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
251 } XLogCtlInsert;
252
253 /*
254  * Shared state data for XLogWrite/XLogFlush.
255  */
256 typedef struct XLogCtlWrite
257 {
258         XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
259         uint16          curridx;                /* cache index of next block to write */
260 } XLogCtlWrite;
261
262 /*
263  * Total shared-memory state for XLOG.
264  */
265 typedef struct XLogCtlData
266 {
267         /* Protected by WALInsertLock: */
268         XLogCtlInsert Insert;
269         /* Protected by info_lck: */
270         XLogwrtRqst LogwrtRqst;
271         XLogwrtResult LogwrtResult;
272         /* Protected by WALWriteLock: */
273         XLogCtlWrite Write;
274
275         /*
276          * These values do not change after startup, although the pointed-to
277          * pages and xlblocks values certainly do.      Permission to read/write
278          * the pages and xlblocks values depends on WALInsertLock and
279          * WALWriteLock.
280          */
281         char       *pages;                      /* buffers for unwritten XLOG pages */
282         XLogRecPtr *xlblocks;           /* 1st byte ptr-s + BLCKSZ */
283         uint32          XLogCacheByte;  /* # bytes in xlog buffers */
284         uint32          XLogCacheBlck;  /* highest allocated xlog buffer index */
285         StartUpID       ThisStartUpID;
286
287         /* This value is not protected by *any* lock... */
288         /* see SetSavedRedoRecPtr/GetSavedRedoRecPtr */
289         XLogRecPtr      SavedRedoRecPtr;
290
291         slock_t         info_lck;               /* locks shared LogwrtRqst/LogwrtResult */
292 } XLogCtlData;
293
294 static XLogCtlData *XLogCtl = NULL;
295
296 /*
297  * We maintain an image of pg_control in shared memory.
298  */
299 static ControlFileData *ControlFile = NULL;
300
301 /*
302  * Macros for managing XLogInsert state.  In most cases, the calling routine
303  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
304  * so these are passed as parameters instead of being fetched via XLogCtl.
305  */
306
307 /* Free space remaining in the current xlog page buffer */
308 #define INSERT_FREESPACE(Insert)  \
309         (BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
310
311 /* Construct XLogRecPtr value for current insertion point */
312 #define INSERT_RECPTR(recptr,Insert,curridx)  \
313         ( \
314           (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
315           (recptr).xrecoff = \
316                 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
317         )
318
319
320 /* Increment an xlogid/segment pair */
321 #define NextLogSeg(logId, logSeg)       \
322         do { \
323                 if ((logSeg) >= XLogSegsPerFile-1) \
324                 { \
325                         (logId)++; \
326                         (logSeg) = 0; \
327                 } \
328                 else \
329                         (logSeg)++; \
330         } while (0)
331
332 /* Decrement an xlogid/segment pair (assume it's not 0,0) */
333 #define PrevLogSeg(logId, logSeg)       \
334         do { \
335                 if (logSeg) \
336                         (logSeg)--; \
337                 else \
338                 { \
339                         (logId)--; \
340                         (logSeg) = XLogSegsPerFile-1; \
341                 } \
342         } while (0)
343
344 /*
345  * Compute ID and segment from an XLogRecPtr.
346  *
347  * For XLByteToSeg, do the computation at face value.  For XLByteToPrevSeg,
348  * a boundary byte is taken to be in the previous segment.      This is suitable
349  * for deciding which segment to write given a pointer to a record end,
350  * for example.  (We can assume xrecoff is not zero, since no valid recptr
351  * can have that.)
352  */
353 #define XLByteToSeg(xlrp, logId, logSeg)        \
354         ( logId = (xlrp).xlogid, \
355           logSeg = (xlrp).xrecoff / XLogSegSize \
356         )
357 #define XLByteToPrevSeg(xlrp, logId, logSeg)    \
358         ( logId = (xlrp).xlogid, \
359           logSeg = ((xlrp).xrecoff - 1) / XLogSegSize \
360         )
361
362 /*
363  * Is an XLogRecPtr within a particular XLOG segment?
364  *
365  * For XLByteInSeg, do the computation at face value.  For XLByteInPrevSeg,
366  * a boundary byte is taken to be in the previous segment.
367  */
368 #define XLByteInSeg(xlrp, logId, logSeg)        \
369         ((xlrp).xlogid == (logId) && \
370          (xlrp).xrecoff / XLogSegSize == (logSeg))
371
372 #define XLByteInPrevSeg(xlrp, logId, logSeg)    \
373         ((xlrp).xlogid == (logId) && \
374          ((xlrp).xrecoff - 1) / XLogSegSize == (logSeg))
375
376
377 #define XLogFileName(path, log, seg)    \
378                         snprintf(path, MAXPGPATH, "%s/%08X%08X",        \
379                                          XLogDir, log, seg)
380
381 #define PrevBufIdx(idx)         \
382                 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
383
384 #define NextBufIdx(idx)         \
385                 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
386
387 #define XRecOffIsValid(xrecoff) \
388                 ((xrecoff) % BLCKSZ >= SizeOfXLogPHD && \
389                 (BLCKSZ - (xrecoff) % BLCKSZ) >= SizeOfXLogRecord)
390
391 /*
392  * _INTL_MAXLOGRECSZ: max space needed for a record including header and
393  * any backup-block data.
394  */
395 #define _INTL_MAXLOGRECSZ       (SizeOfXLogRecord + MAXLOGRECSZ + \
396                                                          XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
397
398
399 /* File path names */
400 static char XLogDir[MAXPGPATH];
401 static char ControlFilePath[MAXPGPATH];
402
403 /*
404  * Private, possibly out-of-date copy of shared LogwrtResult.
405  * See discussion above.
406  */
407 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
408
409 /*
410  * openLogFile is -1 or a kernel FD for an open log file segment.
411  * When it's open, openLogOff is the current seek offset in the file.
412  * openLogId/openLogSeg identify the segment.  These variables are only
413  * used to write the XLOG, and so will normally refer to the active segment.
414  */
415 static int      openLogFile = -1;
416 static uint32 openLogId = 0;
417 static uint32 openLogSeg = 0;
418 static uint32 openLogOff = 0;
419
420 /*
421  * These variables are used similarly to the ones above, but for reading
422  * the XLOG.  Note, however, that readOff generally represents the offset
423  * of the page just read, not the seek position of the FD itself, which
424  * will be just past that page.
425  */
426 static int      readFile = -1;
427 static uint32 readId = 0;
428 static uint32 readSeg = 0;
429 static uint32 readOff = 0;
430
431 /* Buffer for currently read page (BLCKSZ bytes) */
432 static char *readBuf = NULL;
433
434 /* State information for XLOG reading */
435 static XLogRecPtr ReadRecPtr;
436 static XLogRecPtr EndRecPtr;
437 static XLogRecord *nextRecord = NULL;
438 static StartUpID lastReadSUI;
439
440 static bool InRedo = false;
441
442
443 static bool AdvanceXLInsertBuffer(void);
444 static void XLogWrite(XLogwrtRqst WriteRqst);
445 static int XLogFileInit(uint32 log, uint32 seg,
446                          bool *use_existent, bool use_lock);
447 static bool InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
448                                            bool find_free, int max_advance,
449                                            bool use_lock);
450 static int      XLogFileOpen(uint32 log, uint32 seg, bool econt);
451 static void PreallocXlogFiles(XLogRecPtr endptr);
452 static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr);
453 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
454 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI);
455 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
456                                          int whichChkpt,
457                                          char *buffer);
458 static void WriteControlFile(void);
459 static void ReadControlFile(void);
460 static char *str_time(time_t tnow);
461 static void xlog_outrec(char *buf, XLogRecord *record);
462 static void issue_xlog_fsync(void);
463
464
465 /*
466  * Insert an XLOG record having the specified RMID and info bytes,
467  * with the body of the record being the data chunk(s) described by
468  * the rdata list (see xlog.h for notes about rdata).
469  *
470  * Returns XLOG pointer to end of record (beginning of next record).
471  * This can be used as LSN for data pages affected by the logged action.
472  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
473  * before the data page can be written out.  This implements the basic
474  * WAL rule "write the log before the data".)
475  *
476  * NB: this routine feels free to scribble on the XLogRecData structs,
477  * though not on the data they reference.  This is OK since the XLogRecData
478  * structs are always just temporaries in the calling code.
479  */
480 XLogRecPtr
481 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
482 {
483         XLogCtlInsert *Insert = &XLogCtl->Insert;
484         XLogRecord *record;
485         XLogContRecord *contrecord;
486         XLogRecPtr      RecPtr;
487         XLogRecPtr      WriteRqst;
488         uint32          freespace;
489         uint16          curridx;
490         XLogRecData *rdt;
491         Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
492         bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
493         BkpBlock        dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
494         XLogRecPtr      dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
495         XLogRecData dtbuf_rdt[2 * XLR_MAX_BKP_BLOCKS];
496         crc64           rdata_crc;
497         uint32          len,
498                                 write_len;
499         unsigned        i;
500         XLogwrtRqst LogwrtRqst;
501         bool            updrqst;
502         bool            no_tran = (rmid == RM_XLOG_ID) ? true : false;
503
504         if (info & XLR_INFO_MASK)
505         {
506                 if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
507                         elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK));
508                 no_tran = true;
509                 info &= ~XLR_INFO_MASK;
510         }
511
512         /*
513          * In bootstrap mode, we don't actually log anything but XLOG
514          * resources; return a phony record pointer.
515          */
516         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
517         {
518                 RecPtr.xlogid = 0;
519                 RecPtr.xrecoff = SizeOfXLogPHD; /* start of 1st checkpoint record */
520                 return (RecPtr);
521         }
522
523         /*
524          * Here we scan the rdata list, determine which buffers must be backed
525          * up, and compute the CRC values for the data.  Note that the record
526          * header isn't added into the CRC yet since we don't know the final
527          * length or info bits quite yet.
528          *
529          * We may have to loop back to here if a race condition is detected
530          * below. We could prevent the race by doing all this work while
531          * holding the insert lock, but it seems better to avoid doing CRC
532          * calculations while holding the lock.  This means we have to be
533          * careful about modifying the rdata list until we know we aren't
534          * going to loop back again.  The only change we allow ourselves to
535          * make earlier is to set rdt->data = NULL in list items we have
536          * decided we will have to back up the whole buffer for.  This is OK
537          * because we will certainly decide the same thing again for those
538          * items if we do it over; doing it here saves an extra pass over the
539          * list later.
540          */
541 begin:;
542         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
543         {
544                 dtbuf[i] = InvalidBuffer;
545                 dtbuf_bkp[i] = false;
546         }
547
548         INIT_CRC64(rdata_crc);
549         len = 0;
550         for (rdt = rdata;;)
551         {
552                 if (rdt->buffer == InvalidBuffer)
553                 {
554                         /* Simple data, just include it */
555                         len += rdt->len;
556                         COMP_CRC64(rdata_crc, rdt->data, rdt->len);
557                 }
558                 else
559                 {
560                         /* Find info for buffer */
561                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
562                         {
563                                 if (rdt->buffer == dtbuf[i])
564                                 {
565                                         /* Buffer already referenced by earlier list item */
566                                         if (dtbuf_bkp[i])
567                                                 rdt->data = NULL;
568                                         else if (rdt->data)
569                                         {
570                                                 len += rdt->len;
571                                                 COMP_CRC64(rdata_crc, rdt->data, rdt->len);
572                                         }
573                                         break;
574                                 }
575                                 if (dtbuf[i] == InvalidBuffer)
576                                 {
577                                         /* OK, put it in this slot */
578                                         dtbuf[i] = rdt->buffer;
579
580                                         /*
581                                          * XXX We assume page LSN is first data on page
582                                          */
583                                         dtbuf_lsn[i] = *((XLogRecPtr *) BufferGetBlock(rdt->buffer));
584                                         if (XLByteLE(dtbuf_lsn[i], RedoRecPtr))
585                                         {
586                                                 crc64           dtcrc;
587
588                                                 dtbuf_bkp[i] = true;
589                                                 rdt->data = NULL;
590                                                 INIT_CRC64(dtcrc);
591                                                 COMP_CRC64(dtcrc,
592                                                                    BufferGetBlock(dtbuf[i]),
593                                                                    BLCKSZ);
594                                                 dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]);
595                                                 dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]);
596                                                 COMP_CRC64(dtcrc,
597                                                                 (char *) &(dtbuf_xlg[i]) + sizeof(crc64),
598                                                                    sizeof(BkpBlock) - sizeof(crc64));
599                                                 FIN_CRC64(dtcrc);
600                                                 dtbuf_xlg[i].crc = dtcrc;
601                                         }
602                                         else if (rdt->data)
603                                         {
604                                                 len += rdt->len;
605                                                 COMP_CRC64(rdata_crc, rdt->data, rdt->len);
606                                         }
607                                         break;
608                                 }
609                         }
610                         if (i >= XLR_MAX_BKP_BLOCKS)
611                                 elog(PANIC, "can backup at most %d blocks per xlog record",
612                                          XLR_MAX_BKP_BLOCKS);
613                 }
614                 /* Break out of loop when rdt points to last list item */
615                 if (rdt->next == NULL)
616                         break;
617                 rdt = rdt->next;
618         }
619
620         /*
621          * NOTE: the test for len == 0 here is somewhat fishy, since in theory
622          * all of the rmgr data might have been suppressed in favor of backup
623          * blocks.      Currently, all callers of XLogInsert provide at least some
624          * not-in-a-buffer data and so len == 0 should never happen, but that
625          * may not be true forever.  If you need to remove the len == 0 check,
626          * also remove the check for xl_len == 0 in ReadRecord, below.
627          */
628         if (len == 0 || len > MAXLOGRECSZ)
629                 elog(PANIC, "invalid xlog record length %u", len);
630
631         START_CRIT_SECTION();
632
633         /* update LogwrtResult before doing cache fill check */
634         {
635                 /* use volatile pointer to prevent code rearrangement */
636                 volatile XLogCtlData *xlogctl = XLogCtl;
637
638                 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
639                 LogwrtRqst = xlogctl->LogwrtRqst;
640                 LogwrtResult = xlogctl->LogwrtResult;
641                 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
642         }
643
644         /*
645          * If cache is half filled then try to acquire write lock and do
646          * XLogWrite. Ignore any fractional blocks in performing this check.
647          */
648         LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;
649         if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
650                 (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
651                  XLogCtl->XLogCacheByte / 2))
652         {
653                 if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
654                 {
655                         LogwrtResult = XLogCtl->Write.LogwrtResult;
656                         if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
657                                 XLogWrite(LogwrtRqst);
658                         LWLockRelease(WALWriteLock);
659                 }
660         }
661
662         /* Now wait to get insert lock */
663         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
664
665         /*
666          * Check to see if my RedoRecPtr is out of date.  If so, may have to
667          * go back and recompute everything.  This can only happen just after
668          * a checkpoint, so it's better to be slow in this case and fast
669          * otherwise.
670          */
671         if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
672         {
673                 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
674                 RedoRecPtr = Insert->RedoRecPtr;
675
676                 for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
677                 {
678                         if (dtbuf[i] == InvalidBuffer)
679                                 continue;
680                         if (dtbuf_bkp[i] == false &&
681                                 XLByteLE(dtbuf_lsn[i], RedoRecPtr))
682                         {
683                                 /*
684                                  * Oops, this buffer now needs to be backed up, but we
685                                  * didn't think so above.  Start over.
686                                  */
687                                 LWLockRelease(WALInsertLock);
688                                 END_CRIT_SECTION();
689                                 goto begin;
690                         }
691                 }
692         }
693
694         /*
695          * Make additional rdata list entries for the backup blocks, so that
696          * we don't need to special-case them in the write loop.  Note that we
697          * have now irrevocably changed the input rdata list.  At the exit of
698          * this loop, write_len includes the backup block data.
699          *
700          * Also set the appropriate info bits to show which buffers were backed
701          * up.  The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th
702          * distinct buffer value (ignoring InvalidBuffer) appearing in the
703          * rdata list.
704          */
705         write_len = len;
706         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
707         {
708                 if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i]))
709                         continue;
710
711                 info |= XLR_SET_BKP_BLOCK(i);
712
713                 rdt->next = &(dtbuf_rdt[2 * i]);
714
715                 dtbuf_rdt[2 * i].data = (char *) &(dtbuf_xlg[i]);
716                 dtbuf_rdt[2 * i].len = sizeof(BkpBlock);
717                 write_len += sizeof(BkpBlock);
718
719                 rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]);
720
721                 dtbuf_rdt[2 * i + 1].data = (char *) BufferGetBlock(dtbuf[i]);
722                 dtbuf_rdt[2 * i + 1].len = BLCKSZ;
723                 write_len += BLCKSZ;
724                 dtbuf_rdt[2 * i + 1].next = NULL;
725         }
726
727         /* Insert record header */
728
729         updrqst = false;
730         freespace = INSERT_FREESPACE(Insert);
731         if (freespace < SizeOfXLogRecord)
732         {
733                 updrqst = AdvanceXLInsertBuffer();
734                 freespace = BLCKSZ - SizeOfXLogPHD;
735         }
736
737         curridx = Insert->curridx;
738         record = (XLogRecord *) Insert->currpos;
739
740         record->xl_prev = Insert->PrevRecord;
741         if (no_tran)
742         {
743                 record->xl_xact_prev.xlogid = 0;
744                 record->xl_xact_prev.xrecoff = 0;
745         }
746         else
747                 record->xl_xact_prev = MyLastRecPtr;
748
749         record->xl_xid = GetCurrentTransactionId();
750         record->xl_len = len;           /* doesn't include backup blocks */
751         record->xl_info = info;
752         record->xl_rmid = rmid;
753
754         /* Now we can finish computing the main CRC */
755         COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64),
756                            SizeOfXLogRecord - sizeof(crc64));
757         FIN_CRC64(rdata_crc);
758         record->xl_crc = rdata_crc;
759
760         /* Compute record's XLOG location */
761         INSERT_RECPTR(RecPtr, Insert, curridx);
762
763         /* If first XLOG record of transaction, save it in PGPROC array */
764         if (MyLastRecPtr.xrecoff == 0 && !no_tran)
765         {
766                 /*
767                  * We do not acquire SInvalLock here because of possible deadlock.
768                  * Anyone who wants to inspect other procs' logRec must acquire
769                  * WALInsertLock, instead.      A better solution would be a per-PROC
770                  * spinlock, but no time for that before 7.2 --- tgl 12/19/01.
771                  */
772                 MyProc->logRec = RecPtr;
773         }
774
775         if (XLOG_DEBUG)
776         {
777                 char            buf[8192];
778
779                 sprintf(buf, "INSERT @ %X/%X: ", RecPtr.xlogid, RecPtr.xrecoff);
780                 xlog_outrec(buf, record);
781                 if (rdata->data != NULL)
782                 {
783                         strcat(buf, " - ");
784                         RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data);
785                 }
786                 elog(LOG, "%s", buf);
787         }
788
789         /* Record begin of record in appropriate places */
790         if (!no_tran)
791                 MyLastRecPtr = RecPtr;
792         ProcLastRecPtr = RecPtr;
793         Insert->PrevRecord = RecPtr;
794         MyXactMadeXLogEntry = true;
795
796         Insert->currpos += SizeOfXLogRecord;
797         freespace -= SizeOfXLogRecord;
798
799         /*
800          * Append the data, including backup blocks if any
801          */
802         while (write_len)
803         {
804                 while (rdata->data == NULL)
805                         rdata = rdata->next;
806
807                 if (freespace > 0)
808                 {
809                         if (rdata->len > freespace)
810                         {
811                                 memcpy(Insert->currpos, rdata->data, freespace);
812                                 rdata->data += freespace;
813                                 rdata->len -= freespace;
814                                 write_len -= freespace;
815                         }
816                         else
817                         {
818                                 memcpy(Insert->currpos, rdata->data, rdata->len);
819                                 freespace -= rdata->len;
820                                 write_len -= rdata->len;
821                                 Insert->currpos += rdata->len;
822                                 rdata = rdata->next;
823                                 continue;
824                         }
825                 }
826
827                 /* Use next buffer */
828                 updrqst = AdvanceXLInsertBuffer();
829                 curridx = Insert->curridx;
830                 /* Insert cont-record header */
831                 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
832                 contrecord = (XLogContRecord *) Insert->currpos;
833                 contrecord->xl_rem_len = write_len;
834                 Insert->currpos += SizeOfXLogContRecord;
835                 freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
836         }
837
838         /* Ensure next record will be properly aligned */
839         Insert->currpos = (char *) Insert->currpage +
840                 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
841         freespace = INSERT_FREESPACE(Insert);
842
843         /*
844          * The recptr I return is the beginning of the *next* record. This
845          * will be stored as LSN for changed data pages...
846          */
847         INSERT_RECPTR(RecPtr, Insert, curridx);
848
849         /* Need to update shared LogwrtRqst if some block was filled up */
850         if (freespace < SizeOfXLogRecord)
851                 updrqst = true;                 /* curridx is filled and available for
852                                                                  * writing out */
853         else
854                 curridx = PrevBufIdx(curridx);
855         WriteRqst = XLogCtl->xlblocks[curridx];
856
857         LWLockRelease(WALInsertLock);
858
859         if (updrqst)
860         {
861                 /* use volatile pointer to prevent code rearrangement */
862                 volatile XLogCtlData *xlogctl = XLogCtl;
863
864                 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
865                 /* advance global request to include new block(s) */
866                 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
867                         xlogctl->LogwrtRqst.Write = WriteRqst;
868                 /* update local result copy while I have the chance */
869                 LogwrtResult = xlogctl->LogwrtResult;
870                 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
871         }
872
873         ProcLastRecEnd = RecPtr;
874
875         END_CRIT_SECTION();
876
877         return (RecPtr);
878 }
879
880 /*
881  * Advance the Insert state to the next buffer page, writing out the next
882  * buffer if it still contains unwritten data.
883  *
884  * The global LogwrtRqst.Write pointer needs to be advanced to include the
885  * just-filled page.  If we can do this for free (without an extra lock),
886  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
887  * request update still needs to be done, FALSE if we did it internally.
888  *
889  * Must be called with WALInsertLock held.
890  */
891 static bool
892 AdvanceXLInsertBuffer(void)
893 {
894         XLogCtlInsert *Insert = &XLogCtl->Insert;
895         XLogCtlWrite *Write = &XLogCtl->Write;
896         uint16          nextidx = NextBufIdx(Insert->curridx);
897         bool            update_needed = true;
898         XLogRecPtr      OldPageRqstPtr;
899         XLogwrtRqst WriteRqst;
900         XLogRecPtr      NewPageEndPtr;
901         XLogPageHeader NewPage;
902
903         /* Use Insert->LogwrtResult copy if it's more fresh */
904         if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
905                 LogwrtResult = Insert->LogwrtResult;
906
907         /*
908          * Get ending-offset of the buffer page we need to replace (this may
909          * be zero if the buffer hasn't been used yet).  Fall through if it's
910          * already written out.
911          */
912         OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
913         if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
914         {
915                 /* nope, got work to do... */
916                 XLogRecPtr      FinishedPageRqstPtr;
917
918                 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
919
920                 /* Before waiting, get info_lck and update LogwrtResult */
921                 {
922                         /* use volatile pointer to prevent code rearrangement */
923                         volatile XLogCtlData *xlogctl = XLogCtl;
924
925                         SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
926                         if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
927                                 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
928                         LogwrtResult = xlogctl->LogwrtResult;
929                         SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
930                 }
931
932                 update_needed = false;  /* Did the shared-request update */
933
934                 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
935                 {
936                         /* OK, someone wrote it already */
937                         Insert->LogwrtResult = LogwrtResult;
938                 }
939                 else
940                 {
941                         /* Must acquire write lock */
942                         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
943                         LogwrtResult = Write->LogwrtResult;
944                         if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
945                         {
946                                 /* OK, someone wrote it already */
947                                 LWLockRelease(WALWriteLock);
948                                 Insert->LogwrtResult = LogwrtResult;
949                         }
950                         else
951                         {
952                                 /*
953                                  * Have to write buffers while holding insert lock. This
954                                  * is not good, so only write as much as we absolutely
955                                  * must.
956                                  */
957                                 WriteRqst.Write = OldPageRqstPtr;
958                                 WriteRqst.Flush.xlogid = 0;
959                                 WriteRqst.Flush.xrecoff = 0;
960                                 XLogWrite(WriteRqst);
961                                 LWLockRelease(WALWriteLock);
962                                 Insert->LogwrtResult = LogwrtResult;
963                         }
964                 }
965         }
966
967         /*
968          * Now the next buffer slot is free and we can set it up to be the
969          * next output page.
970          */
971         NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
972         if (NewPageEndPtr.xrecoff >= XLogFileSize)
973         {
974                 /* crossing a logid boundary */
975                 NewPageEndPtr.xlogid += 1;
976                 NewPageEndPtr.xrecoff = BLCKSZ;
977         }
978         else
979                 NewPageEndPtr.xrecoff += BLCKSZ;
980         XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
981         NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
982         Insert->curridx = nextidx;
983         Insert->currpage = NewPage;
984         Insert->currpos = ((char *) NewPage) + SizeOfXLogPHD;
985
986         /*
987          * Be sure to re-zero the buffer so that bytes beyond what we've
988          * written will look like zeroes and not valid XLOG records...
989          */
990         MemSet((char *) NewPage, 0, BLCKSZ);
991
992         /* And fill the new page's header */
993         NewPage->xlp_magic = XLOG_PAGE_MAGIC;
994         /* NewPage->xlp_info = 0; */    /* done by memset */
995         NewPage->xlp_sui = ThisStartUpID;
996         NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
997         NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
998
999         return update_needed;
1000 }
1001
1002 /*
1003  * Write and/or fsync the log at least as far as WriteRqst indicates.
1004  *
1005  * Must be called with WALWriteLock held.
1006  */
1007 static void
1008 XLogWrite(XLogwrtRqst WriteRqst)
1009 {
1010         XLogCtlWrite *Write = &XLogCtl->Write;
1011         char       *from;
1012         bool            ispartialpage;
1013         bool            use_existent;
1014
1015         /*
1016          * Update local LogwrtResult (caller probably did this already,
1017          * but...)
1018          */
1019         LogwrtResult = Write->LogwrtResult;
1020
1021         while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1022         {
1023                 /*
1024                  * Make sure we're not ahead of the insert process.  This could
1025                  * happen if we're passed a bogus WriteRqst.Write that is past the
1026                  * end of the last page that's been initialized by
1027                  * AdvanceXLInsertBuffer.
1028                  */
1029                 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[Write->curridx]))
1030                         elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
1031                                  LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1032                                  XLogCtl->xlblocks[Write->curridx].xlogid,
1033                                  XLogCtl->xlblocks[Write->curridx].xrecoff);
1034
1035                 /* Advance LogwrtResult.Write to end of current buffer page */
1036                 LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx];
1037                 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1038
1039                 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1040                 {
1041                         /*
1042                          * Switch to new logfile segment.
1043                          */
1044                         if (openLogFile >= 0)
1045                         {
1046                                 if (close(openLogFile) != 0)
1047                                         ereport(PANIC,
1048                                                         (errcode_for_file_access(),
1049                                         errmsg("close of log file %u, segment %u failed: %m",
1050                                                    openLogId, openLogSeg)));
1051                                 openLogFile = -1;
1052                         }
1053                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1054
1055                         /* create/use new log file */
1056                         use_existent = true;
1057                         openLogFile = XLogFileInit(openLogId, openLogSeg,
1058                                                                            &use_existent, true);
1059                         openLogOff = 0;
1060
1061                         /* update pg_control, unless someone else already did */
1062                         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1063                         if (ControlFile->logId < openLogId ||
1064                                 (ControlFile->logId == openLogId &&
1065                                  ControlFile->logSeg < openLogSeg + 1))
1066                         {
1067                                 ControlFile->logId = openLogId;
1068                                 ControlFile->logSeg = openLogSeg + 1;
1069                                 ControlFile->time = time(NULL);
1070                                 UpdateControlFile();
1071
1072                                 /*
1073                                  * Signal postmaster to start a checkpoint if it's been
1074                                  * too long since the last one.  (We look at local copy of
1075                                  * RedoRecPtr which might be a little out of date, but
1076                                  * should be close enough for this purpose.)
1077                                  */
1078                                 if (IsUnderPostmaster &&
1079                                         (openLogId != RedoRecPtr.xlogid ||
1080                                          openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
1081                                          (uint32) CheckPointSegments))
1082                                 {
1083                                         if (XLOG_DEBUG)
1084                                                 elog(LOG, "time for a checkpoint, signaling postmaster");
1085                                         SendPostmasterSignal(PMSIGNAL_DO_CHECKPOINT);
1086                                 }
1087                         }
1088                         LWLockRelease(ControlFileLock);
1089                 }
1090
1091                 if (openLogFile < 0)
1092                 {
1093                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1094                         openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
1095                         openLogOff = 0;
1096                 }
1097
1098                 /* Need to seek in the file? */
1099                 if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
1100                 {
1101                         openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
1102                         if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
1103                                 ereport(PANIC,
1104                                                 (errcode_for_file_access(),
1105                                                  errmsg("lseek of log file %u, segment %u, offset %u failed: %m",
1106                                                                 openLogId, openLogSeg, openLogOff)));
1107                 }
1108
1109                 /* OK to write the page */
1110                 from = XLogCtl->pages + Write->curridx * BLCKSZ;
1111                 errno = 0;
1112                 if (write(openLogFile, from, BLCKSZ) != BLCKSZ)
1113                 {
1114                         /* if write didn't set errno, assume problem is no disk space */
1115                         if (errno == 0)
1116                                 errno = ENOSPC;
1117                         ereport(PANIC,
1118                                         (errcode_for_file_access(),
1119                                          errmsg("write of log file %u, segment %u, offset %u failed: %m",
1120                                                         openLogId, openLogSeg, openLogOff)));
1121                 }
1122                 openLogOff += BLCKSZ;
1123
1124                 /*
1125                  * If we just wrote the whole last page of a logfile segment,
1126                  * fsync the segment immediately.  This avoids having to go back
1127                  * and re-open prior segments when an fsync request comes along
1128                  * later. Doing it here ensures that one and only one backend will
1129                  * perform this fsync.
1130                  */
1131                 if (openLogOff >= XLogSegSize && !ispartialpage)
1132                 {
1133                         issue_xlog_fsync();
1134                         LogwrtResult.Flush = LogwrtResult.Write;        /* end of current page */
1135                 }
1136
1137                 if (ispartialpage)
1138                 {
1139                         /* Only asked to write a partial page */
1140                         LogwrtResult.Write = WriteRqst.Write;
1141                         break;
1142                 }
1143                 Write->curridx = NextBufIdx(Write->curridx);
1144         }
1145
1146         /*
1147          * If asked to flush, do so
1148          */
1149         if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1150                 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1151         {
1152                 /*
1153                  * Could get here without iterating above loop, in which case we
1154                  * might have no open file or the wrong one.  However, we do not
1155                  * need to fsync more than one file.
1156                  */
1157                 if (sync_method != SYNC_METHOD_OPEN)
1158                 {
1159                         if (openLogFile >= 0 &&
1160                          !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1161                         {
1162                                 if (close(openLogFile) != 0)
1163                                         ereport(PANIC,
1164                                                         (errcode_for_file_access(),
1165                                         errmsg("close of log file %u, segment %u failed: %m",
1166                                                    openLogId, openLogSeg)));
1167                                 openLogFile = -1;
1168                         }
1169                         if (openLogFile < 0)
1170                         {
1171                                 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1172                                 openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
1173                                 openLogOff = 0;
1174                         }
1175                         issue_xlog_fsync();
1176                 }
1177                 LogwrtResult.Flush = LogwrtResult.Write;
1178         }
1179
1180         /*
1181          * Update shared-memory status
1182          *
1183          * We make sure that the shared 'request' values do not fall behind the
1184          * 'result' values.  This is not absolutely essential, but it saves
1185          * some code in a couple of places.
1186          */
1187         {
1188                 /* use volatile pointer to prevent code rearrangement */
1189                 volatile XLogCtlData *xlogctl = XLogCtl;
1190
1191                 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
1192                 xlogctl->LogwrtResult = LogwrtResult;
1193                 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1194                         xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1195                 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1196                         xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1197                 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
1198         }
1199
1200         Write->LogwrtResult = LogwrtResult;
1201 }
1202
1203 /*
1204  * Ensure that all XLOG data through the given position is flushed to disk.
1205  *
1206  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1207  * already held, and we try to avoid acquiring it if possible.
1208  */
1209 void
1210 XLogFlush(XLogRecPtr record)
1211 {
1212         XLogRecPtr      WriteRqstPtr;
1213         XLogwrtRqst WriteRqst;
1214
1215         /* Disabled during REDO */
1216         if (InRedo)
1217                 return;
1218
1219         /* Quick exit if already known flushed */
1220         if (XLByteLE(record, LogwrtResult.Flush))
1221                 return;
1222
1223         if (XLOG_DEBUG)
1224                 elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
1225                          record.xlogid, record.xrecoff,
1226                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1227                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1228
1229         START_CRIT_SECTION();
1230
1231         /*
1232          * Since fsync is usually a horribly expensive operation, we try to
1233          * piggyback as much data as we can on each fsync: if we see any more
1234          * data entered into the xlog buffer, we'll write and fsync that too,
1235          * so that the final value of LogwrtResult.Flush is as large as
1236          * possible. This gives us some chance of avoiding another fsync
1237          * immediately after.
1238          */
1239
1240         /* initialize to given target; may increase below */
1241         WriteRqstPtr = record;
1242
1243         /* read LogwrtResult and update local state */
1244         {
1245                 /* use volatile pointer to prevent code rearrangement */
1246                 volatile XLogCtlData *xlogctl = XLogCtl;
1247
1248                 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
1249                 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1250                         WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1251                 LogwrtResult = xlogctl->LogwrtResult;
1252                 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
1253         }
1254
1255         /* done already? */
1256         if (!XLByteLE(record, LogwrtResult.Flush))
1257         {
1258                 /* now wait for the write lock */
1259                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1260                 LogwrtResult = XLogCtl->Write.LogwrtResult;
1261                 if (!XLByteLE(record, LogwrtResult.Flush))
1262                 {
1263                         /* try to write/flush later additions to XLOG as well */
1264                         if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
1265                         {
1266                                 XLogCtlInsert *Insert = &XLogCtl->Insert;
1267                                 uint32          freespace = INSERT_FREESPACE(Insert);
1268
1269                                 if (freespace < SizeOfXLogRecord)               /* buffer is full */
1270                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1271                                 else
1272                                 {
1273                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1274                                         WriteRqstPtr.xrecoff -= freespace;
1275                                 }
1276                                 LWLockRelease(WALInsertLock);
1277                                 WriteRqst.Write = WriteRqstPtr;
1278                                 WriteRqst.Flush = WriteRqstPtr;
1279                         }
1280                         else
1281                         {
1282                                 WriteRqst.Write = WriteRqstPtr;
1283                                 WriteRqst.Flush = record;
1284                         }
1285                         XLogWrite(WriteRqst);
1286                 }
1287                 LWLockRelease(WALWriteLock);
1288         }
1289
1290         END_CRIT_SECTION();
1291
1292         /*
1293          * If we still haven't flushed to the request point then we have a
1294          * problem; most likely, the requested flush point is past end of
1295          * XLOG. This has been seen to occur when a disk page has a corrupted
1296          * LSN.
1297          *
1298          * Formerly we treated this as a PANIC condition, but that hurts the
1299          * system's robustness rather than helping it: we do not want to take
1300          * down the whole system due to corruption on one data page.  In
1301          * particular, if the bad page is encountered again during recovery
1302          * then we would be unable to restart the database at all!      (This
1303          * scenario has actually happened in the field several times with 7.1
1304          * releases. Note that we cannot get here while InRedo is true, but if
1305          * the bad page is brought in and marked dirty during recovery then
1306          * CreateCheckpoint will try to flush it at the end of recovery.)
1307          *
1308          * The current approach is to ERROR under normal conditions, but only
1309          * WARNING during recovery, so that the system can be brought up even
1310          * if there's a corrupt LSN.  Note that for calls from xact.c, the
1311          * ERROR will be promoted to PANIC since xact.c calls this routine
1312          * inside a critical section.  However, calls from bufmgr.c are not
1313          * within critical sections and so we will not force a restart for a
1314          * bad LSN on a data page.
1315          */
1316         if (XLByteLT(LogwrtResult.Flush, record))
1317                 elog(InRecovery ? WARNING : ERROR,
1318                          "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
1319                          record.xlogid, record.xrecoff,
1320                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1321 }
1322
1323 /*
1324  * Create a new XLOG file segment, or open a pre-existing one.
1325  *
1326  * log, seg: identify segment to be created/opened.
1327  *
1328  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1329  * pre-existing file will be deleted).  On return, TRUE if a pre-existing
1330  * file was used.
1331  *
1332  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1333  * place.  This should be TRUE except during bootstrap log creation.  The
1334  * caller must *not* hold the lock at call.
1335  *
1336  * Returns FD of opened file.
1337  */
1338 static int
1339 XLogFileInit(uint32 log, uint32 seg,
1340                          bool *use_existent, bool use_lock)
1341 {
1342         char            path[MAXPGPATH];
1343         char            tmppath[MAXPGPATH];
1344         char            zbuffer[BLCKSZ];
1345         int                     fd;
1346         int                     nbytes;
1347
1348         XLogFileName(path, log, seg);
1349
1350         /*
1351          * Try to use existent file (checkpoint maker may have created it
1352          * already)
1353          */
1354         if (*use_existent)
1355         {
1356                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1357                                                    S_IRUSR | S_IWUSR);
1358                 if (fd < 0)
1359                 {
1360                         if (errno != ENOENT)
1361                                 ereport(PANIC,
1362                                                 (errcode_for_file_access(),
1363                                                  errmsg("open of \"%s\" (log file %u, segment %u) failed: %m",
1364                                                                 path, log, seg)));
1365                 }
1366                 else
1367                         return (fd);
1368         }
1369
1370         /*
1371          * Initialize an empty (all zeroes) segment.  NOTE: it is possible
1372          * that another process is doing the same thing.  If so, we will end
1373          * up pre-creating an extra log segment.  That seems OK, and better
1374          * than holding the lock throughout this lengthy process.
1375          */
1376         snprintf(tmppath, MAXPGPATH, "%s/xlogtemp.%d",
1377                          XLogDir, (int) getpid());
1378
1379         unlink(tmppath);
1380
1381         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1382         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1383                                            S_IRUSR | S_IWUSR);
1384         if (fd < 0)
1385                 ereport(PANIC,
1386                                 (errcode_for_file_access(),
1387                                  errmsg("creation of file \"%s\" failed: %m", tmppath)));
1388
1389         /*
1390          * Zero-fill the file.  We have to do this the hard way to ensure that
1391          * all the file space has really been allocated --- on platforms that
1392          * allow "holes" in files, just seeking to the end doesn't allocate
1393          * intermediate space.  This way, we know that we have all the space
1394          * and (after the fsync below) that all the indirect blocks are down
1395          * on disk.  Therefore, fdatasync(2) or O_DSYNC will be sufficient to
1396          * sync future writes to the log file.
1397          */
1398         MemSet(zbuffer, 0, sizeof(zbuffer));
1399         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
1400         {
1401                 errno = 0;
1402                 if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
1403                 {
1404                         int                     save_errno = errno;
1405
1406                         /*
1407                          * If we fail to make the file, delete it to release disk
1408                          * space
1409                          */
1410                         unlink(tmppath);
1411                         /* if write didn't set errno, assume problem is no disk space */
1412                         errno = save_errno ? save_errno : ENOSPC;
1413
1414                         ereport(PANIC,
1415                                         (errcode_for_file_access(),
1416                                          errmsg("failed to write \"%s\": %m", tmppath)));
1417                 }
1418         }
1419
1420         if (pg_fsync(fd) != 0)
1421                 ereport(PANIC,
1422                                 (errcode_for_file_access(),
1423                                  errmsg("fsync of file \"%s\" failed: %m", tmppath)));
1424
1425         close(fd);
1426
1427         /*
1428          * Now move the segment into place with its final name.
1429          *
1430          * If caller didn't want to use a pre-existing file, get rid of any
1431          * pre-existing file.  Otherwise, cope with possibility that someone
1432          * else has created the file while we were filling ours: if so, use
1433          * ours to pre-create a future log segment.
1434          */
1435         if (!InstallXLogFileSegment(log, seg, tmppath,
1436                                                                 *use_existent, XLOGfileslop,
1437                                                                 use_lock))
1438         {
1439                 /* No need for any more future segments... */
1440                 unlink(tmppath);
1441         }
1442
1443         /* Set flag to tell caller there was no existent file */
1444         *use_existent = false;
1445
1446         /* Now open original target segment (might not be file I just made) */
1447         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1448                                            S_IRUSR | S_IWUSR);
1449         if (fd < 0)
1450                 ereport(PANIC,
1451                                 (errcode_for_file_access(),
1452                         errmsg("open of \"%s\" (log file %u, segment %u) failed: %m",
1453                                    path, log, seg)));
1454
1455         return (fd);
1456 }
1457
1458 /*
1459  * Install a new XLOG segment file as a current or future log segment.
1460  *
1461  * This is used both to install a newly-created segment (which has a temp
1462  * filename while it's being created) and to recycle an old segment.
1463  *
1464  * log, seg: identify segment to install as (or first possible target).
1465  *
1466  * tmppath: initial name of file to install.  It will be renamed into place.
1467  *
1468  * find_free: if TRUE, install the new segment at the first empty log/seg
1469  * number at or after the passed numbers.  If FALSE, install the new segment
1470  * exactly where specified, deleting any existing segment file there.
1471  *
1472  * max_advance: maximum number of log/seg slots to advance past the starting
1473  * point.  Fail if no free slot is found in this range.  (Irrelevant if
1474  * find_free is FALSE.)
1475  *
1476  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1477  * place.  This should be TRUE except during bootstrap log creation.  The
1478  * caller must *not* hold the lock at call.
1479  *
1480  * Returns TRUE if file installed, FALSE if not installed because of
1481  * exceeding max_advance limit.  (Any other kind of failure causes ereport().)
1482  */
1483 static bool
1484 InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
1485                                            bool find_free, int max_advance,
1486                                            bool use_lock)
1487 {
1488         char            path[MAXPGPATH];
1489         struct stat stat_buf;
1490
1491         XLogFileName(path, log, seg);
1492
1493         /*
1494          * We want to be sure that only one process does this at a time.
1495          */
1496         if (use_lock)
1497                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1498
1499         if (!find_free)
1500         {
1501                 /* Force installation: get rid of any pre-existing segment file */
1502                 unlink(path);
1503         }
1504         else
1505         {
1506                 /* Find a free slot to put it in */
1507                 while (stat(path, &stat_buf) == 0)
1508                 {
1509                         if (--max_advance < 0)
1510                         {
1511                                 /* Failed to find a free slot within specified range */
1512                                 if (use_lock)
1513                                         LWLockRelease(ControlFileLock);
1514                                 return false;
1515                         }
1516                         NextLogSeg(log, seg);
1517                         XLogFileName(path, log, seg);
1518                 }
1519         }
1520
1521         /*
1522          * Prefer link() to rename() here just to be really sure that we don't
1523          * overwrite an existing logfile.  However, there shouldn't be one, so
1524          * rename() is an acceptable substitute except for the truly paranoid.
1525          */
1526 #if HAVE_WORKING_LINK
1527         if (link(tmppath, path) < 0)
1528                 ereport(PANIC,
1529                                 (errcode_for_file_access(),
1530                                  errmsg("link from \"%s\" to \"%s\" (initialization of log file %u, segment %u) failed: %m",
1531                                                 tmppath, path, log, seg)));
1532         unlink(tmppath);
1533 #else
1534         if (rename(tmppath, path) < 0)
1535                 ereport(PANIC,
1536                                 (errcode_for_file_access(),
1537                                  errmsg("rename from \"%s\" to \"%s\" (initialization of log file %u, segment %u) failed: %m",
1538                                                 tmppath, path, log, seg)));
1539 #endif
1540
1541         if (use_lock)
1542                 LWLockRelease(ControlFileLock);
1543
1544         return true;
1545 }
1546
1547 /*
1548  * Open a pre-existing logfile segment.
1549  */
1550 static int
1551 XLogFileOpen(uint32 log, uint32 seg, bool econt)
1552 {
1553         char            path[MAXPGPATH];
1554         int                     fd;
1555
1556         XLogFileName(path, log, seg);
1557
1558         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1559                                            S_IRUSR | S_IWUSR);
1560         if (fd < 0)
1561         {
1562                 if (econt && errno == ENOENT)
1563                 {
1564                         ereport(LOG,
1565                                         (errcode_for_file_access(),
1566                         errmsg("open of \"%s\" (log file %u, segment %u) failed: %m",
1567                                    path, log, seg)));
1568                         return (fd);
1569                 }
1570                 ereport(PANIC,
1571                                 (errcode_for_file_access(),
1572                         errmsg("open of \"%s\" (log file %u, segment %u) failed: %m",
1573                                    path, log, seg)));
1574         }
1575
1576         return (fd);
1577 }
1578
1579 /*
1580  * Preallocate log files beyond the specified log endpoint, according to
1581  * the XLOGfile user parameter.
1582  */
1583 static void
1584 PreallocXlogFiles(XLogRecPtr endptr)
1585 {
1586         uint32          _logId;
1587         uint32          _logSeg;
1588         int                     lf;
1589         bool            use_existent;
1590
1591         XLByteToPrevSeg(endptr, _logId, _logSeg);
1592         if ((endptr.xrecoff - 1) % XLogSegSize >=
1593                 (uint32) (0.75 * XLogSegSize))
1594         {
1595                 NextLogSeg(_logId, _logSeg);
1596                 use_existent = true;
1597                 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
1598                 close(lf);
1599         }
1600 }
1601
1602 /*
1603  * Remove or move offline all log files older or equal to passed log/seg#
1604  *
1605  * endptr is current (or recent) end of xlog; this is used to determine
1606  * whether we want to recycle rather than delete no-longer-wanted log files.
1607  */
1608 static void
1609 MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr)
1610 {
1611         uint32          endlogId;
1612         uint32          endlogSeg;
1613         DIR                *xldir;
1614         struct dirent *xlde;
1615         char            lastoff[32];
1616         char            path[MAXPGPATH];
1617
1618         XLByteToPrevSeg(endptr, endlogId, endlogSeg);
1619
1620         xldir = opendir(XLogDir);
1621         if (xldir == NULL)
1622                 ereport(PANIC,
1623                                 (errcode_for_file_access(),
1624                         errmsg("could not open transaction log directory \"%s\": %m",
1625                                    XLogDir)));
1626
1627         sprintf(lastoff, "%08X%08X", log, seg);
1628
1629         errno = 0;
1630         while ((xlde = readdir(xldir)) != NULL)
1631         {
1632                 if (strlen(xlde->d_name) == 16 &&
1633                         strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
1634                         strcmp(xlde->d_name, lastoff) <= 0)
1635                 {
1636                         snprintf(path, MAXPGPATH, "%s/%s", XLogDir, xlde->d_name);
1637                         if (XLOG_archive_dir[0])
1638                         {
1639                                 ereport(LOG,
1640                                                 (errmsg("archiving transaction log file \"%s\"",
1641                                                                 xlde->d_name)));
1642                                 elog(WARNING, "archiving log files is not implemented");
1643                         }
1644                         else
1645                         {
1646                                 /*
1647                                  * Before deleting the file, see if it can be recycled as
1648                                  * a future log segment.  We allow recycling segments up
1649                                  * to XLOGfileslop segments beyond the current XLOG
1650                                  * location.
1651                                  */
1652                                 if (InstallXLogFileSegment(endlogId, endlogSeg, path,
1653                                                                                    true, XLOGfileslop,
1654                                                                                    true))
1655                                 {
1656                                         ereport(LOG,
1657                                                   (errmsg("recycled transaction log file \"%s\"",
1658                                                                   xlde->d_name)));
1659                                 }
1660                                 else
1661                                 {
1662                                         /* No need for any more future segments... */
1663                                         ereport(LOG,
1664                                                   (errmsg("removing transaction log file \"%s\"",
1665                                                                   xlde->d_name)));
1666                                         unlink(path);
1667                                 }
1668                         }
1669                 }
1670                 errno = 0;
1671         }
1672         if (errno)
1673                 ereport(PANIC,
1674                                 (errcode_for_file_access(),
1675                         errmsg("could not read transaction log directory \"%s\": %m",
1676                                    XLogDir)));
1677         closedir(xldir);
1678 }
1679
1680 /*
1681  * Restore the backup blocks present in an XLOG record, if any.
1682  *
1683  * We assume all of the record has been read into memory at *record.
1684  */
1685 static void
1686 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
1687 {
1688         Relation        reln;
1689         Buffer          buffer;
1690         Page            page;
1691         BkpBlock        bkpb;
1692         char       *blk;
1693         int                     i;
1694
1695         blk = (char *) XLogRecGetData(record) + record->xl_len;
1696         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1697         {
1698                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1699                         continue;
1700
1701                 memcpy((char *) &bkpb, blk, sizeof(BkpBlock));
1702                 blk += sizeof(BkpBlock);
1703
1704                 reln = XLogOpenRelation(true, record->xl_rmid, bkpb.node);
1705
1706                 if (reln)
1707                 {
1708                         buffer = XLogReadBuffer(true, reln, bkpb.block);
1709                         if (BufferIsValid(buffer))
1710                         {
1711                                 page = (Page) BufferGetPage(buffer);
1712                                 memcpy((char *) page, blk, BLCKSZ);
1713                                 PageSetLSN(page, lsn);
1714                                 PageSetSUI(page, ThisStartUpID);
1715                                 UnlockAndWriteBuffer(buffer);
1716                         }
1717                 }
1718
1719                 blk += BLCKSZ;
1720         }
1721 }
1722
1723 /*
1724  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
1725  * record (other than to the minimal extent of computing the amount of
1726  * data to read in) until we've checked the CRCs.
1727  *
1728  * We assume all of the record has been read into memory at *record.
1729  */
1730 static bool
1731 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
1732 {
1733         crc64           crc;
1734         crc64           cbuf;
1735         int                     i;
1736         uint32          len = record->xl_len;
1737         char       *blk;
1738
1739         /* Check CRC of rmgr data and record header */
1740         INIT_CRC64(crc);
1741         COMP_CRC64(crc, XLogRecGetData(record), len);
1742         COMP_CRC64(crc, (char *) record + sizeof(crc64),
1743                            SizeOfXLogRecord - sizeof(crc64));
1744         FIN_CRC64(crc);
1745
1746         if (!EQ_CRC64(record->xl_crc, crc))
1747         {
1748                 ereport(emode,
1749                  (errmsg("bad resource manager data checksum in record at %X/%X",
1750                                  recptr.xlogid, recptr.xrecoff)));
1751                 return (false);
1752         }
1753
1754         /* Check CRCs of backup blocks, if any */
1755         blk = (char *) XLogRecGetData(record) + len;
1756         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1757         {
1758                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1759                         continue;
1760
1761                 INIT_CRC64(crc);
1762                 COMP_CRC64(crc, blk + sizeof(BkpBlock), BLCKSZ);
1763                 COMP_CRC64(crc, blk + sizeof(crc64),
1764                                    sizeof(BkpBlock) - sizeof(crc64));
1765                 FIN_CRC64(crc);
1766                 memcpy((char *) &cbuf, blk, sizeof(crc64));             /* don't assume
1767                                                                                                                  * alignment */
1768
1769                 if (!EQ_CRC64(cbuf, crc))
1770                 {
1771                         ereport(emode,
1772                         (errmsg("bad checksum of backup block %d in record at %X/%X",
1773                                         i + 1, recptr.xlogid, recptr.xrecoff)));
1774                         return (false);
1775                 }
1776                 blk += sizeof(BkpBlock) + BLCKSZ;
1777         }
1778
1779         return (true);
1780 }
1781
1782 /*
1783  * Attempt to read an XLOG record.
1784  *
1785  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
1786  * try to read a record just after the last one previously read.
1787  *
1788  * If no valid record is available, returns NULL, or fails if emode is PANIC.
1789  * (emode must be either PANIC or LOG.)
1790  *
1791  * buffer is a workspace at least _INTL_MAXLOGRECSZ bytes long.  It is needed
1792  * to reassemble a record that crosses block boundaries.  Note that on
1793  * successful return, the returned record pointer always points at buffer.
1794  */
1795 static XLogRecord *
1796 ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer)
1797 {
1798         XLogRecord *record;
1799         XLogRecPtr      tmpRecPtr = EndRecPtr;
1800         uint32          len,
1801                                 total_len;
1802         uint32          targetPageOff;
1803         unsigned        i;
1804         bool            nextmode = false;
1805
1806         if (readBuf == NULL)
1807         {
1808                 /*
1809                  * First time through, permanently allocate readBuf.  We do it
1810                  * this way, rather than just making a static array, for two
1811                  * reasons: (1) no need to waste the storage in most
1812                  * instantiations of the backend; (2) a static char array isn't
1813                  * guaranteed to have any particular alignment, whereas malloc()
1814                  * will provide MAXALIGN'd storage.
1815                  */
1816                 readBuf = (char *) malloc(BLCKSZ);
1817                 Assert(readBuf != NULL);
1818         }
1819
1820         if (RecPtr == NULL)
1821         {
1822                 RecPtr = &tmpRecPtr;
1823                 nextmode = true;
1824                 /* fast case if next record is on same page */
1825                 if (nextRecord != NULL)
1826                 {
1827                         record = nextRecord;
1828                         goto got_record;
1829                 }
1830                 /* align old recptr to next page */
1831                 if (tmpRecPtr.xrecoff % BLCKSZ != 0)
1832                         tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ);
1833                 if (tmpRecPtr.xrecoff >= XLogFileSize)
1834                 {
1835                         (tmpRecPtr.xlogid)++;
1836                         tmpRecPtr.xrecoff = 0;
1837                 }
1838                 tmpRecPtr.xrecoff += SizeOfXLogPHD;
1839         }
1840         else if (!XRecOffIsValid(RecPtr->xrecoff))
1841                 ereport(PANIC,
1842                                 (errmsg("invalid record offset at %X/%X",
1843                                                 RecPtr->xlogid, RecPtr->xrecoff)));
1844
1845         if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
1846         {
1847                 close(readFile);
1848                 readFile = -1;
1849         }
1850         XLByteToSeg(*RecPtr, readId, readSeg);
1851         if (readFile < 0)
1852         {
1853                 readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
1854                 if (readFile < 0)
1855                         goto next_record_is_invalid;
1856                 readOff = (uint32) (-1);        /* force read to occur below */
1857         }
1858
1859         targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / BLCKSZ) * BLCKSZ;
1860         if (readOff != targetPageOff)
1861         {
1862                 readOff = targetPageOff;
1863                 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
1864                 {
1865                         ereport(emode,
1866                                         (errcode_for_file_access(),
1867                                          errmsg("lseek of log file %u, segment %u, offset %u failed: %m",
1868                                                         readId, readSeg, readOff)));
1869                         goto next_record_is_invalid;
1870                 }
1871                 if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
1872                 {
1873                         ereport(emode,
1874                                         (errcode_for_file_access(),
1875                                          errmsg("read of log file %u, segment %u, offset %u failed: %m",
1876                                                         readId, readSeg, readOff)));
1877                         goto next_record_is_invalid;
1878                 }
1879                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, nextmode))
1880                         goto next_record_is_invalid;
1881         }
1882         if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
1883                 RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD)
1884         {
1885                 ereport(emode,
1886                                 (errmsg("contrecord is requested by %X/%X",
1887                                                 RecPtr->xlogid, RecPtr->xrecoff)));
1888                 goto next_record_is_invalid;
1889         }
1890         record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ);
1891
1892 got_record:;
1893
1894         /*
1895          * Currently, xl_len == 0 must be bad data, but that might not be true
1896          * forever.  See note in XLogInsert.
1897          */
1898         if (record->xl_len == 0)
1899         {
1900                 ereport(emode,
1901                                 (errmsg("record with zero length at %X/%X",
1902                                                 RecPtr->xlogid, RecPtr->xrecoff)));
1903                 goto next_record_is_invalid;
1904         }
1905
1906         /*
1907          * Compute total length of record including any appended backup
1908          * blocks.
1909          */
1910         total_len = SizeOfXLogRecord + record->xl_len;
1911         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
1912         {
1913                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
1914                         continue;
1915                 total_len += sizeof(BkpBlock) + BLCKSZ;
1916         }
1917
1918         /*
1919          * Make sure it will fit in buffer (currently, it is mechanically
1920          * impossible for this test to fail, but it seems like a good idea
1921          * anyway).
1922          */
1923         if (total_len > _INTL_MAXLOGRECSZ)
1924         {
1925                 ereport(emode,
1926                                 (errmsg("record length %u at %X/%X too long",
1927                                                 total_len, RecPtr->xlogid, RecPtr->xrecoff)));
1928                 goto next_record_is_invalid;
1929         }
1930         if (record->xl_rmid > RM_MAX_ID)
1931         {
1932                 ereport(emode,
1933                                 (errmsg("invalid resource manager id %u at %X/%X",
1934                                          record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
1935                 goto next_record_is_invalid;
1936         }
1937         nextRecord = NULL;
1938         len = BLCKSZ - RecPtr->xrecoff % BLCKSZ;
1939         if (total_len > len)
1940         {
1941                 /* Need to reassemble record */
1942                 XLogContRecord *contrecord;
1943                 uint32          gotlen = len;
1944
1945                 memcpy(buffer, record, len);
1946                 record = (XLogRecord *) buffer;
1947                 buffer += len;
1948                 for (;;)
1949                 {
1950                         readOff += BLCKSZ;
1951                         if (readOff >= XLogSegSize)
1952                         {
1953                                 close(readFile);
1954                                 readFile = -1;
1955                                 NextLogSeg(readId, readSeg);
1956                                 readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
1957                                 if (readFile < 0)
1958                                         goto next_record_is_invalid;
1959                                 readOff = 0;
1960                         }
1961                         if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
1962                         {
1963                                 ereport(emode,
1964                                                 (errcode_for_file_access(),
1965                                                  errmsg("read of log file %u, segment %u, offset %u failed: %m",
1966                                                                 readId, readSeg, readOff)));
1967                                 goto next_record_is_invalid;
1968                         }
1969                         if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode, true))
1970                                 goto next_record_is_invalid;
1971                         if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
1972                         {
1973                                 ereport(emode,
1974                                                 (errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
1975                                                                 readId, readSeg, readOff)));
1976                                 goto next_record_is_invalid;
1977                         }
1978                         contrecord = (XLogContRecord *) ((char *) readBuf + SizeOfXLogPHD);
1979                         if (contrecord->xl_rem_len == 0 ||
1980                                 total_len != (contrecord->xl_rem_len + gotlen))
1981                         {
1982                                 ereport(emode,
1983                                                 (errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
1984                                                                 contrecord->xl_rem_len,
1985                                                                 readId, readSeg, readOff)));
1986                                 goto next_record_is_invalid;
1987                         }
1988                         len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
1989                         if (contrecord->xl_rem_len > len)
1990                         {
1991                                 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
1992                                 gotlen += len;
1993                                 buffer += len;
1994                                 continue;
1995                         }
1996                         memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
1997                                    contrecord->xl_rem_len);
1998                         break;
1999                 }
2000                 if (!RecordIsValid(record, *RecPtr, emode))
2001                         goto next_record_is_invalid;
2002                 if (BLCKSZ - SizeOfXLogRecord >= SizeOfXLogPHD +
2003                         SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len))
2004                 {
2005                         nextRecord = (XLogRecord *) ((char *) contrecord +
2006                                 SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len));
2007                 }
2008                 EndRecPtr.xlogid = readId;
2009                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
2010                         SizeOfXLogPHD + SizeOfXLogContRecord +
2011                         MAXALIGN(contrecord->xl_rem_len);
2012                 ReadRecPtr = *RecPtr;
2013                 return record;
2014         }
2015
2016         /* Record does not cross a page boundary */
2017         if (!RecordIsValid(record, *RecPtr, emode))
2018                 goto next_record_is_invalid;
2019         if (BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % BLCKSZ +
2020                 MAXALIGN(total_len))
2021                 nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
2022         EndRecPtr.xlogid = RecPtr->xlogid;
2023         EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
2024         ReadRecPtr = *RecPtr;
2025         memcpy(buffer, record, total_len);
2026         return (XLogRecord *) buffer;
2027
2028 next_record_is_invalid:;
2029         close(readFile);
2030         readFile = -1;
2031         nextRecord = NULL;
2032         return NULL;
2033 }
2034
2035 /*
2036  * Check whether the xlog header of a page just read in looks valid.
2037  *
2038  * This is just a convenience subroutine to avoid duplicated code in
2039  * ReadRecord.  It's not intended for use from anywhere else.
2040  */
2041 static bool
2042 ValidXLOGHeader(XLogPageHeader hdr, int emode, bool checkSUI)
2043 {
2044         XLogRecPtr      recaddr;
2045
2046         if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
2047         {
2048                 ereport(emode,
2049                                 (errmsg("invalid magic number %04X in log file %u, segment %u, offset %u",
2050                                                 hdr->xlp_magic, readId, readSeg, readOff)));
2051                 return false;
2052         }
2053         if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
2054         {
2055                 ereport(emode,
2056                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
2057                                                 hdr->xlp_info, readId, readSeg, readOff)));
2058                 return false;
2059         }
2060         recaddr.xlogid = readId;
2061         recaddr.xrecoff = readSeg * XLogSegSize + readOff;
2062         if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
2063         {
2064                 ereport(emode,
2065                                 (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
2066                                          hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
2067                                                 readId, readSeg, readOff)));
2068                 return false;
2069         }
2070
2071         /*
2072          * We disbelieve a SUI less than the previous page's SUI, or more than
2073          * a few counts greater.  In theory as many as 512 shutdown checkpoint
2074          * records could appear on a 32K-sized xlog page, so that's the most
2075          * differential there could legitimately be.
2076          *
2077          * Note this check can only be applied when we are reading the next page
2078          * in sequence, so ReadRecord passes a flag indicating whether to
2079          * check.
2080          */
2081         if (checkSUI)
2082         {
2083                 if (hdr->xlp_sui < lastReadSUI ||
2084                         hdr->xlp_sui > lastReadSUI + 512)
2085                 {
2086                         ereport(emode,
2087                         /* translator: SUI = startup id */
2088                                         (errmsg("out-of-sequence SUI %u (after %u) in log file %u, segment %u, offset %u",
2089                                                         hdr->xlp_sui, lastReadSUI,
2090                                                         readId, readSeg, readOff)));
2091                         return false;
2092                 }
2093         }
2094         lastReadSUI = hdr->xlp_sui;
2095         return true;
2096 }
2097
2098 /*
2099  * I/O routines for pg_control
2100  *
2101  * *ControlFile is a buffer in shared memory that holds an image of the
2102  * contents of pg_control.      WriteControlFile() initializes pg_control
2103  * given a preloaded buffer, ReadControlFile() loads the buffer from
2104  * the pg_control file (during postmaster or standalone-backend startup),
2105  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
2106  *
2107  * For simplicity, WriteControlFile() initializes the fields of pg_control
2108  * that are related to checking backend/database compatibility, and
2109  * ReadControlFile() verifies they are correct.  We could split out the
2110  * I/O and compatibility-check functions, but there seems no need currently.
2111  */
2112
2113 void
2114 XLOGPathInit(void)
2115 {
2116         /* Init XLOG file paths */
2117         snprintf(XLogDir, MAXPGPATH, "%s/pg_xlog", DataDir);
2118         snprintf(ControlFilePath, MAXPGPATH, "%s/global/pg_control", DataDir);
2119 }
2120
2121 static void
2122 WriteControlFile(void)
2123 {
2124         int                     fd;
2125         char            buffer[BLCKSZ]; /* need not be aligned */
2126         char       *localeptr;
2127
2128         /*
2129          * Initialize version and compatibility-check fields
2130          */
2131         ControlFile->pg_control_version = PG_CONTROL_VERSION;
2132         ControlFile->catalog_version_no = CATALOG_VERSION_NO;
2133         ControlFile->blcksz = BLCKSZ;
2134         ControlFile->relseg_size = RELSEG_SIZE;
2135
2136         ControlFile->nameDataLen = NAMEDATALEN;
2137         ControlFile->funcMaxArgs = FUNC_MAX_ARGS;
2138
2139 #ifdef HAVE_INT64_TIMESTAMP
2140         ControlFile->enableIntTimes = TRUE;
2141 #else
2142         ControlFile->enableIntTimes = FALSE;
2143 #endif
2144
2145         ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
2146         localeptr = setlocale(LC_COLLATE, NULL);
2147         if (!localeptr)
2148                 ereport(PANIC,
2149                                 (errmsg("invalid LC_COLLATE setting")));
2150         StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
2151         localeptr = setlocale(LC_CTYPE, NULL);
2152         if (!localeptr)
2153                 ereport(PANIC,
2154                                 (errmsg("invalid LC_CTYPE setting")));
2155         StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
2156
2157         /* Contents are protected with a CRC */
2158         INIT_CRC64(ControlFile->crc);
2159         COMP_CRC64(ControlFile->crc,
2160                            (char *) ControlFile + sizeof(crc64),
2161                            sizeof(ControlFileData) - sizeof(crc64));
2162         FIN_CRC64(ControlFile->crc);
2163
2164         /*
2165          * We write out BLCKSZ bytes into pg_control, zero-padding the excess
2166          * over sizeof(ControlFileData).  This reduces the odds of
2167          * premature-EOF errors when reading pg_control.  We'll still fail
2168          * when we check the contents of the file, but hopefully with a more
2169          * specific error than "couldn't read pg_control".
2170          */
2171         if (sizeof(ControlFileData) > BLCKSZ)
2172                 ereport(PANIC,
2173                                 (errmsg("sizeof(ControlFileData) is larger than BLCKSZ; fix either one")));
2174
2175         memset(buffer, 0, BLCKSZ);
2176         memcpy(buffer, ControlFile, sizeof(ControlFileData));
2177
2178         fd = BasicOpenFile(ControlFilePath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
2179                                            S_IRUSR | S_IWUSR);
2180         if (fd < 0)
2181                 ereport(PANIC,
2182                                 (errcode_for_file_access(),
2183                                  errmsg("could not create control file \"%s\": %m",
2184                                                 ControlFilePath)));
2185
2186         errno = 0;
2187         if (write(fd, buffer, BLCKSZ) != BLCKSZ)
2188         {
2189                 /* if write didn't set errno, assume problem is no disk space */
2190                 if (errno == 0)
2191                         errno = ENOSPC;
2192                 ereport(PANIC,
2193                                 (errcode_for_file_access(),
2194                                  errmsg("write to control file failed: %m")));
2195         }
2196
2197         if (pg_fsync(fd) != 0)
2198                 ereport(PANIC,
2199                                 (errcode_for_file_access(),
2200                                  errmsg("fsync of control file failed: %m")));
2201
2202         close(fd);
2203 }
2204
2205 static void
2206 ReadControlFile(void)
2207 {
2208         crc64           crc;
2209         int                     fd;
2210
2211         /*
2212          * Read data...
2213          */
2214         fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
2215         if (fd < 0)
2216                 ereport(PANIC,
2217                                 (errcode_for_file_access(),
2218                                  errmsg("could not open control file \"%s\": %m",
2219                                                 ControlFilePath)));
2220
2221         if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
2222                 ereport(PANIC,
2223                                 (errcode_for_file_access(),
2224                                  errmsg("read from control file failed: %m")));
2225
2226         close(fd);
2227
2228         /*
2229          * Check for expected pg_control format version.  If this is wrong,
2230          * the CRC check will likely fail because we'll be checking the wrong
2231          * number of bytes.  Complaining about wrong version will probably be
2232          * more enlightening than complaining about wrong CRC.
2233          */
2234         if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
2235                 ereport(FATAL,
2236                                 (errmsg("database files are incompatible with server"),
2237                                  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
2238                           " but the server was compiled with PG_CONTROL_VERSION %d.",
2239                                         ControlFile->pg_control_version, PG_CONTROL_VERSION),
2240                                  errhint("It looks like you need to initdb.")));
2241         /* Now check the CRC. */
2242         INIT_CRC64(crc);
2243         COMP_CRC64(crc,
2244                            (char *) ControlFile + sizeof(crc64),
2245                            sizeof(ControlFileData) - sizeof(crc64));
2246         FIN_CRC64(crc);
2247
2248         if (!EQ_CRC64(crc, ControlFile->crc))
2249                 ereport(FATAL,
2250                                 (errmsg("invalid checksum in control file")));
2251
2252         /*
2253          * Do compatibility checking immediately.  We do this here for 2
2254          * reasons:
2255          *
2256          * (1) if the database isn't compatible with the backend executable, we
2257          * want to abort before we can possibly do any damage;
2258          *
2259          * (2) this code is executed in the postmaster, so the setlocale() will
2260          * propagate to forked backends, which aren't going to read this file
2261          * for themselves.      (These locale settings are considered critical
2262          * compatibility items because they can affect sort order of indexes.)
2263          */
2264         if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
2265                 ereport(FATAL,
2266                                 (errmsg("database files are incompatible with server"),
2267                                  errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
2268                           " but the server was compiled with CATALOG_VERSION_NO %d.",
2269                                         ControlFile->catalog_version_no, CATALOG_VERSION_NO),
2270                                  errhint("It looks like you need to initdb.")));
2271         if (ControlFile->blcksz != BLCKSZ)
2272                 ereport(FATAL,
2273                                 (errmsg("database files are incompatible with server"),
2274                  errdetail("The database cluster was initialized with BLCKSZ %d,"
2275                                    " but the server was compiled with BLCKSZ %d.",
2276                                    ControlFile->blcksz, BLCKSZ),
2277                          errhint("It looks like you need to recompile or initdb.")));
2278         if (ControlFile->relseg_size != RELSEG_SIZE)
2279                 ereport(FATAL,
2280                                 (errmsg("database files are incompatible with server"),
2281                                  errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
2282                                          " but the server was compiled with RELSEG_SIZE %d.",
2283                                                    ControlFile->relseg_size, RELSEG_SIZE),
2284                          errhint("It looks like you need to recompile or initdb.")));
2285         if (ControlFile->nameDataLen != NAMEDATALEN)
2286                 ereport(FATAL,
2287                                 (errmsg("database files are incompatible with server"),
2288                                  errdetail("The database cluster was initialized with NAMEDATALEN %d,"
2289                                          " but the server was compiled with NAMEDATALEN %d.",
2290                                                    ControlFile->nameDataLen, NAMEDATALEN),
2291                          errhint("It looks like you need to recompile or initdb.")));
2292         if (ControlFile->funcMaxArgs != FUNC_MAX_ARGS)
2293                 ereport(FATAL,
2294                                 (errmsg("database files are incompatible with server"),
2295                                  errdetail("The database cluster was initialized with FUNC_MAX_ARGS %d,"
2296                                    " but the server was compiled with FUNC_MAX_ARGS %d.",
2297                                                    ControlFile->funcMaxArgs, FUNC_MAX_ARGS),
2298                          errhint("It looks like you need to recompile or initdb.")));
2299
2300 #ifdef HAVE_INT64_TIMESTAMP
2301         if (ControlFile->enableIntTimes != TRUE)
2302                 ereport(FATAL,
2303                                 (errmsg("database files are incompatible with server"),
2304                                  errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
2305                           " but the server was compiled with HAVE_INT64_TIMESTAMP."),
2306                          errhint("It looks like you need to recompile or initdb.")));
2307 #else
2308         if (ControlFile->enableIntTimes != FALSE)
2309                 ereport(FATAL,
2310                                 (errmsg("database files are incompatible with server"),
2311                                  errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
2312                    " but the server was compiled without HAVE_INT64_TIMESTAMP."),
2313                          errhint("It looks like you need to recompile or initdb.")));
2314 #endif
2315
2316         if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
2317                 ereport(FATAL,
2318                                 (errmsg("database files are incompatible with server"),
2319                                  errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
2320                           " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
2321                                                    ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
2322                          errhint("It looks like you need to recompile or initdb.")));
2323         if (setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
2324                 ereport(FATAL,
2325                 (errmsg("database files are incompatible with operating system"),
2326                  errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
2327                                    " which is not recognized by setlocale().",
2328                                    ControlFile->lc_collate),
2329                  errhint("It looks like you need to initdb or install locale support.")));
2330         if (setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
2331                 ereport(FATAL,
2332                 (errmsg("database files are incompatible with operating system"),
2333                  errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
2334                                    " which is not recognized by setlocale().",
2335                                    ControlFile->lc_ctype),
2336                  errhint("It looks like you need to initdb or install locale support.")));
2337
2338         /* Make the fixed locale settings visible as GUC variables, too */
2339         SetConfigOption("lc_collate", ControlFile->lc_collate,
2340                                         PGC_INTERNAL, PGC_S_OVERRIDE);
2341         SetConfigOption("lc_ctype", ControlFile->lc_ctype,
2342                                         PGC_INTERNAL, PGC_S_OVERRIDE);
2343 }
2344
2345 void
2346 UpdateControlFile(void)
2347 {
2348         int                     fd;
2349
2350         INIT_CRC64(ControlFile->crc);
2351         COMP_CRC64(ControlFile->crc,
2352                            (char *) ControlFile + sizeof(crc64),
2353                            sizeof(ControlFileData) - sizeof(crc64));
2354         FIN_CRC64(ControlFile->crc);
2355
2356         fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
2357         if (fd < 0)
2358                 ereport(PANIC,
2359                                 (errcode_for_file_access(),
2360                                  errmsg("could not open control file \"%s\": %m",
2361                                                 ControlFilePath)));
2362
2363         errno = 0;
2364         if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
2365         {
2366                 /* if write didn't set errno, assume problem is no disk space */
2367                 if (errno == 0)
2368                         errno = ENOSPC;
2369                 ereport(PANIC,
2370                                 (errcode_for_file_access(),
2371                                  errmsg("write to control file failed: %m")));
2372         }
2373
2374         if (pg_fsync(fd) != 0)
2375                 ereport(PANIC,
2376                                 (errcode_for_file_access(),
2377                                  errmsg("fsync of control file failed: %m")));
2378
2379         close(fd);
2380 }
2381
2382 /*
2383  * Initialization of shared memory for XLOG
2384  */
2385
2386 int
2387 XLOGShmemSize(void)
2388 {
2389         if (XLOGbuffers < MinXLOGbuffers)
2390                 XLOGbuffers = MinXLOGbuffers;
2391
2392         return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
2393                 + BLCKSZ * XLOGbuffers +
2394                 MAXALIGN(sizeof(ControlFileData));
2395 }
2396
2397 void
2398 XLOGShmemInit(void)
2399 {
2400         bool            found;
2401
2402         /* this must agree with space requested by XLOGShmemSize() */
2403         if (XLOGbuffers < MinXLOGbuffers)
2404                 XLOGbuffers = MinXLOGbuffers;
2405
2406         XLogCtl = (XLogCtlData *)
2407                 ShmemInitStruct("XLOG Ctl",
2408                                                 MAXALIGN(sizeof(XLogCtlData) +
2409                                                                  sizeof(XLogRecPtr) * XLOGbuffers)
2410                                                 + BLCKSZ * XLOGbuffers,
2411                                                 &found);
2412         Assert(!found);
2413         ControlFile = (ControlFileData *)
2414                 ShmemInitStruct("Control File", sizeof(ControlFileData), &found);
2415         Assert(!found);
2416
2417         memset(XLogCtl, 0, sizeof(XLogCtlData));
2418
2419         /*
2420          * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be
2421          * a multiple of the alignment for same, so no extra alignment padding
2422          * is needed here.
2423          */
2424         XLogCtl->xlblocks = (XLogRecPtr *)
2425                 (((char *) XLogCtl) + sizeof(XLogCtlData));
2426         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
2427
2428         /*
2429          * Here, on the other hand, we must MAXALIGN to ensure the page
2430          * buffers have worst-case alignment.
2431          */
2432         XLogCtl->pages =
2433                 ((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) +
2434                                                                           sizeof(XLogRecPtr) * XLOGbuffers);
2435         memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);
2436
2437         /*
2438          * Do basic initialization of XLogCtl shared data. (StartupXLOG will
2439          * fill in additional info.)
2440          */
2441         XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
2442         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
2443         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
2444         SpinLockInit(&XLogCtl->info_lck);
2445
2446         /*
2447          * If we are not in bootstrap mode, pg_control should already exist.
2448          * Read and validate it immediately (see comments in ReadControlFile()
2449          * for the reasons why).
2450          */
2451         if (!IsBootstrapProcessingMode())
2452                 ReadControlFile();
2453 }
2454
2455 /*
2456  * This func must be called ONCE on system install.  It creates pg_control
2457  * and the initial XLOG segment.
2458  */
2459 void
2460 BootStrapXLOG(void)
2461 {
2462         CheckPoint      checkPoint;
2463         char       *buffer;
2464         XLogPageHeader page;
2465         XLogRecord *record;
2466         bool            use_existent;
2467         crc64           crc;
2468
2469         /* Use malloc() to ensure buffer is MAXALIGNED */
2470         buffer = (char *) malloc(BLCKSZ);
2471         page = (XLogPageHeader) buffer;
2472
2473         checkPoint.redo.xlogid = 0;
2474         checkPoint.redo.xrecoff = SizeOfXLogPHD;
2475         checkPoint.undo = checkPoint.redo;
2476         checkPoint.ThisStartUpID = 0;
2477         checkPoint.nextXid = FirstNormalTransactionId;
2478         checkPoint.nextOid = BootstrapObjectIdData;
2479         checkPoint.time = time(NULL);
2480
2481         ShmemVariableCache->nextXid = checkPoint.nextXid;
2482         ShmemVariableCache->nextOid = checkPoint.nextOid;
2483         ShmemVariableCache->oidCount = 0;
2484
2485         memset(buffer, 0, BLCKSZ);
2486         page->xlp_magic = XLOG_PAGE_MAGIC;
2487         page->xlp_info = 0;
2488         page->xlp_sui = checkPoint.ThisStartUpID;
2489         page->xlp_pageaddr.xlogid = 0;
2490         page->xlp_pageaddr.xrecoff = 0;
2491         record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
2492         record->xl_prev.xlogid = 0;
2493         record->xl_prev.xrecoff = 0;
2494         record->xl_xact_prev = record->xl_prev;
2495         record->xl_xid = InvalidTransactionId;
2496         record->xl_len = sizeof(checkPoint);
2497         record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
2498         record->xl_rmid = RM_XLOG_ID;
2499         memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
2500
2501         INIT_CRC64(crc);
2502         COMP_CRC64(crc, &checkPoint, sizeof(checkPoint));
2503         COMP_CRC64(crc, (char *) record + sizeof(crc64),
2504                            SizeOfXLogRecord - sizeof(crc64));
2505         FIN_CRC64(crc);
2506         record->xl_crc = crc;
2507
2508         use_existent = false;
2509         openLogFile = XLogFileInit(0, 0, &use_existent, false);
2510
2511         errno = 0;
2512         if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
2513         {
2514                 /* if write didn't set errno, assume problem is no disk space */
2515                 if (errno == 0)
2516                         errno = ENOSPC;
2517                 ereport(PANIC,
2518                                 (errcode_for_file_access(),
2519                                  errmsg("failed to write bootstrap xlog file: %m")));
2520         }
2521
2522         if (pg_fsync(openLogFile) != 0)
2523                 ereport(PANIC,
2524                                 (errcode_for_file_access(),
2525                                  errmsg("failed to fsync bootstrap xlog file: %m")));
2526
2527         close(openLogFile);
2528         openLogFile = -1;
2529
2530         memset(ControlFile, 0, sizeof(ControlFileData));
2531         /* Initialize pg_control status fields */
2532         ControlFile->state = DB_SHUTDOWNED;
2533         ControlFile->time = checkPoint.time;
2534         ControlFile->logId = 0;
2535         ControlFile->logSeg = 1;
2536         ControlFile->checkPoint = checkPoint.redo;
2537         ControlFile->checkPointCopy = checkPoint;
2538         /* some additional ControlFile fields are set in WriteControlFile() */
2539
2540         WriteControlFile();
2541
2542         /* Bootstrap the commit log, too */
2543         BootStrapCLOG();
2544 }
2545
2546 static char *
2547 str_time(time_t tnow)
2548 {
2549         static char buf[32];
2550
2551         strftime(buf, sizeof(buf),
2552                          "%Y-%m-%d %H:%M:%S %Z",
2553                          localtime(&tnow));
2554
2555         return buf;
2556 }
2557
2558 /*
2559  * This must be called ONCE during postmaster or standalone-backend startup
2560  */
2561 void
2562 StartupXLOG(void)
2563 {
2564         XLogCtlInsert *Insert;
2565         CheckPoint      checkPoint;
2566         bool            wasShutdown;
2567         XLogRecPtr      RecPtr,
2568                                 LastRec,
2569                                 checkPointLoc,
2570                                 EndOfLog;
2571         XLogRecord *record;
2572         char       *buffer;
2573         uint32          freespace;
2574
2575         /* Use malloc() to ensure record buffer is MAXALIGNED */
2576         buffer = (char *) malloc(_INTL_MAXLOGRECSZ);
2577
2578         CritSectionCount++;
2579
2580         /*
2581          * Read control file and check XLOG status looks valid.
2582          *
2583          * Note: in most control paths, *ControlFile is already valid and we need
2584          * not do ReadControlFile() here, but might as well do it to be sure.
2585          */
2586         ReadControlFile();
2587
2588         if (ControlFile->logSeg == 0 ||
2589                 ControlFile->state < DB_SHUTDOWNED ||
2590                 ControlFile->state > DB_IN_PRODUCTION ||
2591                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
2592                 ereport(FATAL,
2593                                 (errmsg("control file contains invalid data")));
2594
2595         if (ControlFile->state == DB_SHUTDOWNED)
2596                 ereport(LOG,
2597                                 (errmsg("database system was shut down at %s",
2598                                                 str_time(ControlFile->time))));
2599         else if (ControlFile->state == DB_SHUTDOWNING)
2600                 ereport(LOG,
2601                                 (errmsg("database system shutdown was interrupted at %s",
2602                                                 str_time(ControlFile->time))));
2603         else if (ControlFile->state == DB_IN_RECOVERY)
2604                 ereport(LOG,
2605                 (errmsg("database system was interrupted while in recovery at %s",
2606                                 str_time(ControlFile->time)),
2607                  errhint("This probably means that some data is corrupted and"
2608                                  " you will have to use the last backup for recovery.")));
2609         else if (ControlFile->state == DB_IN_PRODUCTION)
2610                 ereport(LOG,
2611                                 (errmsg("database system was interrupted at %s",
2612                                                 str_time(ControlFile->time))));
2613
2614         /* This is just to allow attaching to startup process with a debugger */
2615 #ifdef XLOG_REPLAY_DELAY
2616         if (XLOG_DEBUG && ControlFile->state != DB_SHUTDOWNED)
2617                 sleep(60);
2618 #endif
2619
2620         /*
2621          * Get the last valid checkpoint record.  If the latest one according
2622          * to pg_control is broken, try the next-to-last one.
2623          */
2624         record = ReadCheckpointRecord(ControlFile->checkPoint, 1, buffer);
2625         if (record != NULL)
2626         {
2627                 checkPointLoc = ControlFile->checkPoint;
2628                 ereport(LOG,
2629                                 (errmsg("checkpoint record is at %X/%X",
2630                                                 checkPointLoc.xlogid, checkPointLoc.xrecoff)));
2631         }
2632         else
2633         {
2634                 record = ReadCheckpointRecord(ControlFile->prevCheckPoint, 2, buffer);
2635                 if (record != NULL)
2636                 {
2637                         checkPointLoc = ControlFile->prevCheckPoint;
2638                         ereport(LOG,
2639                                         (errmsg("using previous checkpoint record at %X/%X",
2640                                                   checkPointLoc.xlogid, checkPointLoc.xrecoff)));
2641                         InRecovery = true;      /* force recovery even if SHUTDOWNED */
2642                 }
2643                 else
2644                         ereport(PANIC,
2645                                  (errmsg("could not locate a valid checkpoint record")));
2646         }
2647         LastRec = RecPtr = checkPointLoc;
2648         memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
2649         wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
2650
2651         ereport(LOG,
2652                         (errmsg("redo record is at %X/%X; undo record is at %X/%X; shutdown %s",
2653                                         checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
2654                                         checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
2655                                         wasShutdown ? "TRUE" : "FALSE")));
2656         ereport(LOG,
2657                         (errmsg("next transaction id: %u; next oid: %u",
2658                                         checkPoint.nextXid, checkPoint.nextOid)));
2659         if (!TransactionIdIsNormal(checkPoint.nextXid))
2660                 ereport(PANIC,
2661                                 (errmsg("invalid next transaction id")));
2662
2663         ShmemVariableCache->nextXid = checkPoint.nextXid;
2664         ShmemVariableCache->nextOid = checkPoint.nextOid;
2665         ShmemVariableCache->oidCount = 0;
2666
2667         /*
2668          * If it was a shutdown checkpoint, then any following WAL entries
2669          * were created under the next StartUpID; if it was a regular
2670          * checkpoint then any following WAL entries were created under the
2671          * same StartUpID. We must replay WAL entries using the same StartUpID
2672          * they were created under, so temporarily adopt that SUI (see also
2673          * xlog_redo()).
2674          */
2675         if (wasShutdown)
2676                 ThisStartUpID = checkPoint.ThisStartUpID + 1;
2677         else
2678                 ThisStartUpID = checkPoint.ThisStartUpID;
2679
2680         RedoRecPtr = XLogCtl->Insert.RedoRecPtr =
2681                 XLogCtl->SavedRedoRecPtr = checkPoint.redo;
2682
2683         if (XLByteLT(RecPtr, checkPoint.redo))
2684                 ereport(PANIC,
2685                                 (errmsg("invalid redo in checkpoint record")));
2686         if (checkPoint.undo.xrecoff == 0)
2687                 checkPoint.undo = RecPtr;
2688
2689         if (XLByteLT(checkPoint.undo, RecPtr) ||
2690                 XLByteLT(checkPoint.redo, RecPtr))
2691         {
2692                 if (wasShutdown)
2693                         ereport(PANIC,
2694                         (errmsg("invalid redo/undo record in shutdown checkpoint")));
2695                 InRecovery = true;
2696         }
2697         else if (ControlFile->state != DB_SHUTDOWNED)
2698                 InRecovery = true;
2699
2700         /* REDO */
2701         if (InRecovery)
2702         {
2703                 int                     rmid;
2704
2705                 ereport(LOG,
2706                                 (errmsg("database system was not properly shut down; "
2707                                                 "automatic recovery in progress")));
2708                 ControlFile->state = DB_IN_RECOVERY;
2709                 ControlFile->time = time(NULL);
2710                 UpdateControlFile();
2711
2712                 /* Start up the recovery environment */
2713                 XLogInitRelationCache();
2714
2715                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
2716                 {
2717                         if (RmgrTable[rmid].rm_startup != NULL)
2718                                 RmgrTable[rmid].rm_startup();
2719                 }
2720
2721                 /* Is REDO required ? */
2722                 if (XLByteLT(checkPoint.redo, RecPtr))
2723                         record = ReadRecord(&(checkPoint.redo), PANIC, buffer);
2724                 else
2725                 {
2726                         /* read past CheckPoint record */
2727                         record = ReadRecord(NULL, LOG, buffer);
2728                 }
2729
2730                 if (record != NULL)
2731                 {
2732                         InRedo = true;
2733                         ereport(LOG,
2734                                         (errmsg("redo starts at %X/%X",
2735                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
2736                         do
2737                         {
2738                                 /* nextXid must be beyond record's xid */
2739                                 if (TransactionIdFollowsOrEquals(record->xl_xid,
2740                                                                                         ShmemVariableCache->nextXid))
2741                                 {
2742                                         ShmemVariableCache->nextXid = record->xl_xid;
2743                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
2744                                 }
2745                                 if (XLOG_DEBUG)
2746                                 {
2747                                         char            buf[8192];
2748
2749                                         sprintf(buf, "REDO @ %X/%X; LSN %X/%X: ",
2750                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
2751                                                         EndRecPtr.xlogid, EndRecPtr.xrecoff);
2752                                         xlog_outrec(buf, record);
2753                                         strcat(buf, " - ");
2754                                         RmgrTable[record->xl_rmid].rm_desc(buf,
2755                                                                 record->xl_info, XLogRecGetData(record));
2756                                         elog(LOG, "%s", buf);
2757                                 }
2758
2759                                 if (record->xl_info & XLR_BKP_BLOCK_MASK)
2760                                         RestoreBkpBlocks(record, EndRecPtr);
2761
2762                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
2763                                 record = ReadRecord(NULL, LOG, buffer);
2764                         } while (record != NULL);
2765                         ereport(LOG,
2766                                         (errmsg("redo done at %X/%X",
2767                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
2768                         LastRec = ReadRecPtr;
2769                         InRedo = false;
2770                 }
2771                 else
2772                         ereport(LOG,
2773                                         (errmsg("redo is not required")));
2774         }
2775
2776         /*
2777          * Init xlog buffer cache using the block containing the last valid
2778          * record from the previous incarnation.
2779          */
2780         record = ReadRecord(&LastRec, PANIC, buffer);
2781         EndOfLog = EndRecPtr;
2782         XLByteToPrevSeg(EndOfLog, openLogId, openLogSeg);
2783         openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
2784         openLogOff = 0;
2785         ControlFile->logId = openLogId;
2786         ControlFile->logSeg = openLogSeg + 1;
2787         Insert = &XLogCtl->Insert;
2788         Insert->PrevRecord = LastRec;
2789         XLogCtl->xlblocks[0].xlogid = openLogId;
2790         XLogCtl->xlblocks[0].xrecoff =
2791                 ((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
2792
2793         /*
2794          * Tricky point here: readBuf contains the *last* block that the
2795          * LastRec record spans, not the one it starts in.      The last block is
2796          * indeed the one we want to use.
2797          */
2798         Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize);
2799         memcpy((char *) Insert->currpage, readBuf, BLCKSZ);
2800         Insert->currpos = (char *) Insert->currpage +
2801                 (EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
2802
2803         LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
2804
2805         XLogCtl->Write.LogwrtResult = LogwrtResult;
2806         Insert->LogwrtResult = LogwrtResult;
2807         XLogCtl->LogwrtResult = LogwrtResult;
2808
2809         XLogCtl->LogwrtRqst.Write = EndOfLog;
2810         XLogCtl->LogwrtRqst.Flush = EndOfLog;
2811
2812         freespace = INSERT_FREESPACE(Insert);
2813         if (freespace > 0)
2814         {
2815                 /* Make sure rest of page is zero */
2816                 MemSet(Insert->currpos, 0, freespace);
2817                 XLogCtl->Write.curridx = 0;
2818         }
2819         else
2820         {
2821                 /*
2822                  * Whenever Write.LogwrtResult points to exactly the end of a
2823                  * page, Write.curridx must point to the *next* page (see
2824                  * XLogWrite()).
2825                  *
2826                  * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
2827                  * we can't since we haven't yet determined the correct StartUpID
2828                  * to put into the new page's header.  The first actual attempt to
2829                  * insert a log record will advance the insert state.
2830                  */
2831                 XLogCtl->Write.curridx = NextBufIdx(0);
2832         }
2833
2834 #ifdef NOT_USED
2835         /* UNDO */
2836         if (InRecovery)
2837         {
2838                 RecPtr = ReadRecPtr;
2839                 if (XLByteLT(checkPoint.undo, RecPtr))
2840                 {
2841                         ereport(LOG,
2842                                         (errmsg("undo starts at %X/%X",
2843                                                         RecPtr.xlogid, RecPtr.xrecoff)));
2844                         do
2845                         {
2846                                 record = ReadRecord(&RecPtr, PANIC, buffer);
2847                                 if (TransactionIdIsValid(record->xl_xid) &&
2848                                         !TransactionIdDidCommit(record->xl_xid))
2849                                         RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record);
2850                                 RecPtr = record->xl_prev;
2851                         } while (XLByteLE(checkPoint.undo, RecPtr));
2852                         ereport(LOG,
2853                                         (errmsg("undo done at %X/%X",
2854                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
2855                 }
2856                 else
2857                         ereport(LOG,
2858                                         (errmsg("undo is not required")));
2859         }
2860 #endif
2861
2862         if (InRecovery)
2863         {
2864                 int                     rmid;
2865
2866                 /*
2867                  * Allow resource managers to do any required cleanup.
2868                  */
2869                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
2870                 {
2871                         if (RmgrTable[rmid].rm_cleanup != NULL)
2872                                 RmgrTable[rmid].rm_cleanup();
2873                 }
2874
2875                 /* suppress in-transaction check in CreateCheckPoint */
2876                 MyLastRecPtr.xrecoff = 0;
2877                 MyXactMadeXLogEntry = false;
2878                 MyXactMadeTempRelUpdate = false;
2879
2880                 /*
2881                  * At this point, ThisStartUpID is the largest SUI that we could
2882                  * find evidence for in the WAL entries.  But check it against
2883                  * pg_control's latest checkpoint, to make sure that we can't
2884                  * accidentally re-use an already-used SUI.
2885                  */
2886                 if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID)
2887                         ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID;
2888
2889                 /*
2890                  * Perform a new checkpoint to update our recovery activity to
2891                  * disk.
2892                  *
2893                  * Note that we write a shutdown checkpoint.  This is correct since
2894                  * the records following it will use SUI one more than what is
2895                  * shown in the checkpoint's ThisStartUpID.
2896                  *
2897                  * In case we had to use the secondary checkpoint, make sure that it
2898                  * will still be shown as the secondary checkpoint after this
2899                  * CreateCheckPoint operation; we don't want the broken primary
2900                  * checkpoint to become prevCheckPoint...
2901                  */
2902                 ControlFile->checkPoint = checkPointLoc;
2903                 CreateCheckPoint(true, true);
2904
2905                 /*
2906                  * Close down recovery environment
2907                  */
2908                 XLogCloseRelationCache();
2909         }
2910         else
2911         {
2912                 /*
2913                  * If we are not doing recovery, then we saw a checkpoint with
2914                  * nothing after it, and we can safely use StartUpID equal to one
2915                  * more than the checkpoint's SUI.  But just for paranoia's sake,
2916                  * check against pg_control too.
2917                  */
2918                 ThisStartUpID = checkPoint.ThisStartUpID;
2919                 if (ThisStartUpID < ControlFile->checkPointCopy.ThisStartUpID)
2920                         ThisStartUpID = ControlFile->checkPointCopy.ThisStartUpID;
2921         }
2922
2923         /*
2924          * Preallocate additional log files, if wanted.
2925          */
2926         PreallocXlogFiles(EndOfLog);
2927
2928         /*
2929          * Advance StartUpID to one more than the highest value used
2930          * previously.
2931          */
2932         ThisStartUpID++;
2933         XLogCtl->ThisStartUpID = ThisStartUpID;
2934
2935         /*
2936          * Okay, we're officially UP.
2937          */
2938         InRecovery = false;
2939
2940         ControlFile->state = DB_IN_PRODUCTION;
2941         ControlFile->time = time(NULL);
2942         UpdateControlFile();
2943
2944         /* Start up the commit log, too */
2945         StartupCLOG();
2946
2947         ereport(LOG,
2948                         (errmsg("database system is ready")));
2949         CritSectionCount--;
2950
2951         /* Shut down readFile facility, free space */
2952         if (readFile >= 0)
2953         {
2954                 close(readFile);
2955                 readFile = -1;
2956         }
2957         if (readBuf)
2958         {
2959                 free(readBuf);
2960                 readBuf = NULL;
2961         }
2962
2963         free(buffer);
2964 }
2965
2966 /*
2967  * Subroutine to try to fetch and validate a prior checkpoint record.
2968  * whichChkpt = 1 for "primary", 2 for "secondary", merely informative
2969  */
2970 static XLogRecord *
2971 ReadCheckpointRecord(XLogRecPtr RecPtr,
2972                                          int whichChkpt,
2973                                          char *buffer)
2974 {
2975         XLogRecord *record;
2976
2977         if (!XRecOffIsValid(RecPtr.xrecoff))
2978         {
2979                 ereport(LOG,
2980                 /* translator: %s is "primary" or "secondary" */
2981                                 (errmsg("invalid %s checkpoint link in control file",
2982                 (whichChkpt == 1) ? gettext("primary") : gettext("secondary"))));
2983                 return NULL;
2984         }
2985
2986         record = ReadRecord(&RecPtr, LOG, buffer);
2987
2988         if (record == NULL)
2989         {
2990                 ereport(LOG,
2991                 /* translator: %s is "primary" or "secondary" */
2992                                 (errmsg("invalid %s checkpoint record",
2993                 (whichChkpt == 1) ? gettext("primary") : gettext("secondary"))));
2994                 return NULL;
2995         }
2996         if (record->xl_rmid != RM_XLOG_ID)
2997         {
2998                 ereport(LOG,
2999                 /* translator: %s is "primary" or "secondary" */
3000                    (errmsg("invalid resource manager id in %s checkpoint record",
3001                 (whichChkpt == 1) ? gettext("primary") : gettext("secondary"))));
3002                 return NULL;
3003         }
3004         if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
3005                 record->xl_info != XLOG_CHECKPOINT_ONLINE)
3006         {
3007                 ereport(LOG,
3008                 /* translator: %s is "primary" or "secondary" */
3009                                 (errmsg("invalid xl_info in %s checkpoint record",
3010                 (whichChkpt == 1) ? gettext("primary") : gettext("secondary"))));
3011                 return NULL;
3012         }
3013         if (record->xl_len != sizeof(CheckPoint))
3014         {
3015                 ereport(LOG,
3016                 /* translator: %s is "primary" or "secondary" */
3017                                 (errmsg("invalid length of %s checkpoint record",
3018                 (whichChkpt == 1) ? gettext("primary") : gettext("secondary"))));
3019                 return NULL;
3020         }
3021         return record;
3022 }
3023
3024 /*
3025  * Postmaster uses this to initialize ThisStartUpID & RedoRecPtr from
3026  * XLogCtlData located in shmem after successful startup.
3027  */
3028 void
3029 SetThisStartUpID(void)
3030 {
3031         ThisStartUpID = XLogCtl->ThisStartUpID;
3032         RedoRecPtr = XLogCtl->SavedRedoRecPtr;
3033 }
3034
3035 /*
3036  * CheckPoint process called by postmaster saves copy of new RedoRecPtr
3037  * in shmem (using SetSavedRedoRecPtr).  When checkpointer completes,
3038  * postmaster calls GetSavedRedoRecPtr to update its own copy of RedoRecPtr,
3039  * so that subsequently-spawned backends will start out with a reasonably
3040  * up-to-date local RedoRecPtr.  Since these operations are not protected by
3041  * any lock and copying an XLogRecPtr isn't atomic, it's unsafe to use either
3042  * of these routines at other times!
3043  */
3044 void
3045 SetSavedRedoRecPtr(void)
3046 {
3047         XLogCtl->SavedRedoRecPtr = RedoRecPtr;
3048 }
3049
3050 void
3051 GetSavedRedoRecPtr(void)
3052 {
3053         RedoRecPtr = XLogCtl->SavedRedoRecPtr;
3054 }
3055
3056 /*
3057  * Once spawned, a backend may update its local RedoRecPtr from
3058  * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
3059  * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
3060  */
3061 XLogRecPtr
3062 GetRedoRecPtr(void)
3063 {
3064         /* use volatile pointer to prevent code rearrangement */
3065         volatile XLogCtlData *xlogctl = XLogCtl;
3066
3067         SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
3068         Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
3069         RedoRecPtr = xlogctl->Insert.RedoRecPtr;
3070         SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
3071
3072         return RedoRecPtr;
3073 }
3074
3075 /*
3076  * This must be called ONCE during postmaster or standalone-backend shutdown
3077  */
3078 void
3079 ShutdownXLOG(void)
3080 {
3081         ereport(LOG,
3082                         (errmsg("shutting down")));
3083
3084         /* suppress in-transaction check in CreateCheckPoint */
3085         MyLastRecPtr.xrecoff = 0;
3086         MyXactMadeXLogEntry = false;
3087         MyXactMadeTempRelUpdate = false;
3088
3089         CritSectionCount++;
3090         CreateDummyCaches();
3091         CreateCheckPoint(true, true);
3092         ShutdownCLOG();
3093         CritSectionCount--;
3094
3095         ereport(LOG,
3096                         (errmsg("database system is shut down")));
3097 }
3098
3099 /*
3100  * Perform a checkpoint --- either during shutdown, or on-the-fly
3101  *
3102  * If force is true, we force a checkpoint regardless of whether any XLOG
3103  * activity has occurred since the last one.
3104  */
3105 void
3106 CreateCheckPoint(bool shutdown, bool force)
3107 {
3108         CheckPoint      checkPoint;
3109         XLogRecPtr      recptr;
3110         XLogCtlInsert *Insert = &XLogCtl->Insert;
3111         XLogRecData rdata;
3112         uint32          freespace;
3113         uint32          _logId;
3114         uint32          _logSeg;
3115
3116         if (MyXactMadeXLogEntry)
3117                 ereport(ERROR,
3118                                 (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
3119                   errmsg("checkpoint cannot be made inside transaction block")));
3120
3121         /*
3122          * Acquire CheckpointLock to ensure only one checkpoint happens at a
3123          * time.
3124          *
3125          * The CheckpointLock can be held for quite a while, which is not good
3126          * because we won't respond to a cancel/die request while waiting for
3127          * an LWLock.  (But the alternative of using a regular lock won't work
3128          * for background checkpoint processes, which are not regular
3129          * backends.)  So, rather than use a plain LWLockAcquire, use this
3130          * kluge to allow an interrupt to be accepted while we are waiting:
3131          */
3132         while (!LWLockConditionalAcquire(CheckpointLock, LW_EXCLUSIVE))
3133         {
3134                 CHECK_FOR_INTERRUPTS();
3135                 sleep(1);
3136         }
3137
3138         /*
3139          * Use a critical section to force system panic if we have trouble.
3140          */
3141         START_CRIT_SECTION();
3142
3143         if (shutdown)
3144         {
3145                 ControlFile->state = DB_SHUTDOWNING;
3146                 ControlFile->time = time(NULL);
3147                 UpdateControlFile();
3148         }
3149
3150         MemSet(&checkPoint, 0, sizeof(checkPoint));
3151         checkPoint.ThisStartUpID = ThisStartUpID;
3152         checkPoint.time = time(NULL);
3153
3154         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
3155
3156         /*
3157          * If this isn't a shutdown or forced checkpoint, and we have not
3158          * inserted any XLOG records since the start of the last checkpoint,
3159          * skip the checkpoint.  The idea here is to avoid inserting duplicate
3160          * checkpoints when the system is idle. That wastes log space, and
3161          * more importantly it exposes us to possible loss of both current and
3162          * previous checkpoint records if the machine crashes just as we're
3163          * writing the update. (Perhaps it'd make even more sense to
3164          * checkpoint only when the previous checkpoint record is in a
3165          * different xlog page?)
3166          *
3167          * We have to make two tests to determine that nothing has happened since
3168          * the start of the last checkpoint: current insertion point must
3169          * match the end of the last checkpoint record, and its redo pointer
3170          * must point to itself.
3171          */
3172         if (!shutdown && !force)
3173         {
3174                 XLogRecPtr      curInsert;
3175
3176                 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
3177                 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
3178                         curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
3179                         MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
3180                         ControlFile->checkPoint.xlogid ==
3181                         ControlFile->checkPointCopy.redo.xlogid &&
3182                         ControlFile->checkPoint.xrecoff ==
3183                         ControlFile->checkPointCopy.redo.xrecoff)
3184                 {
3185                         LWLockRelease(WALInsertLock);
3186                         LWLockRelease(CheckpointLock);
3187                         END_CRIT_SECTION();
3188                         return;
3189                 }
3190         }
3191
3192         /*
3193          * Compute new REDO record ptr = location of next XLOG record.
3194          *
3195          * NB: this is NOT necessarily where the checkpoint record itself will
3196          * be, since other backends may insert more XLOG records while we're
3197          * off doing the buffer flush work.  Those XLOG records are logically
3198          * after the checkpoint, even though physically before it.      Got that?
3199          */
3200         freespace = INSERT_FREESPACE(Insert);
3201         if (freespace < SizeOfXLogRecord)
3202         {
3203                 (void) AdvanceXLInsertBuffer();
3204                 /* OK to ignore update return flag, since we will do flush anyway */
3205                 freespace = BLCKSZ - SizeOfXLogPHD;
3206         }
3207         INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
3208
3209         /*
3210          * Here we update the shared RedoRecPtr for future XLogInsert calls;
3211          * this must be done while holding the insert lock AND the info_lck.
3212          *
3213          * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
3214          * pointing past where it really needs to point.  This is okay; the
3215          * only consequence is that XLogInsert might back up whole buffers
3216          * that it didn't really need to.  We can't postpone advancing
3217          * RedoRecPtr because XLogInserts that happen while we are dumping
3218          * buffers must assume that their buffer changes are not included in
3219          * the checkpoint.
3220          */
3221         {
3222                 /* use volatile pointer to prevent code rearrangement */
3223                 volatile XLogCtlData *xlogctl = XLogCtl;
3224
3225                 SpinLockAcquire_NoHoldoff(&xlogctl->info_lck);
3226                 RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
3227                 SpinLockRelease_NoHoldoff(&xlogctl->info_lck);
3228         }
3229
3230         /*
3231          * Get UNDO record ptr - this is oldest of PGPROC->logRec values. We
3232          * do this while holding insert lock to ensure that we won't miss any
3233          * about-to-commit transactions (UNDO must include all xacts that have
3234          * commits after REDO point).
3235          *
3236          * XXX temporarily ifdef'd out to avoid three-way deadlock condition:
3237          * GetUndoRecPtr needs to grab SInvalLock to ensure that it is looking
3238          * at a stable set of proc records, but grabbing SInvalLock while
3239          * holding WALInsertLock is no good.  GetNewTransactionId may cause a
3240          * WAL record to be written while holding XidGenLock, and
3241          * GetSnapshotData needs to get XidGenLock while holding SInvalLock,
3242          * so there's a risk of deadlock. Need to find a better solution.  See
3243          * pgsql-hackers discussion of 17-Dec-01.
3244          */
3245 #ifdef NOT_USED
3246         checkPoint.undo = GetUndoRecPtr();
3247
3248         if (shutdown && checkPoint.undo.xrecoff != 0)
3249                 elog(PANIC, "active transaction while database system is shutting down");
3250 #endif
3251
3252         /*
3253          * Now we can release insert lock, allowing other xacts to proceed
3254          * even while we are flushing disk buffers.
3255          */
3256         LWLockRelease(WALInsertLock);
3257
3258         /*
3259          * Get the other info we need for the checkpoint record.
3260          */
3261         LWLockAcquire(XidGenLock, LW_SHARED);
3262         checkPoint.nextXid = ShmemVariableCache->nextXid;
3263         LWLockRelease(XidGenLock);
3264
3265         LWLockAcquire(OidGenLock, LW_SHARED);
3266         checkPoint.nextOid = ShmemVariableCache->nextOid;
3267         if (!shutdown)
3268                 checkPoint.nextOid += ShmemVariableCache->oidCount;
3269         LWLockRelease(OidGenLock);
3270
3271         /*
3272          * Having constructed the checkpoint record, ensure all shmem disk
3273          * buffers and commit-log buffers are flushed to disk.
3274          *
3275          * This I/O could fail for various reasons.  If so, we will fail to
3276          * complete the checkpoint, but there is no reason to force a system
3277          * panic.  Accordingly, exit critical section while doing it.
3278          */
3279         END_CRIT_SECTION();
3280
3281         CheckPointCLOG();
3282         FlushBufferPool();
3283
3284         START_CRIT_SECTION();
3285
3286         /*
3287          * Now insert the checkpoint record into XLOG.
3288          */
3289         rdata.buffer = InvalidBuffer;
3290         rdata.data = (char *) (&checkPoint);
3291         rdata.len = sizeof(checkPoint);
3292         rdata.next = NULL;
3293
3294         recptr = XLogInsert(RM_XLOG_ID,
3295                                                 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
3296                                                 XLOG_CHECKPOINT_ONLINE,
3297                                                 &rdata);
3298
3299         XLogFlush(recptr);
3300
3301         /*
3302          * We now have ProcLastRecPtr = start of actual checkpoint record,
3303          * recptr = end of actual checkpoint record.
3304          */
3305         if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
3306                 ereport(PANIC,
3307                                 (errmsg("concurrent transaction log activity while database system is shutting down")));
3308
3309         /*
3310          * Select point at which we can truncate the log, which we base on the
3311          * prior checkpoint's earliest info.
3312          *
3313          * With UNDO support: oldest item is redo or undo, whichever is older;
3314          * but watch out for case that undo = 0.
3315          *
3316          * Without UNDO support: just use the redo pointer.  This allows xlog
3317          * space to be freed much faster when there are long-running
3318          * transactions.
3319          */
3320 #ifdef NOT_USED
3321         if (ControlFile->checkPointCopy.undo.xrecoff != 0 &&
3322                 XLByteLT(ControlFile->checkPointCopy.undo,
3323                                  ControlFile->checkPointCopy.redo))
3324                 XLByteToSeg(ControlFile->checkPointCopy.undo, _logId, _logSeg);
3325         else
3326 #endif
3327                 XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
3328
3329         /*
3330          * Update the control file.
3331          */
3332         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
3333         if (shutdown)
3334                 ControlFile->state = DB_SHUTDOWNED;
3335         ControlFile->prevCheckPoint = ControlFile->checkPoint;
3336         ControlFile->checkPoint = ProcLastRecPtr;
3337         ControlFile->checkPointCopy = checkPoint;
3338         ControlFile->time = time(NULL);
3339         UpdateControlFile();
3340         LWLockRelease(ControlFileLock);
3341
3342         /*
3343          * We are now done with critical updates; no need for system panic if
3344          * we have trouble while fooling with offline log segments.
3345          */
3346         END_CRIT_SECTION();
3347
3348         /*
3349          * Delete offline log files (those no longer needed even for previous
3350          * checkpoint).
3351          */
3352         if (_logId || _logSeg)
3353         {
3354                 PrevLogSeg(_logId, _logSeg);
3355                 MoveOfflineLogs(_logId, _logSeg, recptr);
3356         }
3357
3358         /*
3359          * Make more log segments if needed.  (Do this after deleting offline
3360          * log segments, to avoid having peak disk space usage higher than
3361          * necessary.)
3362          */
3363         if (!shutdown)
3364                 PreallocXlogFiles(recptr);
3365
3366         LWLockRelease(CheckpointLock);
3367 }
3368
3369 /*
3370  * Write a NEXTOID log record
3371  */
3372 void
3373 XLogPutNextOid(Oid nextOid)
3374 {
3375         XLogRecData rdata;
3376
3377         rdata.buffer = InvalidBuffer;
3378         rdata.data = (char *) (&nextOid);
3379         rdata.len = sizeof(Oid);
3380         rdata.next = NULL;
3381         (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
3382 }
3383
3384 /*
3385  * XLOG resource manager's routines
3386  */
3387 void
3388 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
3389 {
3390         uint8           info = record->xl_info & ~XLR_INFO_MASK;
3391
3392         if (info == XLOG_NEXTOID)
3393         {
3394                 Oid                     nextOid;
3395
3396                 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
3397                 if (ShmemVariableCache->nextOid < nextOid)
3398                 {
3399                         ShmemVariableCache->nextOid = nextOid;
3400                         ShmemVariableCache->oidCount = 0;
3401                 }
3402         }
3403         else if (info == XLOG_CHECKPOINT_SHUTDOWN)
3404         {
3405                 CheckPoint      checkPoint;
3406
3407                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
3408                 /* In a SHUTDOWN checkpoint, believe the counters exactly */
3409                 ShmemVariableCache->nextXid = checkPoint.nextXid;
3410                 ShmemVariableCache->nextOid = checkPoint.nextOid;
3411                 ShmemVariableCache->oidCount = 0;
3412                 /* Any later WAL records should be run with shutdown SUI plus 1 */
3413                 ThisStartUpID = checkPoint.ThisStartUpID + 1;
3414         }
3415         else if (info == XLOG_CHECKPOINT_ONLINE)
3416         {
3417                 CheckPoint      checkPoint;
3418
3419                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
3420                 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
3421                 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
3422                                                                   checkPoint.nextXid))
3423                         ShmemVariableCache->nextXid = checkPoint.nextXid;
3424                 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
3425                 {
3426                         ShmemVariableCache->nextOid = checkPoint.nextOid;
3427                         ShmemVariableCache->oidCount = 0;
3428                 }
3429                 /* Any later WAL records should be run with the then-active SUI */
3430                 ThisStartUpID = checkPoint.ThisStartUpID;
3431         }
3432 }
3433
3434 void
3435 xlog_undo(XLogRecPtr lsn, XLogRecord *record)
3436 {
3437 }
3438
3439 void
3440 xlog_desc(char *buf, uint8 xl_info, char *rec)
3441 {
3442         uint8           info = xl_info & ~XLR_INFO_MASK;
3443
3444         if (info == XLOG_CHECKPOINT_SHUTDOWN ||
3445                 info == XLOG_CHECKPOINT_ONLINE)
3446         {
3447                 CheckPoint *checkpoint = (CheckPoint *) rec;
3448
3449                 sprintf(buf + strlen(buf), "checkpoint: redo %X/%X; undo %X/%X; "
3450                                 "sui %u; xid %u; oid %u; %s",
3451                                 checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
3452                                 checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
3453                                 checkpoint->ThisStartUpID, checkpoint->nextXid,
3454                                 checkpoint->nextOid,
3455                          (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
3456         }
3457         else if (info == XLOG_NEXTOID)
3458         {
3459                 Oid                     nextOid;
3460
3461                 memcpy(&nextOid, rec, sizeof(Oid));
3462                 sprintf(buf + strlen(buf), "nextOid: %u", nextOid);
3463         }
3464         else
3465                 strcat(buf, "UNKNOWN");
3466 }
3467
3468 static void
3469 xlog_outrec(char *buf, XLogRecord *record)
3470 {
3471         int                     bkpb;
3472         int                     i;
3473
3474         sprintf(buf + strlen(buf), "prev %X/%X; xprev %X/%X; xid %u",
3475                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
3476                         record->xl_xact_prev.xlogid, record->xl_xact_prev.xrecoff,
3477                         record->xl_xid);
3478
3479         for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++)
3480         {
3481                 if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
3482                         continue;
3483                 bkpb++;
3484         }
3485
3486         if (bkpb)
3487                 sprintf(buf + strlen(buf), "; bkpb %d", bkpb);
3488
3489         sprintf(buf + strlen(buf), ": %s",
3490                         RmgrTable[record->xl_rmid].rm_name);
3491 }
3492
3493
3494 /*
3495  * GUC support
3496  */
3497 const char *
3498 assign_xlog_sync_method(const char *method, bool doit, bool interactive)
3499 {
3500         int                     new_sync_method;
3501         int                     new_sync_bit;
3502
3503         if (strcasecmp(method, "fsync") == 0)
3504         {
3505                 new_sync_method = SYNC_METHOD_FSYNC;
3506                 new_sync_bit = 0;
3507         }
3508 #ifdef HAVE_FDATASYNC
3509         else if (strcasecmp(method, "fdatasync") == 0)
3510         {
3511                 new_sync_method = SYNC_METHOD_FDATASYNC;
3512                 new_sync_bit = 0;
3513         }
3514 #endif
3515 #ifdef OPEN_SYNC_FLAG
3516         else if (strcasecmp(method, "open_sync") == 0)
3517         {
3518                 new_sync_method = SYNC_METHOD_OPEN;
3519                 new_sync_bit = OPEN_SYNC_FLAG;
3520         }
3521 #endif
3522 #ifdef OPEN_DATASYNC_FLAG
3523         else if (strcasecmp(method, "open_datasync") == 0)
3524         {
3525                 new_sync_method = SYNC_METHOD_OPEN;
3526                 new_sync_bit = OPEN_DATASYNC_FLAG;
3527         }
3528 #endif
3529         else
3530                 return NULL;
3531
3532         if (!doit)
3533                 return method;
3534
3535         if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
3536         {
3537                 /*
3538                  * To ensure that no blocks escape unsynced, force an fsync on the
3539                  * currently open log segment (if any).  Also, if the open flag is
3540                  * changing, close the log file so it will be reopened (with new
3541                  * flag bit) at next use.
3542                  */
3543                 if (openLogFile >= 0)
3544                 {
3545                         if (pg_fsync(openLogFile) != 0)
3546                                 ereport(PANIC,
3547                                                 (errcode_for_file_access(),
3548                                         errmsg("fsync of log file %u, segment %u failed: %m",
3549                                                    openLogId, openLogSeg)));
3550                         if (open_sync_bit != new_sync_bit)
3551                         {
3552                                 if (close(openLogFile) != 0)
3553                                         ereport(PANIC,
3554                                                         (errcode_for_file_access(),
3555                                         errmsg("close of log file %u, segment %u failed: %m",
3556                                                    openLogId, openLogSeg)));
3557                                 openLogFile = -1;
3558                         }
3559                 }
3560                 sync_method = new_sync_method;
3561                 open_sync_bit = new_sync_bit;
3562         }
3563
3564         return method;
3565 }
3566
3567
3568 /*
3569  * Issue appropriate kind of fsync (if any) on the current XLOG output file
3570  */
3571 static void
3572 issue_xlog_fsync(void)
3573 {
3574         switch (sync_method)
3575         {
3576                 case SYNC_METHOD_FSYNC:
3577                         if (pg_fsync(openLogFile) != 0)
3578                                 ereport(PANIC,
3579                                                 (errcode_for_file_access(),
3580                                         errmsg("fsync of log file %u, segment %u failed: %m",
3581                                                    openLogId, openLogSeg)));
3582                         break;
3583 #ifdef HAVE_FDATASYNC
3584                 case SYNC_METHOD_FDATASYNC:
3585                         if (pg_fdatasync(openLogFile) != 0)
3586                                 ereport(PANIC,
3587                                                 (errcode_for_file_access(),
3588                                 errmsg("fdatasync of log file %u, segment %u failed: %m",
3589                                            openLogId, openLogSeg)));
3590                         break;
3591 #endif
3592                 case SYNC_METHOD_OPEN:
3593                         /* write synced it already */
3594                         break;
3595                 default:
3596                         elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
3597                         break;
3598         }
3599 }