OSDN Git Service

Allow include files to compile own their own.
[pg-rex/syncrep.git] / src / backend / access / transam / xlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  *              PostgreSQL transaction log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.243 2006/07/13 16:49:13 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include <ctype.h>
18 #include <fcntl.h>
19 #include <signal.h>
20 #include <time.h>
21 #include <unistd.h>
22 #include <sys/stat.h>
23 #include <sys/time.h>
24
25 #include "access/clog.h"
26 #include "access/multixact.h"
27 #include "access/subtrans.h"
28 #include "access/transam.h"
29 #include "access/twophase.h"
30 #include "access/xact.h"
31 #include "access/xlog.h"
32 #include "access/xlog_internal.h"
33 #include "access/xlogutils.h"
34 #include "catalog/catversion.h"
35 #include "catalog/pg_control.h"
36 #include "miscadmin.h"
37 #include "pgstat.h"
38 #include "postmaster/bgwriter.h"
39 #include "storage/bufpage.h"
40 #include "storage/fd.h"
41 #include "storage/lwlock.h"
42 #include "storage/pmsignal.h"
43 #include "storage/proc.h"
44 #include "storage/procarray.h"
45 #include "storage/spin.h"
46 #include "utils/builtins.h"
47 #include "utils/guc.h"
48 #include "utils/nabstime.h"
49 #include "utils/pg_locale.h"
50 #include "utils/relcache.h"
51
52
53 /*
54  *      Because O_DIRECT bypasses the kernel buffers, and because we never
55  *      read those buffers except during crash recovery, it is a win to use
56  *      it in all cases where we sync on each write().  We could allow O_DIRECT
57  *      with fsync(), but because skipping the kernel buffer forces writes out
58  *      quickly, it seems best just to use it for O_SYNC.  It is hard to imagine
59  *      how fsync() could be a win for O_DIRECT compared to O_SYNC and O_DIRECT.
60  *      Also, O_DIRECT is never enough to force data to the drives, it merely
61  *      tries to bypass the kernel cache, so we still need O_SYNC or fsync().
62  */
63 #ifdef O_DIRECT
64 #define PG_O_DIRECT                             O_DIRECT
65 #else
66 #define PG_O_DIRECT                             0
67 #endif
68
69 /*
70  * This chunk of hackery attempts to determine which file sync methods
71  * are available on the current platform, and to choose an appropriate
72  * default method.      We assume that fsync() is always available, and that
73  * configure determined whether fdatasync() is.
74  */
75 #if defined(O_SYNC)
76 #define BARE_OPEN_SYNC_FLAG             O_SYNC
77 #elif defined(O_FSYNC)
78 #define BARE_OPEN_SYNC_FLAG             O_FSYNC
79 #endif
80 #ifdef BARE_OPEN_SYNC_FLAG
81 #define OPEN_SYNC_FLAG                  (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
82 #endif
83
84 #if defined(O_DSYNC)
85 #if defined(OPEN_SYNC_FLAG)
86 /* O_DSYNC is distinct? */
87 #if O_DSYNC != BARE_OPEN_SYNC_FLAG
88 #define OPEN_DATASYNC_FLAG              (O_DSYNC | PG_O_DIRECT)
89 #endif
90 #else                                                   /* !defined(OPEN_SYNC_FLAG) */
91 /* Win32 only has O_DSYNC */
92 #define OPEN_DATASYNC_FLAG              (O_DSYNC | PG_O_DIRECT)
93 #endif
94 #endif
95
96 #if defined(OPEN_DATASYNC_FLAG)
97 #define DEFAULT_SYNC_METHOD_STR "open_datasync"
98 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_OPEN
99 #define DEFAULT_SYNC_FLAGBIT    OPEN_DATASYNC_FLAG
100 #elif defined(HAVE_FDATASYNC)
101 #define DEFAULT_SYNC_METHOD_STR "fdatasync"
102 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FDATASYNC
103 #define DEFAULT_SYNC_FLAGBIT    0
104 #elif defined(HAVE_FSYNC_WRITETHROUGH_ONLY)
105 #define DEFAULT_SYNC_METHOD_STR "fsync_writethrough"
106 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FSYNC_WRITETHROUGH
107 #define DEFAULT_SYNC_FLAGBIT    0
108 #else
109 #define DEFAULT_SYNC_METHOD_STR "fsync"
110 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FSYNC
111 #define DEFAULT_SYNC_FLAGBIT    0
112 #endif
113
114
115 /*
116  * Limitation of buffer-alignment for direct IO depends on OS and filesystem,
117  * but XLOG_BLCKSZ is assumed to be enough for it.
118  */
119 #ifdef O_DIRECT
120 #define ALIGNOF_XLOG_BUFFER             XLOG_BLCKSZ
121 #else
122 #define ALIGNOF_XLOG_BUFFER             ALIGNOF_BUFFER
123 #endif
124
125
126 /* File path names (all relative to $PGDATA) */
127 #define BACKUP_LABEL_FILE               "backup_label"
128 #define RECOVERY_COMMAND_FILE   "recovery.conf"
129 #define RECOVERY_COMMAND_DONE   "recovery.done"
130
131
132 /* User-settable parameters */
133 int                     CheckPointSegments = 3;
134 int                     XLOGbuffers = 8;
135 char       *XLogArchiveCommand = NULL;
136 char       *XLOG_sync_method = NULL;
137 const char      XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
138 bool            fullPageWrites = true;
139
140 #ifdef WAL_DEBUG
141 bool            XLOG_DEBUG = false;
142 #endif
143
144 /*
145  * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
146  * preallocated XLOG segments --- we try to have at least XLOGfiles advance
147  * segments but no more than XLOGfileslop segments.  This could
148  * be made a separate GUC variable, but at present I think it's sufficient
149  * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
150  * checkpoint will free no more than 2*CheckPointSegments log segments, and
151  * we want to recycle all of them; the +1 allows boundary cases to happen
152  * without wasting a delete/create-segment cycle.
153  */
154
155 #define XLOGfileslop    (2*CheckPointSegments + 1)
156
157
158 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
159 int                     sync_method = DEFAULT_SYNC_METHOD;
160 static int      open_sync_bit = DEFAULT_SYNC_FLAGBIT;
161
162 #define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
163
164
165 /*
166  * ThisTimeLineID will be same in all backends --- it identifies current
167  * WAL timeline for the database system.
168  */
169 TimeLineID      ThisTimeLineID = 0;
170
171 /* Are we doing recovery from XLOG? */
172 bool            InRecovery = false;
173
174 /* Are we recovering using offline XLOG archives? */
175 static bool InArchiveRecovery = false;
176
177 /* Was the last xlog file restored from archive, or local? */
178 static bool restoredFromArchive = false;
179
180 /* options taken from recovery.conf */
181 static char *recoveryRestoreCommand = NULL;
182 static bool recoveryTarget = false;
183 static bool recoveryTargetExact = false;
184 static bool recoveryTargetInclusive = true;
185 static TransactionId recoveryTargetXid;
186 static time_t recoveryTargetTime;
187
188 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
189 static TransactionId recoveryStopXid;
190 static time_t recoveryStopTime;
191 static bool recoveryStopAfter;
192
193 /* constraint set by read_backup_label */
194 static XLogRecPtr recoveryMinXlogOffset = {0, 0};
195
196 /*
197  * During normal operation, the only timeline we care about is ThisTimeLineID.
198  * During recovery, however, things are more complicated.  To simplify life
199  * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
200  * scan through the WAL history (that is, it is the line that was active when
201  * the currently-scanned WAL record was generated).  We also need these
202  * timeline values:
203  *
204  * recoveryTargetTLI: the desired timeline that we want to end in.
205  *
206  * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
207  * its known parents, newest first (so recoveryTargetTLI is always the
208  * first list member).  Only these TLIs are expected to be seen in the WAL
209  * segments we read, and indeed only these TLIs will be considered as
210  * candidate WAL files to open at all.
211  *
212  * curFileTLI: the TLI appearing in the name of the current input WAL file.
213  * (This is not necessarily the same as ThisTimeLineID, because we could
214  * be scanning data that was copied from an ancestor timeline when the current
215  * file was created.)  During a sequential scan we do not allow this value
216  * to decrease.
217  */
218 static TimeLineID recoveryTargetTLI;
219 static List *expectedTLIs;
220 static TimeLineID curFileTLI;
221
222 /*
223  * MyLastRecPtr points to the start of the last XLOG record inserted by the
224  * current transaction.  If MyLastRecPtr.xrecoff == 0, then the current
225  * xact hasn't yet inserted any transaction-controlled XLOG records.
226  *
227  * Note that XLOG records inserted outside transaction control are not
228  * reflected into MyLastRecPtr.  They do, however, cause MyXactMadeXLogEntry
229  * to be set true.      The latter can be used to test whether the current xact
230  * made any loggable changes (including out-of-xact changes, such as
231  * sequence updates).
232  *
233  * When we insert/update/delete a tuple in a temporary relation, we do not
234  * make any XLOG record, since we don't care about recovering the state of
235  * the temp rel after a crash.  However, we will still need to remember
236  * whether our transaction committed or aborted in that case.  So, we must
237  * set MyXactMadeTempRelUpdate true to indicate that the XID will be of
238  * interest later.
239  */
240 XLogRecPtr      MyLastRecPtr = {0, 0};
241
242 bool            MyXactMadeXLogEntry = false;
243
244 bool            MyXactMadeTempRelUpdate = false;
245
246 /*
247  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
248  * current backend.  It is updated for all inserts, transaction-controlled
249  * or not.      ProcLastRecEnd is similar but points to end+1 of last record.
250  */
251 static XLogRecPtr ProcLastRecPtr = {0, 0};
252
253 XLogRecPtr      ProcLastRecEnd = {0, 0};
254
255 /*
256  * RedoRecPtr is this backend's local copy of the REDO record pointer
257  * (which is almost but not quite the same as a pointer to the most recent
258  * CHECKPOINT record).  We update this from the shared-memory copy,
259  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
260  * hold the Insert lock).  See XLogInsert for details.  We are also allowed
261  * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
262  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
263  * InitXLOGAccess.
264  */
265 static XLogRecPtr RedoRecPtr;
266
267 /*----------
268  * Shared-memory data structures for XLOG control
269  *
270  * LogwrtRqst indicates a byte position that we need to write and/or fsync
271  * the log up to (all records before that point must be written or fsynced).
272  * LogwrtResult indicates the byte positions we have already written/fsynced.
273  * These structs are identical but are declared separately to indicate their
274  * slightly different functions.
275  *
276  * We do a lot of pushups to minimize the amount of access to lockable
277  * shared memory values.  There are actually three shared-memory copies of
278  * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
279  *              XLogCtl->LogwrtResult is protected by info_lck
280  *              XLogCtl->Write.LogwrtResult is protected by WALWriteLock
281  *              XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
282  * One must hold the associated lock to read or write any of these, but
283  * of course no lock is needed to read/write the unshared LogwrtResult.
284  *
285  * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
286  * right", since both are updated by a write or flush operation before
287  * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
288  * is that it can be examined/modified by code that already holds WALWriteLock
289  * without needing to grab info_lck as well.
290  *
291  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
292  * but is updated when convenient.      Again, it exists for the convenience of
293  * code that is already holding WALInsertLock but not the other locks.
294  *
295  * The unshared LogwrtResult may lag behind any or all of these, and again
296  * is updated when convenient.
297  *
298  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
299  * (protected by info_lck), but we don't need to cache any copies of it.
300  *
301  * Note that this all works because the request and result positions can only
302  * advance forward, never back up, and so we can easily determine which of two
303  * values is "more up to date".
304  *
305  * info_lck is only held long enough to read/update the protected variables,
306  * so it's a plain spinlock.  The other locks are held longer (potentially
307  * over I/O operations), so we use LWLocks for them.  These locks are:
308  *
309  * WALInsertLock: must be held to insert a record into the WAL buffers.
310  *
311  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
312  * XLogFlush).
313  *
314  * ControlFileLock: must be held to read/update control file or create
315  * new log file.
316  *
317  * CheckpointLock: must be held to do a checkpoint (ensures only one
318  * checkpointer at a time; even though the postmaster won't launch
319  * parallel checkpoint processes, we need this because manual checkpoints
320  * could be launched simultaneously).
321  *
322  *----------
323  */
324
325 typedef struct XLogwrtRqst
326 {
327         XLogRecPtr      Write;                  /* last byte + 1 to write out */
328         XLogRecPtr      Flush;                  /* last byte + 1 to flush */
329 } XLogwrtRqst;
330
331 typedef struct XLogwrtResult
332 {
333         XLogRecPtr      Write;                  /* last byte + 1 written out */
334         XLogRecPtr      Flush;                  /* last byte + 1 flushed */
335 } XLogwrtResult;
336
337 /*
338  * Shared state data for XLogInsert.
339  */
340 typedef struct XLogCtlInsert
341 {
342         XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
343         XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
344         int                     curridx;                /* current block index in cache */
345         XLogPageHeader currpage;        /* points to header of block in cache */
346         char       *currpos;            /* current insertion point in cache */
347         XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
348         bool            forcePageWrites;        /* forcing full-page writes for PITR? */
349 } XLogCtlInsert;
350
351 /*
352  * Shared state data for XLogWrite/XLogFlush.
353  */
354 typedef struct XLogCtlWrite
355 {
356         XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
357         int                     curridx;                /* cache index of next block to write */
358 } XLogCtlWrite;
359
360 /*
361  * Total shared-memory state for XLOG.
362  */
363 typedef struct XLogCtlData
364 {
365         /* Protected by WALInsertLock: */
366         XLogCtlInsert Insert;
367         /* Protected by info_lck: */
368         XLogwrtRqst LogwrtRqst;
369         XLogwrtResult LogwrtResult;
370         /* Protected by WALWriteLock: */
371         XLogCtlWrite Write;
372
373         /*
374          * These values do not change after startup, although the pointed-to pages
375          * and xlblocks values certainly do.  Permission to read/write the pages
376          * and xlblocks values depends on WALInsertLock and WALWriteLock.
377          */
378         char       *pages;                      /* buffers for unwritten XLOG pages */
379         XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
380         Size            XLogCacheByte;  /* # bytes in xlog buffers */
381         int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
382         TimeLineID      ThisTimeLineID;
383
384         slock_t         info_lck;               /* locks shared LogwrtRqst/LogwrtResult */
385 } XLogCtlData;
386
387 static XLogCtlData *XLogCtl = NULL;
388
389 /*
390  * We maintain an image of pg_control in shared memory.
391  */
392 static ControlFileData *ControlFile = NULL;
393
394 /*
395  * Macros for managing XLogInsert state.  In most cases, the calling routine
396  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
397  * so these are passed as parameters instead of being fetched via XLogCtl.
398  */
399
400 /* Free space remaining in the current xlog page buffer */
401 #define INSERT_FREESPACE(Insert)  \
402         (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
403
404 /* Construct XLogRecPtr value for current insertion point */
405 #define INSERT_RECPTR(recptr,Insert,curridx)  \
406         ( \
407           (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
408           (recptr).xrecoff = \
409                 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
410         )
411
412 #define PrevBufIdx(idx)         \
413                 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
414
415 #define NextBufIdx(idx)         \
416                 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
417
418 /*
419  * Private, possibly out-of-date copy of shared LogwrtResult.
420  * See discussion above.
421  */
422 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
423
424 /*
425  * openLogFile is -1 or a kernel FD for an open log file segment.
426  * When it's open, openLogOff is the current seek offset in the file.
427  * openLogId/openLogSeg identify the segment.  These variables are only
428  * used to write the XLOG, and so will normally refer to the active segment.
429  */
430 static int      openLogFile = -1;
431 static uint32 openLogId = 0;
432 static uint32 openLogSeg = 0;
433 static uint32 openLogOff = 0;
434
435 /*
436  * These variables are used similarly to the ones above, but for reading
437  * the XLOG.  Note, however, that readOff generally represents the offset
438  * of the page just read, not the seek position of the FD itself, which
439  * will be just past that page.
440  */
441 static int      readFile = -1;
442 static uint32 readId = 0;
443 static uint32 readSeg = 0;
444 static uint32 readOff = 0;
445
446 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
447 static char *readBuf = NULL;
448
449 /* Buffer for current ReadRecord result (expandable) */
450 static char *readRecordBuf = NULL;
451 static uint32 readRecordBufSize = 0;
452
453 /* State information for XLOG reading */
454 static XLogRecPtr ReadRecPtr;   /* start of last record read */
455 static XLogRecPtr EndRecPtr;    /* end+1 of last record read */
456 static XLogRecord *nextRecord = NULL;
457 static TimeLineID lastPageTLI = 0;
458
459 static bool InRedo = false;
460
461
462 static void XLogArchiveNotify(const char *xlog);
463 static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
464 static bool XLogArchiveCheckDone(const char *xlog);
465 static void XLogArchiveCleanup(const char *xlog);
466 static void readRecoveryCommandFile(void);
467 static void exitArchiveRecovery(TimeLineID endTLI,
468                                         uint32 endLogId, uint32 endLogSeg);
469 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
470
471 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
472                                 XLogRecPtr *lsn, BkpBlock *bkpb);
473 static bool AdvanceXLInsertBuffer(void);
474 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
475 static int XLogFileInit(uint32 log, uint32 seg,
476                          bool *use_existent, bool use_lock);
477 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
478                                            bool find_free, int *max_advance,
479                                            bool use_lock);
480 static int      XLogFileOpen(uint32 log, uint32 seg);
481 static int      XLogFileRead(uint32 log, uint32 seg, int emode);
482 static void     XLogFileClose(void);
483 static bool RestoreArchivedFile(char *path, const char *xlogfname,
484                                         const char *recovername, off_t expectedSize);
485 static int      PreallocXlogFiles(XLogRecPtr endptr);
486 static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
487                                 int *nsegsremoved, int *nsegsrecycled);
488 static void CleanupBackupHistory(void);
489 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
490 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
491 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
492 static List *readTimeLineHistory(TimeLineID targetTLI);
493 static bool existsTimeLineHistory(TimeLineID probeTLI);
494 static TimeLineID findNewestTimeLine(TimeLineID startTLI);
495 static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
496                                          TimeLineID endTLI,
497                                          uint32 endLogId, uint32 endLogSeg);
498 static void WriteControlFile(void);
499 static void ReadControlFile(void);
500 static char *str_time(time_t tnow);
501 static void issue_xlog_fsync(void);
502
503 #ifdef WAL_DEBUG
504 static void xlog_outrec(StringInfo buf, XLogRecord *record);
505 #endif
506 static bool read_backup_label(XLogRecPtr *checkPointLoc);
507 static void remove_backup_label(void);
508 static void rm_redo_error_callback(void *arg);
509
510
511 /*
512  * Insert an XLOG record having the specified RMID and info bytes,
513  * with the body of the record being the data chunk(s) described by
514  * the rdata chain (see xlog.h for notes about rdata).
515  *
516  * Returns XLOG pointer to end of record (beginning of next record).
517  * This can be used as LSN for data pages affected by the logged action.
518  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
519  * before the data page can be written out.  This implements the basic
520  * WAL rule "write the log before the data".)
521  *
522  * NB: this routine feels free to scribble on the XLogRecData structs,
523  * though not on the data they reference.  This is OK since the XLogRecData
524  * structs are always just temporaries in the calling code.
525  */
526 XLogRecPtr
527 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
528 {
529         XLogCtlInsert *Insert = &XLogCtl->Insert;
530         XLogRecord *record;
531         XLogContRecord *contrecord;
532         XLogRecPtr      RecPtr;
533         XLogRecPtr      WriteRqst;
534         uint32          freespace;
535         int                     curridx;
536         XLogRecData *rdt;
537         Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
538         bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
539         BkpBlock        dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
540         XLogRecPtr      dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
541         XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
542         XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
543         XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
544         pg_crc32        rdata_crc;
545         uint32          len,
546                                 write_len;
547         unsigned        i;
548         XLogwrtRqst LogwrtRqst;
549         bool            updrqst;
550         bool            doPageWrites;
551         bool            no_tran = (rmid == RM_XLOG_ID) ? true : false;
552
553         if (info & XLR_INFO_MASK)
554         {
555                 if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
556                         elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK));
557                 no_tran = true;
558                 info &= ~XLR_INFO_MASK;
559         }
560
561         /*
562          * In bootstrap mode, we don't actually log anything but XLOG resources;
563          * return a phony record pointer.
564          */
565         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
566         {
567                 RecPtr.xlogid = 0;
568                 RecPtr.xrecoff = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
569                 return RecPtr;
570         }
571
572         /*
573          * Here we scan the rdata chain, determine which buffers must be backed
574          * up, and compute the CRC values for the data.  Note that the record
575          * header isn't added into the CRC initially since we don't know the final
576          * length or info bits quite yet.  Thus, the CRC will represent the CRC of
577          * the whole record in the order "rdata, then backup blocks, then record
578          * header".
579          *
580          * We may have to loop back to here if a race condition is detected below.
581          * We could prevent the race by doing all this work while holding the
582          * insert lock, but it seems better to avoid doing CRC calculations while
583          * holding the lock.  This means we have to be careful about modifying the
584          * rdata chain until we know we aren't going to loop back again.  The only
585          * change we allow ourselves to make earlier is to set rdt->data = NULL in
586          * chain items we have decided we will have to back up the whole buffer
587          * for.  This is OK because we will certainly decide the same thing again
588          * for those items if we do it over; doing it here saves an extra pass
589          * over the chain later.
590          */
591 begin:;
592         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
593         {
594                 dtbuf[i] = InvalidBuffer;
595                 dtbuf_bkp[i] = false;
596         }
597
598         /*
599          * Decide if we need to do full-page writes in this XLOG record: true if
600          * full_page_writes is on or we have a PITR request for it.  Since we
601          * don't yet have the insert lock, forcePageWrites could change under us,
602          * but we'll recheck it once we have the lock.
603          */
604         doPageWrites = fullPageWrites || Insert->forcePageWrites;
605
606         INIT_CRC32(rdata_crc);
607         len = 0;
608         for (rdt = rdata;;)
609         {
610                 if (rdt->buffer == InvalidBuffer)
611                 {
612                         /* Simple data, just include it */
613                         len += rdt->len;
614                         COMP_CRC32(rdata_crc, rdt->data, rdt->len);
615                 }
616                 else
617                 {
618                         /* Find info for buffer */
619                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
620                         {
621                                 if (rdt->buffer == dtbuf[i])
622                                 {
623                                         /* Buffer already referenced by earlier chain item */
624                                         if (dtbuf_bkp[i])
625                                                 rdt->data = NULL;
626                                         else if (rdt->data)
627                                         {
628                                                 len += rdt->len;
629                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
630                                         }
631                                         break;
632                                 }
633                                 if (dtbuf[i] == InvalidBuffer)
634                                 {
635                                         /* OK, put it in this slot */
636                                         dtbuf[i] = rdt->buffer;
637                                         if (XLogCheckBuffer(rdt, doPageWrites,
638                                                                                 &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
639                                         {
640                                                 dtbuf_bkp[i] = true;
641                                                 rdt->data = NULL;
642                                         }
643                                         else if (rdt->data)
644                                         {
645                                                 len += rdt->len;
646                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
647                                         }
648                                         break;
649                                 }
650                         }
651                         if (i >= XLR_MAX_BKP_BLOCKS)
652                                 elog(PANIC, "can backup at most %d blocks per xlog record",
653                                          XLR_MAX_BKP_BLOCKS);
654                 }
655                 /* Break out of loop when rdt points to last chain item */
656                 if (rdt->next == NULL)
657                         break;
658                 rdt = rdt->next;
659         }
660
661         /*
662          * Now add the backup block headers and data into the CRC
663          */
664         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
665         {
666                 if (dtbuf_bkp[i])
667                 {
668                         BkpBlock   *bkpb = &(dtbuf_xlg[i]);
669                         char       *page;
670
671                         COMP_CRC32(rdata_crc,
672                                            (char *) bkpb,
673                                            sizeof(BkpBlock));
674                         page = (char *) BufferGetBlock(dtbuf[i]);
675                         if (bkpb->hole_length == 0)
676                         {
677                                 COMP_CRC32(rdata_crc,
678                                                    page,
679                                                    BLCKSZ);
680                         }
681                         else
682                         {
683                                 /* must skip the hole */
684                                 COMP_CRC32(rdata_crc,
685                                                    page,
686                                                    bkpb->hole_offset);
687                                 COMP_CRC32(rdata_crc,
688                                                    page + (bkpb->hole_offset + bkpb->hole_length),
689                                                    BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
690                         }
691                 }
692         }
693
694         /*
695          * NOTE: the test for len == 0 here is somewhat fishy, since in theory all
696          * of the rmgr data might have been suppressed in favor of backup blocks.
697          * Currently, all callers of XLogInsert provide at least some
698          * not-in-a-buffer data and so len == 0 should never happen, but that may
699          * not be true forever.  If you need to remove the len == 0 check, also
700          * remove the check for xl_len == 0 in ReadRecord, below.
701          */
702         if (len == 0)
703                 elog(PANIC, "invalid xlog record length %u", len);
704
705         START_CRIT_SECTION();
706
707         /* update LogwrtResult before doing cache fill check */
708         {
709                 /* use volatile pointer to prevent code rearrangement */
710                 volatile XLogCtlData *xlogctl = XLogCtl;
711
712                 SpinLockAcquire(&xlogctl->info_lck);
713                 LogwrtRqst = xlogctl->LogwrtRqst;
714                 LogwrtResult = xlogctl->LogwrtResult;
715                 SpinLockRelease(&xlogctl->info_lck);
716         }
717
718         /*
719          * If cache is half filled then try to acquire write lock and do
720          * XLogWrite. Ignore any fractional blocks in performing this check.
721          */
722         LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % XLOG_BLCKSZ;
723         if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
724                 (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
725                  XLogCtl->XLogCacheByte / 2))
726         {
727                 if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
728                 {
729                         /*
730                          * Since the amount of data we write here is completely optional
731                          * anyway, tell XLogWrite it can be "flexible" and stop at a
732                          * convenient boundary.  This allows writes triggered by this
733                          * mechanism to synchronize with the cache boundaries, so that in
734                          * a long transaction we'll basically dump alternating halves of
735                          * the buffer array.
736                          */
737                         LogwrtResult = XLogCtl->Write.LogwrtResult;
738                         if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
739                                 XLogWrite(LogwrtRqst, true);
740                         LWLockRelease(WALWriteLock);
741                 }
742         }
743
744         /* Now wait to get insert lock */
745         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
746
747         /*
748          * Check to see if my RedoRecPtr is out of date.  If so, may have to go
749          * back and recompute everything.  This can only happen just after a
750          * checkpoint, so it's better to be slow in this case and fast otherwise.
751          *
752          * If we aren't doing full-page writes then RedoRecPtr doesn't actually
753          * affect the contents of the XLOG record, so we'll update our local
754          * copy but not force a recomputation.
755          */
756         if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
757         {
758                 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
759                 RedoRecPtr = Insert->RedoRecPtr;
760
761                 if (doPageWrites)
762                 {
763                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
764                         {
765                                 if (dtbuf[i] == InvalidBuffer)
766                                         continue;
767                                 if (dtbuf_bkp[i] == false &&
768                                         XLByteLE(dtbuf_lsn[i], RedoRecPtr))
769                                 {
770                                         /*
771                                          * Oops, this buffer now needs to be backed up, but we
772                                          * didn't think so above.  Start over.
773                                          */
774                                         LWLockRelease(WALInsertLock);
775                                         END_CRIT_SECTION();
776                                         goto begin;
777                                 }
778                         }
779                 }
780         }
781
782         /*
783          * Also check to see if forcePageWrites was just turned on; if we
784          * weren't already doing full-page writes then go back and recompute.
785          * (If it was just turned off, we could recompute the record without
786          * full pages, but we choose not to bother.)
787          */
788         if (Insert->forcePageWrites && !doPageWrites)
789         {
790                 /* Oops, must redo it with full-page data */
791                 LWLockRelease(WALInsertLock);
792                 END_CRIT_SECTION();
793                 goto begin;
794         }
795
796         /*
797          * Make additional rdata chain entries for the backup blocks, so that we
798          * don't need to special-case them in the write loop.  Note that we have
799          * now irrevocably changed the input rdata chain.  At the exit of this
800          * loop, write_len includes the backup block data.
801          *
802          * Also set the appropriate info bits to show which buffers were backed
803          * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
804          * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
805          */
806         write_len = len;
807         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
808         {
809                 BkpBlock   *bkpb;
810                 char       *page;
811
812                 if (!dtbuf_bkp[i])
813                         continue;
814
815                 info |= XLR_SET_BKP_BLOCK(i);
816
817                 bkpb = &(dtbuf_xlg[i]);
818                 page = (char *) BufferGetBlock(dtbuf[i]);
819
820                 rdt->next = &(dtbuf_rdt1[i]);
821                 rdt = rdt->next;
822
823                 rdt->data = (char *) bkpb;
824                 rdt->len = sizeof(BkpBlock);
825                 write_len += sizeof(BkpBlock);
826
827                 rdt->next = &(dtbuf_rdt2[i]);
828                 rdt = rdt->next;
829
830                 if (bkpb->hole_length == 0)
831                 {
832                         rdt->data = page;
833                         rdt->len = BLCKSZ;
834                         write_len += BLCKSZ;
835                         rdt->next = NULL;
836                 }
837                 else
838                 {
839                         /* must skip the hole */
840                         rdt->data = page;
841                         rdt->len = bkpb->hole_offset;
842                         write_len += bkpb->hole_offset;
843
844                         rdt->next = &(dtbuf_rdt3[i]);
845                         rdt = rdt->next;
846
847                         rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
848                         rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
849                         write_len += rdt->len;
850                         rdt->next = NULL;
851                 }
852         }
853
854         /*
855          * If there isn't enough space on the current XLOG page for a record
856          * header, advance to the next page (leaving the unused space as zeroes).
857          */
858         updrqst = false;
859         freespace = INSERT_FREESPACE(Insert);
860         if (freespace < SizeOfXLogRecord)
861         {
862                 updrqst = AdvanceXLInsertBuffer();
863                 freespace = INSERT_FREESPACE(Insert);
864         }
865
866         curridx = Insert->curridx;
867         record = (XLogRecord *) Insert->currpos;
868
869         /* Insert record header */
870
871         record->xl_prev = Insert->PrevRecord;
872         record->xl_xid = GetCurrentTransactionIdIfAny();
873         record->xl_tot_len = SizeOfXLogRecord + write_len;
874         record->xl_len = len;           /* doesn't include backup blocks */
875         record->xl_info = info;
876         record->xl_rmid = rmid;
877
878         /* Now we can finish computing the record's CRC */
879         COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
880                            SizeOfXLogRecord - sizeof(pg_crc32));
881         FIN_CRC32(rdata_crc);
882         record->xl_crc = rdata_crc;
883
884         /* Compute record's XLOG location */
885         INSERT_RECPTR(RecPtr, Insert, curridx);
886
887 #ifdef WAL_DEBUG
888         if (XLOG_DEBUG)
889         {
890                 StringInfoData  buf;
891
892                 initStringInfo(&buf);
893                 appendStringInfo(&buf, "INSERT @ %X/%X: ", 
894                                                         RecPtr.xlogid, RecPtr.xrecoff);
895                 xlog_outrec(&buf, record);
896                 if (rdata->data != NULL)
897                 {
898                         appendStringInfo(&buf, " - ");
899                         RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
900                 }
901                 elog(LOG, "%s", buf.data);
902                 pfree(buf.data);
903         }
904 #endif
905
906         /* Record begin of record in appropriate places */
907         if (!no_tran)
908                 MyLastRecPtr = RecPtr;
909         ProcLastRecPtr = RecPtr;
910         Insert->PrevRecord = RecPtr;
911         MyXactMadeXLogEntry = true;
912
913         Insert->currpos += SizeOfXLogRecord;
914         freespace -= SizeOfXLogRecord;
915
916         /*
917          * Append the data, including backup blocks if any
918          */
919         while (write_len)
920         {
921                 while (rdata->data == NULL)
922                         rdata = rdata->next;
923
924                 if (freespace > 0)
925                 {
926                         if (rdata->len > freespace)
927                         {
928                                 memcpy(Insert->currpos, rdata->data, freespace);
929                                 rdata->data += freespace;
930                                 rdata->len -= freespace;
931                                 write_len -= freespace;
932                         }
933                         else
934                         {
935                                 memcpy(Insert->currpos, rdata->data, rdata->len);
936                                 freespace -= rdata->len;
937                                 write_len -= rdata->len;
938                                 Insert->currpos += rdata->len;
939                                 rdata = rdata->next;
940                                 continue;
941                         }
942                 }
943
944                 /* Use next buffer */
945                 updrqst = AdvanceXLInsertBuffer();
946                 curridx = Insert->curridx;
947                 /* Insert cont-record header */
948                 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
949                 contrecord = (XLogContRecord *) Insert->currpos;
950                 contrecord->xl_rem_len = write_len;
951                 Insert->currpos += SizeOfXLogContRecord;
952                 freespace = INSERT_FREESPACE(Insert);
953         }
954
955         /* Ensure next record will be properly aligned */
956         Insert->currpos = (char *) Insert->currpage +
957                 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
958         freespace = INSERT_FREESPACE(Insert);
959
960         /*
961          * The recptr I return is the beginning of the *next* record. This will be
962          * stored as LSN for changed data pages...
963          */
964         INSERT_RECPTR(RecPtr, Insert, curridx);
965
966         /* Need to update shared LogwrtRqst if some block was filled up */
967         if (freespace < SizeOfXLogRecord)
968                 updrqst = true;                 /* curridx is filled and available for writing
969                                                                  * out */
970         else
971                 curridx = PrevBufIdx(curridx);
972         WriteRqst = XLogCtl->xlblocks[curridx];
973
974         LWLockRelease(WALInsertLock);
975
976         if (updrqst)
977         {
978                 /* use volatile pointer to prevent code rearrangement */
979                 volatile XLogCtlData *xlogctl = XLogCtl;
980
981                 SpinLockAcquire(&xlogctl->info_lck);
982                 /* advance global request to include new block(s) */
983                 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
984                         xlogctl->LogwrtRqst.Write = WriteRqst;
985                 /* update local result copy while I have the chance */
986                 LogwrtResult = xlogctl->LogwrtResult;
987                 SpinLockRelease(&xlogctl->info_lck);
988         }
989
990         ProcLastRecEnd = RecPtr;
991
992         END_CRIT_SECTION();
993
994         return RecPtr;
995 }
996
997 /*
998  * Determine whether the buffer referenced by an XLogRecData item has to
999  * be backed up, and if so fill a BkpBlock struct for it.  In any case
1000  * save the buffer's LSN at *lsn.
1001  */
1002 static bool
1003 XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
1004                                 XLogRecPtr *lsn, BkpBlock *bkpb)
1005 {
1006         PageHeader      page;
1007
1008         page = (PageHeader) BufferGetBlock(rdata->buffer);
1009
1010         /*
1011          * XXX We assume page LSN is first data on *every* page that can be passed
1012          * to XLogInsert, whether it otherwise has the standard page layout or
1013          * not.
1014          */
1015         *lsn = page->pd_lsn;
1016
1017         if (doPageWrites &&
1018                 XLByteLE(page->pd_lsn, RedoRecPtr))
1019         {
1020                 /*
1021                  * The page needs to be backed up, so set up *bkpb
1022                  */
1023                 bkpb->node = BufferGetFileNode(rdata->buffer);
1024                 bkpb->block = BufferGetBlockNumber(rdata->buffer);
1025
1026                 if (rdata->buffer_std)
1027                 {
1028                         /* Assume we can omit data between pd_lower and pd_upper */
1029                         uint16          lower = page->pd_lower;
1030                         uint16          upper = page->pd_upper;
1031
1032                         if (lower >= SizeOfPageHeaderData &&
1033                                 upper > lower &&
1034                                 upper <= BLCKSZ)
1035                         {
1036                                 bkpb->hole_offset = lower;
1037                                 bkpb->hole_length = upper - lower;
1038                         }
1039                         else
1040                         {
1041                                 /* No "hole" to compress out */
1042                                 bkpb->hole_offset = 0;
1043                                 bkpb->hole_length = 0;
1044                         }
1045                 }
1046                 else
1047                 {
1048                         /* Not a standard page header, don't try to eliminate "hole" */
1049                         bkpb->hole_offset = 0;
1050                         bkpb->hole_length = 0;
1051                 }
1052
1053                 return true;                    /* buffer requires backup */
1054         }
1055
1056         return false;                           /* buffer does not need to be backed up */
1057 }
1058
1059 /*
1060  * XLogArchiveNotify
1061  *
1062  * Create an archive notification file
1063  *
1064  * The name of the notification file is the message that will be picked up
1065  * by the archiver, e.g. we write 0000000100000001000000C6.ready
1066  * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
1067  * then when complete, rename it to 0000000100000001000000C6.done
1068  */
1069 static void
1070 XLogArchiveNotify(const char *xlog)
1071 {
1072         char            archiveStatusPath[MAXPGPATH];
1073         FILE       *fd;
1074
1075         /* insert an otherwise empty file called <XLOG>.ready */
1076         StatusFilePath(archiveStatusPath, xlog, ".ready");
1077         fd = AllocateFile(archiveStatusPath, "w");
1078         if (fd == NULL)
1079         {
1080                 ereport(LOG,
1081                                 (errcode_for_file_access(),
1082                                  errmsg("could not create archive status file \"%s\": %m",
1083                                                 archiveStatusPath)));
1084                 return;
1085         }
1086         if (FreeFile(fd))
1087         {
1088                 ereport(LOG,
1089                                 (errcode_for_file_access(),
1090                                  errmsg("could not write archive status file \"%s\": %m",
1091                                                 archiveStatusPath)));
1092                 return;
1093         }
1094
1095         /* Notify archiver that it's got something to do */
1096         if (IsUnderPostmaster)
1097                 SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);
1098 }
1099
1100 /*
1101  * Convenience routine to notify using log/seg representation of filename
1102  */
1103 static void
1104 XLogArchiveNotifySeg(uint32 log, uint32 seg)
1105 {
1106         char            xlog[MAXFNAMELEN];
1107
1108         XLogFileName(xlog, ThisTimeLineID, log, seg);
1109         XLogArchiveNotify(xlog);
1110 }
1111
1112 /*
1113  * XLogArchiveCheckDone
1114  *
1115  * This is called when we are ready to delete or recycle an old XLOG segment
1116  * file or backup history file.  If it is okay to delete it then return true.
1117  * If it is not time to delete it, make sure a .ready file exists, and return
1118  * false.
1119  *
1120  * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists,
1121  * then return false; else create <XLOG>.ready and return false.
1122  *
1123  * The reason we do things this way is so that if the original attempt to
1124  * create <XLOG>.ready fails, we'll retry during subsequent checkpoints.
1125  */
1126 static bool
1127 XLogArchiveCheckDone(const char *xlog)
1128 {
1129         char            archiveStatusPath[MAXPGPATH];
1130         struct stat stat_buf;
1131
1132         /* Always deletable if archiving is off */
1133         if (!XLogArchivingActive())
1134                 return true;
1135
1136         /* First check for .done --- this means archiver is done with it */
1137         StatusFilePath(archiveStatusPath, xlog, ".done");
1138         if (stat(archiveStatusPath, &stat_buf) == 0)
1139                 return true;
1140
1141         /* check for .ready --- this means archiver is still busy with it */
1142         StatusFilePath(archiveStatusPath, xlog, ".ready");
1143         if (stat(archiveStatusPath, &stat_buf) == 0)
1144                 return false;
1145
1146         /* Race condition --- maybe archiver just finished, so recheck */
1147         StatusFilePath(archiveStatusPath, xlog, ".done");
1148         if (stat(archiveStatusPath, &stat_buf) == 0)
1149                 return true;
1150
1151         /* Retry creation of the .ready file */
1152         XLogArchiveNotify(xlog);
1153         return false;
1154 }
1155
1156 /*
1157  * XLogArchiveCleanup
1158  *
1159  * Cleanup archive notification file(s) for a particular xlog segment
1160  */
1161 static void
1162 XLogArchiveCleanup(const char *xlog)
1163 {
1164         char            archiveStatusPath[MAXPGPATH];
1165
1166         /* Remove the .done file */
1167         StatusFilePath(archiveStatusPath, xlog, ".done");
1168         unlink(archiveStatusPath);
1169         /* should we complain about failure? */
1170
1171         /* Remove the .ready file if present --- normally it shouldn't be */
1172         StatusFilePath(archiveStatusPath, xlog, ".ready");
1173         unlink(archiveStatusPath);
1174         /* should we complain about failure? */
1175 }
1176
1177 /*
1178  * Advance the Insert state to the next buffer page, writing out the next
1179  * buffer if it still contains unwritten data.
1180  *
1181  * The global LogwrtRqst.Write pointer needs to be advanced to include the
1182  * just-filled page.  If we can do this for free (without an extra lock),
1183  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
1184  * request update still needs to be done, FALSE if we did it internally.
1185  *
1186  * Must be called with WALInsertLock held.
1187  */
1188 static bool
1189 AdvanceXLInsertBuffer(void)
1190 {
1191         XLogCtlInsert *Insert = &XLogCtl->Insert;
1192         XLogCtlWrite *Write = &XLogCtl->Write;
1193         int                     nextidx = NextBufIdx(Insert->curridx);
1194         bool            update_needed = true;
1195         XLogRecPtr      OldPageRqstPtr;
1196         XLogwrtRqst WriteRqst;
1197         XLogRecPtr      NewPageEndPtr;
1198         XLogPageHeader NewPage;
1199
1200         /* Use Insert->LogwrtResult copy if it's more fresh */
1201         if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
1202                 LogwrtResult = Insert->LogwrtResult;
1203
1204         /*
1205          * Get ending-offset of the buffer page we need to replace (this may be
1206          * zero if the buffer hasn't been used yet).  Fall through if it's already
1207          * written out.
1208          */
1209         OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
1210         if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1211         {
1212                 /* nope, got work to do... */
1213                 XLogRecPtr      FinishedPageRqstPtr;
1214
1215                 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1216
1217                 /* Before waiting, get info_lck and update LogwrtResult */
1218                 {
1219                         /* use volatile pointer to prevent code rearrangement */
1220                         volatile XLogCtlData *xlogctl = XLogCtl;
1221
1222                         SpinLockAcquire(&xlogctl->info_lck);
1223                         if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
1224                                 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
1225                         LogwrtResult = xlogctl->LogwrtResult;
1226                         SpinLockRelease(&xlogctl->info_lck);
1227                 }
1228
1229                 update_needed = false;  /* Did the shared-request update */
1230
1231                 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1232                 {
1233                         /* OK, someone wrote it already */
1234                         Insert->LogwrtResult = LogwrtResult;
1235                 }
1236                 else
1237                 {
1238                         /* Must acquire write lock */
1239                         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1240                         LogwrtResult = Write->LogwrtResult;
1241                         if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1242                         {
1243                                 /* OK, someone wrote it already */
1244                                 LWLockRelease(WALWriteLock);
1245                                 Insert->LogwrtResult = LogwrtResult;
1246                         }
1247                         else
1248                         {
1249                                 /*
1250                                  * Have to write buffers while holding insert lock. This is
1251                                  * not good, so only write as much as we absolutely must.
1252                                  */
1253                                 WriteRqst.Write = OldPageRqstPtr;
1254                                 WriteRqst.Flush.xlogid = 0;
1255                                 WriteRqst.Flush.xrecoff = 0;
1256                                 XLogWrite(WriteRqst, false);
1257                                 LWLockRelease(WALWriteLock);
1258                                 Insert->LogwrtResult = LogwrtResult;
1259                         }
1260                 }
1261         }
1262
1263         /*
1264          * Now the next buffer slot is free and we can set it up to be the next
1265          * output page.
1266          */
1267         NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
1268         if (NewPageEndPtr.xrecoff >= XLogFileSize)
1269         {
1270                 /* crossing a logid boundary */
1271                 NewPageEndPtr.xlogid += 1;
1272                 NewPageEndPtr.xrecoff = XLOG_BLCKSZ;
1273         }
1274         else
1275                 NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
1276         XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
1277         NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
1278
1279         Insert->curridx = nextidx;
1280         Insert->currpage = NewPage;
1281
1282         Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
1283
1284         /*
1285          * Be sure to re-zero the buffer so that bytes beyond what we've written
1286          * will look like zeroes and not valid XLOG records...
1287          */
1288         MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
1289
1290         /*
1291          * Fill the new page's header
1292          */
1293         NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
1294
1295         /* NewPage->xlp_info = 0; */    /* done by memset */
1296         NewPage   ->xlp_tli = ThisTimeLineID;
1297         NewPage   ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
1298         NewPage   ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - XLOG_BLCKSZ;
1299
1300         /*
1301          * If first page of an XLOG segment file, make it a long header.
1302          */
1303         if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
1304         {
1305                 XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
1306
1307                 NewLongPage->xlp_sysid = ControlFile->system_identifier;
1308                 NewLongPage->xlp_seg_size = XLogSegSize;
1309                 NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
1310                 NewPage   ->xlp_info |= XLP_LONG_HEADER;
1311
1312                 Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
1313         }
1314
1315         return update_needed;
1316 }
1317
1318 /*
1319  * Write and/or fsync the log at least as far as WriteRqst indicates.
1320  *
1321  * If flexible == TRUE, we don't have to write as far as WriteRqst, but
1322  * may stop at any convenient boundary (such as a cache or logfile boundary).
1323  * This option allows us to avoid uselessly issuing multiple writes when a
1324  * single one would do.
1325  *
1326  * Must be called with WALWriteLock held.
1327  */
1328 static void
1329 XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
1330 {
1331         XLogCtlWrite *Write = &XLogCtl->Write;
1332         bool            ispartialpage;
1333         bool            finishing_seg;
1334         bool            use_existent;
1335         int                     curridx;
1336         int                     npages;
1337         int                     startidx;
1338         uint32          startoffset;
1339
1340         /* We should always be inside a critical section here */
1341         Assert(CritSectionCount > 0);
1342
1343         /*
1344          * Update local LogwrtResult (caller probably did this already, but...)
1345          */
1346         LogwrtResult = Write->LogwrtResult;
1347
1348         /*
1349          * Since successive pages in the xlog cache are consecutively allocated,
1350          * we can usually gather multiple pages together and issue just one
1351          * write() call.  npages is the number of pages we have determined can be
1352          * written together; startidx is the cache block index of the first one,
1353          * and startoffset is the file offset at which it should go. The latter
1354          * two variables are only valid when npages > 0, but we must initialize
1355          * all of them to keep the compiler quiet.
1356          */
1357         npages = 0;
1358         startidx = 0;
1359         startoffset = 0;
1360
1361         /*
1362          * Within the loop, curridx is the cache block index of the page to
1363          * consider writing.  We advance Write->curridx only after successfully
1364          * writing pages.  (Right now, this refinement is useless since we are
1365          * going to PANIC if any error occurs anyway; but someday it may come in
1366          * useful.)
1367          */
1368         curridx = Write->curridx;
1369
1370         while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1371         {
1372                 /*
1373                  * Make sure we're not ahead of the insert process.  This could happen
1374                  * if we're passed a bogus WriteRqst.Write that is past the end of the
1375                  * last page that's been initialized by AdvanceXLInsertBuffer.
1376                  */
1377                 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
1378                         elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
1379                                  LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1380                                  XLogCtl->xlblocks[curridx].xlogid,
1381                                  XLogCtl->xlblocks[curridx].xrecoff);
1382
1383                 /* Advance LogwrtResult.Write to end of current buffer page */
1384                 LogwrtResult.Write = XLogCtl->xlblocks[curridx];
1385                 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1386
1387                 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1388                 {
1389                         /*
1390                          * Switch to new logfile segment.  We cannot have any pending
1391                          * pages here (since we dump what we have at segment end).
1392                          */
1393                         Assert(npages == 0);
1394                         if (openLogFile >= 0)
1395                                 XLogFileClose();
1396                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1397
1398                         /* create/use new log file */
1399                         use_existent = true;
1400                         openLogFile = XLogFileInit(openLogId, openLogSeg,
1401                                                                            &use_existent, true);
1402                         openLogOff = 0;
1403
1404                         /* update pg_control, unless someone else already did */
1405                         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1406                         if (ControlFile->logId < openLogId ||
1407                                 (ControlFile->logId == openLogId &&
1408                                  ControlFile->logSeg < openLogSeg + 1))
1409                         {
1410                                 ControlFile->logId = openLogId;
1411                                 ControlFile->logSeg = openLogSeg + 1;
1412                                 ControlFile->time = time(NULL);
1413                                 UpdateControlFile();
1414
1415                                 /*
1416                                  * Signal bgwriter to start a checkpoint if it's been too long
1417                                  * since the last one.  (We look at local copy of RedoRecPtr
1418                                  * which might be a little out of date, but should be close
1419                                  * enough for this purpose.)
1420                                  *
1421                                  * A straight computation of segment number could overflow 32
1422                                  * bits.  Rather than assuming we have working 64-bit
1423                                  * arithmetic, we compare the highest-order bits separately,
1424                                  * and force a checkpoint immediately when they change.
1425                                  */
1426                                 if (IsUnderPostmaster)
1427                                 {
1428                                         uint32          old_segno,
1429                                                                 new_segno;
1430                                         uint32          old_highbits,
1431                                                                 new_highbits;
1432
1433                                         old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
1434                                                 (RedoRecPtr.xrecoff / XLogSegSize);
1435                                         old_highbits = RedoRecPtr.xlogid / XLogSegSize;
1436                                         new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile +
1437                                                 openLogSeg;
1438                                         new_highbits = openLogId / XLogSegSize;
1439                                         if (new_highbits != old_highbits ||
1440                                                 new_segno >= old_segno + (uint32) CheckPointSegments)
1441                                         {
1442 #ifdef WAL_DEBUG
1443                                                 if (XLOG_DEBUG)
1444                                                         elog(LOG, "time for a checkpoint, signaling bgwriter");
1445 #endif
1446                                                 RequestCheckpoint(false, true);
1447                                         }
1448                                 }
1449                         }
1450                         LWLockRelease(ControlFileLock);
1451                 }
1452
1453                 /* Make sure we have the current logfile open */
1454                 if (openLogFile < 0)
1455                 {
1456                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1457                         openLogFile = XLogFileOpen(openLogId, openLogSeg);
1458                         openLogOff = 0;
1459                 }
1460
1461                 /* Add current page to the set of pending pages-to-dump */
1462                 if (npages == 0)
1463                 {
1464                         /* first of group */
1465                         startidx = curridx;
1466                         startoffset = (LogwrtResult.Write.xrecoff - XLOG_BLCKSZ) % XLogSegSize;
1467                 }
1468                 npages++;
1469
1470                 /*
1471                  * Dump the set if this will be the last loop iteration, or if we are
1472                  * at the last page of the cache area (since the next page won't be
1473                  * contiguous in memory), or if we are at the end of the logfile
1474                  * segment.
1475                  */
1476                 finishing_seg = !ispartialpage &&
1477                         (startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
1478
1479                 if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) ||
1480                         curridx == XLogCtl->XLogCacheBlck ||
1481                         finishing_seg)
1482                 {
1483                         char       *from;
1484                         Size            nbytes;
1485
1486                         /* Need to seek in the file? */
1487                         if (openLogOff != startoffset)
1488                         {
1489                                 if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
1490                                         ereport(PANIC,
1491                                                         (errcode_for_file_access(),
1492                                                          errmsg("could not seek in log file %u, "
1493                                                                         "segment %u to offset %u: %m",
1494                                                                         openLogId, openLogSeg, startoffset)));
1495                                 openLogOff = startoffset;
1496                         }
1497
1498                         /* OK to write the page(s) */
1499                         from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
1500                         nbytes = npages * (Size) XLOG_BLCKSZ;
1501                         errno = 0;
1502                         if (write(openLogFile, from, nbytes) != nbytes)
1503                         {
1504                                 /* if write didn't set errno, assume no disk space */
1505                                 if (errno == 0)
1506                                         errno = ENOSPC;
1507                                 ereport(PANIC,
1508                                                 (errcode_for_file_access(),
1509                                                  errmsg("could not write to log file %u, segment %u "
1510                                                                 "at offset %u, length %lu: %m",
1511                                                                 openLogId, openLogSeg,
1512                                                                 openLogOff, (unsigned long) nbytes)));
1513                         }
1514
1515                         /* Update state for write */
1516                         openLogOff += nbytes;
1517                         Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
1518                         npages = 0;
1519
1520                         /*
1521                          * If we just wrote the whole last page of a logfile segment,
1522                          * fsync the segment immediately.  This avoids having to go back
1523                          * and re-open prior segments when an fsync request comes along
1524                          * later. Doing it here ensures that one and only one backend will
1525                          * perform this fsync.
1526                          *
1527                          * This is also the right place to notify the Archiver that the
1528                          * segment is ready to copy to archival storage.
1529                          */
1530                         if (finishing_seg)
1531                         {
1532                                 issue_xlog_fsync();
1533                                 LogwrtResult.Flush = LogwrtResult.Write;                /* end of page */
1534
1535                                 if (XLogArchivingActive())
1536                                         XLogArchiveNotifySeg(openLogId, openLogSeg);
1537                         }
1538                 }
1539
1540                 if (ispartialpage)
1541                 {
1542                         /* Only asked to write a partial page */
1543                         LogwrtResult.Write = WriteRqst.Write;
1544                         break;
1545                 }
1546                 curridx = NextBufIdx(curridx);
1547
1548                 /* If flexible, break out of loop as soon as we wrote something */
1549                 if (flexible && npages == 0)
1550                         break;
1551         }
1552
1553         Assert(npages == 0);
1554         Assert(curridx == Write->curridx);
1555
1556         /*
1557          * If asked to flush, do so
1558          */
1559         if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1560                 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1561         {
1562                 /*
1563                  * Could get here without iterating above loop, in which case we might
1564                  * have no open file or the wrong one.  However, we do not need to
1565                  * fsync more than one file.
1566                  */
1567                 if (sync_method != SYNC_METHOD_OPEN)
1568                 {
1569                         if (openLogFile >= 0 &&
1570                                 !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1571                                 XLogFileClose();
1572                         if (openLogFile < 0)
1573                         {
1574                                 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1575                                 openLogFile = XLogFileOpen(openLogId, openLogSeg);
1576                                 openLogOff = 0;
1577                         }
1578                         issue_xlog_fsync();
1579                 }
1580                 LogwrtResult.Flush = LogwrtResult.Write;
1581         }
1582
1583         /*
1584          * Update shared-memory status
1585          *
1586          * We make sure that the shared 'request' values do not fall behind the
1587          * 'result' values.  This is not absolutely essential, but it saves some
1588          * code in a couple of places.
1589          */
1590         {
1591                 /* use volatile pointer to prevent code rearrangement */
1592                 volatile XLogCtlData *xlogctl = XLogCtl;
1593
1594                 SpinLockAcquire(&xlogctl->info_lck);
1595                 xlogctl->LogwrtResult = LogwrtResult;
1596                 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1597                         xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1598                 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1599                         xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1600                 SpinLockRelease(&xlogctl->info_lck);
1601         }
1602
1603         Write->LogwrtResult = LogwrtResult;
1604 }
1605
1606 /*
1607  * Ensure that all XLOG data through the given position is flushed to disk.
1608  *
1609  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1610  * already held, and we try to avoid acquiring it if possible.
1611  */
1612 void
1613 XLogFlush(XLogRecPtr record)
1614 {
1615         XLogRecPtr      WriteRqstPtr;
1616         XLogwrtRqst WriteRqst;
1617
1618         /* Disabled during REDO */
1619         if (InRedo)
1620                 return;
1621
1622         /* Quick exit if already known flushed */
1623         if (XLByteLE(record, LogwrtResult.Flush))
1624                 return;
1625
1626 #ifdef WAL_DEBUG
1627         if (XLOG_DEBUG)
1628                 elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
1629                          record.xlogid, record.xrecoff,
1630                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1631                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1632 #endif
1633
1634         START_CRIT_SECTION();
1635
1636         /*
1637          * Since fsync is usually a horribly expensive operation, we try to
1638          * piggyback as much data as we can on each fsync: if we see any more data
1639          * entered into the xlog buffer, we'll write and fsync that too, so that
1640          * the final value of LogwrtResult.Flush is as large as possible. This
1641          * gives us some chance of avoiding another fsync immediately after.
1642          */
1643
1644         /* initialize to given target; may increase below */
1645         WriteRqstPtr = record;
1646
1647         /* read LogwrtResult and update local state */
1648         {
1649                 /* use volatile pointer to prevent code rearrangement */
1650                 volatile XLogCtlData *xlogctl = XLogCtl;
1651
1652                 SpinLockAcquire(&xlogctl->info_lck);
1653                 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1654                         WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1655                 LogwrtResult = xlogctl->LogwrtResult;
1656                 SpinLockRelease(&xlogctl->info_lck);
1657         }
1658
1659         /* done already? */
1660         if (!XLByteLE(record, LogwrtResult.Flush))
1661         {
1662                 /* now wait for the write lock */
1663                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1664                 LogwrtResult = XLogCtl->Write.LogwrtResult;
1665                 if (!XLByteLE(record, LogwrtResult.Flush))
1666                 {
1667                         /* try to write/flush later additions to XLOG as well */
1668                         if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
1669                         {
1670                                 XLogCtlInsert *Insert = &XLogCtl->Insert;
1671                                 uint32          freespace = INSERT_FREESPACE(Insert);
1672
1673                                 if (freespace < SizeOfXLogRecord)               /* buffer is full */
1674                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1675                                 else
1676                                 {
1677                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1678                                         WriteRqstPtr.xrecoff -= freespace;
1679                                 }
1680                                 LWLockRelease(WALInsertLock);
1681                                 WriteRqst.Write = WriteRqstPtr;
1682                                 WriteRqst.Flush = WriteRqstPtr;
1683                         }
1684                         else
1685                         {
1686                                 WriteRqst.Write = WriteRqstPtr;
1687                                 WriteRqst.Flush = record;
1688                         }
1689                         XLogWrite(WriteRqst, false);
1690                 }
1691                 LWLockRelease(WALWriteLock);
1692         }
1693
1694         END_CRIT_SECTION();
1695
1696         /*
1697          * If we still haven't flushed to the request point then we have a
1698          * problem; most likely, the requested flush point is past end of XLOG.
1699          * This has been seen to occur when a disk page has a corrupted LSN.
1700          *
1701          * Formerly we treated this as a PANIC condition, but that hurts the
1702          * system's robustness rather than helping it: we do not want to take down
1703          * the whole system due to corruption on one data page.  In particular, if
1704          * the bad page is encountered again during recovery then we would be
1705          * unable to restart the database at all!  (This scenario has actually
1706          * happened in the field several times with 7.1 releases. Note that we
1707          * cannot get here while InRedo is true, but if the bad page is brought in
1708          * and marked dirty during recovery then CreateCheckPoint will try to
1709          * flush it at the end of recovery.)
1710          *
1711          * The current approach is to ERROR under normal conditions, but only
1712          * WARNING during recovery, so that the system can be brought up even if
1713          * there's a corrupt LSN.  Note that for calls from xact.c, the ERROR will
1714          * be promoted to PANIC since xact.c calls this routine inside a critical
1715          * section.  However, calls from bufmgr.c are not within critical sections
1716          * and so we will not force a restart for a bad LSN on a data page.
1717          */
1718         if (XLByteLT(LogwrtResult.Flush, record))
1719                 elog(InRecovery ? WARNING : ERROR,
1720                 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
1721                          record.xlogid, record.xrecoff,
1722                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1723 }
1724
1725 /*
1726  * Create a new XLOG file segment, or open a pre-existing one.
1727  *
1728  * log, seg: identify segment to be created/opened.
1729  *
1730  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1731  * pre-existing file will be deleted).  On return, TRUE if a pre-existing
1732  * file was used.
1733  *
1734  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1735  * place.  This should be TRUE except during bootstrap log creation.  The
1736  * caller must *not* hold the lock at call.
1737  *
1738  * Returns FD of opened file.
1739  *
1740  * Note: errors here are ERROR not PANIC because we might or might not be
1741  * inside a critical section (eg, during checkpoint there is no reason to
1742  * take down the system on failure).  They will promote to PANIC if we are
1743  * in a critical section.
1744  */
1745 static int
1746 XLogFileInit(uint32 log, uint32 seg,
1747                          bool *use_existent, bool use_lock)
1748 {
1749         char            path[MAXPGPATH];
1750         char            tmppath[MAXPGPATH];
1751         char            zbuffer[XLOG_BLCKSZ];
1752         uint32          installed_log;
1753         uint32          installed_seg;
1754         int                     max_advance;
1755         int                     fd;
1756         int                     nbytes;
1757
1758         XLogFilePath(path, ThisTimeLineID, log, seg);
1759
1760         /*
1761          * Try to use existent file (checkpoint maker may have created it already)
1762          */
1763         if (*use_existent)
1764         {
1765                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1766                                                    S_IRUSR | S_IWUSR);
1767                 if (fd < 0)
1768                 {
1769                         if (errno != ENOENT)
1770                                 ereport(ERROR,
1771                                                 (errcode_for_file_access(),
1772                                                  errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1773                                                                 path, log, seg)));
1774                 }
1775                 else
1776                         return fd;
1777         }
1778
1779         /*
1780          * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
1781          * another process is doing the same thing.  If so, we will end up
1782          * pre-creating an extra log segment.  That seems OK, and better than
1783          * holding the lock throughout this lengthy process.
1784          */
1785         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1786
1787         unlink(tmppath);
1788
1789         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1790         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1791                                            S_IRUSR | S_IWUSR);
1792         if (fd < 0)
1793                 ereport(ERROR,
1794                                 (errcode_for_file_access(),
1795                                  errmsg("could not create file \"%s\": %m", tmppath)));
1796
1797         /*
1798          * Zero-fill the file.  We have to do this the hard way to ensure that all
1799          * the file space has really been allocated --- on platforms that allow
1800          * "holes" in files, just seeking to the end doesn't allocate intermediate
1801          * space.  This way, we know that we have all the space and (after the
1802          * fsync below) that all the indirect blocks are down on disk.  Therefore,
1803          * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
1804          * log file.
1805          */
1806         MemSet(zbuffer, 0, sizeof(zbuffer));
1807         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
1808         {
1809                 errno = 0;
1810                 if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
1811                 {
1812                         int                     save_errno = errno;
1813
1814                         /*
1815                          * If we fail to make the file, delete it to release disk space
1816                          */
1817                         unlink(tmppath);
1818                         /* if write didn't set errno, assume problem is no disk space */
1819                         errno = save_errno ? save_errno : ENOSPC;
1820
1821                         ereport(ERROR,
1822                                         (errcode_for_file_access(),
1823                                          errmsg("could not write to file \"%s\": %m", tmppath)));
1824                 }
1825         }
1826
1827         if (pg_fsync(fd) != 0)
1828                 ereport(ERROR,
1829                                 (errcode_for_file_access(),
1830                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1831
1832         if (close(fd))
1833                 ereport(ERROR,
1834                                 (errcode_for_file_access(),
1835                                  errmsg("could not close file \"%s\": %m", tmppath)));
1836
1837         /*
1838          * Now move the segment into place with its final name.
1839          *
1840          * If caller didn't want to use a pre-existing file, get rid of any
1841          * pre-existing file.  Otherwise, cope with possibility that someone else
1842          * has created the file while we were filling ours: if so, use ours to
1843          * pre-create a future log segment.
1844          */
1845         installed_log = log;
1846         installed_seg = seg;
1847         max_advance = XLOGfileslop;
1848         if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
1849                                                                 *use_existent, &max_advance,
1850                                                                 use_lock))
1851         {
1852                 /* No need for any more future segments... */
1853                 unlink(tmppath);
1854         }
1855
1856         /* Set flag to tell caller there was no existent file */
1857         *use_existent = false;
1858
1859         /* Now open original target segment (might not be file I just made) */
1860         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1861                                            S_IRUSR | S_IWUSR);
1862         if (fd < 0)
1863                 ereport(ERROR,
1864                                 (errcode_for_file_access(),
1865                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1866                                   path, log, seg)));
1867
1868         return fd;
1869 }
1870
1871 /*
1872  * Create a new XLOG file segment by copying a pre-existing one.
1873  *
1874  * log, seg: identify segment to be created.
1875  *
1876  * srcTLI, srclog, srcseg: identify segment to be copied (could be from
1877  *              a different timeline)
1878  *
1879  * Currently this is only used during recovery, and so there are no locking
1880  * considerations.      But we should be just as tense as XLogFileInit to avoid
1881  * emplacing a bogus file.
1882  */
1883 static void
1884 XLogFileCopy(uint32 log, uint32 seg,
1885                          TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
1886 {
1887         char            path[MAXPGPATH];
1888         char            tmppath[MAXPGPATH];
1889         char            buffer[XLOG_BLCKSZ];
1890         int                     srcfd;
1891         int                     fd;
1892         int                     nbytes;
1893
1894         /*
1895          * Open the source file
1896          */
1897         XLogFilePath(path, srcTLI, srclog, srcseg);
1898         srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1899         if (srcfd < 0)
1900                 ereport(ERROR,
1901                                 (errcode_for_file_access(),
1902                                  errmsg("could not open file \"%s\": %m", path)));
1903
1904         /*
1905          * Copy into a temp file name.
1906          */
1907         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1908
1909         unlink(tmppath);
1910
1911         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1912         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1913                                            S_IRUSR | S_IWUSR);
1914         if (fd < 0)
1915                 ereport(ERROR,
1916                                 (errcode_for_file_access(),
1917                                  errmsg("could not create file \"%s\": %m", tmppath)));
1918
1919         /*
1920          * Do the data copying.
1921          */
1922         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
1923         {
1924                 errno = 0;
1925                 if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
1926                 {
1927                         if (errno != 0)
1928                                 ereport(ERROR,
1929                                                 (errcode_for_file_access(),
1930                                                  errmsg("could not read file \"%s\": %m", path)));
1931                         else
1932                                 ereport(ERROR,
1933                                                 (errmsg("not enough data in file \"%s\"", path)));
1934                 }
1935                 errno = 0;
1936                 if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
1937                 {
1938                         int                     save_errno = errno;
1939
1940                         /*
1941                          * If we fail to make the file, delete it to release disk space
1942                          */
1943                         unlink(tmppath);
1944                         /* if write didn't set errno, assume problem is no disk space */
1945                         errno = save_errno ? save_errno : ENOSPC;
1946
1947                         ereport(ERROR,
1948                                         (errcode_for_file_access(),
1949                                          errmsg("could not write to file \"%s\": %m", tmppath)));
1950                 }
1951         }
1952
1953         if (pg_fsync(fd) != 0)
1954                 ereport(ERROR,
1955                                 (errcode_for_file_access(),
1956                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1957
1958         if (close(fd))
1959                 ereport(ERROR,
1960                                 (errcode_for_file_access(),
1961                                  errmsg("could not close file \"%s\": %m", tmppath)));
1962
1963         close(srcfd);
1964
1965         /*
1966          * Now move the segment into place with its final name.
1967          */
1968         if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
1969                 elog(ERROR, "InstallXLogFileSegment should not have failed");
1970 }
1971
1972 /*
1973  * Install a new XLOG segment file as a current or future log segment.
1974  *
1975  * This is used both to install a newly-created segment (which has a temp
1976  * filename while it's being created) and to recycle an old segment.
1977  *
1978  * *log, *seg: identify segment to install as (or first possible target).
1979  * When find_free is TRUE, these are modified on return to indicate the
1980  * actual installation location or last segment searched.
1981  *
1982  * tmppath: initial name of file to install.  It will be renamed into place.
1983  *
1984  * find_free: if TRUE, install the new segment at the first empty log/seg
1985  * number at or after the passed numbers.  If FALSE, install the new segment
1986  * exactly where specified, deleting any existing segment file there.
1987  *
1988  * *max_advance: maximum number of log/seg slots to advance past the starting
1989  * point.  Fail if no free slot is found in this range.  On return, reduced
1990  * by the number of slots skipped over.  (Irrelevant, and may be NULL,
1991  * when find_free is FALSE.)
1992  *
1993  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1994  * place.  This should be TRUE except during bootstrap log creation.  The
1995  * caller must *not* hold the lock at call.
1996  *
1997  * Returns TRUE if file installed, FALSE if not installed because of
1998  * exceeding max_advance limit.  (Any other kind of failure causes ereport().)
1999  */
2000 static bool
2001 InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
2002                                            bool find_free, int *max_advance,
2003                                            bool use_lock)
2004 {
2005         char            path[MAXPGPATH];
2006         struct stat stat_buf;
2007
2008         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2009
2010         /*
2011          * We want to be sure that only one process does this at a time.
2012          */
2013         if (use_lock)
2014                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
2015
2016         if (!find_free)
2017         {
2018                 /* Force installation: get rid of any pre-existing segment file */
2019                 unlink(path);
2020         }
2021         else
2022         {
2023                 /* Find a free slot to put it in */
2024                 while (stat(path, &stat_buf) == 0)
2025                 {
2026                         if (*max_advance <= 0)
2027                         {
2028                                 /* Failed to find a free slot within specified range */
2029                                 if (use_lock)
2030                                         LWLockRelease(ControlFileLock);
2031                                 return false;
2032                         }
2033                         NextLogSeg(*log, *seg);
2034                         (*max_advance)--;
2035                         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2036                 }
2037         }
2038
2039         /*
2040          * Prefer link() to rename() here just to be really sure that we don't
2041          * overwrite an existing logfile.  However, there shouldn't be one, so
2042          * rename() is an acceptable substitute except for the truly paranoid.
2043          */
2044 #if HAVE_WORKING_LINK
2045         if (link(tmppath, path) < 0)
2046                 ereport(ERROR,
2047                                 (errcode_for_file_access(),
2048                                  errmsg("could not link file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2049                                                 tmppath, path, *log, *seg)));
2050         unlink(tmppath);
2051 #else
2052         if (rename(tmppath, path) < 0)
2053                 ereport(ERROR,
2054                                 (errcode_for_file_access(),
2055                                  errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2056                                                 tmppath, path, *log, *seg)));
2057 #endif
2058
2059         if (use_lock)
2060                 LWLockRelease(ControlFileLock);
2061
2062         return true;
2063 }
2064
2065 /*
2066  * Open a pre-existing logfile segment for writing.
2067  */
2068 static int
2069 XLogFileOpen(uint32 log, uint32 seg)
2070 {
2071         char            path[MAXPGPATH];
2072         int                     fd;
2073
2074         XLogFilePath(path, ThisTimeLineID, log, seg);
2075
2076         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
2077                                            S_IRUSR | S_IWUSR);
2078         if (fd < 0)
2079                 ereport(PANIC,
2080                                 (errcode_for_file_access(),
2081                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2082                                   path, log, seg)));
2083
2084         return fd;
2085 }
2086
2087 /*
2088  * Open a logfile segment for reading (during recovery).
2089  */
2090 static int
2091 XLogFileRead(uint32 log, uint32 seg, int emode)
2092 {
2093         char            path[MAXPGPATH];
2094         char            xlogfname[MAXFNAMELEN];
2095         ListCell   *cell;
2096         int                     fd;
2097
2098         /*
2099          * Loop looking for a suitable timeline ID: we might need to read any of
2100          * the timelines listed in expectedTLIs.
2101          *
2102          * We expect curFileTLI on entry to be the TLI of the preceding file in
2103          * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
2104          * to go backwards; this prevents us from picking up the wrong file when a
2105          * parent timeline extends to higher segment numbers than the child we
2106          * want to read.
2107          */
2108         foreach(cell, expectedTLIs)
2109         {
2110                 TimeLineID      tli = (TimeLineID) lfirst_int(cell);
2111
2112                 if (tli < curFileTLI)
2113                         break;                          /* don't bother looking at too-old TLIs */
2114
2115                 if (InArchiveRecovery)
2116                 {
2117                         XLogFileName(xlogfname, tli, log, seg);
2118                         restoredFromArchive = RestoreArchivedFile(path, xlogfname,
2119                                                                                                           "RECOVERYXLOG",
2120                                                                                                           XLogSegSize);
2121                 }
2122                 else
2123                         XLogFilePath(path, tli, log, seg);
2124
2125                 fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2126                 if (fd >= 0)
2127                 {
2128                         /* Success! */
2129                         curFileTLI = tli;
2130                         return fd;
2131                 }
2132                 if (errno != ENOENT)    /* unexpected failure? */
2133                         ereport(PANIC,
2134                                         (errcode_for_file_access(),
2135                         errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2136                                    path, log, seg)));
2137         }
2138
2139         /* Couldn't find it.  For simplicity, complain about front timeline */
2140         XLogFilePath(path, recoveryTargetTLI, log, seg);
2141         errno = ENOENT;
2142         ereport(emode,
2143                         (errcode_for_file_access(),
2144                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2145                                   path, log, seg)));
2146         return -1;
2147 }
2148
2149 /*
2150  * Close the current logfile segment for writing.
2151  */
2152 static void
2153 XLogFileClose(void)
2154 {
2155         Assert(openLogFile >= 0);
2156
2157         /*
2158          * posix_fadvise is problematic on many platforms: on older x86 Linux
2159          * it just dumps core, and there are reports of problems on PPC platforms
2160          * as well.  The following is therefore disabled for the time being.
2161          * We could consider some kind of configure test to see if it's safe to
2162          * use, but since we lack hard evidence that there's any useful performance
2163          * gain to be had, spending time on that seems unprofitable for now.
2164          */
2165 #ifdef NOT_USED
2166
2167         /*
2168          * WAL segment files will not be re-read in normal operation, so we advise
2169          * OS to release any cached pages.  But do not do so if WAL archiving is
2170          * active, because archiver process could use the cache to read the WAL
2171          * segment.
2172          *
2173          * While O_DIRECT works for O_SYNC, posix_fadvise() works for fsync()
2174          * and O_SYNC, and some platforms only have posix_fadvise().
2175          */
2176 #if defined(HAVE_DECL_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
2177         if (!XLogArchivingActive())
2178                 posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
2179 #endif
2180
2181 #endif /* NOT_USED */
2182
2183         if (close(openLogFile))
2184                 ereport(PANIC,
2185                         (errcode_for_file_access(),
2186                         errmsg("could not close log file %u, segment %u: %m",
2187                                    openLogId, openLogSeg)));
2188         openLogFile = -1;
2189 }
2190
2191 /*
2192  * Attempt to retrieve the specified file from off-line archival storage.
2193  * If successful, fill "path" with its complete path (note that this will be
2194  * a temp file name that doesn't follow the normal naming convention), and
2195  * return TRUE.
2196  *
2197  * If not successful, fill "path" with the name of the normal on-line file
2198  * (which may or may not actually exist, but we'll try to use it), and return
2199  * FALSE.
2200  *
2201  * For fixed-size files, the caller may pass the expected size as an
2202  * additional crosscheck on successful recovery.  If the file size is not
2203  * known, set expectedSize = 0.
2204  */
2205 static bool
2206 RestoreArchivedFile(char *path, const char *xlogfname,
2207                                         const char *recovername, off_t expectedSize)
2208 {
2209         char            xlogpath[MAXPGPATH];
2210         char            xlogRestoreCmd[MAXPGPATH];
2211         char       *dp;
2212         char       *endp;
2213         const char *sp;
2214         int                     rc;
2215         struct stat stat_buf;
2216
2217         /*
2218          * When doing archive recovery, we always prefer an archived log file even
2219          * if a file of the same name exists in XLOGDIR.  The reason is that the
2220          * file in XLOGDIR could be an old, un-filled or partly-filled version
2221          * that was copied and restored as part of backing up $PGDATA.
2222          *
2223          * We could try to optimize this slightly by checking the local copy
2224          * lastchange timestamp against the archived copy, but we have no API to
2225          * do this, nor can we guarantee that the lastchange timestamp was
2226          * preserved correctly when we copied to archive. Our aim is robustness,
2227          * so we elect not to do this.
2228          *
2229          * If we cannot obtain the log file from the archive, however, we will try
2230          * to use the XLOGDIR file if it exists.  This is so that we can make use
2231          * of log segments that weren't yet transferred to the archive.
2232          *
2233          * Notice that we don't actually overwrite any files when we copy back
2234          * from archive because the recoveryRestoreCommand may inadvertently
2235          * restore inappropriate xlogs, or they may be corrupt, so we may wish to
2236          * fallback to the segments remaining in current XLOGDIR later. The
2237          * copy-from-archive filename is always the same, ensuring that we don't
2238          * run out of disk space on long recoveries.
2239          */
2240         snprintf(xlogpath, MAXPGPATH, XLOGDIR "/%s", recovername);
2241
2242         /*
2243          * Make sure there is no existing file named recovername.
2244          */
2245         if (stat(xlogpath, &stat_buf) != 0)
2246         {
2247                 if (errno != ENOENT)
2248                         ereport(FATAL,
2249                                         (errcode_for_file_access(),
2250                                          errmsg("could not stat file \"%s\": %m",
2251                                                         xlogpath)));
2252         }
2253         else
2254         {
2255                 if (unlink(xlogpath) != 0)
2256                         ereport(FATAL,
2257                                         (errcode_for_file_access(),
2258                                          errmsg("could not remove file \"%s\": %m",
2259                                                         xlogpath)));
2260         }
2261
2262         /*
2263          * construct the command to be executed
2264          */
2265         dp = xlogRestoreCmd;
2266         endp = xlogRestoreCmd + MAXPGPATH - 1;
2267         *endp = '\0';
2268
2269         for (sp = recoveryRestoreCommand; *sp; sp++)
2270         {
2271                 if (*sp == '%')
2272                 {
2273                         switch (sp[1])
2274                         {
2275                                 case 'p':
2276                                         /* %p: full path of target file */
2277                                         sp++;
2278                                         StrNCpy(dp, xlogpath, endp - dp);
2279                                         make_native_path(dp);
2280                                         dp += strlen(dp);
2281                                         break;
2282                                 case 'f':
2283                                         /* %f: filename of desired file */
2284                                         sp++;
2285                                         StrNCpy(dp, xlogfname, endp - dp);
2286                                         dp += strlen(dp);
2287                                         break;
2288                                 case '%':
2289                                         /* convert %% to a single % */
2290                                         sp++;
2291                                         if (dp < endp)
2292                                                 *dp++ = *sp;
2293                                         break;
2294                                 default:
2295                                         /* otherwise treat the % as not special */
2296                                         if (dp < endp)
2297                                                 *dp++ = *sp;
2298                                         break;
2299                         }
2300                 }
2301                 else
2302                 {
2303                         if (dp < endp)
2304                                 *dp++ = *sp;
2305                 }
2306         }
2307         *dp = '\0';
2308
2309         ereport(DEBUG3,
2310                         (errmsg_internal("executing restore command \"%s\"",
2311                                                          xlogRestoreCmd)));
2312
2313         /*
2314          * Copy xlog from archival storage to XLOGDIR
2315          */
2316         rc = system(xlogRestoreCmd);
2317         if (rc == 0)
2318         {
2319                 /*
2320                  * command apparently succeeded, but let's make sure the file is
2321                  * really there now and has the correct size.
2322                  *
2323                  * XXX I made wrong-size a fatal error to ensure the DBA would notice
2324                  * it, but is that too strong?  We could try to plow ahead with a
2325                  * local copy of the file ... but the problem is that there probably
2326                  * isn't one, and we'd incorrectly conclude we've reached the end of
2327                  * WAL and we're done recovering ...
2328                  */
2329                 if (stat(xlogpath, &stat_buf) == 0)
2330                 {
2331                         if (expectedSize > 0 && stat_buf.st_size != expectedSize)
2332                                 ereport(FATAL,
2333                                                 (errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
2334                                                                 xlogfname,
2335                                                                 (unsigned long) stat_buf.st_size,
2336                                                                 (unsigned long) expectedSize)));
2337                         else
2338                         {
2339                                 ereport(LOG,
2340                                                 (errmsg("restored log file \"%s\" from archive",
2341                                                                 xlogfname)));
2342                                 strcpy(path, xlogpath);
2343                                 return true;
2344                         }
2345                 }
2346                 else
2347                 {
2348                         /* stat failed */
2349                         if (errno != ENOENT)
2350                                 ereport(FATAL,
2351                                                 (errcode_for_file_access(),
2352                                                  errmsg("could not stat file \"%s\": %m",
2353                                                                 xlogpath)));
2354                 }
2355         }
2356
2357         /*
2358          * remember, we rollforward UNTIL the restore fails so failure here is
2359          * just part of the process... that makes it difficult to determine
2360          * whether the restore failed because there isn't an archive to restore,
2361          * or because the administrator has specified the restore program
2362          * incorrectly.  We have to assume the former.
2363          */
2364         ereport(DEBUG2,
2365                 (errmsg("could not restore file \"%s\" from archive: return code %d",
2366                                 xlogfname, rc)));
2367
2368         /*
2369          * if an archived file is not available, there might still be a version of
2370          * this file in XLOGDIR, so return that as the filename to open.
2371          *
2372          * In many recovery scenarios we expect this to fail also, but if so that
2373          * just means we've reached the end of WAL.
2374          */
2375         snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
2376         return false;
2377 }
2378
2379 /*
2380  * Preallocate log files beyond the specified log endpoint, according to
2381  * the XLOGfile user parameter.
2382  */
2383 static int
2384 PreallocXlogFiles(XLogRecPtr endptr)
2385 {
2386         int                     nsegsadded = 0;
2387         uint32          _logId;
2388         uint32          _logSeg;
2389         int                     lf;
2390         bool            use_existent;
2391
2392         XLByteToPrevSeg(endptr, _logId, _logSeg);
2393         if ((endptr.xrecoff - 1) % XLogSegSize >=
2394                 (uint32) (0.75 * XLogSegSize))
2395         {
2396                 NextLogSeg(_logId, _logSeg);
2397                 use_existent = true;
2398                 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
2399                 close(lf);
2400                 if (!use_existent)
2401                         nsegsadded++;
2402         }
2403         return nsegsadded;
2404 }
2405
2406 /*
2407  * Remove or move offline all log files older or equal to passed log/seg#
2408  *
2409  * endptr is current (or recent) end of xlog; this is used to determine
2410  * whether we want to recycle rather than delete no-longer-wanted log files.
2411  */
2412 static void
2413 MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
2414                                 int *nsegsremoved, int *nsegsrecycled)
2415 {
2416         uint32          endlogId;
2417         uint32          endlogSeg;
2418         int                     max_advance;
2419         DIR                *xldir;
2420         struct dirent *xlde;
2421         char            lastoff[MAXFNAMELEN];
2422         char            path[MAXPGPATH];
2423
2424         *nsegsremoved = 0;
2425         *nsegsrecycled = 0;
2426
2427         /*
2428          * Initialize info about where to try to recycle to.  We allow recycling
2429          * segments up to XLOGfileslop segments beyond the current XLOG location.
2430          */
2431         XLByteToPrevSeg(endptr, endlogId, endlogSeg);
2432         max_advance = XLOGfileslop;
2433
2434         xldir = AllocateDir(XLOGDIR);
2435         if (xldir == NULL)
2436                 ereport(ERROR,
2437                                 (errcode_for_file_access(),
2438                                  errmsg("could not open transaction log directory \"%s\": %m",
2439                                                 XLOGDIR)));
2440
2441         XLogFileName(lastoff, ThisTimeLineID, log, seg);
2442
2443         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2444         {
2445                 /*
2446                  * We ignore the timeline part of the XLOG segment identifiers in
2447                  * deciding whether a segment is still needed.  This ensures that we
2448                  * won't prematurely remove a segment from a parent timeline. We could
2449                  * probably be a little more proactive about removing segments of
2450                  * non-parent timelines, but that would be a whole lot more
2451                  * complicated.
2452                  *
2453                  * We use the alphanumeric sorting property of the filenames to decide
2454                  * which ones are earlier than the lastoff segment.
2455                  */
2456                 if (strlen(xlde->d_name) == 24 &&
2457                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2458                         strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
2459                 {
2460                         if (XLogArchiveCheckDone(xlde->d_name))
2461                         {
2462                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2463
2464                                 /*
2465                                  * Before deleting the file, see if it can be recycled as a
2466                                  * future log segment.
2467                                  */
2468                                 if (InstallXLogFileSegment(&endlogId, &endlogSeg, path,
2469                                                                                    true, &max_advance,
2470                                                                                    true))
2471                                 {
2472                                         ereport(DEBUG2,
2473                                                         (errmsg("recycled transaction log file \"%s\"",
2474                                                                         xlde->d_name)));
2475                                         (*nsegsrecycled)++;
2476                                         /* Needn't recheck that slot on future iterations */
2477                                         if (max_advance > 0)
2478                                         {
2479                                                 NextLogSeg(endlogId, endlogSeg);
2480                                                 max_advance--;
2481                                         }
2482                                 }
2483                                 else
2484                                 {
2485                                         /* No need for any more future segments... */
2486                                         ereport(DEBUG2,
2487                                                         (errmsg("removing transaction log file \"%s\"",
2488                                                                         xlde->d_name)));
2489                                         unlink(path);
2490                                         (*nsegsremoved)++;
2491                                 }
2492
2493                                 XLogArchiveCleanup(xlde->d_name);
2494                         }
2495                 }
2496         }
2497
2498         FreeDir(xldir);
2499 }
2500
2501 /*
2502  * Remove previous backup history files.  This also retries creation of
2503  * .ready files for any backup history files for which XLogArchiveNotify
2504  * failed earlier.
2505  */
2506 static void
2507 CleanupBackupHistory(void)
2508 {
2509         DIR                *xldir;
2510         struct dirent *xlde;
2511         char            path[MAXPGPATH];
2512
2513         xldir = AllocateDir(XLOGDIR);
2514         if (xldir == NULL)
2515                 ereport(ERROR,
2516                                 (errcode_for_file_access(),
2517                                  errmsg("could not open transaction log directory \"%s\": %m",
2518                                                 XLOGDIR)));
2519
2520         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2521         {
2522                 if (strlen(xlde->d_name) > 24 &&
2523                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2524                         strcmp(xlde->d_name + strlen(xlde->d_name) - strlen(".backup"),
2525                                    ".backup") == 0)
2526                 {
2527                         if (XLogArchiveCheckDone(xlde->d_name))
2528                         {
2529                                 ereport(DEBUG2,
2530                                 (errmsg("removing transaction log backup history file \"%s\"",
2531                                                 xlde->d_name)));
2532                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2533                                 unlink(path);
2534                                 XLogArchiveCleanup(xlde->d_name);
2535                         }
2536                 }
2537         }
2538
2539         FreeDir(xldir);
2540 }
2541
2542 /*
2543  * Restore the backup blocks present in an XLOG record, if any.
2544  *
2545  * We assume all of the record has been read into memory at *record.
2546  *
2547  * Note: when a backup block is available in XLOG, we restore it
2548  * unconditionally, even if the page in the database appears newer.
2549  * This is to protect ourselves against database pages that were partially
2550  * or incorrectly written during a crash.  We assume that the XLOG data
2551  * must be good because it has passed a CRC check, while the database
2552  * page might not be.  This will force us to replay all subsequent
2553  * modifications of the page that appear in XLOG, rather than possibly
2554  * ignoring them as already applied, but that's not a huge drawback.
2555  */
2556 static void
2557 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
2558 {
2559         Relation        reln;
2560         Buffer          buffer;
2561         Page            page;
2562         BkpBlock        bkpb;
2563         char       *blk;
2564         int                     i;
2565
2566         blk = (char *) XLogRecGetData(record) + record->xl_len;
2567         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2568         {
2569                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2570                         continue;
2571
2572                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2573                 blk += sizeof(BkpBlock);
2574
2575                 reln = XLogOpenRelation(bkpb.node);
2576                 buffer = XLogReadBuffer(reln, bkpb.block, true);
2577                 Assert(BufferIsValid(buffer));
2578                 page = (Page) BufferGetPage(buffer);
2579
2580                 if (bkpb.hole_length == 0)
2581                 {
2582                         memcpy((char *) page, blk, BLCKSZ);
2583                 }
2584                 else
2585                 {
2586                         /* must zero-fill the hole */
2587                         MemSet((char *) page, 0, BLCKSZ);
2588                         memcpy((char *) page, blk, bkpb.hole_offset);
2589                         memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
2590                                    blk + bkpb.hole_offset,
2591                                    BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
2592                 }
2593
2594                 PageSetLSN(page, lsn);
2595                 PageSetTLI(page, ThisTimeLineID);
2596                 MarkBufferDirty(buffer);
2597                 UnlockReleaseBuffer(buffer);
2598
2599                 blk += BLCKSZ - bkpb.hole_length;
2600         }
2601 }
2602
2603 /*
2604  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
2605  * record (other than to the minimal extent of computing the amount of
2606  * data to read in) until we've checked the CRCs.
2607  *
2608  * We assume all of the record has been read into memory at *record.
2609  */
2610 static bool
2611 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
2612 {
2613         pg_crc32        crc;
2614         int                     i;
2615         uint32          len = record->xl_len;
2616         BkpBlock        bkpb;
2617         char       *blk;
2618
2619         /* First the rmgr data */
2620         INIT_CRC32(crc);
2621         COMP_CRC32(crc, XLogRecGetData(record), len);
2622
2623         /* Add in the backup blocks, if any */
2624         blk = (char *) XLogRecGetData(record) + len;
2625         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2626         {
2627                 uint32          blen;
2628
2629                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2630                         continue;
2631
2632                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2633                 if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
2634                 {
2635                         ereport(emode,
2636                                         (errmsg("incorrect hole size in record at %X/%X",
2637                                                         recptr.xlogid, recptr.xrecoff)));
2638                         return false;
2639                 }
2640                 blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
2641                 COMP_CRC32(crc, blk, blen);
2642                 blk += blen;
2643         }
2644
2645         /* Check that xl_tot_len agrees with our calculation */
2646         if (blk != (char *) record + record->xl_tot_len)
2647         {
2648                 ereport(emode,
2649                                 (errmsg("incorrect total length in record at %X/%X",
2650                                                 recptr.xlogid, recptr.xrecoff)));
2651                 return false;
2652         }
2653
2654         /* Finally include the record header */
2655         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
2656                            SizeOfXLogRecord - sizeof(pg_crc32));
2657         FIN_CRC32(crc);
2658
2659         if (!EQ_CRC32(record->xl_crc, crc))
2660         {
2661                 ereport(emode,
2662                 (errmsg("incorrect resource manager data checksum in record at %X/%X",
2663                                 recptr.xlogid, recptr.xrecoff)));
2664                 return false;
2665         }
2666
2667         return true;
2668 }
2669
2670 /*
2671  * Attempt to read an XLOG record.
2672  *
2673  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
2674  * try to read a record just after the last one previously read.
2675  *
2676  * If no valid record is available, returns NULL, or fails if emode is PANIC.
2677  * (emode must be either PANIC or LOG.)
2678  *
2679  * The record is copied into readRecordBuf, so that on successful return,
2680  * the returned record pointer always points there.
2681  */
2682 static XLogRecord *
2683 ReadRecord(XLogRecPtr *RecPtr, int emode)
2684 {
2685         XLogRecord *record;
2686         char       *buffer;
2687         XLogRecPtr      tmpRecPtr = EndRecPtr;
2688         bool            randAccess = false;
2689         uint32          len,
2690                                 total_len;
2691         uint32          targetPageOff;
2692         uint32          targetRecOff;
2693         uint32          pageHeaderSize;
2694
2695         if (readBuf == NULL)
2696         {
2697                 /*
2698                  * First time through, permanently allocate readBuf.  We do it this
2699                  * way, rather than just making a static array, for two reasons: (1)
2700                  * no need to waste the storage in most instantiations of the backend;
2701                  * (2) a static char array isn't guaranteed to have any particular
2702                  * alignment, whereas malloc() will provide MAXALIGN'd storage.
2703                  */
2704                 readBuf = (char *) malloc(XLOG_BLCKSZ);
2705                 Assert(readBuf != NULL);
2706         }
2707
2708         if (RecPtr == NULL)
2709         {
2710                 RecPtr = &tmpRecPtr;
2711                 /* fast case if next record is on same page */
2712                 if (nextRecord != NULL)
2713                 {
2714                         record = nextRecord;
2715                         goto got_record;
2716                 }
2717                 /* align old recptr to next page */
2718                 if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
2719                         tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
2720                 if (tmpRecPtr.xrecoff >= XLogFileSize)
2721                 {
2722                         (tmpRecPtr.xlogid)++;
2723                         tmpRecPtr.xrecoff = 0;
2724                 }
2725                 /* We will account for page header size below */
2726         }
2727         else
2728         {
2729                 if (!XRecOffIsValid(RecPtr->xrecoff))
2730                         ereport(PANIC,
2731                                         (errmsg("invalid record offset at %X/%X",
2732                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2733
2734                 /*
2735                  * Since we are going to a random position in WAL, forget any prior
2736                  * state about what timeline we were in, and allow it to be any
2737                  * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
2738                  * to go backwards (but we can't reset that variable right here, since
2739                  * we might not change files at all).
2740                  */
2741                 lastPageTLI = 0;                /* see comment in ValidXLOGHeader */
2742                 randAccess = true;              /* allow curFileTLI to go backwards too */
2743         }
2744
2745         if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
2746         {
2747                 close(readFile);
2748                 readFile = -1;
2749         }
2750         XLByteToSeg(*RecPtr, readId, readSeg);
2751         if (readFile < 0)
2752         {
2753                 /* Now it's okay to reset curFileTLI if random fetch */
2754                 if (randAccess)
2755                         curFileTLI = 0;
2756
2757                 readFile = XLogFileRead(readId, readSeg, emode);
2758                 if (readFile < 0)
2759                         goto next_record_is_invalid;
2760
2761                 /*
2762                  * Whenever switching to a new WAL segment, we read the first page of
2763                  * the file and validate its header, even if that's not where the
2764                  * target record is.  This is so that we can check the additional
2765                  * identification info that is present in the first page's "long"
2766                  * header.
2767                  */
2768                 readOff = 0;
2769                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2770                 {
2771                         ereport(emode,
2772                                         (errcode_for_file_access(),
2773                                          errmsg("could not read from log file %u, segment %u, offset %u: %m",
2774                                                         readId, readSeg, readOff)));
2775                         goto next_record_is_invalid;
2776                 }
2777                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2778                         goto next_record_is_invalid;
2779         }
2780
2781         targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
2782         if (readOff != targetPageOff)
2783         {
2784                 readOff = targetPageOff;
2785                 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
2786                 {
2787                         ereport(emode,
2788                                         (errcode_for_file_access(),
2789                                          errmsg("could not seek in log file %u, segment %u to offset %u: %m",
2790                                                         readId, readSeg, readOff)));
2791                         goto next_record_is_invalid;
2792                 }
2793                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2794                 {
2795                         ereport(emode,
2796                                         (errcode_for_file_access(),
2797                                          errmsg("could not read from log file %u, segment %u at offset %u: %m",
2798                                                         readId, readSeg, readOff)));
2799                         goto next_record_is_invalid;
2800                 }
2801                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2802                         goto next_record_is_invalid;
2803         }
2804         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2805         targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
2806         if (targetRecOff == 0)
2807         {
2808                 /*
2809                  * Can only get here in the continuing-from-prev-page case, because
2810                  * XRecOffIsValid eliminated the zero-page-offset case otherwise. Need
2811                  * to skip over the new page's header.
2812                  */
2813                 tmpRecPtr.xrecoff += pageHeaderSize;
2814                 targetRecOff = pageHeaderSize;
2815         }
2816         else if (targetRecOff < pageHeaderSize)
2817         {
2818                 ereport(emode,
2819                                 (errmsg("invalid record offset at %X/%X",
2820                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2821                 goto next_record_is_invalid;
2822         }
2823         if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
2824                 targetRecOff == pageHeaderSize)
2825         {
2826                 ereport(emode,
2827                                 (errmsg("contrecord is requested by %X/%X",
2828                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2829                 goto next_record_is_invalid;
2830         }
2831         record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
2832
2833 got_record:;
2834
2835         /*
2836          * Currently, xl_len == 0 must be bad data, but that might not be true
2837          * forever.  See note in XLogInsert.
2838          */
2839         if (record->xl_len == 0)
2840         {
2841                 ereport(emode,
2842                                 (errmsg("record with zero length at %X/%X",
2843                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2844                 goto next_record_is_invalid;
2845         }
2846         if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
2847                 record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
2848                 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
2849         {
2850                 ereport(emode,
2851                                 (errmsg("invalid record length at %X/%X",
2852                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2853                 goto next_record_is_invalid;
2854         }
2855         if (record->xl_rmid > RM_MAX_ID)
2856         {
2857                 ereport(emode,
2858                                 (errmsg("invalid resource manager ID %u at %X/%X",
2859                                                 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
2860                 goto next_record_is_invalid;
2861         }
2862         if (randAccess)
2863         {
2864                 /*
2865                  * We can't exactly verify the prev-link, but surely it should be less
2866                  * than the record's own address.
2867                  */
2868                 if (!XLByteLT(record->xl_prev, *RecPtr))
2869                 {
2870                         ereport(emode,
2871                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
2872                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
2873                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2874                         goto next_record_is_invalid;
2875                 }
2876         }
2877         else
2878         {
2879                 /*
2880                  * Record's prev-link should exactly match our previous location. This
2881                  * check guards against torn WAL pages where a stale but valid-looking
2882                  * WAL record starts on a sector boundary.
2883                  */
2884                 if (!XLByteEQ(record->xl_prev, ReadRecPtr))
2885                 {
2886                         ereport(emode,
2887                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
2888                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
2889                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2890                         goto next_record_is_invalid;
2891                 }
2892         }
2893
2894         /*
2895          * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
2896          * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
2897          * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
2898          * enough for all "normal" records, but very large commit or abort records
2899          * might need more space.)
2900          */
2901         total_len = record->xl_tot_len;
2902         if (total_len > readRecordBufSize)
2903         {
2904                 uint32          newSize = total_len;
2905
2906                 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
2907                 newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
2908                 if (readRecordBuf)
2909                         free(readRecordBuf);
2910                 readRecordBuf = (char *) malloc(newSize);
2911                 if (!readRecordBuf)
2912                 {
2913                         readRecordBufSize = 0;
2914                         /* We treat this as a "bogus data" condition */
2915                         ereport(emode,
2916                                         (errmsg("record length %u at %X/%X too long",
2917                                                         total_len, RecPtr->xlogid, RecPtr->xrecoff)));
2918                         goto next_record_is_invalid;
2919                 }
2920                 readRecordBufSize = newSize;
2921         }
2922
2923         buffer = readRecordBuf;
2924         nextRecord = NULL;
2925         len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
2926         if (total_len > len)
2927         {
2928                 /* Need to reassemble record */
2929                 XLogContRecord *contrecord;
2930                 uint32          gotlen = len;
2931
2932                 memcpy(buffer, record, len);
2933                 record = (XLogRecord *) buffer;
2934                 buffer += len;
2935                 for (;;)
2936                 {
2937                         readOff += XLOG_BLCKSZ;
2938                         if (readOff >= XLogSegSize)
2939                         {
2940                                 close(readFile);
2941                                 readFile = -1;
2942                                 NextLogSeg(readId, readSeg);
2943                                 readFile = XLogFileRead(readId, readSeg, emode);
2944                                 if (readFile < 0)
2945                                         goto next_record_is_invalid;
2946                                 readOff = 0;
2947                         }
2948                         if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2949                         {
2950                                 ereport(emode,
2951                                                 (errcode_for_file_access(),
2952                                                  errmsg("could not read from log file %u, segment %u, offset %u: %m",
2953                                                                 readId, readSeg, readOff)));
2954                                 goto next_record_is_invalid;
2955                         }
2956                         if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2957                                 goto next_record_is_invalid;
2958                         if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
2959                         {
2960                                 ereport(emode,
2961                                                 (errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
2962                                                                 readId, readSeg, readOff)));
2963                                 goto next_record_is_invalid;
2964                         }
2965                         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2966                         contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize);
2967                         if (contrecord->xl_rem_len == 0 ||
2968                                 total_len != (contrecord->xl_rem_len + gotlen))
2969                         {
2970                                 ereport(emode,
2971                                                 (errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
2972                                                                 contrecord->xl_rem_len,
2973                                                                 readId, readSeg, readOff)));
2974                                 goto next_record_is_invalid;
2975                         }
2976                         len = XLOG_BLCKSZ - pageHeaderSize - SizeOfXLogContRecord;
2977                         if (contrecord->xl_rem_len > len)
2978                         {
2979                                 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
2980                                 gotlen += len;
2981                                 buffer += len;
2982                                 continue;
2983                         }
2984                         memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
2985                                    contrecord->xl_rem_len);
2986                         break;
2987                 }
2988                 if (!RecordIsValid(record, *RecPtr, emode))
2989                         goto next_record_is_invalid;
2990                 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2991                 if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
2992                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
2993                 {
2994                         nextRecord = (XLogRecord *) ((char *) contrecord +
2995                                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
2996                 }
2997                 EndRecPtr.xlogid = readId;
2998                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
2999                         pageHeaderSize +
3000                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
3001                 ReadRecPtr = *RecPtr;
3002                 return record;
3003         }
3004
3005         /* Record does not cross a page boundary */
3006         if (!RecordIsValid(record, *RecPtr, emode))
3007                 goto next_record_is_invalid;
3008         if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
3009                 MAXALIGN(total_len))
3010                 nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
3011         EndRecPtr.xlogid = RecPtr->xlogid;
3012         EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
3013         ReadRecPtr = *RecPtr;
3014         memcpy(buffer, record, total_len);
3015         return (XLogRecord *) buffer;
3016
3017 next_record_is_invalid:;
3018         close(readFile);
3019         readFile = -1;
3020         nextRecord = NULL;
3021         return NULL;
3022 }
3023
3024 /*
3025  * Check whether the xlog header of a page just read in looks valid.
3026  *
3027  * This is just a convenience subroutine to avoid duplicated code in
3028  * ReadRecord.  It's not intended for use from anywhere else.
3029  */
3030 static bool
3031 ValidXLOGHeader(XLogPageHeader hdr, int emode)
3032 {
3033         XLogRecPtr      recaddr;
3034
3035         if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
3036         {
3037                 ereport(emode,
3038                                 (errmsg("invalid magic number %04X in log file %u, segment %u, offset %u",
3039                                                 hdr->xlp_magic, readId, readSeg, readOff)));
3040                 return false;
3041         }
3042         if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
3043         {
3044                 ereport(emode,
3045                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3046                                                 hdr->xlp_info, readId, readSeg, readOff)));
3047                 return false;
3048         }
3049         if (hdr->xlp_info & XLP_LONG_HEADER)
3050         {
3051                 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
3052
3053                 if (longhdr->xlp_sysid != ControlFile->system_identifier)
3054                 {
3055                         char            fhdrident_str[32];
3056                         char            sysident_str[32];
3057
3058                         /*
3059                          * Format sysids separately to keep platform-dependent format code
3060                          * out of the translatable message string.
3061                          */
3062                         snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
3063                                          longhdr->xlp_sysid);
3064                         snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
3065                                          ControlFile->system_identifier);
3066                         ereport(emode,
3067                                         (errmsg("WAL file is from different system"),
3068                                          errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
3069                                                            fhdrident_str, sysident_str)));
3070                         return false;
3071                 }
3072                 if (longhdr->xlp_seg_size != XLogSegSize)
3073                 {
3074                         ereport(emode,
3075                                         (errmsg("WAL file is from different system"),
3076                                          errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
3077                         return false;
3078                 }
3079                 if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
3080                 {
3081                         ereport(emode,
3082                                         (errmsg("WAL file is from different system"),
3083                                          errdetail("Incorrect XLOG_BLCKSZ in page header.")));
3084                         return false;
3085                 }
3086         }
3087         else if (readOff == 0)
3088         {
3089                 /* hmm, first page of file doesn't have a long header? */
3090                 ereport(emode,
3091                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3092                                                 hdr->xlp_info, readId, readSeg, readOff)));
3093                 return false;
3094         }
3095
3096         recaddr.xlogid = readId;
3097         recaddr.xrecoff = readSeg * XLogSegSize + readOff;
3098         if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
3099         {
3100                 ereport(emode,
3101                                 (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
3102                                                 hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
3103                                                 readId, readSeg, readOff)));
3104                 return false;
3105         }
3106
3107         /*
3108          * Check page TLI is one of the expected values.
3109          */
3110         if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
3111         {
3112                 ereport(emode,
3113                                 (errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u",
3114                                                 hdr->xlp_tli,
3115                                                 readId, readSeg, readOff)));
3116                 return false;
3117         }
3118
3119         /*
3120          * Since child timelines are always assigned a TLI greater than their
3121          * immediate parent's TLI, we should never see TLI go backwards across
3122          * successive pages of a consistent WAL sequence.
3123          *
3124          * Of course this check should only be applied when advancing sequentially
3125          * across pages; therefore ReadRecord resets lastPageTLI to zero when
3126          * going to a random page.
3127          */
3128         if (hdr->xlp_tli < lastPageTLI)
3129         {
3130                 ereport(emode,
3131                                 (errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u",
3132                                                 hdr->xlp_tli, lastPageTLI,
3133                                                 readId, readSeg, readOff)));
3134                 return false;
3135         }
3136         lastPageTLI = hdr->xlp_tli;
3137         return true;
3138 }
3139
3140 /*
3141  * Try to read a timeline's history file.
3142  *
3143  * If successful, return the list of component TLIs (the given TLI followed by
3144  * its ancestor TLIs).  If we can't find the history file, assume that the
3145  * timeline has no parents, and return a list of just the specified timeline
3146  * ID.
3147  */
3148 static List *
3149 readTimeLineHistory(TimeLineID targetTLI)
3150 {
3151         List       *result;
3152         char            path[MAXPGPATH];
3153         char            histfname[MAXFNAMELEN];
3154         char            fline[MAXPGPATH];
3155         FILE       *fd;
3156
3157         if (InArchiveRecovery)
3158         {
3159                 TLHistoryFileName(histfname, targetTLI);
3160                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3161         }
3162         else
3163                 TLHistoryFilePath(path, targetTLI);
3164
3165         fd = AllocateFile(path, "r");
3166         if (fd == NULL)
3167         {
3168                 if (errno != ENOENT)
3169                         ereport(FATAL,
3170                                         (errcode_for_file_access(),
3171                                          errmsg("could not open file \"%s\": %m", path)));
3172                 /* Not there, so assume no parents */
3173                 return list_make1_int((int) targetTLI);
3174         }
3175
3176         result = NIL;
3177
3178         /*
3179          * Parse the file...
3180          */
3181         while (fgets(fline, MAXPGPATH, fd) != NULL)
3182         {
3183                 /* skip leading whitespace and check for # comment */
3184                 char       *ptr;
3185                 char       *endptr;
3186                 TimeLineID      tli;
3187
3188                 for (ptr = fline; *ptr; ptr++)
3189                 {
3190                         if (!isspace((unsigned char) *ptr))
3191                                 break;
3192                 }
3193                 if (*ptr == '\0' || *ptr == '#')
3194                         continue;
3195
3196                 /* expect a numeric timeline ID as first field of line */
3197                 tli = (TimeLineID) strtoul(ptr, &endptr, 0);
3198                 if (endptr == ptr)
3199                         ereport(FATAL,
3200                                         (errmsg("syntax error in history file: %s", fline),
3201                                          errhint("Expected a numeric timeline ID.")));
3202
3203                 if (result &&
3204                         tli <= (TimeLineID) linitial_int(result))
3205                         ereport(FATAL,
3206                                         (errmsg("invalid data in history file: %s", fline),
3207                                    errhint("Timeline IDs must be in increasing sequence.")));
3208
3209                 /* Build list with newest item first */
3210                 result = lcons_int((int) tli, result);
3211
3212                 /* we ignore the remainder of each line */
3213         }
3214
3215         FreeFile(fd);
3216
3217         if (result &&
3218                 targetTLI <= (TimeLineID) linitial_int(result))
3219                 ereport(FATAL,
3220                                 (errmsg("invalid data in history file \"%s\"", path),
3221                         errhint("Timeline IDs must be less than child timeline's ID.")));
3222
3223         result = lcons_int((int) targetTLI, result);
3224
3225         ereport(DEBUG3,
3226                         (errmsg_internal("history of timeline %u is %s",
3227                                                          targetTLI, nodeToString(result))));
3228
3229         return result;
3230 }
3231
3232 /*
3233  * Probe whether a timeline history file exists for the given timeline ID
3234  */
3235 static bool
3236 existsTimeLineHistory(TimeLineID probeTLI)
3237 {
3238         char            path[MAXPGPATH];
3239         char            histfname[MAXFNAMELEN];
3240         FILE       *fd;
3241
3242         if (InArchiveRecovery)
3243         {
3244                 TLHistoryFileName(histfname, probeTLI);
3245                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3246         }
3247         else
3248                 TLHistoryFilePath(path, probeTLI);
3249
3250         fd = AllocateFile(path, "r");
3251         if (fd != NULL)
3252         {
3253                 FreeFile(fd);
3254                 return true;
3255         }
3256         else
3257         {
3258                 if (errno != ENOENT)
3259                         ereport(FATAL,
3260                                         (errcode_for_file_access(),
3261                                          errmsg("could not open file \"%s\": %m", path)));
3262                 return false;
3263         }
3264 }
3265
3266 /*
3267  * Find the newest existing timeline, assuming that startTLI exists.
3268  *
3269  * Note: while this is somewhat heuristic, it does positively guarantee
3270  * that (result + 1) is not a known timeline, and therefore it should
3271  * be safe to assign that ID to a new timeline.
3272  */
3273 static TimeLineID
3274 findNewestTimeLine(TimeLineID startTLI)
3275 {
3276         TimeLineID      newestTLI;
3277         TimeLineID      probeTLI;
3278
3279         /*
3280          * The algorithm is just to probe for the existence of timeline history
3281          * files.  XXX is it useful to allow gaps in the sequence?
3282          */
3283         newestTLI = startTLI;
3284
3285         for (probeTLI = startTLI + 1;; probeTLI++)
3286         {
3287                 if (existsTimeLineHistory(probeTLI))
3288                 {
3289                         newestTLI = probeTLI;           /* probeTLI exists */
3290                 }
3291                 else
3292                 {
3293                         /* doesn't exist, assume we're done */
3294                         break;
3295                 }
3296         }
3297
3298         return newestTLI;
3299 }
3300
3301 /*
3302  * Create a new timeline history file.
3303  *
3304  *      newTLI: ID of the new timeline
3305  *      parentTLI: ID of its immediate parent
3306  *      endTLI et al: ID of the last used WAL file, for annotation purposes
3307  *
3308  * Currently this is only used during recovery, and so there are no locking
3309  * considerations.      But we should be just as tense as XLogFileInit to avoid
3310  * emplacing a bogus file.
3311  */
3312 static void
3313 writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
3314                                          TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
3315 {
3316         char            path[MAXPGPATH];
3317         char            tmppath[MAXPGPATH];
3318         char            histfname[MAXFNAMELEN];
3319         char            xlogfname[MAXFNAMELEN];
3320         char            buffer[BLCKSZ];
3321         int                     srcfd;
3322         int                     fd;
3323         int                     nbytes;
3324
3325         Assert(newTLI > parentTLI); /* else bad selection of newTLI */
3326
3327         /*
3328          * Write into a temp file name.
3329          */
3330         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3331
3332         unlink(tmppath);
3333
3334         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
3335         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
3336                                            S_IRUSR | S_IWUSR);
3337         if (fd < 0)
3338                 ereport(ERROR,
3339                                 (errcode_for_file_access(),
3340                                  errmsg("could not create file \"%s\": %m", tmppath)));
3341
3342         /*
3343          * If a history file exists for the parent, copy it verbatim
3344          */
3345         if (InArchiveRecovery)
3346         {
3347                 TLHistoryFileName(histfname, parentTLI);
3348                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3349         }
3350         else
3351                 TLHistoryFilePath(path, parentTLI);
3352
3353         srcfd = BasicOpenFile(path, O_RDONLY, 0);
3354         if (srcfd < 0)
3355         {
3356                 if (errno != ENOENT)
3357                         ereport(ERROR,
3358                                         (errcode_for_file_access(),
3359                                          errmsg("could not open file \"%s\": %m", path)));
3360                 /* Not there, so assume parent has no parents */
3361         }
3362         else
3363         {
3364                 for (;;)
3365                 {
3366                         errno = 0;
3367                         nbytes = (int) read(srcfd, buffer, sizeof(buffer));
3368                         if (nbytes < 0 || errno != 0)
3369                                 ereport(ERROR,
3370                                                 (errcode_for_file_access(),
3371                                                  errmsg("could not read file \"%s\": %m", path)));
3372                         if (nbytes == 0)
3373                                 break;
3374                         errno = 0;
3375                         if ((int) write(fd, buffer, nbytes) != nbytes)
3376                         {
3377                                 int                     save_errno = errno;
3378
3379                                 /*
3380                                  * If we fail to make the file, delete it to release disk
3381                                  * space
3382                                  */
3383                                 unlink(tmppath);
3384
3385                                 /*
3386                                  * if write didn't set errno, assume problem is no disk space
3387                                  */
3388                                 errno = save_errno ? save_errno : ENOSPC;
3389
3390                                 ereport(ERROR,
3391                                                 (errcode_for_file_access(),
3392                                          errmsg("could not write to file \"%s\": %m", tmppath)));
3393                         }
3394                 }
3395                 close(srcfd);
3396         }
3397
3398         /*
3399          * Append one line with the details of this timeline split.
3400          *
3401          * If we did have a parent file, insert an extra newline just in case the
3402          * parent file failed to end with one.
3403          */
3404         XLogFileName(xlogfname, endTLI, endLogId, endLogSeg);
3405
3406         snprintf(buffer, sizeof(buffer),
3407                          "%s%u\t%s\t%s transaction %u at %s\n",
3408                          (srcfd < 0) ? "" : "\n",
3409                          parentTLI,
3410                          xlogfname,
3411                          recoveryStopAfter ? "after" : "before",
3412                          recoveryStopXid,
3413                          str_time(recoveryStopTime));
3414
3415         nbytes = strlen(buffer);
3416         errno = 0;
3417         if ((int) write(fd, buffer, nbytes) != nbytes)
3418         {
3419                 int                     save_errno = errno;
3420
3421                 /*
3422                  * If we fail to make the file, delete it to release disk space
3423                  */
3424                 unlink(tmppath);
3425                 /* if write didn't set errno, assume problem is no disk space */
3426                 errno = save_errno ? save_errno : ENOSPC;
3427
3428                 ereport(ERROR,
3429                                 (errcode_for_file_access(),
3430                                  errmsg("could not write to file \"%s\": %m", tmppath)));
3431         }
3432
3433         if (pg_fsync(fd) != 0)
3434                 ereport(ERROR,
3435                                 (errcode_for_file_access(),
3436                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
3437
3438         if (close(fd))
3439                 ereport(ERROR,
3440                                 (errcode_for_file_access(),
3441                                  errmsg("could not close file \"%s\": %m", tmppath)));
3442
3443
3444         /*
3445          * Now move the completed history file into place with its final name.
3446          */
3447         TLHistoryFilePath(path, newTLI);
3448
3449         /*
3450          * Prefer link() to rename() here just to be really sure that we don't
3451          * overwrite an existing logfile.  However, there shouldn't be one, so
3452          * rename() is an acceptable substitute except for the truly paranoid.
3453          */
3454 #if HAVE_WORKING_LINK
3455         if (link(tmppath, path) < 0)
3456                 ereport(ERROR,
3457                                 (errcode_for_file_access(),
3458                                  errmsg("could not link file \"%s\" to \"%s\": %m",
3459                                                 tmppath, path)));
3460         unlink(tmppath);
3461 #else
3462         if (rename(tmppath, path) < 0)
3463                 ereport(ERROR,
3464                                 (errcode_for_file_access(),
3465                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
3466                                                 tmppath, path)));
3467 #endif
3468
3469         /* The history file can be archived immediately. */
3470         TLHistoryFileName(histfname, newTLI);
3471         XLogArchiveNotify(histfname);
3472 }
3473
3474 /*
3475  * I/O routines for pg_control
3476  *
3477  * *ControlFile is a buffer in shared memory that holds an image of the
3478  * contents of pg_control.      WriteControlFile() initializes pg_control
3479  * given a preloaded buffer, ReadControlFile() loads the buffer from
3480  * the pg_control file (during postmaster or standalone-backend startup),
3481  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
3482  *
3483  * For simplicity, WriteControlFile() initializes the fields of pg_control
3484  * that are related to checking backend/database compatibility, and
3485  * ReadControlFile() verifies they are correct.  We could split out the
3486  * I/O and compatibility-check functions, but there seems no need currently.
3487  */
3488 static void
3489 WriteControlFile(void)
3490 {
3491         int                     fd;
3492         char            buffer[PG_CONTROL_SIZE]; /* need not be aligned */
3493         char       *localeptr;
3494
3495         /*
3496          * Initialize version and compatibility-check fields
3497          */
3498         ControlFile->pg_control_version = PG_CONTROL_VERSION;
3499         ControlFile->catalog_version_no = CATALOG_VERSION_NO;
3500
3501         ControlFile->maxAlign = MAXIMUM_ALIGNOF;
3502         ControlFile->floatFormat = FLOATFORMAT_VALUE;
3503
3504         ControlFile->blcksz = BLCKSZ;
3505         ControlFile->relseg_size = RELSEG_SIZE;
3506         ControlFile->xlog_blcksz = XLOG_BLCKSZ;
3507         ControlFile->xlog_seg_size = XLOG_SEG_SIZE;
3508
3509         ControlFile->nameDataLen = NAMEDATALEN;
3510         ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
3511
3512 #ifdef HAVE_INT64_TIMESTAMP
3513         ControlFile->enableIntTimes = TRUE;
3514 #else
3515         ControlFile->enableIntTimes = FALSE;
3516 #endif
3517
3518         ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
3519         localeptr = setlocale(LC_COLLATE, NULL);
3520         if (!localeptr)
3521                 ereport(PANIC,
3522                                 (errmsg("invalid LC_COLLATE setting")));
3523         StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
3524         localeptr = setlocale(LC_CTYPE, NULL);
3525         if (!localeptr)
3526                 ereport(PANIC,
3527                                 (errmsg("invalid LC_CTYPE setting")));
3528         StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
3529
3530         /* Contents are protected with a CRC */
3531         INIT_CRC32(ControlFile->crc);
3532         COMP_CRC32(ControlFile->crc,
3533                            (char *) ControlFile,
3534                            offsetof(ControlFileData, crc));
3535         FIN_CRC32(ControlFile->crc);
3536
3537         /*
3538          * We write out PG_CONTROL_SIZE bytes into pg_control, zero-padding the
3539          * excess over sizeof(ControlFileData).  This reduces the odds of
3540          * premature-EOF errors when reading pg_control.  We'll still fail when we
3541          * check the contents of the file, but hopefully with a more specific
3542          * error than "couldn't read pg_control".
3543          */
3544         if (sizeof(ControlFileData) > PG_CONTROL_SIZE)
3545                 elog(PANIC, "sizeof(ControlFileData) is larger than PG_CONTROL_SIZE; fix either one");
3546
3547         memset(buffer, 0, PG_CONTROL_SIZE);
3548         memcpy(buffer, ControlFile, sizeof(ControlFileData));
3549
3550         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3551                                            O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
3552                                            S_IRUSR | S_IWUSR);
3553         if (fd < 0)
3554                 ereport(PANIC,
3555                                 (errcode_for_file_access(),
3556                                  errmsg("could not create control file \"%s\": %m",
3557                                                 XLOG_CONTROL_FILE)));
3558
3559         errno = 0;
3560         if (write(fd, buffer, PG_CONTROL_SIZE) != PG_CONTROL_SIZE)
3561         {
3562                 /* if write didn't set errno, assume problem is no disk space */
3563                 if (errno == 0)
3564                         errno = ENOSPC;
3565                 ereport(PANIC,
3566                                 (errcode_for_file_access(),
3567                                  errmsg("could not write to control file: %m")));
3568         }
3569
3570         if (pg_fsync(fd) != 0)
3571                 ereport(PANIC,
3572                                 (errcode_for_file_access(),
3573                                  errmsg("could not fsync control file: %m")));
3574
3575         if (close(fd))
3576                 ereport(PANIC,
3577                                 (errcode_for_file_access(),
3578                                  errmsg("could not close control file: %m")));
3579 }
3580
3581 static void
3582 ReadControlFile(void)
3583 {
3584         pg_crc32        crc;
3585         int                     fd;
3586
3587         /*
3588          * Read data...
3589          */
3590         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3591                                            O_RDWR | PG_BINARY,
3592                                            S_IRUSR | S_IWUSR);
3593         if (fd < 0)
3594                 ereport(PANIC,
3595                                 (errcode_for_file_access(),
3596                                  errmsg("could not open control file \"%s\": %m",
3597                                                 XLOG_CONTROL_FILE)));
3598
3599         if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3600                 ereport(PANIC,
3601                                 (errcode_for_file_access(),
3602                                  errmsg("could not read from control file: %m")));
3603
3604         close(fd);
3605
3606         /*
3607          * Check for expected pg_control format version.  If this is wrong, the
3608          * CRC check will likely fail because we'll be checking the wrong number
3609          * of bytes.  Complaining about wrong version will probably be more
3610          * enlightening than complaining about wrong CRC.
3611          */
3612         if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
3613                 ereport(FATAL,
3614                                 (errmsg("database files are incompatible with server"),
3615                                  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
3616                                   " but the server was compiled with PG_CONTROL_VERSION %d.",
3617                                                 ControlFile->pg_control_version, PG_CONTROL_VERSION),
3618                                  errhint("It looks like you need to initdb.")));
3619         /* Now check the CRC. */
3620         INIT_CRC32(crc);
3621         COMP_CRC32(crc,
3622                            (char *) ControlFile,
3623                            offsetof(ControlFileData, crc));
3624         FIN_CRC32(crc);
3625
3626         if (!EQ_CRC32(crc, ControlFile->crc))
3627                 ereport(FATAL,
3628                                 (errmsg("incorrect checksum in control file")));
3629
3630         /*
3631          * Do compatibility checking immediately.  We do this here for 2 reasons:
3632          *
3633          * (1) if the database isn't compatible with the backend executable, we
3634          * want to abort before we can possibly do any damage;
3635          *
3636          * (2) this code is executed in the postmaster, so the setlocale() will
3637          * propagate to forked backends, which aren't going to read this file for
3638          * themselves.  (These locale settings are considered critical
3639          * compatibility items because they can affect sort order of indexes.)
3640          */
3641         if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
3642                 ereport(FATAL,
3643                                 (errmsg("database files are incompatible with server"),
3644                                  errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
3645                                   " but the server was compiled with CATALOG_VERSION_NO %d.",
3646                                                 ControlFile->catalog_version_no, CATALOG_VERSION_NO),
3647                                  errhint("It looks like you need to initdb.")));
3648         if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
3649                 ereport(FATAL,
3650                                 (errmsg("database files are incompatible with server"),
3651                    errdetail("The database cluster was initialized with MAXALIGN %d,"
3652                                          " but the server was compiled with MAXALIGN %d.",
3653                                          ControlFile->maxAlign, MAXIMUM_ALIGNOF),
3654                                  errhint("It looks like you need to initdb.")));
3655         if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
3656                 ereport(FATAL,
3657                                 (errmsg("database files are incompatible with server"),
3658                                  errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
3659                                  errhint("It looks like you need to initdb.")));
3660         if (ControlFile->blcksz != BLCKSZ)
3661                 ereport(FATAL,
3662                                 (errmsg("database files are incompatible with server"),
3663                          errdetail("The database cluster was initialized with BLCKSZ %d,"
3664                                            " but the server was compiled with BLCKSZ %d.",
3665                                            ControlFile->blcksz, BLCKSZ),
3666                                  errhint("It looks like you need to recompile or initdb.")));
3667         if (ControlFile->relseg_size != RELSEG_SIZE)
3668                 ereport(FATAL,
3669                                 (errmsg("database files are incompatible with server"),
3670                 errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
3671                                   " but the server was compiled with RELSEG_SIZE %d.",
3672                                   ControlFile->relseg_size, RELSEG_SIZE),
3673                                  errhint("It looks like you need to recompile or initdb.")));
3674         if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
3675                 ereport(FATAL,
3676                                 (errmsg("database files are incompatible with server"),
3677                          errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
3678                                            " but the server was compiled with XLOG_BLCKSZ %d.",
3679                                            ControlFile->xlog_blcksz, XLOG_BLCKSZ),
3680                                  errhint("It looks like you need to recompile or initdb.")));
3681         if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
3682                 ereport(FATAL,
3683                                 (errmsg("database files are incompatible with server"),
3684                                  errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d,"
3685                                            " but the server was compiled with XLOG_SEG_SIZE %d.",
3686                                                    ControlFile->xlog_seg_size, XLOG_SEG_SIZE),
3687                                  errhint("It looks like you need to recompile or initdb.")));
3688         if (ControlFile->nameDataLen != NAMEDATALEN)
3689                 ereport(FATAL,
3690                                 (errmsg("database files are incompatible with server"),
3691                 errdetail("The database cluster was initialized with NAMEDATALEN %d,"
3692                                   " but the server was compiled with NAMEDATALEN %d.",
3693                                   ControlFile->nameDataLen, NAMEDATALEN),
3694                                  errhint("It looks like you need to recompile or initdb.")));
3695         if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
3696                 ereport(FATAL,
3697                                 (errmsg("database files are incompatible with server"),
3698                                  errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
3699                                           " but the server was compiled with INDEX_MAX_KEYS %d.",
3700                                                    ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
3701                                  errhint("It looks like you need to recompile or initdb.")));
3702
3703 #ifdef HAVE_INT64_TIMESTAMP
3704         if (ControlFile->enableIntTimes != TRUE)
3705                 ereport(FATAL,
3706                                 (errmsg("database files are incompatible with server"),
3707                                  errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
3708                                   " but the server was compiled with HAVE_INT64_TIMESTAMP."),
3709                                  errhint("It looks like you need to recompile or initdb.")));
3710 #else
3711         if (ControlFile->enableIntTimes != FALSE)
3712                 ereport(FATAL,
3713                                 (errmsg("database files are incompatible with server"),
3714                                  errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
3715                            " but the server was compiled without HAVE_INT64_TIMESTAMP."),
3716                                  errhint("It looks like you need to recompile or initdb.")));
3717 #endif
3718
3719         if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
3720                 ereport(FATAL,
3721                                 (errmsg("database files are incompatible with server"),
3722                                  errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
3723                                   " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
3724                                                    ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
3725                                  errhint("It looks like you need to recompile or initdb.")));
3726         if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
3727                 ereport(FATAL,
3728                         (errmsg("database files are incompatible with operating system"),
3729                          errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
3730                                            " which is not recognized by setlocale().",
3731                                            ControlFile->lc_collate),
3732                          errhint("It looks like you need to initdb or install locale support.")));
3733         if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
3734                 ereport(FATAL,
3735                         (errmsg("database files are incompatible with operating system"),
3736                 errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
3737                                   " which is not recognized by setlocale().",
3738                                   ControlFile->lc_ctype),
3739                          errhint("It looks like you need to initdb or install locale support.")));
3740
3741         /* Make the fixed locale settings visible as GUC variables, too */
3742         SetConfigOption("lc_collate", ControlFile->lc_collate,
3743                                         PGC_INTERNAL, PGC_S_OVERRIDE);
3744         SetConfigOption("lc_ctype", ControlFile->lc_ctype,
3745                                         PGC_INTERNAL, PGC_S_OVERRIDE);
3746 }
3747
3748 void
3749 UpdateControlFile(void)
3750 {
3751         int                     fd;
3752
3753         INIT_CRC32(ControlFile->crc);
3754         COMP_CRC32(ControlFile->crc,
3755                            (char *) ControlFile,
3756                            offsetof(ControlFileData, crc));
3757         FIN_CRC32(ControlFile->crc);
3758
3759         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3760                                            O_RDWR | PG_BINARY,
3761                                            S_IRUSR | S_IWUSR);
3762         if (fd < 0)
3763                 ereport(PANIC,
3764                                 (errcode_for_file_access(),
3765                                  errmsg("could not open control file \"%s\": %m",
3766                                                 XLOG_CONTROL_FILE)));
3767
3768         errno = 0;
3769         if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3770         {
3771                 /* if write didn't set errno, assume problem is no disk space */
3772                 if (errno == 0)
3773                         errno = ENOSPC;
3774                 ereport(PANIC,
3775                                 (errcode_for_file_access(),
3776                                  errmsg("could not write to control file: %m")));
3777         }
3778
3779         if (pg_fsync(fd) != 0)
3780                 ereport(PANIC,
3781                                 (errcode_for_file_access(),
3782                                  errmsg("could not fsync control file: %m")));
3783
3784         if (close(fd))
3785                 ereport(PANIC,
3786                                 (errcode_for_file_access(),
3787                                  errmsg("could not close control file: %m")));
3788 }
3789
3790 /*
3791  * Initialization of shared memory for XLOG
3792  */
3793 Size
3794 XLOGShmemSize(void)
3795 {
3796         Size            size;
3797
3798         /* XLogCtl */
3799         size = sizeof(XLogCtlData);
3800         /* xlblocks array */
3801         size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
3802         /* extra alignment padding for XLOG I/O buffers */
3803         size = add_size(size, ALIGNOF_XLOG_BUFFER);
3804         /* and the buffers themselves */
3805         size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
3806
3807         /*
3808          * Note: we don't count ControlFileData, it comes out of the "slop factor"
3809          * added by CreateSharedMemoryAndSemaphores.  This lets us use this
3810          * routine again below to compute the actual allocation size.
3811          */
3812
3813         return size;
3814 }
3815
3816 void
3817 XLOGShmemInit(void)
3818 {
3819         bool            foundCFile,
3820                                 foundXLog;
3821         char       *allocptr;
3822
3823         ControlFile = (ControlFileData *)
3824                 ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
3825         XLogCtl = (XLogCtlData *)
3826                 ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
3827
3828         if (foundCFile || foundXLog)
3829         {
3830                 /* both should be present or neither */
3831                 Assert(foundCFile && foundXLog);
3832                 return;
3833         }
3834
3835         memset(XLogCtl, 0, sizeof(XLogCtlData));
3836
3837         /*
3838          * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
3839          * multiple of the alignment for same, so no extra alignment padding is
3840          * needed here.
3841          */
3842         allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
3843         XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
3844         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
3845         allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
3846
3847         /*
3848          * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
3849          */
3850         allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
3851         XLogCtl->pages = allocptr;
3852         memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
3853
3854         /*
3855          * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
3856          * in additional info.)
3857          */
3858         XLogCtl->XLogCacheByte = (Size) XLOG_BLCKSZ * XLOGbuffers;
3859
3860         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
3861         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
3862         SpinLockInit(&XLogCtl->info_lck);
3863
3864         /*
3865          * If we are not in bootstrap mode, pg_control should already exist. Read
3866          * and validate it immediately (see comments in ReadControlFile() for the
3867          * reasons why).
3868          */
3869         if (!IsBootstrapProcessingMode())
3870                 ReadControlFile();
3871 }
3872
3873 /*
3874  * This func must be called ONCE on system install.  It creates pg_control
3875  * and the initial XLOG segment.
3876  */
3877 void
3878 BootStrapXLOG(void)
3879 {
3880         CheckPoint      checkPoint;
3881         char       *buffer;
3882         XLogPageHeader page;
3883         XLogLongPageHeader longpage;
3884         XLogRecord *record;
3885         bool            use_existent;
3886         uint64          sysidentifier;
3887         struct timeval tv;
3888         pg_crc32        crc;
3889
3890         /*
3891          * Select a hopefully-unique system identifier code for this installation.
3892          * We use the result of gettimeofday(), including the fractional seconds
3893          * field, as being about as unique as we can easily get.  (Think not to
3894          * use random(), since it hasn't been seeded and there's no portable way
3895          * to seed it other than the system clock value...)  The upper half of the
3896          * uint64 value is just the tv_sec part, while the lower half is the XOR
3897          * of tv_sec and tv_usec.  This is to ensure that we don't lose uniqueness
3898          * unnecessarily if "uint64" is really only 32 bits wide.  A person
3899          * knowing this encoding can determine the initialization time of the
3900          * installation, which could perhaps be useful sometimes.
3901          */
3902         gettimeofday(&tv, NULL);
3903         sysidentifier = ((uint64) tv.tv_sec) << 32;
3904         sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
3905
3906         /* First timeline ID is always 1 */
3907         ThisTimeLineID = 1;
3908
3909         /* page buffer must be aligned suitably for O_DIRECT */
3910         buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
3911         page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
3912         memset(page, 0, XLOG_BLCKSZ);
3913
3914         /* Set up information for the initial checkpoint record */
3915         checkPoint.redo.xlogid = 0;
3916         checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
3917         checkPoint.undo = checkPoint.redo;
3918         checkPoint.ThisTimeLineID = ThisTimeLineID;
3919         checkPoint.nextXid = FirstNormalTransactionId;
3920         checkPoint.nextOid = FirstBootstrapObjectId;
3921         checkPoint.nextMulti = FirstMultiXactId;
3922         checkPoint.nextMultiOffset = 0;
3923         checkPoint.time = time(NULL);
3924
3925         ShmemVariableCache->nextXid = checkPoint.nextXid;
3926         ShmemVariableCache->nextOid = checkPoint.nextOid;
3927         ShmemVariableCache->oidCount = 0;
3928         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
3929
3930         /* Set up the XLOG page header */
3931         page->xlp_magic = XLOG_PAGE_MAGIC;
3932         page->xlp_info = XLP_LONG_HEADER;
3933         page->xlp_tli = ThisTimeLineID;
3934         page->xlp_pageaddr.xlogid = 0;
3935         page->xlp_pageaddr.xrecoff = 0;
3936         longpage = (XLogLongPageHeader) page;
3937         longpage->xlp_sysid = sysidentifier;
3938         longpage->xlp_seg_size = XLogSegSize;
3939         longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
3940
3941         /* Insert the initial checkpoint record */
3942         record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
3943         record->xl_prev.xlogid = 0;
3944         record->xl_prev.xrecoff = 0;
3945         record->xl_xid = InvalidTransactionId;
3946         record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
3947         record->xl_len = sizeof(checkPoint);
3948         record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
3949         record->xl_rmid = RM_XLOG_ID;
3950         memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
3951
3952         INIT_CRC32(crc);
3953         COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
3954         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
3955                            SizeOfXLogRecord - sizeof(pg_crc32));
3956         FIN_CRC32(crc);
3957         record->xl_crc = crc;
3958
3959         /* Create first XLOG segment file */
3960         use_existent = false;
3961         openLogFile = XLogFileInit(0, 0, &use_existent, false);
3962
3963         /* Write the first page with the initial record */
3964         errno = 0;
3965         if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3966         {
3967                 /* if write didn't set errno, assume problem is no disk space */
3968                 if (errno == 0)
3969                         errno = ENOSPC;
3970                 ereport(PANIC,
3971                                 (errcode_for_file_access(),
3972                           errmsg("could not write bootstrap transaction log file: %m")));
3973         }
3974
3975         if (pg_fsync(openLogFile) != 0)
3976                 ereport(PANIC,
3977                                 (errcode_for_file_access(),
3978                           errmsg("could not fsync bootstrap transaction log file: %m")));
3979
3980         if (close(openLogFile))
3981                 ereport(PANIC,
3982                                 (errcode_for_file_access(),
3983                           errmsg("could not close bootstrap transaction log file: %m")));
3984
3985         openLogFile = -1;
3986
3987         /* Now create pg_control */
3988
3989         memset(ControlFile, 0, sizeof(ControlFileData));
3990         /* Initialize pg_control status fields */
3991         ControlFile->system_identifier = sysidentifier;
3992         ControlFile->state = DB_SHUTDOWNED;
3993         ControlFile->time = checkPoint.time;
3994         ControlFile->logId = 0;
3995         ControlFile->logSeg = 1;
3996         ControlFile->checkPoint = checkPoint.redo;
3997         ControlFile->checkPointCopy = checkPoint;
3998         /* some additional ControlFile fields are set in WriteControlFile() */
3999
4000         WriteControlFile();
4001
4002         /* Bootstrap the commit log, too */
4003         BootStrapCLOG();
4004         BootStrapSUBTRANS();
4005         BootStrapMultiXact();
4006
4007         pfree(buffer);
4008 }
4009
4010 static char *
4011 str_time(time_t tnow)
4012 {
4013         static char buf[128];
4014
4015         strftime(buf, sizeof(buf),
4016                          "%Y-%m-%d %H:%M:%S %Z",
4017                          localtime(&tnow));
4018
4019         return buf;
4020 }
4021
4022 /*
4023  * See if there is a recovery command file (recovery.conf), and if so
4024  * read in parameters for archive recovery.
4025  *
4026  * XXX longer term intention is to expand this to
4027  * cater for additional parameters and controls
4028  * possibly use a flex lexer similar to the GUC one
4029  */
4030 static void
4031 readRecoveryCommandFile(void)
4032 {
4033         FILE       *fd;
4034         char            cmdline[MAXPGPATH];
4035         TimeLineID      rtli = 0;
4036         bool            rtliGiven = false;
4037         bool            syntaxError = false;
4038
4039         fd = AllocateFile(RECOVERY_COMMAND_FILE, "r");
4040         if (fd == NULL)
4041         {
4042                 if (errno == ENOENT)
4043                         return;                         /* not there, so no archive recovery */
4044                 ereport(FATAL,
4045                                 (errcode_for_file_access(),
4046                                  errmsg("could not open recovery command file \"%s\": %m",
4047                                                 RECOVERY_COMMAND_FILE)));
4048         }
4049
4050         ereport(LOG,
4051                         (errmsg("starting archive recovery")));
4052
4053         /*
4054          * Parse the file...
4055          */
4056         while (fgets(cmdline, MAXPGPATH, fd) != NULL)
4057         {
4058                 /* skip leading whitespace and check for # comment */
4059                 char       *ptr;
4060                 char       *tok1;
4061                 char       *tok2;
4062
4063                 for (ptr = cmdline; *ptr; ptr++)
4064                 {
4065                         if (!isspace((unsigned char) *ptr))
4066                                 break;
4067                 }
4068                 if (*ptr == '\0' || *ptr == '#')
4069                         continue;
4070
4071                 /* identify the quoted parameter value */
4072                 tok1 = strtok(ptr, "'");
4073                 if (!tok1)
4074                 {
4075                         syntaxError = true;
4076                         break;
4077                 }
4078                 tok2 = strtok(NULL, "'");
4079                 if (!tok2)
4080                 {
4081                         syntaxError = true;
4082                         break;
4083                 }
4084                 /* reparse to get just the parameter name */
4085                 tok1 = strtok(ptr, " \t=");
4086                 if (!tok1)
4087                 {
4088                         syntaxError = true;
4089                         break;
4090                 }
4091
4092                 if (strcmp(tok1, "restore_command") == 0)
4093                 {
4094                         recoveryRestoreCommand = pstrdup(tok2);
4095                         ereport(LOG,
4096                                         (errmsg("restore_command = \"%s\"",
4097                                                         recoveryRestoreCommand)));
4098                 }
4099                 else if (strcmp(tok1, "recovery_target_timeline") == 0)
4100                 {
4101                         rtliGiven = true;
4102                         if (strcmp(tok2, "latest") == 0)
4103                                 rtli = 0;
4104                         else
4105                         {
4106                                 errno = 0;
4107                                 rtli = (TimeLineID) strtoul(tok2, NULL, 0);
4108                                 if (errno == EINVAL || errno == ERANGE)
4109                                         ereport(FATAL,
4110                                                         (errmsg("recovery_target_timeline is not a valid number: \"%s\"",
4111                                                                         tok2)));
4112                         }
4113                         if (rtli)
4114                                 ereport(LOG,
4115                                                 (errmsg("recovery_target_timeline = %u", rtli)));
4116                         else
4117                                 ereport(LOG,
4118                                                 (errmsg("recovery_target_timeline = latest")));
4119                 }
4120                 else if (strcmp(tok1, "recovery_target_xid") == 0)
4121                 {
4122                         errno = 0;
4123                         recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0);
4124                         if (errno == EINVAL || errno == ERANGE)
4125                                 ereport(FATAL,
4126                                  (errmsg("recovery_target_xid is not a valid number: \"%s\"",
4127                                                  tok2)));
4128                         ereport(LOG,
4129                                         (errmsg("recovery_target_xid = %u",
4130                                                         recoveryTargetXid)));
4131                         recoveryTarget = true;
4132                         recoveryTargetExact = true;
4133                 }
4134                 else if (strcmp(tok1, "recovery_target_time") == 0)
4135                 {
4136                         /*
4137                          * if recovery_target_xid specified, then this overrides
4138                          * recovery_target_time
4139                          */
4140                         if (recoveryTargetExact)
4141                                 continue;
4142                         recoveryTarget = true;
4143                         recoveryTargetExact = false;
4144
4145                         /*
4146                          * Convert the time string given by the user to the time_t format.
4147                          * We use type abstime's input converter because we know abstime
4148                          * has the same representation as time_t.
4149                          */
4150                         recoveryTargetTime = (time_t)
4151                                 DatumGetAbsoluteTime(DirectFunctionCall1(abstimein,
4152                                                                                                          CStringGetDatum(tok2)));
4153                         ereport(LOG,
4154                                         (errmsg("recovery_target_time = %s",
4155                                                         DatumGetCString(DirectFunctionCall1(abstimeout,
4156                                 AbsoluteTimeGetDatum((AbsoluteTime) recoveryTargetTime))))));
4157                 }
4158                 else if (strcmp(tok1, "recovery_target_inclusive") == 0)
4159                 {
4160                         /*
4161                          * does nothing if a recovery_target is not also set
4162                          */
4163                         if (strcmp(tok2, "true") == 0)
4164                                 recoveryTargetInclusive = true;
4165                         else
4166                         {
4167                                 recoveryTargetInclusive = false;
4168                                 tok2 = "false";
4169                         }
4170                         ereport(LOG,
4171                                         (errmsg("recovery_target_inclusive = %s", tok2)));
4172                 }
4173                 else
4174                         ereport(FATAL,
4175                                         (errmsg("unrecognized recovery parameter \"%s\"",
4176                                                         tok1)));
4177         }
4178
4179         FreeFile(fd);
4180
4181         if (syntaxError)
4182                 ereport(FATAL,
4183                                 (errmsg("syntax error in recovery command file: %s",
4184                                                 cmdline),
4185                           errhint("Lines should have the format parameter = 'value'.")));
4186
4187         /* Check that required parameters were supplied */
4188         if (recoveryRestoreCommand == NULL)
4189                 ereport(FATAL,
4190                                 (errmsg("recovery command file \"%s\" did not specify restore_command",
4191                                                 RECOVERY_COMMAND_FILE)));
4192
4193         /* Enable fetching from archive recovery area */
4194         InArchiveRecovery = true;
4195
4196         /*
4197          * If user specified recovery_target_timeline, validate it or compute the
4198          * "latest" value.      We can't do this until after we've gotten the restore
4199          * command and set InArchiveRecovery, because we need to fetch timeline
4200          * history files from the archive.
4201          */
4202         if (rtliGiven)
4203         {
4204                 if (rtli)
4205                 {
4206                         /* Timeline 1 does not have a history file, all else should */
4207                         if (rtli != 1 && !existsTimeLineHistory(rtli))
4208                                 ereport(FATAL,
4209                                                 (errmsg("recovery_target_timeline %u does not exist",
4210                                                                 rtli)));
4211                         recoveryTargetTLI = rtli;
4212                 }
4213                 else
4214                 {
4215                         /* We start the "latest" search from pg_control's timeline */
4216                         recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI);
4217                 }
4218         }
4219 }
4220
4221 /*
4222  * Exit archive-recovery state
4223  */
4224 static void
4225 exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
4226 {
4227         char            recoveryPath[MAXPGPATH];
4228         char            xlogpath[MAXPGPATH];
4229
4230         /*
4231          * We are no longer in archive recovery state.
4232          */
4233         InArchiveRecovery = false;
4234
4235         /*
4236          * We should have the ending log segment currently open.  Verify, and then
4237          * close it (to avoid problems on Windows with trying to rename or delete
4238          * an open file).
4239          */
4240         Assert(readFile >= 0);
4241         Assert(readId == endLogId);
4242         Assert(readSeg == endLogSeg);
4243
4244         close(readFile);
4245         readFile = -1;
4246
4247         /*
4248          * If the segment was fetched from archival storage, we want to replace
4249          * the existing xlog segment (if any) with the archival version.  This is
4250          * because whatever is in XLOGDIR is very possibly older than what we have
4251          * from the archives, since it could have come from restoring a PGDATA
4252          * backup.      In any case, the archival version certainly is more
4253          * descriptive of what our current database state is, because that is what
4254          * we replayed from.
4255          *
4256          * Note that if we are establishing a new timeline, ThisTimeLineID is
4257          * already set to the new value, and so we will create a new file instead
4258          * of overwriting any existing file.
4259          */
4260         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
4261         XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4262
4263         if (restoredFromArchive)
4264         {
4265                 ereport(DEBUG3,
4266                                 (errmsg_internal("moving last restored xlog to \"%s\"",
4267                                                                  xlogpath)));
4268                 unlink(xlogpath);               /* might or might not exist */
4269                 if (rename(recoveryPath, xlogpath) != 0)
4270                         ereport(FATAL,
4271                                         (errcode_for_file_access(),
4272                                          errmsg("could not rename file \"%s\" to \"%s\": %m",
4273                                                         recoveryPath, xlogpath)));
4274                 /* XXX might we need to fix permissions on the file? */
4275         }
4276         else
4277         {
4278                 /*
4279                  * If the latest segment is not archival, but there's still a
4280                  * RECOVERYXLOG laying about, get rid of it.
4281                  */
4282                 unlink(recoveryPath);   /* ignore any error */
4283
4284                 /*
4285                  * If we are establishing a new timeline, we have to copy data from
4286                  * the last WAL segment of the old timeline to create a starting WAL
4287                  * segment for the new timeline.
4288                  */
4289                 if (endTLI != ThisTimeLineID)
4290                         XLogFileCopy(endLogId, endLogSeg,
4291                                                  endTLI, endLogId, endLogSeg);
4292         }
4293
4294         /*
4295          * Let's just make real sure there are not .ready or .done flags posted
4296          * for the new segment.
4297          */
4298         XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4299         XLogArchiveCleanup(xlogpath);
4300
4301         /* Get rid of any remaining recovered timeline-history file, too */
4302         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
4303         unlink(recoveryPath);           /* ignore any error */
4304
4305         /*
4306          * Rename the config file out of the way, so that we don't accidentally
4307          * re-enter archive recovery mode in a subsequent crash.
4308          */
4309         unlink(RECOVERY_COMMAND_DONE);
4310         if (rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE) != 0)
4311                 ereport(FATAL,
4312                                 (errcode_for_file_access(),
4313                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
4314                                                 RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE)));
4315
4316         ereport(LOG,
4317                         (errmsg("archive recovery complete")));
4318 }
4319
4320 /*
4321  * For point-in-time recovery, this function decides whether we want to
4322  * stop applying the XLOG at or after the current record.
4323  *
4324  * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
4325  * *includeThis is set TRUE if we should apply this record before stopping.
4326  * Also, some information is saved in recoveryStopXid et al for use in
4327  * annotating the new timeline's history file.
4328  */
4329 static bool
4330 recoveryStopsHere(XLogRecord *record, bool *includeThis)
4331 {
4332         bool            stopsHere;
4333         uint8           record_info;
4334         time_t          recordXtime;
4335
4336         /* Do we have a PITR target at all? */
4337         if (!recoveryTarget)
4338                 return false;
4339
4340         /* We only consider stopping at COMMIT or ABORT records */
4341         if (record->xl_rmid != RM_XACT_ID)
4342                 return false;
4343         record_info = record->xl_info & ~XLR_INFO_MASK;
4344         if (record_info == XLOG_XACT_COMMIT)
4345         {
4346                 xl_xact_commit *recordXactCommitData;
4347
4348                 recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
4349                 recordXtime = recordXactCommitData->xtime;
4350         }
4351         else if (record_info == XLOG_XACT_ABORT)
4352         {
4353                 xl_xact_abort *recordXactAbortData;
4354
4355                 recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
4356                 recordXtime = recordXactAbortData->xtime;
4357         }
4358         else
4359                 return false;
4360
4361         if (recoveryTargetExact)
4362         {
4363                 /*
4364                  * there can be only one transaction end record with this exact
4365                  * transactionid
4366                  *
4367                  * when testing for an xid, we MUST test for equality only, since
4368                  * transactions are numbered in the order they start, not the order
4369                  * they complete. A higher numbered xid will complete before you about
4370                  * 50% of the time...
4371                  */
4372                 stopsHere = (record->xl_xid == recoveryTargetXid);
4373                 if (stopsHere)
4374                         *includeThis = recoveryTargetInclusive;
4375         }
4376         else
4377         {
4378                 /*
4379                  * there can be many transactions that share the same commit time, so
4380                  * we stop after the last one, if we are inclusive, or stop at the
4381                  * first one if we are exclusive
4382                  */
4383                 if (recoveryTargetInclusive)
4384                         stopsHere = (recordXtime > recoveryTargetTime);
4385                 else
4386                         stopsHere = (recordXtime >= recoveryTargetTime);
4387                 if (stopsHere)
4388                         *includeThis = false;
4389         }
4390
4391         if (stopsHere)
4392         {
4393                 recoveryStopXid = record->xl_xid;
4394                 recoveryStopTime = recordXtime;
4395                 recoveryStopAfter = *includeThis;
4396
4397                 if (record_info == XLOG_XACT_COMMIT)
4398                 {
4399                         if (recoveryStopAfter)
4400                                 ereport(LOG,
4401                                                 (errmsg("recovery stopping after commit of transaction %u, time %s",
4402                                                           recoveryStopXid, str_time(recoveryStopTime))));
4403                         else
4404                                 ereport(LOG,
4405                                                 (errmsg("recovery stopping before commit of transaction %u, time %s",
4406                                                           recoveryStopXid, str_time(recoveryStopTime))));
4407                 }
4408                 else
4409                 {
4410                         if (recoveryStopAfter)
4411                                 ereport(LOG,
4412                                                 (errmsg("recovery stopping after abort of transaction %u, time %s",
4413                                                           recoveryStopXid, str_time(recoveryStopTime))));
4414                         else
4415                                 ereport(LOG,
4416                                                 (errmsg("recovery stopping before abort of transaction %u, time %s",
4417                                                           recoveryStopXid, str_time(recoveryStopTime))));
4418                 }
4419         }
4420
4421         return stopsHere;
4422 }
4423
4424 /*
4425  * This must be called ONCE during postmaster or standalone-backend startup
4426  */
4427 void
4428 StartupXLOG(void)
4429 {
4430         XLogCtlInsert *Insert;
4431         CheckPoint      checkPoint;
4432         bool            wasShutdown;
4433         bool            needNewTimeLine = false;
4434         XLogRecPtr      RecPtr,
4435                                 LastRec,
4436                                 checkPointLoc,
4437                                 EndOfLog;
4438         uint32          endLogId;
4439         uint32          endLogSeg;
4440         XLogRecord *record;
4441         uint32          freespace;
4442         TransactionId oldestActiveXID;
4443
4444         CritSectionCount++;
4445
4446         /*
4447          * Read control file and check XLOG status looks valid.
4448          *
4449          * Note: in most control paths, *ControlFile is already valid and we need
4450          * not do ReadControlFile() here, but might as well do it to be sure.
4451          */
4452         ReadControlFile();
4453
4454         if (ControlFile->logSeg == 0 ||
4455                 ControlFile->state < DB_SHUTDOWNED ||
4456                 ControlFile->state > DB_IN_PRODUCTION ||
4457                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
4458                 ereport(FATAL,
4459                                 (errmsg("control file contains invalid data")));
4460
4461         if (ControlFile->state == DB_SHUTDOWNED)
4462                 ereport(LOG,
4463                                 (errmsg("database system was shut down at %s",
4464                                                 str_time(ControlFile->time))));
4465         else if (ControlFile->state == DB_SHUTDOWNING)
4466                 ereport(LOG,
4467                                 (errmsg("database system shutdown was interrupted at %s",
4468                                                 str_time(ControlFile->time))));
4469         else if (ControlFile->state == DB_IN_RECOVERY)
4470                 ereport(LOG,
4471                    (errmsg("database system was interrupted while in recovery at %s",
4472                                    str_time(ControlFile->time)),
4473                         errhint("This probably means that some data is corrupted and"
4474                                         " you will have to use the last backup for recovery.")));
4475         else if (ControlFile->state == DB_IN_PRODUCTION)
4476                 ereport(LOG,
4477                                 (errmsg("database system was interrupted at %s",
4478                                                 str_time(ControlFile->time))));
4479
4480         /* This is just to allow attaching to startup process with a debugger */
4481 #ifdef XLOG_REPLAY_DELAY
4482         if (ControlFile->state != DB_SHUTDOWNED)
4483                 pg_usleep(60000000L);
4484 #endif
4485
4486         /*
4487          * Initialize on the assumption we want to recover to the same timeline
4488          * that's active according to pg_control.
4489          */
4490         recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
4491
4492         /*
4493          * Check for recovery control file, and if so set up state for offline
4494          * recovery
4495          */
4496         readRecoveryCommandFile();
4497
4498         /* Now we can determine the list of expected TLIs */
4499         expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
4500
4501         /*
4502          * If pg_control's timeline is not in expectedTLIs, then we cannot
4503          * proceed: the backup is not part of the history of the requested
4504          * timeline.
4505          */
4506         if (!list_member_int(expectedTLIs,
4507                                                  (int) ControlFile->checkPointCopy.ThisTimeLineID))
4508                 ereport(FATAL,
4509                                 (errmsg("requested timeline %u is not a child of database system timeline %u",
4510                                                 recoveryTargetTLI,
4511                                                 ControlFile->checkPointCopy.ThisTimeLineID)));
4512
4513         if (read_backup_label(&checkPointLoc))
4514         {
4515                 /*
4516                  * When a backup_label file is present, we want to roll forward from
4517                  * the checkpoint it identifies, rather than using pg_control.
4518                  */
4519                 record = ReadCheckpointRecord(checkPointLoc, 0);
4520                 if (record != NULL)
4521                 {
4522                         ereport(LOG,
4523                                         (errmsg("checkpoint record is at %X/%X",
4524                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4525                         InRecovery = true;      /* force recovery even if SHUTDOWNED */
4526                 }
4527                 else
4528                 {
4529                         ereport(PANIC,
4530                                         (errmsg("could not locate required checkpoint record"),
4531                                          errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
4532                 }
4533         }
4534         else
4535         {
4536                 /*
4537                  * Get the last valid checkpoint record.  If the latest one according
4538                  * to pg_control is broken, try the next-to-last one.
4539                  */
4540                 checkPointLoc = ControlFile->checkPoint;
4541                 record = ReadCheckpointRecord(checkPointLoc, 1);
4542                 if (record != NULL)
4543                 {
4544                         ereport(LOG,
4545                                         (errmsg("checkpoint record is at %X/%X",
4546                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4547                 }
4548                 else
4549                 {
4550                         checkPointLoc = ControlFile->prevCheckPoint;
4551                         record = ReadCheckpointRecord(checkPointLoc, 2);
4552                         if (record != NULL)
4553                         {
4554                                 ereport(LOG,
4555                                                 (errmsg("using previous checkpoint record at %X/%X",
4556                                                           checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4557                                 InRecovery = true;              /* force recovery even if SHUTDOWNED */
4558                         }
4559                         else
4560                                 ereport(PANIC,
4561                                          (errmsg("could not locate a valid checkpoint record")));
4562                 }
4563         }
4564
4565         LastRec = RecPtr = checkPointLoc;
4566         memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
4567         wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
4568
4569         ereport(LOG,
4570          (errmsg("redo record is at %X/%X; undo record is at %X/%X; shutdown %s",
4571                          checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
4572                          checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
4573                          wasShutdown ? "TRUE" : "FALSE")));
4574         ereport(LOG,
4575                         (errmsg("next transaction ID: %u; next OID: %u",
4576                                         checkPoint.nextXid, checkPoint.nextOid)));
4577         ereport(LOG,
4578                         (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
4579                                         checkPoint.nextMulti, checkPoint.nextMultiOffset)));
4580         if (!TransactionIdIsNormal(checkPoint.nextXid))
4581                 ereport(PANIC,
4582                                 (errmsg("invalid next transaction ID")));
4583
4584         ShmemVariableCache->nextXid = checkPoint.nextXid;
4585         ShmemVariableCache->nextOid = checkPoint.nextOid;
4586         ShmemVariableCache->oidCount = 0;
4587         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4588
4589         /*
4590          * We must replay WAL entries using the same TimeLineID they were created
4591          * under, so temporarily adopt the TLI indicated by the checkpoint (see
4592          * also xlog_redo()).
4593          */
4594         ThisTimeLineID = checkPoint.ThisTimeLineID;
4595
4596         RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
4597
4598         if (XLByteLT(RecPtr, checkPoint.redo))
4599                 ereport(PANIC,
4600                                 (errmsg("invalid redo in checkpoint record")));
4601         if (checkPoint.undo.xrecoff == 0)
4602                 checkPoint.undo = RecPtr;
4603
4604         /*
4605          * Check whether we need to force recovery from WAL.  If it appears to
4606          * have been a clean shutdown and we did not have a recovery.conf file,
4607          * then assume no recovery needed.
4608          */
4609         if (XLByteLT(checkPoint.undo, RecPtr) ||
4610                 XLByteLT(checkPoint.redo, RecPtr))
4611         {
4612                 if (wasShutdown)
4613                         ereport(PANIC,
4614                                 (errmsg("invalid redo/undo record in shutdown checkpoint")));
4615                 InRecovery = true;
4616         }
4617         else if (ControlFile->state != DB_SHUTDOWNED)
4618                 InRecovery = true;
4619         else if (InArchiveRecovery)
4620         {
4621                 /* force recovery due to presence of recovery.conf */
4622                 InRecovery = true;
4623         }
4624
4625         /* REDO */
4626         if (InRecovery)
4627         {
4628                 int                     rmid;
4629
4630                 if (InArchiveRecovery)
4631                         ereport(LOG,
4632                                         (errmsg("automatic recovery in progress")));
4633                 else
4634                         ereport(LOG,
4635                                         (errmsg("database system was not properly shut down; "
4636                                                         "automatic recovery in progress")));
4637                 ControlFile->state = DB_IN_RECOVERY;
4638                 ControlFile->time = time(NULL);
4639                 UpdateControlFile();
4640
4641                 /* Start up the recovery environment */
4642                 XLogInitRelationCache();
4643
4644                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
4645                 {
4646                         if (RmgrTable[rmid].rm_startup != NULL)
4647                                 RmgrTable[rmid].rm_startup();
4648                 }
4649
4650                 /*
4651                  * Find the first record that logically follows the checkpoint --- it
4652                  * might physically precede it, though.
4653                  */
4654                 if (XLByteLT(checkPoint.redo, RecPtr))
4655                 {
4656                         /* back up to find the record */
4657                         record = ReadRecord(&(checkPoint.redo), PANIC);
4658                 }
4659                 else
4660                 {
4661                         /* just have to read next record after CheckPoint */
4662                         record = ReadRecord(NULL, LOG);
4663                 }
4664
4665                 if (record != NULL)
4666                 {
4667                         bool            recoveryContinue = true;
4668                         bool            recoveryApply = true;
4669                         ErrorContextCallback    errcontext;
4670
4671                         InRedo = true;
4672                         ereport(LOG,
4673                                         (errmsg("redo starts at %X/%X",
4674                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
4675
4676                         /*
4677                          * main redo apply loop
4678                          */
4679                         do
4680                         {
4681 #ifdef WAL_DEBUG
4682                                 if (XLOG_DEBUG)
4683                                 {
4684                                         StringInfoData  buf;
4685
4686                                         initStringInfo(&buf);
4687                                         appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
4688                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
4689                                                         EndRecPtr.xlogid, EndRecPtr.xrecoff);
4690                                         xlog_outrec(&buf, record);
4691                                         appendStringInfo(&buf, " - ");
4692                                         RmgrTable[record->xl_rmid].rm_desc(&buf,
4693                                                                                                            record->xl_info,
4694                                                                                                            XLogRecGetData(record));
4695                                         elog(LOG, "%s", buf.data);
4696                                         pfree(buf.data);
4697                                 }
4698 #endif
4699
4700                                 /*
4701                                  * Have we reached our recovery target?
4702                                  */
4703                                 if (recoveryStopsHere(record, &recoveryApply))
4704                                 {
4705                                         needNewTimeLine = true;         /* see below */
4706                                         recoveryContinue = false;
4707                                         if (!recoveryApply)
4708                                                 break;
4709                                 }
4710
4711                                 /* Setup error traceback support for ereport() */
4712                                 errcontext.callback = rm_redo_error_callback;
4713                                 errcontext.arg = (void *) record;
4714                                 errcontext.previous = error_context_stack;
4715                                 error_context_stack = &errcontext;
4716
4717                                 /* nextXid must be beyond record's xid */
4718                                 if (TransactionIdFollowsOrEquals(record->xl_xid,
4719                                                                                                  ShmemVariableCache->nextXid))
4720                                 {
4721                                         ShmemVariableCache->nextXid = record->xl_xid;
4722                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
4723                                 }
4724
4725                                 if (record->xl_info & XLR_BKP_BLOCK_MASK)
4726                                         RestoreBkpBlocks(record, EndRecPtr);
4727
4728                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
4729
4730                                 /* Pop the error context stack */
4731                                 error_context_stack = errcontext.previous;
4732
4733                                 LastRec = ReadRecPtr;
4734
4735                                 record = ReadRecord(NULL, LOG);
4736                         } while (record != NULL && recoveryContinue);
4737
4738                         /*
4739                          * end of main redo apply loop
4740                          */
4741
4742                         ereport(LOG,
4743                                         (errmsg("redo done at %X/%X",
4744                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
4745                         InRedo = false;
4746                 }
4747                 else
4748                 {
4749                         /* there are no WAL records following the checkpoint */
4750                         ereport(LOG,
4751                                         (errmsg("redo is not required")));
4752                 }
4753         }
4754
4755         /*
4756          * Re-fetch the last valid or last applied record, so we can identify the
4757          * exact endpoint of what we consider the valid portion of WAL.
4758          */
4759         record = ReadRecord(&LastRec, PANIC);
4760         EndOfLog = EndRecPtr;
4761         XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
4762
4763         /*
4764          * Complain if we did not roll forward far enough to render the backup
4765          * dump consistent.
4766          */
4767         if (XLByteLT(EndOfLog, recoveryMinXlogOffset))
4768         {
4769                 if (needNewTimeLine)    /* stopped because of stop request */
4770                         ereport(FATAL,
4771                                         (errmsg("requested recovery stop point is before end time of backup dump")));
4772                 else
4773                         /* ran off end of WAL */
4774                         ereport(FATAL,
4775                                         (errmsg("WAL ends before end time of backup dump")));
4776         }
4777
4778         /*
4779          * Consider whether we need to assign a new timeline ID.
4780          *
4781          * If we stopped short of the end of WAL during recovery, then we are
4782          * generating a new timeline and must assign it a unique new ID.
4783          * Otherwise, we can just extend the timeline we were in when we ran out
4784          * of WAL.
4785          */
4786         if (needNewTimeLine)
4787         {
4788                 ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
4789                 ereport(LOG,
4790                                 (errmsg("selected new timeline ID: %u", ThisTimeLineID)));
4791                 writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
4792                                                          curFileTLI, endLogId, endLogSeg);
4793         }
4794
4795         /* Save the selected TimeLineID in shared memory, too */
4796         XLogCtl->ThisTimeLineID = ThisTimeLineID;
4797
4798         /*
4799          * We are now done reading the old WAL.  Turn off archive fetching if it
4800          * was active, and make a writable copy of the last WAL segment. (Note
4801          * that we also have a copy of the last block of the old WAL in readBuf;
4802          * we will use that below.)
4803          */
4804         if (InArchiveRecovery)
4805                 exitArchiveRecovery(curFileTLI, endLogId, endLogSeg);
4806
4807         /*
4808          * Prepare to write WAL starting at EndOfLog position, and init xlog
4809          * buffer cache using the block containing the last record from the
4810          * previous incarnation.
4811          */
4812         openLogId = endLogId;
4813         openLogSeg = endLogSeg;
4814         openLogFile = XLogFileOpen(openLogId, openLogSeg);
4815         openLogOff = 0;
4816         ControlFile->logId = openLogId;
4817         ControlFile->logSeg = openLogSeg + 1;
4818         Insert = &XLogCtl->Insert;
4819         Insert->PrevRecord = LastRec;
4820         XLogCtl->xlblocks[0].xlogid = openLogId;
4821         XLogCtl->xlblocks[0].xrecoff =
4822                 ((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
4823
4824         /*
4825          * Tricky point here: readBuf contains the *last* block that the LastRec
4826          * record spans, not the one it starts in.      The last block is indeed the
4827          * one we want to use.
4828          */
4829         Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
4830         memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
4831         Insert->currpos = (char *) Insert->currpage +
4832                 (EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
4833
4834         LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
4835
4836         XLogCtl->Write.LogwrtResult = LogwrtResult;
4837         Insert->LogwrtResult = LogwrtResult;
4838         XLogCtl->LogwrtResult = LogwrtResult;
4839
4840         XLogCtl->LogwrtRqst.Write = EndOfLog;
4841         XLogCtl->LogwrtRqst.Flush = EndOfLog;
4842
4843         freespace = INSERT_FREESPACE(Insert);
4844         if (freespace > 0)
4845         {
4846                 /* Make sure rest of page is zero */
4847                 MemSet(Insert->currpos, 0, freespace);
4848                 XLogCtl->Write.curridx = 0;
4849         }
4850         else
4851         {
4852                 /*
4853                  * Whenever Write.LogwrtResult points to exactly the end of a page,
4854                  * Write.curridx must point to the *next* page (see XLogWrite()).
4855                  *
4856                  * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
4857                  * this is sufficient.  The first actual attempt to insert a log
4858                  * record will advance the insert state.
4859                  */
4860                 XLogCtl->Write.curridx = NextBufIdx(0);
4861         }
4862
4863         /* Pre-scan prepared transactions to find out the range of XIDs present */
4864         oldestActiveXID = PrescanPreparedTransactions();
4865
4866         if (InRecovery)
4867         {
4868                 int                     rmid;
4869
4870                 /*
4871                  * Allow resource managers to do any required cleanup.
4872                  */
4873                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
4874                 {
4875                         if (RmgrTable[rmid].rm_cleanup != NULL)
4876                                 RmgrTable[rmid].rm_cleanup();
4877                 }
4878
4879                 /*
4880                  * Check to see if the XLOG sequence contained any unresolved
4881                  * references to uninitialized pages.
4882                  */
4883                 XLogCheckInvalidPages();
4884
4885                 /*
4886                  * Reset pgstat data, because it may be invalid after recovery.
4887                  */
4888                 pgstat_reset_all();
4889
4890                 /*
4891                  * Perform a new checkpoint to update our recovery activity to disk.
4892                  *
4893                  * Note that we write a shutdown checkpoint rather than an on-line
4894                  * one. This is not particularly critical, but since we may be
4895                  * assigning a new TLI, using a shutdown checkpoint allows us to have
4896                  * the rule that TLI only changes in shutdown checkpoints, which
4897                  * allows some extra error checking in xlog_redo.
4898                  *
4899                  * In case we had to use the secondary checkpoint, make sure that it
4900                  * will still be shown as the secondary checkpoint after this
4901                  * CreateCheckPoint operation; we don't want the broken primary
4902                  * checkpoint to become prevCheckPoint...
4903                  */
4904                 if (XLByteEQ(checkPointLoc, ControlFile->prevCheckPoint))
4905                         ControlFile->checkPoint = checkPointLoc;
4906
4907                 CreateCheckPoint(true, true);
4908
4909                 /*
4910                  * Close down recovery environment
4911                  */
4912                 XLogCloseRelationCache();
4913
4914                 /*
4915                  * Now that we've checkpointed the recovery, it's safe to flush old
4916                  * backup_label, if present.
4917                  */
4918                 remove_backup_label();
4919         }
4920
4921         /*
4922          * Preallocate additional log files, if wanted.
4923          */
4924         (void) PreallocXlogFiles(EndOfLog);
4925
4926         /*
4927          * Okay, we're officially UP.
4928          */
4929         InRecovery = false;
4930
4931         ControlFile->state = DB_IN_PRODUCTION;
4932         ControlFile->time = time(NULL);
4933         UpdateControlFile();
4934
4935         /* Start up the commit log and related stuff, too */
4936         StartupCLOG();
4937         StartupSUBTRANS(oldestActiveXID);
4938         StartupMultiXact();
4939
4940         /* Reload shared-memory state for prepared transactions */
4941         RecoverPreparedTransactions();
4942
4943         ereport(LOG,
4944                         (errmsg("database system is ready")));
4945         CritSectionCount--;
4946
4947         /* Shut down readFile facility, free space */
4948         if (readFile >= 0)
4949         {
4950                 close(readFile);
4951                 readFile = -1;
4952         }
4953         if (readBuf)
4954         {
4955                 free(readBuf);
4956                 readBuf = NULL;
4957         }
4958         if (readRecordBuf)
4959         {
4960                 free(readRecordBuf);
4961                 readRecordBuf = NULL;
4962                 readRecordBufSize = 0;
4963         }
4964 }
4965
4966 /*
4967  * Subroutine to try to fetch and validate a prior checkpoint record.
4968  *
4969  * whichChkpt identifies the checkpoint (merely for reporting purposes).
4970  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
4971  */
4972 static XLogRecord *
4973 ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
4974 {
4975         XLogRecord *record;
4976
4977         if (!XRecOffIsValid(RecPtr.xrecoff))
4978         {
4979                 switch (whichChkpt)
4980                 {
4981                         case 1:
4982                                 ereport(LOG,
4983                                 (errmsg("invalid primary checkpoint link in control file")));
4984                                 break;
4985                         case 2:
4986                                 ereport(LOG,
4987                                                 (errmsg("invalid secondary checkpoint link in control file")));
4988                                 break;
4989                         default:
4990                                 ereport(LOG,
4991                                    (errmsg("invalid checkpoint link in backup_label file")));
4992                                 break;
4993                 }
4994                 return NULL;
4995         }
4996
4997         record = ReadRecord(&RecPtr, LOG);
4998
4999         if (record == NULL)
5000         {
5001                 switch (whichChkpt)
5002                 {
5003                         case 1:
5004                                 ereport(LOG,
5005                                                 (errmsg("invalid primary checkpoint record")));
5006                                 break;
5007                         case 2:
5008                                 ereport(LOG,
5009                                                 (errmsg("invalid secondary checkpoint record")));
5010                                 break;
5011                         default:
5012                                 ereport(LOG,
5013                                                 (errmsg("invalid checkpoint record")));
5014                                 break;
5015                 }
5016                 return NULL;
5017         }
5018         if (record->xl_rmid != RM_XLOG_ID)
5019         {
5020                 switch (whichChkpt)
5021                 {
5022                         case 1:
5023                                 ereport(LOG,
5024                                                 (errmsg("invalid resource manager ID in primary checkpoint record")));
5025                                 break;
5026                         case 2:
5027                                 ereport(LOG,
5028                                                 (errmsg("invalid resource manager ID in secondary checkpoint record")));
5029                                 break;
5030                         default:
5031                                 ereport(LOG,
5032                                 (errmsg("invalid resource manager ID in checkpoint record")));
5033                                 break;
5034                 }
5035                 return NULL;
5036         }
5037         if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
5038                 record->xl_info != XLOG_CHECKPOINT_ONLINE)
5039         {
5040                 switch (whichChkpt)
5041                 {
5042                         case 1:
5043                                 ereport(LOG,
5044                                    (errmsg("invalid xl_info in primary checkpoint record")));
5045                                 break;
5046                         case 2:
5047                                 ereport(LOG,
5048                                  (errmsg("invalid xl_info in secondary checkpoint record")));
5049                                 break;
5050                         default:
5051                                 ereport(LOG,
5052                                                 (errmsg("invalid xl_info in checkpoint record")));
5053                                 break;
5054                 }
5055                 return NULL;
5056         }
5057         if (record->xl_len != sizeof(CheckPoint) ||
5058                 record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
5059         {
5060                 switch (whichChkpt)
5061                 {
5062                         case 1:
5063                                 ereport(LOG,
5064                                         (errmsg("invalid length of primary checkpoint record")));
5065                                 break;
5066                         case 2:
5067                                 ereport(LOG,
5068                                   (errmsg("invalid length of secondary checkpoint record")));
5069                                 break;
5070                         default:
5071                                 ereport(LOG,
5072                                                 (errmsg("invalid length of checkpoint record")));
5073                                 break;
5074                 }
5075                 return NULL;
5076         }
5077         return record;
5078 }
5079
5080 /*
5081  * This must be called during startup of a backend process, except that
5082  * it need not be called in a standalone backend (which does StartupXLOG
5083  * instead).  We need to initialize the local copies of ThisTimeLineID and
5084  * RedoRecPtr.
5085  *
5086  * Note: before Postgres 8.0, we went to some effort to keep the postmaster
5087  * process's copies of ThisTimeLineID and RedoRecPtr valid too.  This was
5088  * unnecessary however, since the postmaster itself never touches XLOG anyway.
5089  */
5090 void
5091 InitXLOGAccess(void)
5092 {
5093         /* ThisTimeLineID doesn't change so we need no lock to copy it */
5094         ThisTimeLineID = XLogCtl->ThisTimeLineID;
5095         /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
5096         (void) GetRedoRecPtr();
5097 }
5098
5099 /*
5100  * Once spawned, a backend may update its local RedoRecPtr from
5101  * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
5102  * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
5103  */
5104 XLogRecPtr
5105 GetRedoRecPtr(void)
5106 {
5107         /* use volatile pointer to prevent code rearrangement */
5108         volatile XLogCtlData *xlogctl = XLogCtl;
5109
5110         SpinLockAcquire(&xlogctl->info_lck);
5111         Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
5112         RedoRecPtr = xlogctl->Insert.RedoRecPtr;
5113         SpinLockRelease(&xlogctl->info_lck);
5114
5115         return RedoRecPtr;
5116 }
5117
5118 /*
5119  * GetRecentNextXid - get the nextXid value saved by the most recent checkpoint
5120  *
5121  * This is currently used only by the autovacuum daemon.  To check for
5122  * impending XID wraparound, autovac needs an approximate idea of the current
5123  * XID counter, and it needs it before choosing which DB to attach to, hence
5124  * before it sets up a PGPROC, hence before it can take any LWLocks.  But it
5125  * has attached to shared memory, and so we can let it reach into the shared
5126  * ControlFile structure and pull out the last checkpoint nextXID.
5127  *
5128  * Since we don't take any sort of lock, we have to assume that reading a
5129  * TransactionId is atomic ... but that assumption is made elsewhere, too,
5130  * and in any case the worst possible consequence of a bogus result is that
5131  * autovac issues an unnecessary database-wide VACUUM.
5132  *
5133  * Note: we could also choose to read ShmemVariableCache->nextXid in an
5134  * unlocked fashion, thus getting a more up-to-date result; but since that
5135  * changes far more frequently than the controlfile checkpoint copy, it would
5136  * pose a far higher risk of bogus result if we did have a nonatomic-read
5137  * problem.
5138  *
5139  * A (theoretically) completely safe answer is to read the actual pg_control
5140  * file into local process memory, but that certainly seems like overkill.
5141  */
5142 TransactionId
5143 GetRecentNextXid(void)
5144 {
5145         return ControlFile->checkPointCopy.nextXid;
5146 }
5147
5148 /*
5149  * This must be called ONCE during postmaster or standalone-backend shutdown
5150  */
5151 void
5152 ShutdownXLOG(int code, Datum arg)
5153 {
5154         ereport(LOG,
5155                         (errmsg("shutting down")));
5156
5157         CritSectionCount++;
5158         CreateCheckPoint(true, true);
5159         ShutdownCLOG();
5160         ShutdownSUBTRANS();
5161         ShutdownMultiXact();
5162         CritSectionCount--;
5163
5164         ereport(LOG,
5165                         (errmsg("database system is shut down")));
5166 }
5167
5168 /*
5169  * Perform a checkpoint --- either during shutdown, or on-the-fly
5170  *
5171  * If force is true, we force a checkpoint regardless of whether any XLOG
5172  * activity has occurred since the last one.
5173  */
5174 void
5175 CreateCheckPoint(bool shutdown, bool force)
5176 {
5177         CheckPoint      checkPoint;
5178         XLogRecPtr      recptr;
5179         XLogCtlInsert *Insert = &XLogCtl->Insert;
5180         XLogRecData rdata;
5181         uint32          freespace;
5182         uint32          _logId;
5183         uint32          _logSeg;
5184         int                     nsegsadded = 0;
5185         int                     nsegsremoved = 0;
5186         int                     nsegsrecycled = 0;
5187
5188         /*
5189          * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
5190          * (This is just pro forma, since in the present system structure there is
5191          * only one process that is allowed to issue checkpoints at any given
5192          * time.)
5193          */
5194         LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
5195
5196         /*
5197          * Use a critical section to force system panic if we have trouble.
5198          */
5199         START_CRIT_SECTION();
5200
5201         if (shutdown)
5202         {
5203                 ControlFile->state = DB_SHUTDOWNING;
5204                 ControlFile->time = time(NULL);
5205                 UpdateControlFile();
5206         }
5207
5208         MemSet(&checkPoint, 0, sizeof(checkPoint));
5209         checkPoint.ThisTimeLineID = ThisTimeLineID;
5210         checkPoint.time = time(NULL);
5211
5212         /*
5213          * We must hold CheckpointStartLock while determining the checkpoint REDO
5214          * pointer.  This ensures that any concurrent transaction commits will be
5215          * either not yet logged, or logged and recorded in pg_clog. See notes in
5216          * RecordTransactionCommit().
5217          */
5218         LWLockAcquire(CheckpointStartLock, LW_EXCLUSIVE);
5219
5220         /* And we need WALInsertLock too */
5221         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5222
5223         /*
5224          * If this isn't a shutdown or forced checkpoint, and we have not inserted
5225          * any XLOG records since the start of the last checkpoint, skip the
5226          * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
5227          * when the system is idle. That wastes log space, and more importantly it
5228          * exposes us to possible loss of both current and previous checkpoint
5229          * records if the machine crashes just as we're writing the update.
5230          * (Perhaps it'd make even more sense to checkpoint only when the previous
5231          * checkpoint record is in a different xlog page?)
5232          *
5233          * We have to make two tests to determine that nothing has happened since
5234          * the start of the last checkpoint: current insertion point must match
5235          * the end of the last checkpoint record, and its redo pointer must point
5236          * to itself.
5237          */
5238         if (!shutdown && !force)
5239         {
5240                 XLogRecPtr      curInsert;
5241
5242                 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
5243                 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
5244                         curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
5245                         MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
5246                         ControlFile->checkPoint.xlogid ==
5247                         ControlFile->checkPointCopy.redo.xlogid &&
5248                         ControlFile->checkPoint.xrecoff ==
5249                         ControlFile->checkPointCopy.redo.xrecoff)
5250                 {
5251                         LWLockRelease(WALInsertLock);
5252                         LWLockRelease(CheckpointStartLock);
5253                         LWLockRelease(CheckpointLock);
5254                         END_CRIT_SECTION();
5255                         return;
5256                 }
5257         }
5258
5259         /*
5260          * Compute new REDO record ptr = location of next XLOG record.
5261          *
5262          * NB: this is NOT necessarily where the checkpoint record itself will be,
5263          * since other backends may insert more XLOG records while we're off doing
5264          * the buffer flush work.  Those XLOG records are logically after the
5265          * checkpoint, even though physically before it.  Got that?
5266          */
5267         freespace = INSERT_FREESPACE(Insert);
5268         if (freespace < SizeOfXLogRecord)
5269         {
5270                 (void) AdvanceXLInsertBuffer();
5271                 /* OK to ignore update return flag, since we will do flush anyway */
5272                 freespace = INSERT_FREESPACE(Insert);
5273         }
5274         INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
5275
5276         /*
5277          * Here we update the shared RedoRecPtr for future XLogInsert calls; this
5278          * must be done while holding the insert lock AND the info_lck.
5279          *
5280          * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
5281          * pointing past where it really needs to point.  This is okay; the only
5282          * consequence is that XLogInsert might back up whole buffers that it
5283          * didn't really need to.  We can't postpone advancing RedoRecPtr because
5284          * XLogInserts that happen while we are dumping buffers must assume that
5285          * their buffer changes are not included in the checkpoint.
5286          */
5287         {
5288                 /* use volatile pointer to prevent code rearrangement */
5289                 volatile XLogCtlData *xlogctl = XLogCtl;
5290
5291                 SpinLockAcquire(&xlogctl->info_lck);
5292                 RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
5293                 SpinLockRelease(&xlogctl->info_lck);
5294         }
5295
5296         /*
5297          * Now we can release insert lock and checkpoint start lock, allowing
5298          * other xacts to proceed even while we are flushing disk buffers.
5299          */
5300         LWLockRelease(WALInsertLock);
5301
5302         LWLockRelease(CheckpointStartLock);
5303
5304         /*
5305          * Get the other info we need for the checkpoint record.
5306          */
5307         LWLockAcquire(XidGenLock, LW_SHARED);
5308         checkPoint.nextXid = ShmemVariableCache->nextXid;
5309         LWLockRelease(XidGenLock);
5310
5311         LWLockAcquire(OidGenLock, LW_SHARED);
5312         checkPoint.nextOid = ShmemVariableCache->nextOid;
5313         if (!shutdown)
5314                 checkPoint.nextOid += ShmemVariableCache->oidCount;
5315         LWLockRelease(OidGenLock);
5316
5317         MultiXactGetCheckptMulti(shutdown,
5318                                                          &checkPoint.nextMulti,
5319                                                          &checkPoint.nextMultiOffset);
5320
5321         /*
5322          * Having constructed the checkpoint record, ensure all shmem disk buffers
5323          * and commit-log buffers are flushed to disk.
5324          *
5325          * This I/O could fail for various reasons.  If so, we will fail to
5326          * complete the checkpoint, but there is no reason to force a system
5327          * panic. Accordingly, exit critical section while doing it.  (If we are
5328          * doing a shutdown checkpoint, we probably *should* panic --- but that
5329          * will happen anyway because we'll still be inside the critical section
5330          * established by ShutdownXLOG.)
5331          */
5332         END_CRIT_SECTION();
5333
5334         if (!shutdown)
5335                 ereport(DEBUG2,
5336                                 (errmsg("checkpoint starting")));
5337
5338         CheckPointCLOG();
5339         CheckPointSUBTRANS();
5340         CheckPointMultiXact();
5341         FlushBufferPool();
5342         /* We deliberately delay 2PC checkpointing as long as possible */
5343         CheckPointTwoPhase(checkPoint.redo);
5344
5345         START_CRIT_SECTION();
5346
5347         /*
5348          * Now insert the checkpoint record into XLOG.
5349          */
5350         rdata.data = (char *) (&checkPoint);
5351         rdata.len = sizeof(checkPoint);
5352         rdata.buffer = InvalidBuffer;
5353         rdata.next = NULL;
5354
5355         recptr = XLogInsert(RM_XLOG_ID,
5356                                                 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
5357                                                 XLOG_CHECKPOINT_ONLINE,
5358                                                 &rdata);
5359
5360         XLogFlush(recptr);
5361
5362         /*
5363          * We now have ProcLastRecPtr = start of actual checkpoint record, recptr
5364          * = end of actual checkpoint record.
5365          */
5366         if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
5367                 ereport(PANIC,
5368                                 (errmsg("concurrent transaction log activity while database system is shutting down")));
5369
5370         /*
5371          * Select point at which we can truncate the log, which we base on the
5372          * prior checkpoint's earliest info.
5373          */
5374         XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
5375
5376         /*
5377          * Update the control file.
5378          */
5379         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5380         if (shutdown)
5381                 ControlFile->state = DB_SHUTDOWNED;
5382         ControlFile->prevCheckPoint = ControlFile->checkPoint;
5383         ControlFile->checkPoint = ProcLastRecPtr;
5384         ControlFile->checkPointCopy = checkPoint;
5385         ControlFile->time = time(NULL);
5386         UpdateControlFile();
5387         LWLockRelease(ControlFileLock);
5388
5389         /*
5390          * We are now done with critical updates; no need for system panic if we
5391          * have trouble while fooling with offline log segments.
5392          */
5393         END_CRIT_SECTION();
5394
5395         /*
5396          * Delete offline log files (those no longer needed even for previous
5397          * checkpoint).
5398          */
5399         if (_logId || _logSeg)
5400         {
5401                 PrevLogSeg(_logId, _logSeg);
5402                 MoveOfflineLogs(_logId, _logSeg, recptr,
5403                                                 &nsegsremoved, &nsegsrecycled);
5404         }
5405
5406         /*
5407          * Make more log segments if needed.  (Do this after deleting offline log
5408          * segments, to avoid having peak disk space usage higher than necessary.)
5409          */
5410         if (!shutdown)
5411                 nsegsadded = PreallocXlogFiles(recptr);
5412
5413         /*
5414          * Truncate pg_subtrans if possible.  We can throw away all data before
5415          * the oldest XMIN of any running transaction.  No future transaction will
5416          * attempt to reference any pg_subtrans entry older than that (see Asserts
5417          * in subtrans.c).      During recovery, though, we mustn't do this because
5418          * StartupSUBTRANS hasn't been called yet.
5419          */
5420         if (!InRecovery)
5421                 TruncateSUBTRANS(GetOldestXmin(true));
5422
5423         if (!shutdown)
5424                 ereport(DEBUG2,
5425                                 (errmsg("checkpoint complete; %d transaction log file(s) added, %d removed, %d recycled",
5426                                                 nsegsadded, nsegsremoved, nsegsrecycled)));
5427
5428         LWLockRelease(CheckpointLock);
5429 }
5430
5431 /*
5432  * Write a NEXTOID log record
5433  */
5434 void
5435 XLogPutNextOid(Oid nextOid)
5436 {
5437         XLogRecData rdata;
5438
5439         rdata.data = (char *) (&nextOid);
5440         rdata.len = sizeof(Oid);
5441         rdata.buffer = InvalidBuffer;
5442         rdata.next = NULL;
5443         (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
5444
5445         /*
5446          * We need not flush the NEXTOID record immediately, because any of the
5447          * just-allocated OIDs could only reach disk as part of a tuple insert or
5448          * update that would have its own XLOG record that must follow the NEXTOID
5449          * record.      Therefore, the standard buffer LSN interlock applied to those
5450          * records will ensure no such OID reaches disk before the NEXTOID record
5451          * does.
5452          */
5453 }
5454
5455 /*
5456  * XLOG resource manager's routines
5457  */
5458 void
5459 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
5460 {
5461         uint8           info = record->xl_info & ~XLR_INFO_MASK;
5462
5463         if (info == XLOG_NEXTOID)
5464         {
5465                 Oid                     nextOid;
5466
5467                 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
5468                 if (ShmemVariableCache->nextOid < nextOid)
5469                 {
5470                         ShmemVariableCache->nextOid = nextOid;
5471                         ShmemVariableCache->oidCount = 0;
5472                 }
5473         }
5474         else if (info == XLOG_CHECKPOINT_SHUTDOWN)
5475         {
5476                 CheckPoint      checkPoint;
5477
5478                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5479                 /* In a SHUTDOWN checkpoint, believe the counters exactly */
5480                 ShmemVariableCache->nextXid = checkPoint.nextXid;
5481                 ShmemVariableCache->nextOid = checkPoint.nextOid;
5482                 ShmemVariableCache->oidCount = 0;
5483                 MultiXactSetNextMXact(checkPoint.nextMulti,
5484                                                           checkPoint.nextMultiOffset);
5485
5486                 /*
5487                  * TLI may change in a shutdown checkpoint, but it shouldn't decrease
5488                  */
5489                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
5490                 {
5491                         if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
5492                                 !list_member_int(expectedTLIs,
5493                                                                  (int) checkPoint.ThisTimeLineID))
5494                                 ereport(PANIC,
5495                                                 (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
5496                                                                 checkPoint.ThisTimeLineID, ThisTimeLineID)));
5497                         /* Following WAL records should be run with new TLI */
5498                         ThisTimeLineID = checkPoint.ThisTimeLineID;
5499                 }
5500         }
5501         else if (info == XLOG_CHECKPOINT_ONLINE)
5502         {
5503                 CheckPoint      checkPoint;
5504
5505                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5506                 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
5507                 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
5508                                                                   checkPoint.nextXid))
5509                         ShmemVariableCache->nextXid = checkPoint.nextXid;
5510                 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
5511                 {
5512                         ShmemVariableCache->nextOid = checkPoint.nextOid;
5513                         ShmemVariableCache->oidCount = 0;
5514                 }
5515                 MultiXactAdvanceNextMXact(checkPoint.nextMulti,
5516                                                                   checkPoint.nextMultiOffset);
5517                 /* TLI should not change in an on-line checkpoint */
5518                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
5519                         ereport(PANIC,
5520                                         (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
5521                                                         checkPoint.ThisTimeLineID, ThisTimeLineID)));
5522         }
5523 }
5524
5525 void
5526 xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
5527 {
5528         uint8                   info = xl_info & ~XLR_INFO_MASK;
5529
5530         if (info == XLOG_CHECKPOINT_SHUTDOWN ||
5531                 info == XLOG_CHECKPOINT_ONLINE)
5532         {
5533                 CheckPoint *checkpoint = (CheckPoint *) rec;
5534
5535                 appendStringInfo(buf, "checkpoint: redo %X/%X; undo %X/%X; "
5536                                 "tli %u; xid %u; oid %u; multi %u; offset %u; %s",
5537                                 checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
5538                                 checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
5539                                 checkpoint->ThisTimeLineID, checkpoint->nextXid,
5540                                 checkpoint->nextOid,
5541                                 checkpoint->nextMulti,
5542                                 checkpoint->nextMultiOffset,
5543                                 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
5544         }
5545         else if (info == XLOG_NEXTOID)
5546         {
5547                 Oid                     nextOid;
5548
5549                 memcpy(&nextOid, rec, sizeof(Oid));
5550                 appendStringInfo(buf, "nextOid: %u", nextOid);
5551         }
5552         else
5553                 appendStringInfo(buf, "UNKNOWN");
5554 }
5555
5556 #ifdef WAL_DEBUG
5557
5558 static void
5559 xlog_outrec(StringInfo buf, XLogRecord *record)
5560 {
5561         int                     i;
5562
5563         appendStringInfo(buf, "prev %X/%X; xid %u",
5564                                          record->xl_prev.xlogid, record->xl_prev.xrecoff,
5565                                          record->xl_xid);
5566
5567         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
5568         {
5569                 if (record->xl_info & XLR_SET_BKP_BLOCK(i))
5570                         appendStringInfo(buf, "; bkpb%d", i+1);
5571         }
5572
5573         appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
5574 }
5575 #endif   /* WAL_DEBUG */
5576
5577
5578 /*
5579  * GUC support
5580  */
5581 const char *
5582 assign_xlog_sync_method(const char *method, bool doit, GucSource source)
5583 {
5584         int                     new_sync_method;
5585         int                     new_sync_bit;
5586
5587         if (pg_strcasecmp(method, "fsync") == 0)
5588         {
5589                 new_sync_method = SYNC_METHOD_FSYNC;
5590                 new_sync_bit = 0;
5591         }
5592 #ifdef HAVE_FSYNC_WRITETHROUGH
5593         else if (pg_strcasecmp(method, "fsync_writethrough") == 0)
5594         {
5595                 new_sync_method = SYNC_METHOD_FSYNC_WRITETHROUGH;
5596                 new_sync_bit = 0;
5597         }
5598 #endif
5599 #ifdef HAVE_FDATASYNC
5600         else if (pg_strcasecmp(method, "fdatasync") == 0)
5601         {
5602                 new_sync_method = SYNC_METHOD_FDATASYNC;
5603                 new_sync_bit = 0;
5604         }
5605 #endif
5606 #ifdef OPEN_SYNC_FLAG
5607         else if (pg_strcasecmp(method, "open_sync") == 0)
5608         {
5609                 new_sync_method = SYNC_METHOD_OPEN;
5610                 new_sync_bit = OPEN_SYNC_FLAG;
5611         }
5612 #endif
5613 #ifdef OPEN_DATASYNC_FLAG
5614         else if (pg_strcasecmp(method, "open_datasync") == 0)
5615         {
5616                 new_sync_method = SYNC_METHOD_OPEN;
5617                 new_sync_bit = OPEN_DATASYNC_FLAG;
5618         }
5619 #endif
5620         else
5621                 return NULL;
5622
5623         if (!doit)
5624                 return method;
5625
5626         if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
5627         {
5628                 /*
5629                  * To ensure that no blocks escape unsynced, force an fsync on the
5630                  * currently open log segment (if any).  Also, if the open flag is
5631                  * changing, close the log file so it will be reopened (with new flag
5632                  * bit) at next use.
5633                  */
5634                 if (openLogFile >= 0)
5635                 {
5636                         if (pg_fsync(openLogFile) != 0)
5637                                 ereport(PANIC,
5638                                                 (errcode_for_file_access(),
5639                                                  errmsg("could not fsync log file %u, segment %u: %m",
5640                                                                 openLogId, openLogSeg)));
5641                         if (open_sync_bit != new_sync_bit)
5642                                 XLogFileClose();
5643                 }
5644                 sync_method = new_sync_method;
5645                 open_sync_bit = new_sync_bit;
5646         }
5647
5648         return method;
5649 }
5650
5651
5652 /*
5653  * Issue appropriate kind of fsync (if any) on the current XLOG output file
5654  */
5655 static void
5656 issue_xlog_fsync(void)
5657 {
5658         switch (sync_method)
5659         {
5660                 case SYNC_METHOD_FSYNC:
5661                         if (pg_fsync_no_writethrough(openLogFile) != 0)
5662                                 ereport(PANIC,
5663                                                 (errcode_for_file_access(),
5664                                                  errmsg("could not fsync log file %u, segment %u: %m",
5665                                                                 openLogId, openLogSeg)));
5666                         break;
5667 #ifdef HAVE_FSYNC_WRITETHROUGH
5668                 case SYNC_METHOD_FSYNC_WRITETHROUGH:
5669                         if (pg_fsync_writethrough(openLogFile) != 0)
5670                                 ereport(PANIC,
5671                                                 (errcode_for_file_access(),
5672                                                  errmsg("could not fsync write-through log file %u, segment %u: %m",
5673                                                                 openLogId, openLogSeg)));
5674                         break;
5675 #endif
5676 #ifdef HAVE_FDATASYNC
5677                 case SYNC_METHOD_FDATASYNC:
5678                         if (pg_fdatasync(openLogFile) != 0)
5679                                 ereport(PANIC,
5680                                                 (errcode_for_file_access(),
5681                                         errmsg("could not fdatasync log file %u, segment %u: %m",
5682                                                    openLogId, openLogSeg)));
5683                         break;
5684 #endif
5685                 case SYNC_METHOD_OPEN:
5686                         /* write synced it already */
5687                         break;
5688                 default:
5689                         elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
5690                         break;
5691         }
5692 }
5693
5694
5695 /*
5696  * pg_start_backup: set up for taking an on-line backup dump
5697  *
5698  * Essentially what this does is to create a backup label file in $PGDATA,
5699  * where it will be archived as part of the backup dump.  The label file
5700  * contains the user-supplied label string (typically this would be used
5701  * to tell where the backup dump will be stored) and the starting time and
5702  * starting WAL offset for the dump.
5703  */
5704 Datum
5705 pg_start_backup(PG_FUNCTION_ARGS)
5706 {
5707         text       *backupid = PG_GETARG_TEXT_P(0);
5708         text       *result;
5709         char       *backupidstr;
5710         XLogRecPtr      checkpointloc;
5711         XLogRecPtr      startpoint;
5712         time_t          stamp_time;
5713         char            strfbuf[128];
5714         char            xlogfilename[MAXFNAMELEN];
5715         uint32          _logId;
5716         uint32          _logSeg;
5717         struct stat stat_buf;
5718         FILE       *fp;
5719
5720         if (!superuser())
5721                 ereport(ERROR,
5722                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5723                                  (errmsg("must be superuser to run a backup"))));
5724
5725         if (!XLogArchivingActive())
5726                 ereport(ERROR,
5727                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5728                                  (errmsg("WAL archiving is not active"),
5729                                   (errhint("archive_command must be defined before "
5730                                                    "online backups can be made safely.")))));
5731
5732         backupidstr = DatumGetCString(DirectFunctionCall1(textout,
5733                                                                                                  PointerGetDatum(backupid)));
5734
5735         /*
5736          * Mark backup active in shared memory.  We must do full-page WAL writes
5737          * during an on-line backup even if not doing so at other times, because
5738          * it's quite possible for the backup dump to obtain a "torn" (partially
5739          * written) copy of a database page if it reads the page concurrently
5740          * with our write to the same page.  This can be fixed as long as the
5741          * first write to the page in the WAL sequence is a full-page write.
5742          * Hence, we turn on forcePageWrites and then force a CHECKPOINT, to
5743          * ensure there are no dirty pages in shared memory that might get
5744          * dumped while the backup is in progress without having a corresponding
5745          * WAL record.  (Once the backup is complete, we need not force full-page
5746          * writes anymore, since we expect that any pages not modified during
5747          * the backup interval must have been correctly captured by the backup.)
5748          *
5749          * We must hold WALInsertLock to change the value of forcePageWrites,
5750          * to ensure adequate interlocking against XLogInsert().
5751          */
5752         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5753         if (XLogCtl->Insert.forcePageWrites)
5754         {
5755                 LWLockRelease(WALInsertLock);
5756                 ereport(ERROR,
5757                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5758                                  errmsg("a backup is already in progress"),
5759                                  errhint("Run pg_stop_backup() and try again.")));
5760         }
5761         XLogCtl->Insert.forcePageWrites = true;
5762         LWLockRelease(WALInsertLock);
5763
5764         /* Use a TRY block to ensure we release forcePageWrites if fail below */
5765         PG_TRY();
5766         {
5767                 /*
5768                  * Force a CHECKPOINT.  Aside from being necessary to prevent torn
5769                  * page problems, this guarantees that two successive backup runs will
5770                  * have different checkpoint positions and hence different history
5771                  * file names, even if nothing happened in between.
5772                  */
5773                 RequestCheckpoint(true, false);
5774
5775                 /*
5776                  * Now we need to fetch the checkpoint record location, and also its
5777                  * REDO pointer.  The oldest point in WAL that would be needed to
5778                  * restore starting from the checkpoint is precisely the REDO pointer.
5779                  */
5780                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5781                 checkpointloc = ControlFile->checkPoint;
5782                 startpoint = ControlFile->checkPointCopy.redo;
5783                 LWLockRelease(ControlFileLock);
5784
5785                 XLByteToSeg(startpoint, _logId, _logSeg);
5786                 XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
5787
5788                 /*
5789                  * We deliberately use strftime/localtime not the src/timezone
5790                  * functions, so that backup labels will consistently be recorded in
5791                  * the same timezone regardless of TimeZone setting.  This matches
5792                  * elog.c's practice.
5793                  */
5794                 stamp_time = time(NULL);
5795                 strftime(strfbuf, sizeof(strfbuf),
5796                                  "%Y-%m-%d %H:%M:%S %Z",
5797                                  localtime(&stamp_time));
5798
5799                 /*
5800                  * Check for existing backup label --- implies a backup is already
5801                  * running.  (XXX given that we checked forcePageWrites above, maybe
5802                  * it would be OK to just unlink any such label file?)
5803                  */
5804                 if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0)
5805                 {
5806                         if (errno != ENOENT)
5807                                 ereport(ERROR,
5808                                                 (errcode_for_file_access(),
5809                                                  errmsg("could not stat file \"%s\": %m",
5810                                                                 BACKUP_LABEL_FILE)));
5811                 }
5812                 else
5813                         ereport(ERROR,
5814                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5815                                          errmsg("a backup is already in progress"),
5816                                          errhint("If you're sure there is no backup in progress, remove file \"%s\" and try again.",
5817                                                          BACKUP_LABEL_FILE)));
5818
5819                 /*
5820                  * Okay, write the file
5821                  */
5822                 fp = AllocateFile(BACKUP_LABEL_FILE, "w");
5823                 if (!fp)
5824                         ereport(ERROR,
5825                                         (errcode_for_file_access(),
5826                                          errmsg("could not create file \"%s\": %m",
5827                                                         BACKUP_LABEL_FILE)));
5828                 fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
5829                                 startpoint.xlogid, startpoint.xrecoff, xlogfilename);
5830                 fprintf(fp, "CHECKPOINT LOCATION: %X/%X\n",
5831                                 checkpointloc.xlogid, checkpointloc.xrecoff);
5832                 fprintf(fp, "START TIME: %s\n", strfbuf);
5833                 fprintf(fp, "LABEL: %s\n", backupidstr);
5834                 if (fflush(fp) || ferror(fp) || FreeFile(fp))
5835                         ereport(ERROR,
5836                                         (errcode_for_file_access(),
5837                                          errmsg("could not write file \"%s\": %m",
5838                                                         BACKUP_LABEL_FILE)));
5839         }
5840         PG_CATCH();
5841         {
5842                 /* Turn off forcePageWrites on failure */
5843                 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5844                 XLogCtl->Insert.forcePageWrites = false;
5845                 LWLockRelease(WALInsertLock);
5846
5847                 PG_RE_THROW();
5848         }
5849         PG_END_TRY();
5850
5851         /*
5852          * We're done.  As a convenience, return the starting WAL offset.
5853          */
5854         snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
5855                          startpoint.xlogid, startpoint.xrecoff);
5856         result = DatumGetTextP(DirectFunctionCall1(textin,
5857                                                                                          CStringGetDatum(xlogfilename)));
5858         PG_RETURN_TEXT_P(result);
5859 }
5860
5861 /*
5862  * pg_stop_backup: finish taking an on-line backup dump
5863  *
5864  * We remove the backup label file created by pg_start_backup, and instead
5865  * create a backup history file in pg_xlog (whence it will immediately be
5866  * archived).  The backup history file contains the same info found in
5867  * the label file, plus the backup-end time and WAL offset.
5868  */
5869 Datum
5870 pg_stop_backup(PG_FUNCTION_ARGS)
5871 {
5872         text       *result;
5873         XLogCtlInsert *Insert = &XLogCtl->Insert;
5874         XLogRecPtr      startpoint;
5875         XLogRecPtr      stoppoint;
5876         time_t          stamp_time;
5877         char            strfbuf[128];
5878         char            histfilepath[MAXPGPATH];
5879         char            startxlogfilename[MAXFNAMELEN];
5880         char            stopxlogfilename[MAXFNAMELEN];
5881         uint32          _logId;
5882         uint32          _logSeg;
5883         FILE       *lfp;
5884         FILE       *fp;
5885         char            ch;
5886         int                     ich;
5887
5888         if (!superuser())
5889                 ereport(ERROR,
5890                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5891                                  (errmsg("must be superuser to run a backup"))));
5892
5893         /*
5894          * Get the current end-of-WAL position; it will be unsafe to use this dump
5895          * to restore to a point in advance of this time.  We can also clear
5896          * forcePageWrites here.
5897          */
5898         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5899         INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
5900         XLogCtl->Insert.forcePageWrites = false;
5901         LWLockRelease(WALInsertLock);
5902
5903         XLByteToSeg(stoppoint, _logId, _logSeg);
5904         XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
5905
5906         /*
5907          * We deliberately use strftime/localtime not the src/timezone functions,
5908          * so that backup labels will consistently be recorded in the same
5909          * timezone regardless of TimeZone setting.  This matches elog.c's
5910          * practice.
5911          */
5912         stamp_time = time(NULL);
5913         strftime(strfbuf, sizeof(strfbuf),
5914                          "%Y-%m-%d %H:%M:%S %Z",
5915                          localtime(&stamp_time));
5916
5917         /*
5918          * Open the existing label file
5919          */
5920         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
5921         if (!lfp)
5922         {
5923                 if (errno != ENOENT)
5924                         ereport(ERROR,
5925                                         (errcode_for_file_access(),
5926                                          errmsg("could not read file \"%s\": %m",
5927                                                         BACKUP_LABEL_FILE)));
5928                 ereport(ERROR,
5929                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5930                                  errmsg("a backup is not in progress")));
5931         }
5932
5933         /*
5934          * Read and parse the START WAL LOCATION line (this code is pretty crude,
5935          * but we are not expecting any variability in the file format).
5936          */
5937         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %24s)%c",
5938                            &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
5939                            &ch) != 4 || ch != '\n')
5940                 ereport(ERROR,
5941                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5942                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
5943
5944         /*
5945          * Write the backup history file
5946          */
5947         XLByteToSeg(startpoint, _logId, _logSeg);
5948         BackupHistoryFilePath(histfilepath, ThisTimeLineID, _logId, _logSeg,
5949                                                   startpoint.xrecoff % XLogSegSize);
5950         fp = AllocateFile(histfilepath, "w");
5951         if (!fp)
5952                 ereport(ERROR,
5953                                 (errcode_for_file_access(),
5954                                  errmsg("could not create file \"%s\": %m",
5955                                                 histfilepath)));
5956         fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
5957                         startpoint.xlogid, startpoint.xrecoff, startxlogfilename);
5958         fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
5959                         stoppoint.xlogid, stoppoint.xrecoff, stopxlogfilename);
5960         /* transfer remaining lines from label to history file */
5961         while ((ich = fgetc(lfp)) != EOF)
5962                 fputc(ich, fp);
5963         fprintf(fp, "STOP TIME: %s\n", strfbuf);
5964         if (fflush(fp) || ferror(fp) || FreeFile(fp))
5965                 ereport(ERROR,
5966                                 (errcode_for_file_access(),
5967                                  errmsg("could not write file \"%s\": %m",
5968                                                 histfilepath)));
5969
5970         /*
5971          * Close and remove the backup label file
5972          */
5973         if (ferror(lfp) || FreeFile(lfp))
5974                 ereport(ERROR,
5975                                 (errcode_for_file_access(),
5976                                  errmsg("could not read file \"%s\": %m",
5977                                                 BACKUP_LABEL_FILE)));
5978         if (unlink(BACKUP_LABEL_FILE) != 0)
5979                 ereport(ERROR,
5980                                 (errcode_for_file_access(),
5981                                  errmsg("could not remove file \"%s\": %m",
5982                                                 BACKUP_LABEL_FILE)));
5983
5984         /*
5985          * Clean out any no-longer-needed history files.  As a side effect,
5986          * this will post a .ready file for the newly created history file,
5987          * notifying the archiver that history file may be archived immediately.
5988          */
5989         CleanupBackupHistory();
5990
5991         /*
5992          * We're done.  As a convenience, return the ending WAL offset.
5993          */
5994         snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
5995                          stoppoint.xlogid, stoppoint.xrecoff);
5996         result = DatumGetTextP(DirectFunctionCall1(textin,
5997                                                                                  CStringGetDatum(stopxlogfilename)));
5998         PG_RETURN_TEXT_P(result);
5999 }
6000
6001 /*
6002  * read_backup_label: check to see if a backup_label file is present
6003  *
6004  * If we see a backup_label during recovery, we assume that we are recovering
6005  * from a backup dump file, and we therefore roll forward from the checkpoint
6006  * identified by the label file, NOT what pg_control says.      This avoids the
6007  * problem that pg_control might have been archived one or more checkpoints
6008  * later than the start of the dump, and so if we rely on it as the start
6009  * point, we will fail to restore a consistent database state.
6010  *
6011  * We also attempt to retrieve the corresponding backup history file.
6012  * If successful, set recoveryMinXlogOffset to constrain valid PITR stopping
6013  * points.
6014  *
6015  * Returns TRUE if a backup_label was found (and fills the checkpoint
6016  * location into *checkPointLoc); returns FALSE if not.
6017  */
6018 static bool
6019 read_backup_label(XLogRecPtr *checkPointLoc)
6020 {
6021         XLogRecPtr      startpoint;
6022         XLogRecPtr      stoppoint;
6023         char            histfilename[MAXFNAMELEN];
6024         char            histfilepath[MAXPGPATH];
6025         char            startxlogfilename[MAXFNAMELEN];
6026         char            stopxlogfilename[MAXFNAMELEN];
6027         TimeLineID      tli;
6028         uint32          _logId;
6029         uint32          _logSeg;
6030         FILE       *lfp;
6031         FILE       *fp;
6032         char            ch;
6033
6034         /*
6035          * See if label file is present
6036          */
6037         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6038         if (!lfp)
6039         {
6040                 if (errno != ENOENT)
6041                         ereport(FATAL,
6042                                         (errcode_for_file_access(),
6043                                          errmsg("could not read file \"%s\": %m",
6044                                                         BACKUP_LABEL_FILE)));
6045                 return false;                   /* it's not there, all is fine */
6046         }
6047
6048         /*
6049          * Read and parse the START WAL LOCATION and CHECKPOINT lines (this code
6050          * is pretty crude, but we are not expecting any variability in the file
6051          * format).
6052          */
6053         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
6054                            &startpoint.xlogid, &startpoint.xrecoff, &tli,
6055                            startxlogfilename, &ch) != 5 || ch != '\n')
6056                 ereport(FATAL,
6057                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6058                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6059         if (fscanf(lfp, "CHECKPOINT LOCATION: %X/%X%c",
6060                            &checkPointLoc->xlogid, &checkPointLoc->xrecoff,
6061                            &ch) != 3 || ch != '\n')
6062                 ereport(FATAL,
6063                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6064                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6065         if (ferror(lfp) || FreeFile(lfp))
6066                 ereport(FATAL,
6067                                 (errcode_for_file_access(),
6068                                  errmsg("could not read file \"%s\": %m",
6069                                                 BACKUP_LABEL_FILE)));
6070
6071         /*
6072          * Try to retrieve the backup history file (no error if we can't)
6073          */
6074         XLByteToSeg(startpoint, _logId, _logSeg);
6075         BackupHistoryFileName(histfilename, tli, _logId, _logSeg,
6076                                                   startpoint.xrecoff % XLogSegSize);
6077
6078         if (InArchiveRecovery)
6079                 RestoreArchivedFile(histfilepath, histfilename, "RECOVERYHISTORY", 0);
6080         else
6081                 BackupHistoryFilePath(histfilepath, tli, _logId, _logSeg,
6082                                                           startpoint.xrecoff % XLogSegSize);
6083
6084         fp = AllocateFile(histfilepath, "r");
6085         if (fp)
6086         {
6087                 /*
6088                  * Parse history file to identify stop point.
6089                  */
6090                 if (fscanf(fp, "START WAL LOCATION: %X/%X (file %24s)%c",
6091                                    &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
6092                                    &ch) != 4 || ch != '\n')
6093                         ereport(FATAL,
6094                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6095                                          errmsg("invalid data in file \"%s\"", histfilename)));
6096                 if (fscanf(fp, "STOP WAL LOCATION: %X/%X (file %24s)%c",
6097                                    &stoppoint.xlogid, &stoppoint.xrecoff, stopxlogfilename,
6098                                    &ch) != 4 || ch != '\n')
6099                         ereport(FATAL,
6100                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6101                                          errmsg("invalid data in file \"%s\"", histfilename)));
6102                 recoveryMinXlogOffset = stoppoint;
6103                 if (ferror(fp) || FreeFile(fp))
6104                         ereport(FATAL,
6105                                         (errcode_for_file_access(),
6106                                          errmsg("could not read file \"%s\": %m",
6107                                                         histfilepath)));
6108         }
6109
6110         return true;
6111 }
6112
6113 /*
6114  * remove_backup_label: remove any extant backup_label after successful
6115  * recovery.  Once we have completed the end-of-recovery checkpoint there
6116  * is no reason to have to replay from the start point indicated by the
6117  * label (and indeed we'll probably have removed/recycled the needed WAL
6118  * segments), so remove the label to prevent trouble in later crash recoveries.
6119  */
6120 static void
6121 remove_backup_label(void)
6122 {
6123         if (unlink(BACKUP_LABEL_FILE) != 0)
6124                 if (errno != ENOENT)
6125                         ereport(FATAL,
6126                                         (errcode_for_file_access(),
6127                                          errmsg("could not remove file \"%s\": %m",
6128                                                         BACKUP_LABEL_FILE)));
6129 }
6130
6131 /*
6132  * Error context callback for errors occurring during rm_redo().
6133  */
6134 static void
6135 rm_redo_error_callback(void *arg)
6136 {
6137         XLogRecord              *record = (XLogRecord *) arg;
6138         StringInfoData   buf;
6139
6140         initStringInfo(&buf);
6141         RmgrTable[record->xl_rmid].rm_desc(&buf, 
6142                                                                            record->xl_info, 
6143                                                                            XLogRecGetData(record));
6144
6145         /* don't bother emitting empty description */
6146         if (buf.len > 0)
6147                 errcontext("xlog redo %s", buf.data);
6148
6149         pfree(buf.data);
6150 }