OSDN Git Service

Don't try to call posix_fadvise() unless <fcntl.h> supplies a declaration
[pg-rex/syncrep.git] / src / backend / access / transam / xlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  *              PostgreSQL transaction log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.240 2006/06/18 18:30:20 tgl Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include <ctype.h>
18 #include <fcntl.h>
19 #include <signal.h>
20 #include <time.h>
21 #include <unistd.h>
22 #include <sys/stat.h>
23 #include <sys/time.h>
24
25 #include "access/clog.h"
26 #include "access/multixact.h"
27 #include "access/subtrans.h"
28 #include "access/twophase.h"
29 #include "access/xact.h"
30 #include "access/xlog.h"
31 #include "access/xlog_internal.h"
32 #include "access/xlogutils.h"
33 #include "catalog/catversion.h"
34 #include "catalog/pg_control.h"
35 #include "miscadmin.h"
36 #include "pgstat.h"
37 #include "postmaster/bgwriter.h"
38 #include "storage/bufpage.h"
39 #include "storage/fd.h"
40 #include "storage/lwlock.h"
41 #include "storage/pmsignal.h"
42 #include "storage/proc.h"
43 #include "storage/procarray.h"
44 #include "storage/spin.h"
45 #include "utils/builtins.h"
46 #include "utils/guc.h"
47 #include "utils/nabstime.h"
48 #include "utils/pg_locale.h"
49 #include "utils/relcache.h"
50
51
52 /*
53  *      Because O_DIRECT bypasses the kernel buffers, and because we never
54  *      read those buffers except during crash recovery, it is a win to use
55  *      it in all cases where we sync on each write().  We could allow O_DIRECT
56  *      with fsync(), but because skipping the kernel buffer forces writes out
57  *      quickly, it seems best just to use it for O_SYNC.  It is hard to imagine
58  *      how fsync() could be a win for O_DIRECT compared to O_SYNC and O_DIRECT.
59  *      Also, O_DIRECT is never enough to force data to the drives, it merely
60  *      tries to bypass the kernel cache, so we still need O_SYNC or fsync().
61  */
62 #ifdef O_DIRECT
63 #define PG_O_DIRECT                             O_DIRECT
64 #else
65 #define PG_O_DIRECT                             0
66 #endif
67
68 /*
69  * This chunk of hackery attempts to determine which file sync methods
70  * are available on the current platform, and to choose an appropriate
71  * default method.      We assume that fsync() is always available, and that
72  * configure determined whether fdatasync() is.
73  */
74 #if defined(O_SYNC)
75 #define BARE_OPEN_SYNC_FLAG             O_SYNC
76 #elif defined(O_FSYNC)
77 #define BARE_OPEN_SYNC_FLAG             O_FSYNC
78 #endif
79 #ifdef BARE_OPEN_SYNC_FLAG
80 #define OPEN_SYNC_FLAG                  (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
81 #endif
82
83 #if defined(O_DSYNC)
84 #if defined(OPEN_SYNC_FLAG)
85 /* O_DSYNC is distinct? */
86 #if O_DSYNC != BARE_OPEN_SYNC_FLAG
87 #define OPEN_DATASYNC_FLAG              (O_DSYNC | PG_O_DIRECT)
88 #endif
89 #else                                                   /* !defined(OPEN_SYNC_FLAG) */
90 /* Win32 only has O_DSYNC */
91 #define OPEN_DATASYNC_FLAG              (O_DSYNC | PG_O_DIRECT)
92 #endif
93 #endif
94
95 #if defined(OPEN_DATASYNC_FLAG)
96 #define DEFAULT_SYNC_METHOD_STR "open_datasync"
97 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_OPEN
98 #define DEFAULT_SYNC_FLAGBIT    OPEN_DATASYNC_FLAG
99 #elif defined(HAVE_FDATASYNC)
100 #define DEFAULT_SYNC_METHOD_STR "fdatasync"
101 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FDATASYNC
102 #define DEFAULT_SYNC_FLAGBIT    0
103 #elif defined(HAVE_FSYNC_WRITETHROUGH_ONLY)
104 #define DEFAULT_SYNC_METHOD_STR "fsync_writethrough"
105 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FSYNC_WRITETHROUGH
106 #define DEFAULT_SYNC_FLAGBIT    0
107 #else
108 #define DEFAULT_SYNC_METHOD_STR "fsync"
109 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FSYNC
110 #define DEFAULT_SYNC_FLAGBIT    0
111 #endif
112
113
114 /*
115  * Limitation of buffer-alignment for direct IO depends on OS and filesystem,
116  * but XLOG_BLCKSZ is assumed to be enough for it.
117  */
118 #ifdef O_DIRECT
119 #define ALIGNOF_XLOG_BUFFER             XLOG_BLCKSZ
120 #else
121 #define ALIGNOF_XLOG_BUFFER             ALIGNOF_BUFFER
122 #endif
123
124
125 /* File path names (all relative to $PGDATA) */
126 #define BACKUP_LABEL_FILE               "backup_label"
127 #define RECOVERY_COMMAND_FILE   "recovery.conf"
128 #define RECOVERY_COMMAND_DONE   "recovery.done"
129
130
131 /* User-settable parameters */
132 int                     CheckPointSegments = 3;
133 int                     XLOGbuffers = 8;
134 char       *XLogArchiveCommand = NULL;
135 char       *XLOG_sync_method = NULL;
136 const char      XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
137 bool            fullPageWrites = true;
138
139 #ifdef WAL_DEBUG
140 bool            XLOG_DEBUG = false;
141 #endif
142
143 /*
144  * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
145  * preallocated XLOG segments --- we try to have at least XLOGfiles advance
146  * segments but no more than XLOGfileslop segments.  This could
147  * be made a separate GUC variable, but at present I think it's sufficient
148  * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
149  * checkpoint will free no more than 2*CheckPointSegments log segments, and
150  * we want to recycle all of them; the +1 allows boundary cases to happen
151  * without wasting a delete/create-segment cycle.
152  */
153
154 #define XLOGfileslop    (2*CheckPointSegments + 1)
155
156
157 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
158 int                     sync_method = DEFAULT_SYNC_METHOD;
159 static int      open_sync_bit = DEFAULT_SYNC_FLAGBIT;
160
161 #define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
162
163
164 /*
165  * ThisTimeLineID will be same in all backends --- it identifies current
166  * WAL timeline for the database system.
167  */
168 TimeLineID      ThisTimeLineID = 0;
169
170 /* Are we doing recovery from XLOG? */
171 bool            InRecovery = false;
172
173 /* Are we recovering using offline XLOG archives? */
174 static bool InArchiveRecovery = false;
175
176 /* Was the last xlog file restored from archive, or local? */
177 static bool restoredFromArchive = false;
178
179 /* options taken from recovery.conf */
180 static char *recoveryRestoreCommand = NULL;
181 static bool recoveryTarget = false;
182 static bool recoveryTargetExact = false;
183 static bool recoveryTargetInclusive = true;
184 static TransactionId recoveryTargetXid;
185 static time_t recoveryTargetTime;
186
187 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
188 static TransactionId recoveryStopXid;
189 static time_t recoveryStopTime;
190 static bool recoveryStopAfter;
191
192 /* constraint set by read_backup_label */
193 static XLogRecPtr recoveryMinXlogOffset = {0, 0};
194
195 /*
196  * During normal operation, the only timeline we care about is ThisTimeLineID.
197  * During recovery, however, things are more complicated.  To simplify life
198  * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
199  * scan through the WAL history (that is, it is the line that was active when
200  * the currently-scanned WAL record was generated).  We also need these
201  * timeline values:
202  *
203  * recoveryTargetTLI: the desired timeline that we want to end in.
204  *
205  * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
206  * its known parents, newest first (so recoveryTargetTLI is always the
207  * first list member).  Only these TLIs are expected to be seen in the WAL
208  * segments we read, and indeed only these TLIs will be considered as
209  * candidate WAL files to open at all.
210  *
211  * curFileTLI: the TLI appearing in the name of the current input WAL file.
212  * (This is not necessarily the same as ThisTimeLineID, because we could
213  * be scanning data that was copied from an ancestor timeline when the current
214  * file was created.)  During a sequential scan we do not allow this value
215  * to decrease.
216  */
217 static TimeLineID recoveryTargetTLI;
218 static List *expectedTLIs;
219 static TimeLineID curFileTLI;
220
221 /*
222  * MyLastRecPtr points to the start of the last XLOG record inserted by the
223  * current transaction.  If MyLastRecPtr.xrecoff == 0, then the current
224  * xact hasn't yet inserted any transaction-controlled XLOG records.
225  *
226  * Note that XLOG records inserted outside transaction control are not
227  * reflected into MyLastRecPtr.  They do, however, cause MyXactMadeXLogEntry
228  * to be set true.      The latter can be used to test whether the current xact
229  * made any loggable changes (including out-of-xact changes, such as
230  * sequence updates).
231  *
232  * When we insert/update/delete a tuple in a temporary relation, we do not
233  * make any XLOG record, since we don't care about recovering the state of
234  * the temp rel after a crash.  However, we will still need to remember
235  * whether our transaction committed or aborted in that case.  So, we must
236  * set MyXactMadeTempRelUpdate true to indicate that the XID will be of
237  * interest later.
238  */
239 XLogRecPtr      MyLastRecPtr = {0, 0};
240
241 bool            MyXactMadeXLogEntry = false;
242
243 bool            MyXactMadeTempRelUpdate = false;
244
245 /*
246  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
247  * current backend.  It is updated for all inserts, transaction-controlled
248  * or not.      ProcLastRecEnd is similar but points to end+1 of last record.
249  */
250 static XLogRecPtr ProcLastRecPtr = {0, 0};
251
252 XLogRecPtr      ProcLastRecEnd = {0, 0};
253
254 /*
255  * RedoRecPtr is this backend's local copy of the REDO record pointer
256  * (which is almost but not quite the same as a pointer to the most recent
257  * CHECKPOINT record).  We update this from the shared-memory copy,
258  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
259  * hold the Insert lock).  See XLogInsert for details.  We are also allowed
260  * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
261  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
262  * InitXLOGAccess.
263  */
264 static XLogRecPtr RedoRecPtr;
265
266 /*----------
267  * Shared-memory data structures for XLOG control
268  *
269  * LogwrtRqst indicates a byte position that we need to write and/or fsync
270  * the log up to (all records before that point must be written or fsynced).
271  * LogwrtResult indicates the byte positions we have already written/fsynced.
272  * These structs are identical but are declared separately to indicate their
273  * slightly different functions.
274  *
275  * We do a lot of pushups to minimize the amount of access to lockable
276  * shared memory values.  There are actually three shared-memory copies of
277  * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
278  *              XLogCtl->LogwrtResult is protected by info_lck
279  *              XLogCtl->Write.LogwrtResult is protected by WALWriteLock
280  *              XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
281  * One must hold the associated lock to read or write any of these, but
282  * of course no lock is needed to read/write the unshared LogwrtResult.
283  *
284  * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
285  * right", since both are updated by a write or flush operation before
286  * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
287  * is that it can be examined/modified by code that already holds WALWriteLock
288  * without needing to grab info_lck as well.
289  *
290  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
291  * but is updated when convenient.      Again, it exists for the convenience of
292  * code that is already holding WALInsertLock but not the other locks.
293  *
294  * The unshared LogwrtResult may lag behind any or all of these, and again
295  * is updated when convenient.
296  *
297  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
298  * (protected by info_lck), but we don't need to cache any copies of it.
299  *
300  * Note that this all works because the request and result positions can only
301  * advance forward, never back up, and so we can easily determine which of two
302  * values is "more up to date".
303  *
304  * info_lck is only held long enough to read/update the protected variables,
305  * so it's a plain spinlock.  The other locks are held longer (potentially
306  * over I/O operations), so we use LWLocks for them.  These locks are:
307  *
308  * WALInsertLock: must be held to insert a record into the WAL buffers.
309  *
310  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
311  * XLogFlush).
312  *
313  * ControlFileLock: must be held to read/update control file or create
314  * new log file.
315  *
316  * CheckpointLock: must be held to do a checkpoint (ensures only one
317  * checkpointer at a time; even though the postmaster won't launch
318  * parallel checkpoint processes, we need this because manual checkpoints
319  * could be launched simultaneously).
320  *
321  *----------
322  */
323
324 typedef struct XLogwrtRqst
325 {
326         XLogRecPtr      Write;                  /* last byte + 1 to write out */
327         XLogRecPtr      Flush;                  /* last byte + 1 to flush */
328 } XLogwrtRqst;
329
330 typedef struct XLogwrtResult
331 {
332         XLogRecPtr      Write;                  /* last byte + 1 written out */
333         XLogRecPtr      Flush;                  /* last byte + 1 flushed */
334 } XLogwrtResult;
335
336 /*
337  * Shared state data for XLogInsert.
338  */
339 typedef struct XLogCtlInsert
340 {
341         XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
342         XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
343         int                     curridx;                /* current block index in cache */
344         XLogPageHeader currpage;        /* points to header of block in cache */
345         char       *currpos;            /* current insertion point in cache */
346         XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
347         bool            forcePageWrites;        /* forcing full-page writes for PITR? */
348 } XLogCtlInsert;
349
350 /*
351  * Shared state data for XLogWrite/XLogFlush.
352  */
353 typedef struct XLogCtlWrite
354 {
355         XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
356         int                     curridx;                /* cache index of next block to write */
357 } XLogCtlWrite;
358
359 /*
360  * Total shared-memory state for XLOG.
361  */
362 typedef struct XLogCtlData
363 {
364         /* Protected by WALInsertLock: */
365         XLogCtlInsert Insert;
366         /* Protected by info_lck: */
367         XLogwrtRqst LogwrtRqst;
368         XLogwrtResult LogwrtResult;
369         /* Protected by WALWriteLock: */
370         XLogCtlWrite Write;
371
372         /*
373          * These values do not change after startup, although the pointed-to pages
374          * and xlblocks values certainly do.  Permission to read/write the pages
375          * and xlblocks values depends on WALInsertLock and WALWriteLock.
376          */
377         char       *pages;                      /* buffers for unwritten XLOG pages */
378         XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
379         Size            XLogCacheByte;  /* # bytes in xlog buffers */
380         int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
381         TimeLineID      ThisTimeLineID;
382
383         slock_t         info_lck;               /* locks shared LogwrtRqst/LogwrtResult */
384 } XLogCtlData;
385
386 static XLogCtlData *XLogCtl = NULL;
387
388 /*
389  * We maintain an image of pg_control in shared memory.
390  */
391 static ControlFileData *ControlFile = NULL;
392
393 /*
394  * Macros for managing XLogInsert state.  In most cases, the calling routine
395  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
396  * so these are passed as parameters instead of being fetched via XLogCtl.
397  */
398
399 /* Free space remaining in the current xlog page buffer */
400 #define INSERT_FREESPACE(Insert)  \
401         (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
402
403 /* Construct XLogRecPtr value for current insertion point */
404 #define INSERT_RECPTR(recptr,Insert,curridx)  \
405         ( \
406           (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
407           (recptr).xrecoff = \
408                 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
409         )
410
411 #define PrevBufIdx(idx)         \
412                 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
413
414 #define NextBufIdx(idx)         \
415                 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
416
417 /*
418  * Private, possibly out-of-date copy of shared LogwrtResult.
419  * See discussion above.
420  */
421 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
422
423 /*
424  * openLogFile is -1 or a kernel FD for an open log file segment.
425  * When it's open, openLogOff is the current seek offset in the file.
426  * openLogId/openLogSeg identify the segment.  These variables are only
427  * used to write the XLOG, and so will normally refer to the active segment.
428  */
429 static int      openLogFile = -1;
430 static uint32 openLogId = 0;
431 static uint32 openLogSeg = 0;
432 static uint32 openLogOff = 0;
433
434 /*
435  * These variables are used similarly to the ones above, but for reading
436  * the XLOG.  Note, however, that readOff generally represents the offset
437  * of the page just read, not the seek position of the FD itself, which
438  * will be just past that page.
439  */
440 static int      readFile = -1;
441 static uint32 readId = 0;
442 static uint32 readSeg = 0;
443 static uint32 readOff = 0;
444
445 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
446 static char *readBuf = NULL;
447
448 /* Buffer for current ReadRecord result (expandable) */
449 static char *readRecordBuf = NULL;
450 static uint32 readRecordBufSize = 0;
451
452 /* State information for XLOG reading */
453 static XLogRecPtr ReadRecPtr;   /* start of last record read */
454 static XLogRecPtr EndRecPtr;    /* end+1 of last record read */
455 static XLogRecord *nextRecord = NULL;
456 static TimeLineID lastPageTLI = 0;
457
458 static bool InRedo = false;
459
460
461 static void XLogArchiveNotify(const char *xlog);
462 static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
463 static bool XLogArchiveIsDone(const char *xlog);
464 static void XLogArchiveCleanup(const char *xlog);
465 static void readRecoveryCommandFile(void);
466 static void exitArchiveRecovery(TimeLineID endTLI,
467                                         uint32 endLogId, uint32 endLogSeg);
468 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
469
470 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
471                                 XLogRecPtr *lsn, BkpBlock *bkpb);
472 static bool AdvanceXLInsertBuffer(void);
473 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
474 static int XLogFileInit(uint32 log, uint32 seg,
475                          bool *use_existent, bool use_lock);
476 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
477                                            bool find_free, int *max_advance,
478                                            bool use_lock);
479 static int      XLogFileOpen(uint32 log, uint32 seg);
480 static int      XLogFileRead(uint32 log, uint32 seg, int emode);
481 static void     XLogFileClose(void);
482 static bool RestoreArchivedFile(char *path, const char *xlogfname,
483                                         const char *recovername, off_t expectedSize);
484 static int      PreallocXlogFiles(XLogRecPtr endptr);
485 static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
486                                 int *nsegsremoved, int *nsegsrecycled);
487 static void RemoveOldBackupHistory(void);
488 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
489 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
490 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
491 static List *readTimeLineHistory(TimeLineID targetTLI);
492 static bool existsTimeLineHistory(TimeLineID probeTLI);
493 static TimeLineID findNewestTimeLine(TimeLineID startTLI);
494 static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
495                                          TimeLineID endTLI,
496                                          uint32 endLogId, uint32 endLogSeg);
497 static void WriteControlFile(void);
498 static void ReadControlFile(void);
499 static char *str_time(time_t tnow);
500 static void issue_xlog_fsync(void);
501
502 #ifdef WAL_DEBUG
503 static void xlog_outrec(StringInfo buf, XLogRecord *record);
504 #endif
505 static bool read_backup_label(XLogRecPtr *checkPointLoc);
506 static void remove_backup_label(void);
507 static void rm_redo_error_callback(void *arg);
508
509
510 /*
511  * Insert an XLOG record having the specified RMID and info bytes,
512  * with the body of the record being the data chunk(s) described by
513  * the rdata chain (see xlog.h for notes about rdata).
514  *
515  * Returns XLOG pointer to end of record (beginning of next record).
516  * This can be used as LSN for data pages affected by the logged action.
517  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
518  * before the data page can be written out.  This implements the basic
519  * WAL rule "write the log before the data".)
520  *
521  * NB: this routine feels free to scribble on the XLogRecData structs,
522  * though not on the data they reference.  This is OK since the XLogRecData
523  * structs are always just temporaries in the calling code.
524  */
525 XLogRecPtr
526 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
527 {
528         XLogCtlInsert *Insert = &XLogCtl->Insert;
529         XLogRecord *record;
530         XLogContRecord *contrecord;
531         XLogRecPtr      RecPtr;
532         XLogRecPtr      WriteRqst;
533         uint32          freespace;
534         int                     curridx;
535         XLogRecData *rdt;
536         Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
537         bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
538         BkpBlock        dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
539         XLogRecPtr      dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
540         XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
541         XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
542         XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
543         pg_crc32        rdata_crc;
544         uint32          len,
545                                 write_len;
546         unsigned        i;
547         XLogwrtRqst LogwrtRqst;
548         bool            updrqst;
549         bool            doPageWrites;
550         bool            no_tran = (rmid == RM_XLOG_ID) ? true : false;
551
552         if (info & XLR_INFO_MASK)
553         {
554                 if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
555                         elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK));
556                 no_tran = true;
557                 info &= ~XLR_INFO_MASK;
558         }
559
560         /*
561          * In bootstrap mode, we don't actually log anything but XLOG resources;
562          * return a phony record pointer.
563          */
564         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
565         {
566                 RecPtr.xlogid = 0;
567                 RecPtr.xrecoff = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
568                 return RecPtr;
569         }
570
571         /*
572          * Here we scan the rdata chain, determine which buffers must be backed
573          * up, and compute the CRC values for the data.  Note that the record
574          * header isn't added into the CRC initially since we don't know the final
575          * length or info bits quite yet.  Thus, the CRC will represent the CRC of
576          * the whole record in the order "rdata, then backup blocks, then record
577          * header".
578          *
579          * We may have to loop back to here if a race condition is detected below.
580          * We could prevent the race by doing all this work while holding the
581          * insert lock, but it seems better to avoid doing CRC calculations while
582          * holding the lock.  This means we have to be careful about modifying the
583          * rdata chain until we know we aren't going to loop back again.  The only
584          * change we allow ourselves to make earlier is to set rdt->data = NULL in
585          * chain items we have decided we will have to back up the whole buffer
586          * for.  This is OK because we will certainly decide the same thing again
587          * for those items if we do it over; doing it here saves an extra pass
588          * over the chain later.
589          */
590 begin:;
591         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
592         {
593                 dtbuf[i] = InvalidBuffer;
594                 dtbuf_bkp[i] = false;
595         }
596
597         /*
598          * Decide if we need to do full-page writes in this XLOG record: true if
599          * full_page_writes is on or we have a PITR request for it.  Since we
600          * don't yet have the insert lock, forcePageWrites could change under us,
601          * but we'll recheck it once we have the lock.
602          */
603         doPageWrites = fullPageWrites || Insert->forcePageWrites;
604
605         INIT_CRC32(rdata_crc);
606         len = 0;
607         for (rdt = rdata;;)
608         {
609                 if (rdt->buffer == InvalidBuffer)
610                 {
611                         /* Simple data, just include it */
612                         len += rdt->len;
613                         COMP_CRC32(rdata_crc, rdt->data, rdt->len);
614                 }
615                 else
616                 {
617                         /* Find info for buffer */
618                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
619                         {
620                                 if (rdt->buffer == dtbuf[i])
621                                 {
622                                         /* Buffer already referenced by earlier chain item */
623                                         if (dtbuf_bkp[i])
624                                                 rdt->data = NULL;
625                                         else if (rdt->data)
626                                         {
627                                                 len += rdt->len;
628                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
629                                         }
630                                         break;
631                                 }
632                                 if (dtbuf[i] == InvalidBuffer)
633                                 {
634                                         /* OK, put it in this slot */
635                                         dtbuf[i] = rdt->buffer;
636                                         if (XLogCheckBuffer(rdt, doPageWrites,
637                                                                                 &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
638                                         {
639                                                 dtbuf_bkp[i] = true;
640                                                 rdt->data = NULL;
641                                         }
642                                         else if (rdt->data)
643                                         {
644                                                 len += rdt->len;
645                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
646                                         }
647                                         break;
648                                 }
649                         }
650                         if (i >= XLR_MAX_BKP_BLOCKS)
651                                 elog(PANIC, "can backup at most %d blocks per xlog record",
652                                          XLR_MAX_BKP_BLOCKS);
653                 }
654                 /* Break out of loop when rdt points to last chain item */
655                 if (rdt->next == NULL)
656                         break;
657                 rdt = rdt->next;
658         }
659
660         /*
661          * Now add the backup block headers and data into the CRC
662          */
663         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
664         {
665                 if (dtbuf_bkp[i])
666                 {
667                         BkpBlock   *bkpb = &(dtbuf_xlg[i]);
668                         char       *page;
669
670                         COMP_CRC32(rdata_crc,
671                                            (char *) bkpb,
672                                            sizeof(BkpBlock));
673                         page = (char *) BufferGetBlock(dtbuf[i]);
674                         if (bkpb->hole_length == 0)
675                         {
676                                 COMP_CRC32(rdata_crc,
677                                                    page,
678                                                    BLCKSZ);
679                         }
680                         else
681                         {
682                                 /* must skip the hole */
683                                 COMP_CRC32(rdata_crc,
684                                                    page,
685                                                    bkpb->hole_offset);
686                                 COMP_CRC32(rdata_crc,
687                                                    page + (bkpb->hole_offset + bkpb->hole_length),
688                                                    BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
689                         }
690                 }
691         }
692
693         /*
694          * NOTE: the test for len == 0 here is somewhat fishy, since in theory all
695          * of the rmgr data might have been suppressed in favor of backup blocks.
696          * Currently, all callers of XLogInsert provide at least some
697          * not-in-a-buffer data and so len == 0 should never happen, but that may
698          * not be true forever.  If you need to remove the len == 0 check, also
699          * remove the check for xl_len == 0 in ReadRecord, below.
700          */
701         if (len == 0)
702                 elog(PANIC, "invalid xlog record length %u", len);
703
704         START_CRIT_SECTION();
705
706         /* update LogwrtResult before doing cache fill check */
707         {
708                 /* use volatile pointer to prevent code rearrangement */
709                 volatile XLogCtlData *xlogctl = XLogCtl;
710
711                 SpinLockAcquire(&xlogctl->info_lck);
712                 LogwrtRqst = xlogctl->LogwrtRqst;
713                 LogwrtResult = xlogctl->LogwrtResult;
714                 SpinLockRelease(&xlogctl->info_lck);
715         }
716
717         /*
718          * If cache is half filled then try to acquire write lock and do
719          * XLogWrite. Ignore any fractional blocks in performing this check.
720          */
721         LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % XLOG_BLCKSZ;
722         if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
723                 (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
724                  XLogCtl->XLogCacheByte / 2))
725         {
726                 if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
727                 {
728                         /*
729                          * Since the amount of data we write here is completely optional
730                          * anyway, tell XLogWrite it can be "flexible" and stop at a
731                          * convenient boundary.  This allows writes triggered by this
732                          * mechanism to synchronize with the cache boundaries, so that in
733                          * a long transaction we'll basically dump alternating halves of
734                          * the buffer array.
735                          */
736                         LogwrtResult = XLogCtl->Write.LogwrtResult;
737                         if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
738                                 XLogWrite(LogwrtRqst, true);
739                         LWLockRelease(WALWriteLock);
740                 }
741         }
742
743         /* Now wait to get insert lock */
744         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
745
746         /*
747          * Check to see if my RedoRecPtr is out of date.  If so, may have to go
748          * back and recompute everything.  This can only happen just after a
749          * checkpoint, so it's better to be slow in this case and fast otherwise.
750          *
751          * If we aren't doing full-page writes then RedoRecPtr doesn't actually
752          * affect the contents of the XLOG record, so we'll update our local
753          * copy but not force a recomputation.
754          */
755         if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
756         {
757                 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
758                 RedoRecPtr = Insert->RedoRecPtr;
759
760                 if (doPageWrites)
761                 {
762                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
763                         {
764                                 if (dtbuf[i] == InvalidBuffer)
765                                         continue;
766                                 if (dtbuf_bkp[i] == false &&
767                                         XLByteLE(dtbuf_lsn[i], RedoRecPtr))
768                                 {
769                                         /*
770                                          * Oops, this buffer now needs to be backed up, but we
771                                          * didn't think so above.  Start over.
772                                          */
773                                         LWLockRelease(WALInsertLock);
774                                         END_CRIT_SECTION();
775                                         goto begin;
776                                 }
777                         }
778                 }
779         }
780
781         /*
782          * Also check to see if forcePageWrites was just turned on; if we
783          * weren't already doing full-page writes then go back and recompute.
784          * (If it was just turned off, we could recompute the record without
785          * full pages, but we choose not to bother.)
786          */
787         if (Insert->forcePageWrites && !doPageWrites)
788         {
789                 /* Oops, must redo it with full-page data */
790                 LWLockRelease(WALInsertLock);
791                 END_CRIT_SECTION();
792                 goto begin;
793         }
794
795         /*
796          * Make additional rdata chain entries for the backup blocks, so that we
797          * don't need to special-case them in the write loop.  Note that we have
798          * now irrevocably changed the input rdata chain.  At the exit of this
799          * loop, write_len includes the backup block data.
800          *
801          * Also set the appropriate info bits to show which buffers were backed
802          * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
803          * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
804          */
805         write_len = len;
806         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
807         {
808                 BkpBlock   *bkpb;
809                 char       *page;
810
811                 if (!dtbuf_bkp[i])
812                         continue;
813
814                 info |= XLR_SET_BKP_BLOCK(i);
815
816                 bkpb = &(dtbuf_xlg[i]);
817                 page = (char *) BufferGetBlock(dtbuf[i]);
818
819                 rdt->next = &(dtbuf_rdt1[i]);
820                 rdt = rdt->next;
821
822                 rdt->data = (char *) bkpb;
823                 rdt->len = sizeof(BkpBlock);
824                 write_len += sizeof(BkpBlock);
825
826                 rdt->next = &(dtbuf_rdt2[i]);
827                 rdt = rdt->next;
828
829                 if (bkpb->hole_length == 0)
830                 {
831                         rdt->data = page;
832                         rdt->len = BLCKSZ;
833                         write_len += BLCKSZ;
834                         rdt->next = NULL;
835                 }
836                 else
837                 {
838                         /* must skip the hole */
839                         rdt->data = page;
840                         rdt->len = bkpb->hole_offset;
841                         write_len += bkpb->hole_offset;
842
843                         rdt->next = &(dtbuf_rdt3[i]);
844                         rdt = rdt->next;
845
846                         rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
847                         rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
848                         write_len += rdt->len;
849                         rdt->next = NULL;
850                 }
851         }
852
853         /*
854          * If there isn't enough space on the current XLOG page for a record
855          * header, advance to the next page (leaving the unused space as zeroes).
856          */
857         updrqst = false;
858         freespace = INSERT_FREESPACE(Insert);
859         if (freespace < SizeOfXLogRecord)
860         {
861                 updrqst = AdvanceXLInsertBuffer();
862                 freespace = INSERT_FREESPACE(Insert);
863         }
864
865         curridx = Insert->curridx;
866         record = (XLogRecord *) Insert->currpos;
867
868         /* Insert record header */
869
870         record->xl_prev = Insert->PrevRecord;
871         record->xl_xid = GetCurrentTransactionIdIfAny();
872         record->xl_tot_len = SizeOfXLogRecord + write_len;
873         record->xl_len = len;           /* doesn't include backup blocks */
874         record->xl_info = info;
875         record->xl_rmid = rmid;
876
877         /* Now we can finish computing the record's CRC */
878         COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
879                            SizeOfXLogRecord - sizeof(pg_crc32));
880         FIN_CRC32(rdata_crc);
881         record->xl_crc = rdata_crc;
882
883         /* Compute record's XLOG location */
884         INSERT_RECPTR(RecPtr, Insert, curridx);
885
886 #ifdef WAL_DEBUG
887         if (XLOG_DEBUG)
888         {
889                 StringInfoData  buf;
890
891                 initStringInfo(&buf);
892                 appendStringInfo(&buf, "INSERT @ %X/%X: ", 
893                                                         RecPtr.xlogid, RecPtr.xrecoff);
894                 xlog_outrec(&buf, record);
895                 if (rdata->data != NULL)
896                 {
897                         appendStringInfo(&buf, " - ");
898                         RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
899                 }
900                 elog(LOG, "%s", buf.data);
901                 pfree(buf.data);
902         }
903 #endif
904
905         /* Record begin of record in appropriate places */
906         if (!no_tran)
907                 MyLastRecPtr = RecPtr;
908         ProcLastRecPtr = RecPtr;
909         Insert->PrevRecord = RecPtr;
910         MyXactMadeXLogEntry = true;
911
912         Insert->currpos += SizeOfXLogRecord;
913         freespace -= SizeOfXLogRecord;
914
915         /*
916          * Append the data, including backup blocks if any
917          */
918         while (write_len)
919         {
920                 while (rdata->data == NULL)
921                         rdata = rdata->next;
922
923                 if (freespace > 0)
924                 {
925                         if (rdata->len > freespace)
926                         {
927                                 memcpy(Insert->currpos, rdata->data, freespace);
928                                 rdata->data += freespace;
929                                 rdata->len -= freespace;
930                                 write_len -= freespace;
931                         }
932                         else
933                         {
934                                 memcpy(Insert->currpos, rdata->data, rdata->len);
935                                 freespace -= rdata->len;
936                                 write_len -= rdata->len;
937                                 Insert->currpos += rdata->len;
938                                 rdata = rdata->next;
939                                 continue;
940                         }
941                 }
942
943                 /* Use next buffer */
944                 updrqst = AdvanceXLInsertBuffer();
945                 curridx = Insert->curridx;
946                 /* Insert cont-record header */
947                 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
948                 contrecord = (XLogContRecord *) Insert->currpos;
949                 contrecord->xl_rem_len = write_len;
950                 Insert->currpos += SizeOfXLogContRecord;
951                 freespace = INSERT_FREESPACE(Insert);
952         }
953
954         /* Ensure next record will be properly aligned */
955         Insert->currpos = (char *) Insert->currpage +
956                 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
957         freespace = INSERT_FREESPACE(Insert);
958
959         /*
960          * The recptr I return is the beginning of the *next* record. This will be
961          * stored as LSN for changed data pages...
962          */
963         INSERT_RECPTR(RecPtr, Insert, curridx);
964
965         /* Need to update shared LogwrtRqst if some block was filled up */
966         if (freespace < SizeOfXLogRecord)
967                 updrqst = true;                 /* curridx is filled and available for writing
968                                                                  * out */
969         else
970                 curridx = PrevBufIdx(curridx);
971         WriteRqst = XLogCtl->xlblocks[curridx];
972
973         LWLockRelease(WALInsertLock);
974
975         if (updrqst)
976         {
977                 /* use volatile pointer to prevent code rearrangement */
978                 volatile XLogCtlData *xlogctl = XLogCtl;
979
980                 SpinLockAcquire(&xlogctl->info_lck);
981                 /* advance global request to include new block(s) */
982                 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
983                         xlogctl->LogwrtRqst.Write = WriteRqst;
984                 /* update local result copy while I have the chance */
985                 LogwrtResult = xlogctl->LogwrtResult;
986                 SpinLockRelease(&xlogctl->info_lck);
987         }
988
989         ProcLastRecEnd = RecPtr;
990
991         END_CRIT_SECTION();
992
993         return RecPtr;
994 }
995
996 /*
997  * Determine whether the buffer referenced by an XLogRecData item has to
998  * be backed up, and if so fill a BkpBlock struct for it.  In any case
999  * save the buffer's LSN at *lsn.
1000  */
1001 static bool
1002 XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
1003                                 XLogRecPtr *lsn, BkpBlock *bkpb)
1004 {
1005         PageHeader      page;
1006
1007         page = (PageHeader) BufferGetBlock(rdata->buffer);
1008
1009         /*
1010          * XXX We assume page LSN is first data on *every* page that can be passed
1011          * to XLogInsert, whether it otherwise has the standard page layout or
1012          * not.
1013          */
1014         *lsn = page->pd_lsn;
1015
1016         if (doPageWrites &&
1017                 XLByteLE(page->pd_lsn, RedoRecPtr))
1018         {
1019                 /*
1020                  * The page needs to be backed up, so set up *bkpb
1021                  */
1022                 bkpb->node = BufferGetFileNode(rdata->buffer);
1023                 bkpb->block = BufferGetBlockNumber(rdata->buffer);
1024
1025                 if (rdata->buffer_std)
1026                 {
1027                         /* Assume we can omit data between pd_lower and pd_upper */
1028                         uint16          lower = page->pd_lower;
1029                         uint16          upper = page->pd_upper;
1030
1031                         if (lower >= SizeOfPageHeaderData &&
1032                                 upper > lower &&
1033                                 upper <= BLCKSZ)
1034                         {
1035                                 bkpb->hole_offset = lower;
1036                                 bkpb->hole_length = upper - lower;
1037                         }
1038                         else
1039                         {
1040                                 /* No "hole" to compress out */
1041                                 bkpb->hole_offset = 0;
1042                                 bkpb->hole_length = 0;
1043                         }
1044                 }
1045                 else
1046                 {
1047                         /* Not a standard page header, don't try to eliminate "hole" */
1048                         bkpb->hole_offset = 0;
1049                         bkpb->hole_length = 0;
1050                 }
1051
1052                 return true;                    /* buffer requires backup */
1053         }
1054
1055         return false;                           /* buffer does not need to be backed up */
1056 }
1057
1058 /*
1059  * XLogArchiveNotify
1060  *
1061  * Create an archive notification file
1062  *
1063  * The name of the notification file is the message that will be picked up
1064  * by the archiver, e.g. we write 0000000100000001000000C6.ready
1065  * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
1066  * then when complete, rename it to 0000000100000001000000C6.done
1067  */
1068 static void
1069 XLogArchiveNotify(const char *xlog)
1070 {
1071         char            archiveStatusPath[MAXPGPATH];
1072         FILE       *fd;
1073
1074         /* insert an otherwise empty file called <XLOG>.ready */
1075         StatusFilePath(archiveStatusPath, xlog, ".ready");
1076         fd = AllocateFile(archiveStatusPath, "w");
1077         if (fd == NULL)
1078         {
1079                 ereport(LOG,
1080                                 (errcode_for_file_access(),
1081                                  errmsg("could not create archive status file \"%s\": %m",
1082                                                 archiveStatusPath)));
1083                 return;
1084         }
1085         if (FreeFile(fd))
1086         {
1087                 ereport(LOG,
1088                                 (errcode_for_file_access(),
1089                                  errmsg("could not write archive status file \"%s\": %m",
1090                                                 archiveStatusPath)));
1091                 return;
1092         }
1093
1094         /* Notify archiver that it's got something to do */
1095         if (IsUnderPostmaster)
1096                 SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);
1097 }
1098
1099 /*
1100  * Convenience routine to notify using log/seg representation of filename
1101  */
1102 static void
1103 XLogArchiveNotifySeg(uint32 log, uint32 seg)
1104 {
1105         char            xlog[MAXFNAMELEN];
1106
1107         XLogFileName(xlog, ThisTimeLineID, log, seg);
1108         XLogArchiveNotify(xlog);
1109 }
1110
1111 /*
1112  * XLogArchiveIsDone
1113  *
1114  * Checks for a ".done" archive notification file.      This is called when we
1115  * are ready to delete or recycle an old XLOG segment file.  If it is okay
1116  * to delete it then return true.
1117  *
1118  * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists,
1119  * then return false; else create <XLOG>.ready and return false.  The
1120  * last case covers the possibility that the original attempt to create
1121  * <XLOG>.ready failed.
1122  */
1123 static bool
1124 XLogArchiveIsDone(const char *xlog)
1125 {
1126         char            archiveStatusPath[MAXPGPATH];
1127         struct stat stat_buf;
1128
1129         /* First check for .done --- this is the expected case */
1130         StatusFilePath(archiveStatusPath, xlog, ".done");
1131         if (stat(archiveStatusPath, &stat_buf) == 0)
1132                 return true;
1133
1134         /* check for .ready --- this means archiver is still busy with it */
1135         StatusFilePath(archiveStatusPath, xlog, ".ready");
1136         if (stat(archiveStatusPath, &stat_buf) == 0)
1137                 return false;
1138
1139         /* Race condition --- maybe archiver just finished, so recheck */
1140         StatusFilePath(archiveStatusPath, xlog, ".done");
1141         if (stat(archiveStatusPath, &stat_buf) == 0)
1142                 return true;
1143
1144         /* Retry creation of the .ready file */
1145         XLogArchiveNotify(xlog);
1146         return false;
1147 }
1148
1149 /*
1150  * XLogArchiveCleanup
1151  *
1152  * Cleanup archive notification file(s) for a particular xlog segment
1153  */
1154 static void
1155 XLogArchiveCleanup(const char *xlog)
1156 {
1157         char            archiveStatusPath[MAXPGPATH];
1158
1159         /* Remove the .done file */
1160         StatusFilePath(archiveStatusPath, xlog, ".done");
1161         unlink(archiveStatusPath);
1162         /* should we complain about failure? */
1163
1164         /* Remove the .ready file if present --- normally it shouldn't be */
1165         StatusFilePath(archiveStatusPath, xlog, ".ready");
1166         unlink(archiveStatusPath);
1167         /* should we complain about failure? */
1168 }
1169
1170 /*
1171  * Advance the Insert state to the next buffer page, writing out the next
1172  * buffer if it still contains unwritten data.
1173  *
1174  * The global LogwrtRqst.Write pointer needs to be advanced to include the
1175  * just-filled page.  If we can do this for free (without an extra lock),
1176  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
1177  * request update still needs to be done, FALSE if we did it internally.
1178  *
1179  * Must be called with WALInsertLock held.
1180  */
1181 static bool
1182 AdvanceXLInsertBuffer(void)
1183 {
1184         XLogCtlInsert *Insert = &XLogCtl->Insert;
1185         XLogCtlWrite *Write = &XLogCtl->Write;
1186         int                     nextidx = NextBufIdx(Insert->curridx);
1187         bool            update_needed = true;
1188         XLogRecPtr      OldPageRqstPtr;
1189         XLogwrtRqst WriteRqst;
1190         XLogRecPtr      NewPageEndPtr;
1191         XLogPageHeader NewPage;
1192
1193         /* Use Insert->LogwrtResult copy if it's more fresh */
1194         if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
1195                 LogwrtResult = Insert->LogwrtResult;
1196
1197         /*
1198          * Get ending-offset of the buffer page we need to replace (this may be
1199          * zero if the buffer hasn't been used yet).  Fall through if it's already
1200          * written out.
1201          */
1202         OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
1203         if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1204         {
1205                 /* nope, got work to do... */
1206                 XLogRecPtr      FinishedPageRqstPtr;
1207
1208                 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1209
1210                 /* Before waiting, get info_lck and update LogwrtResult */
1211                 {
1212                         /* use volatile pointer to prevent code rearrangement */
1213                         volatile XLogCtlData *xlogctl = XLogCtl;
1214
1215                         SpinLockAcquire(&xlogctl->info_lck);
1216                         if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
1217                                 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
1218                         LogwrtResult = xlogctl->LogwrtResult;
1219                         SpinLockRelease(&xlogctl->info_lck);
1220                 }
1221
1222                 update_needed = false;  /* Did the shared-request update */
1223
1224                 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1225                 {
1226                         /* OK, someone wrote it already */
1227                         Insert->LogwrtResult = LogwrtResult;
1228                 }
1229                 else
1230                 {
1231                         /* Must acquire write lock */
1232                         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1233                         LogwrtResult = Write->LogwrtResult;
1234                         if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1235                         {
1236                                 /* OK, someone wrote it already */
1237                                 LWLockRelease(WALWriteLock);
1238                                 Insert->LogwrtResult = LogwrtResult;
1239                         }
1240                         else
1241                         {
1242                                 /*
1243                                  * Have to write buffers while holding insert lock. This is
1244                                  * not good, so only write as much as we absolutely must.
1245                                  */
1246                                 WriteRqst.Write = OldPageRqstPtr;
1247                                 WriteRqst.Flush.xlogid = 0;
1248                                 WriteRqst.Flush.xrecoff = 0;
1249                                 XLogWrite(WriteRqst, false);
1250                                 LWLockRelease(WALWriteLock);
1251                                 Insert->LogwrtResult = LogwrtResult;
1252                         }
1253                 }
1254         }
1255
1256         /*
1257          * Now the next buffer slot is free and we can set it up to be the next
1258          * output page.
1259          */
1260         NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
1261         if (NewPageEndPtr.xrecoff >= XLogFileSize)
1262         {
1263                 /* crossing a logid boundary */
1264                 NewPageEndPtr.xlogid += 1;
1265                 NewPageEndPtr.xrecoff = XLOG_BLCKSZ;
1266         }
1267         else
1268                 NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
1269         XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
1270         NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
1271
1272         Insert->curridx = nextidx;
1273         Insert->currpage = NewPage;
1274
1275         Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
1276
1277         /*
1278          * Be sure to re-zero the buffer so that bytes beyond what we've written
1279          * will look like zeroes and not valid XLOG records...
1280          */
1281         MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
1282
1283         /*
1284          * Fill the new page's header
1285          */
1286         NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
1287
1288         /* NewPage->xlp_info = 0; */    /* done by memset */
1289         NewPage   ->xlp_tli = ThisTimeLineID;
1290         NewPage   ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
1291         NewPage   ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - XLOG_BLCKSZ;
1292
1293         /*
1294          * If first page of an XLOG segment file, make it a long header.
1295          */
1296         if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
1297         {
1298                 XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
1299
1300                 NewLongPage->xlp_sysid = ControlFile->system_identifier;
1301                 NewLongPage->xlp_seg_size = XLogSegSize;
1302                 NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
1303                 NewPage   ->xlp_info |= XLP_LONG_HEADER;
1304
1305                 Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
1306         }
1307
1308         return update_needed;
1309 }
1310
1311 /*
1312  * Write and/or fsync the log at least as far as WriteRqst indicates.
1313  *
1314  * If flexible == TRUE, we don't have to write as far as WriteRqst, but
1315  * may stop at any convenient boundary (such as a cache or logfile boundary).
1316  * This option allows us to avoid uselessly issuing multiple writes when a
1317  * single one would do.
1318  *
1319  * Must be called with WALWriteLock held.
1320  */
1321 static void
1322 XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
1323 {
1324         XLogCtlWrite *Write = &XLogCtl->Write;
1325         bool            ispartialpage;
1326         bool            finishing_seg;
1327         bool            use_existent;
1328         int                     curridx;
1329         int                     npages;
1330         int                     startidx;
1331         uint32          startoffset;
1332
1333         /* We should always be inside a critical section here */
1334         Assert(CritSectionCount > 0);
1335
1336         /*
1337          * Update local LogwrtResult (caller probably did this already, but...)
1338          */
1339         LogwrtResult = Write->LogwrtResult;
1340
1341         /*
1342          * Since successive pages in the xlog cache are consecutively allocated,
1343          * we can usually gather multiple pages together and issue just one
1344          * write() call.  npages is the number of pages we have determined can be
1345          * written together; startidx is the cache block index of the first one,
1346          * and startoffset is the file offset at which it should go. The latter
1347          * two variables are only valid when npages > 0, but we must initialize
1348          * all of them to keep the compiler quiet.
1349          */
1350         npages = 0;
1351         startidx = 0;
1352         startoffset = 0;
1353
1354         /*
1355          * Within the loop, curridx is the cache block index of the page to
1356          * consider writing.  We advance Write->curridx only after successfully
1357          * writing pages.  (Right now, this refinement is useless since we are
1358          * going to PANIC if any error occurs anyway; but someday it may come in
1359          * useful.)
1360          */
1361         curridx = Write->curridx;
1362
1363         while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1364         {
1365                 /*
1366                  * Make sure we're not ahead of the insert process.  This could happen
1367                  * if we're passed a bogus WriteRqst.Write that is past the end of the
1368                  * last page that's been initialized by AdvanceXLInsertBuffer.
1369                  */
1370                 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
1371                         elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
1372                                  LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1373                                  XLogCtl->xlblocks[curridx].xlogid,
1374                                  XLogCtl->xlblocks[curridx].xrecoff);
1375
1376                 /* Advance LogwrtResult.Write to end of current buffer page */
1377                 LogwrtResult.Write = XLogCtl->xlblocks[curridx];
1378                 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1379
1380                 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1381                 {
1382                         /*
1383                          * Switch to new logfile segment.  We cannot have any pending
1384                          * pages here (since we dump what we have at segment end).
1385                          */
1386                         Assert(npages == 0);
1387                         if (openLogFile >= 0)
1388                                 XLogFileClose();
1389                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1390
1391                         /* create/use new log file */
1392                         use_existent = true;
1393                         openLogFile = XLogFileInit(openLogId, openLogSeg,
1394                                                                            &use_existent, true);
1395                         openLogOff = 0;
1396
1397                         /* update pg_control, unless someone else already did */
1398                         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1399                         if (ControlFile->logId < openLogId ||
1400                                 (ControlFile->logId == openLogId &&
1401                                  ControlFile->logSeg < openLogSeg + 1))
1402                         {
1403                                 ControlFile->logId = openLogId;
1404                                 ControlFile->logSeg = openLogSeg + 1;
1405                                 ControlFile->time = time(NULL);
1406                                 UpdateControlFile();
1407
1408                                 /*
1409                                  * Signal bgwriter to start a checkpoint if it's been too long
1410                                  * since the last one.  (We look at local copy of RedoRecPtr
1411                                  * which might be a little out of date, but should be close
1412                                  * enough for this purpose.)
1413                                  *
1414                                  * A straight computation of segment number could overflow 32
1415                                  * bits.  Rather than assuming we have working 64-bit
1416                                  * arithmetic, we compare the highest-order bits separately,
1417                                  * and force a checkpoint immediately when they change.
1418                                  */
1419                                 if (IsUnderPostmaster)
1420                                 {
1421                                         uint32          old_segno,
1422                                                                 new_segno;
1423                                         uint32          old_highbits,
1424                                                                 new_highbits;
1425
1426                                         old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
1427                                                 (RedoRecPtr.xrecoff / XLogSegSize);
1428                                         old_highbits = RedoRecPtr.xlogid / XLogSegSize;
1429                                         new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile +
1430                                                 openLogSeg;
1431                                         new_highbits = openLogId / XLogSegSize;
1432                                         if (new_highbits != old_highbits ||
1433                                                 new_segno >= old_segno + (uint32) CheckPointSegments)
1434                                         {
1435 #ifdef WAL_DEBUG
1436                                                 if (XLOG_DEBUG)
1437                                                         elog(LOG, "time for a checkpoint, signaling bgwriter");
1438 #endif
1439                                                 RequestCheckpoint(false, true);
1440                                         }
1441                                 }
1442                         }
1443                         LWLockRelease(ControlFileLock);
1444                 }
1445
1446                 /* Make sure we have the current logfile open */
1447                 if (openLogFile < 0)
1448                 {
1449                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1450                         openLogFile = XLogFileOpen(openLogId, openLogSeg);
1451                         openLogOff = 0;
1452                 }
1453
1454                 /* Add current page to the set of pending pages-to-dump */
1455                 if (npages == 0)
1456                 {
1457                         /* first of group */
1458                         startidx = curridx;
1459                         startoffset = (LogwrtResult.Write.xrecoff - XLOG_BLCKSZ) % XLogSegSize;
1460                 }
1461                 npages++;
1462
1463                 /*
1464                  * Dump the set if this will be the last loop iteration, or if we are
1465                  * at the last page of the cache area (since the next page won't be
1466                  * contiguous in memory), or if we are at the end of the logfile
1467                  * segment.
1468                  */
1469                 finishing_seg = !ispartialpage &&
1470                         (startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
1471
1472                 if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) ||
1473                         curridx == XLogCtl->XLogCacheBlck ||
1474                         finishing_seg)
1475                 {
1476                         char       *from;
1477                         Size            nbytes;
1478
1479                         /* Need to seek in the file? */
1480                         if (openLogOff != startoffset)
1481                         {
1482                                 if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
1483                                         ereport(PANIC,
1484                                                         (errcode_for_file_access(),
1485                                                          errmsg("could not seek in log file %u, "
1486                                                                         "segment %u to offset %u: %m",
1487                                                                         openLogId, openLogSeg, startoffset)));
1488                                 openLogOff = startoffset;
1489                         }
1490
1491                         /* OK to write the page(s) */
1492                         from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
1493                         nbytes = npages * (Size) XLOG_BLCKSZ;
1494                         errno = 0;
1495                         if (write(openLogFile, from, nbytes) != nbytes)
1496                         {
1497                                 /* if write didn't set errno, assume no disk space */
1498                                 if (errno == 0)
1499                                         errno = ENOSPC;
1500                                 ereport(PANIC,
1501                                                 (errcode_for_file_access(),
1502                                                  errmsg("could not write to log file %u, segment %u "
1503                                                                 "at offset %u, length %lu: %m",
1504                                                                 openLogId, openLogSeg,
1505                                                                 openLogOff, (unsigned long) nbytes)));
1506                         }
1507
1508                         /* Update state for write */
1509                         openLogOff += nbytes;
1510                         Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
1511                         npages = 0;
1512
1513                         /*
1514                          * If we just wrote the whole last page of a logfile segment,
1515                          * fsync the segment immediately.  This avoids having to go back
1516                          * and re-open prior segments when an fsync request comes along
1517                          * later. Doing it here ensures that one and only one backend will
1518                          * perform this fsync.
1519                          *
1520                          * This is also the right place to notify the Archiver that the
1521                          * segment is ready to copy to archival storage.
1522                          */
1523                         if (finishing_seg)
1524                         {
1525                                 issue_xlog_fsync();
1526                                 LogwrtResult.Flush = LogwrtResult.Write;                /* end of page */
1527
1528                                 if (XLogArchivingActive())
1529                                         XLogArchiveNotifySeg(openLogId, openLogSeg);
1530                         }
1531                 }
1532
1533                 if (ispartialpage)
1534                 {
1535                         /* Only asked to write a partial page */
1536                         LogwrtResult.Write = WriteRqst.Write;
1537                         break;
1538                 }
1539                 curridx = NextBufIdx(curridx);
1540
1541                 /* If flexible, break out of loop as soon as we wrote something */
1542                 if (flexible && npages == 0)
1543                         break;
1544         }
1545
1546         Assert(npages == 0);
1547         Assert(curridx == Write->curridx);
1548
1549         /*
1550          * If asked to flush, do so
1551          */
1552         if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1553                 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1554         {
1555                 /*
1556                  * Could get here without iterating above loop, in which case we might
1557                  * have no open file or the wrong one.  However, we do not need to
1558                  * fsync more than one file.
1559                  */
1560                 if (sync_method != SYNC_METHOD_OPEN)
1561                 {
1562                         if (openLogFile >= 0 &&
1563                                 !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1564                                 XLogFileClose();
1565                         if (openLogFile < 0)
1566                         {
1567                                 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1568                                 openLogFile = XLogFileOpen(openLogId, openLogSeg);
1569                                 openLogOff = 0;
1570                         }
1571                         issue_xlog_fsync();
1572                 }
1573                 LogwrtResult.Flush = LogwrtResult.Write;
1574         }
1575
1576         /*
1577          * Update shared-memory status
1578          *
1579          * We make sure that the shared 'request' values do not fall behind the
1580          * 'result' values.  This is not absolutely essential, but it saves some
1581          * code in a couple of places.
1582          */
1583         {
1584                 /* use volatile pointer to prevent code rearrangement */
1585                 volatile XLogCtlData *xlogctl = XLogCtl;
1586
1587                 SpinLockAcquire(&xlogctl->info_lck);
1588                 xlogctl->LogwrtResult = LogwrtResult;
1589                 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1590                         xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1591                 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1592                         xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1593                 SpinLockRelease(&xlogctl->info_lck);
1594         }
1595
1596         Write->LogwrtResult = LogwrtResult;
1597 }
1598
1599 /*
1600  * Ensure that all XLOG data through the given position is flushed to disk.
1601  *
1602  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1603  * already held, and we try to avoid acquiring it if possible.
1604  */
1605 void
1606 XLogFlush(XLogRecPtr record)
1607 {
1608         XLogRecPtr      WriteRqstPtr;
1609         XLogwrtRqst WriteRqst;
1610
1611         /* Disabled during REDO */
1612         if (InRedo)
1613                 return;
1614
1615         /* Quick exit if already known flushed */
1616         if (XLByteLE(record, LogwrtResult.Flush))
1617                 return;
1618
1619 #ifdef WAL_DEBUG
1620         if (XLOG_DEBUG)
1621                 elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
1622                          record.xlogid, record.xrecoff,
1623                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1624                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1625 #endif
1626
1627         START_CRIT_SECTION();
1628
1629         /*
1630          * Since fsync is usually a horribly expensive operation, we try to
1631          * piggyback as much data as we can on each fsync: if we see any more data
1632          * entered into the xlog buffer, we'll write and fsync that too, so that
1633          * the final value of LogwrtResult.Flush is as large as possible. This
1634          * gives us some chance of avoiding another fsync immediately after.
1635          */
1636
1637         /* initialize to given target; may increase below */
1638         WriteRqstPtr = record;
1639
1640         /* read LogwrtResult and update local state */
1641         {
1642                 /* use volatile pointer to prevent code rearrangement */
1643                 volatile XLogCtlData *xlogctl = XLogCtl;
1644
1645                 SpinLockAcquire(&xlogctl->info_lck);
1646                 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1647                         WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1648                 LogwrtResult = xlogctl->LogwrtResult;
1649                 SpinLockRelease(&xlogctl->info_lck);
1650         }
1651
1652         /* done already? */
1653         if (!XLByteLE(record, LogwrtResult.Flush))
1654         {
1655                 /* now wait for the write lock */
1656                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1657                 LogwrtResult = XLogCtl->Write.LogwrtResult;
1658                 if (!XLByteLE(record, LogwrtResult.Flush))
1659                 {
1660                         /* try to write/flush later additions to XLOG as well */
1661                         if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
1662                         {
1663                                 XLogCtlInsert *Insert = &XLogCtl->Insert;
1664                                 uint32          freespace = INSERT_FREESPACE(Insert);
1665
1666                                 if (freespace < SizeOfXLogRecord)               /* buffer is full */
1667                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1668                                 else
1669                                 {
1670                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1671                                         WriteRqstPtr.xrecoff -= freespace;
1672                                 }
1673                                 LWLockRelease(WALInsertLock);
1674                                 WriteRqst.Write = WriteRqstPtr;
1675                                 WriteRqst.Flush = WriteRqstPtr;
1676                         }
1677                         else
1678                         {
1679                                 WriteRqst.Write = WriteRqstPtr;
1680                                 WriteRqst.Flush = record;
1681                         }
1682                         XLogWrite(WriteRqst, false);
1683                 }
1684                 LWLockRelease(WALWriteLock);
1685         }
1686
1687         END_CRIT_SECTION();
1688
1689         /*
1690          * If we still haven't flushed to the request point then we have a
1691          * problem; most likely, the requested flush point is past end of XLOG.
1692          * This has been seen to occur when a disk page has a corrupted LSN.
1693          *
1694          * Formerly we treated this as a PANIC condition, but that hurts the
1695          * system's robustness rather than helping it: we do not want to take down
1696          * the whole system due to corruption on one data page.  In particular, if
1697          * the bad page is encountered again during recovery then we would be
1698          * unable to restart the database at all!  (This scenario has actually
1699          * happened in the field several times with 7.1 releases. Note that we
1700          * cannot get here while InRedo is true, but if the bad page is brought in
1701          * and marked dirty during recovery then CreateCheckPoint will try to
1702          * flush it at the end of recovery.)
1703          *
1704          * The current approach is to ERROR under normal conditions, but only
1705          * WARNING during recovery, so that the system can be brought up even if
1706          * there's a corrupt LSN.  Note that for calls from xact.c, the ERROR will
1707          * be promoted to PANIC since xact.c calls this routine inside a critical
1708          * section.  However, calls from bufmgr.c are not within critical sections
1709          * and so we will not force a restart for a bad LSN on a data page.
1710          */
1711         if (XLByteLT(LogwrtResult.Flush, record))
1712                 elog(InRecovery ? WARNING : ERROR,
1713                 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
1714                          record.xlogid, record.xrecoff,
1715                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1716 }
1717
1718 /*
1719  * Create a new XLOG file segment, or open a pre-existing one.
1720  *
1721  * log, seg: identify segment to be created/opened.
1722  *
1723  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1724  * pre-existing file will be deleted).  On return, TRUE if a pre-existing
1725  * file was used.
1726  *
1727  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1728  * place.  This should be TRUE except during bootstrap log creation.  The
1729  * caller must *not* hold the lock at call.
1730  *
1731  * Returns FD of opened file.
1732  *
1733  * Note: errors here are ERROR not PANIC because we might or might not be
1734  * inside a critical section (eg, during checkpoint there is no reason to
1735  * take down the system on failure).  They will promote to PANIC if we are
1736  * in a critical section.
1737  */
1738 static int
1739 XLogFileInit(uint32 log, uint32 seg,
1740                          bool *use_existent, bool use_lock)
1741 {
1742         char            path[MAXPGPATH];
1743         char            tmppath[MAXPGPATH];
1744         char            zbuffer[XLOG_BLCKSZ];
1745         uint32          installed_log;
1746         uint32          installed_seg;
1747         int                     max_advance;
1748         int                     fd;
1749         int                     nbytes;
1750
1751         XLogFilePath(path, ThisTimeLineID, log, seg);
1752
1753         /*
1754          * Try to use existent file (checkpoint maker may have created it already)
1755          */
1756         if (*use_existent)
1757         {
1758                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1759                                                    S_IRUSR | S_IWUSR);
1760                 if (fd < 0)
1761                 {
1762                         if (errno != ENOENT)
1763                                 ereport(ERROR,
1764                                                 (errcode_for_file_access(),
1765                                                  errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1766                                                                 path, log, seg)));
1767                 }
1768                 else
1769                         return fd;
1770         }
1771
1772         /*
1773          * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
1774          * another process is doing the same thing.  If so, we will end up
1775          * pre-creating an extra log segment.  That seems OK, and better than
1776          * holding the lock throughout this lengthy process.
1777          */
1778         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1779
1780         unlink(tmppath);
1781
1782         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1783         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1784                                            S_IRUSR | S_IWUSR);
1785         if (fd < 0)
1786                 ereport(ERROR,
1787                                 (errcode_for_file_access(),
1788                                  errmsg("could not create file \"%s\": %m", tmppath)));
1789
1790         /*
1791          * Zero-fill the file.  We have to do this the hard way to ensure that all
1792          * the file space has really been allocated --- on platforms that allow
1793          * "holes" in files, just seeking to the end doesn't allocate intermediate
1794          * space.  This way, we know that we have all the space and (after the
1795          * fsync below) that all the indirect blocks are down on disk.  Therefore,
1796          * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
1797          * log file.
1798          */
1799         MemSet(zbuffer, 0, sizeof(zbuffer));
1800         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
1801         {
1802                 errno = 0;
1803                 if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
1804                 {
1805                         int                     save_errno = errno;
1806
1807                         /*
1808                          * If we fail to make the file, delete it to release disk space
1809                          */
1810                         unlink(tmppath);
1811                         /* if write didn't set errno, assume problem is no disk space */
1812                         errno = save_errno ? save_errno : ENOSPC;
1813
1814                         ereport(ERROR,
1815                                         (errcode_for_file_access(),
1816                                          errmsg("could not write to file \"%s\": %m", tmppath)));
1817                 }
1818         }
1819
1820         if (pg_fsync(fd) != 0)
1821                 ereport(ERROR,
1822                                 (errcode_for_file_access(),
1823                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1824
1825         if (close(fd))
1826                 ereport(ERROR,
1827                                 (errcode_for_file_access(),
1828                                  errmsg("could not close file \"%s\": %m", tmppath)));
1829
1830         /*
1831          * Now move the segment into place with its final name.
1832          *
1833          * If caller didn't want to use a pre-existing file, get rid of any
1834          * pre-existing file.  Otherwise, cope with possibility that someone else
1835          * has created the file while we were filling ours: if so, use ours to
1836          * pre-create a future log segment.
1837          */
1838         installed_log = log;
1839         installed_seg = seg;
1840         max_advance = XLOGfileslop;
1841         if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
1842                                                                 *use_existent, &max_advance,
1843                                                                 use_lock))
1844         {
1845                 /* No need for any more future segments... */
1846                 unlink(tmppath);
1847         }
1848
1849         /* Set flag to tell caller there was no existent file */
1850         *use_existent = false;
1851
1852         /* Now open original target segment (might not be file I just made) */
1853         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1854                                            S_IRUSR | S_IWUSR);
1855         if (fd < 0)
1856                 ereport(ERROR,
1857                                 (errcode_for_file_access(),
1858                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1859                                   path, log, seg)));
1860
1861         return fd;
1862 }
1863
1864 /*
1865  * Create a new XLOG file segment by copying a pre-existing one.
1866  *
1867  * log, seg: identify segment to be created.
1868  *
1869  * srcTLI, srclog, srcseg: identify segment to be copied (could be from
1870  *              a different timeline)
1871  *
1872  * Currently this is only used during recovery, and so there are no locking
1873  * considerations.      But we should be just as tense as XLogFileInit to avoid
1874  * emplacing a bogus file.
1875  */
1876 static void
1877 XLogFileCopy(uint32 log, uint32 seg,
1878                          TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
1879 {
1880         char            path[MAXPGPATH];
1881         char            tmppath[MAXPGPATH];
1882         char            buffer[XLOG_BLCKSZ];
1883         int                     srcfd;
1884         int                     fd;
1885         int                     nbytes;
1886
1887         /*
1888          * Open the source file
1889          */
1890         XLogFilePath(path, srcTLI, srclog, srcseg);
1891         srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1892         if (srcfd < 0)
1893                 ereport(ERROR,
1894                                 (errcode_for_file_access(),
1895                                  errmsg("could not open file \"%s\": %m", path)));
1896
1897         /*
1898          * Copy into a temp file name.
1899          */
1900         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1901
1902         unlink(tmppath);
1903
1904         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1905         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1906                                            S_IRUSR | S_IWUSR);
1907         if (fd < 0)
1908                 ereport(ERROR,
1909                                 (errcode_for_file_access(),
1910                                  errmsg("could not create file \"%s\": %m", tmppath)));
1911
1912         /*
1913          * Do the data copying.
1914          */
1915         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
1916         {
1917                 errno = 0;
1918                 if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
1919                 {
1920                         if (errno != 0)
1921                                 ereport(ERROR,
1922                                                 (errcode_for_file_access(),
1923                                                  errmsg("could not read file \"%s\": %m", path)));
1924                         else
1925                                 ereport(ERROR,
1926                                                 (errmsg("not enough data in file \"%s\"", path)));
1927                 }
1928                 errno = 0;
1929                 if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
1930                 {
1931                         int                     save_errno = errno;
1932
1933                         /*
1934                          * If we fail to make the file, delete it to release disk space
1935                          */
1936                         unlink(tmppath);
1937                         /* if write didn't set errno, assume problem is no disk space */
1938                         errno = save_errno ? save_errno : ENOSPC;
1939
1940                         ereport(ERROR,
1941                                         (errcode_for_file_access(),
1942                                          errmsg("could not write to file \"%s\": %m", tmppath)));
1943                 }
1944         }
1945
1946         if (pg_fsync(fd) != 0)
1947                 ereport(ERROR,
1948                                 (errcode_for_file_access(),
1949                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1950
1951         if (close(fd))
1952                 ereport(ERROR,
1953                                 (errcode_for_file_access(),
1954                                  errmsg("could not close file \"%s\": %m", tmppath)));
1955
1956         close(srcfd);
1957
1958         /*
1959          * Now move the segment into place with its final name.
1960          */
1961         if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
1962                 elog(ERROR, "InstallXLogFileSegment should not have failed");
1963 }
1964
1965 /*
1966  * Install a new XLOG segment file as a current or future log segment.
1967  *
1968  * This is used both to install a newly-created segment (which has a temp
1969  * filename while it's being created) and to recycle an old segment.
1970  *
1971  * *log, *seg: identify segment to install as (or first possible target).
1972  * When find_free is TRUE, these are modified on return to indicate the
1973  * actual installation location or last segment searched.
1974  *
1975  * tmppath: initial name of file to install.  It will be renamed into place.
1976  *
1977  * find_free: if TRUE, install the new segment at the first empty log/seg
1978  * number at or after the passed numbers.  If FALSE, install the new segment
1979  * exactly where specified, deleting any existing segment file there.
1980  *
1981  * *max_advance: maximum number of log/seg slots to advance past the starting
1982  * point.  Fail if no free slot is found in this range.  On return, reduced
1983  * by the number of slots skipped over.  (Irrelevant, and may be NULL,
1984  * when find_free is FALSE.)
1985  *
1986  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1987  * place.  This should be TRUE except during bootstrap log creation.  The
1988  * caller must *not* hold the lock at call.
1989  *
1990  * Returns TRUE if file installed, FALSE if not installed because of
1991  * exceeding max_advance limit.  (Any other kind of failure causes ereport().)
1992  */
1993 static bool
1994 InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
1995                                            bool find_free, int *max_advance,
1996                                            bool use_lock)
1997 {
1998         char            path[MAXPGPATH];
1999         struct stat stat_buf;
2000
2001         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2002
2003         /*
2004          * We want to be sure that only one process does this at a time.
2005          */
2006         if (use_lock)
2007                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
2008
2009         if (!find_free)
2010         {
2011                 /* Force installation: get rid of any pre-existing segment file */
2012                 unlink(path);
2013         }
2014         else
2015         {
2016                 /* Find a free slot to put it in */
2017                 while (stat(path, &stat_buf) == 0)
2018                 {
2019                         if (*max_advance <= 0)
2020                         {
2021                                 /* Failed to find a free slot within specified range */
2022                                 if (use_lock)
2023                                         LWLockRelease(ControlFileLock);
2024                                 return false;
2025                         }
2026                         NextLogSeg(*log, *seg);
2027                         (*max_advance)--;
2028                         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2029                 }
2030         }
2031
2032         /*
2033          * Prefer link() to rename() here just to be really sure that we don't
2034          * overwrite an existing logfile.  However, there shouldn't be one, so
2035          * rename() is an acceptable substitute except for the truly paranoid.
2036          */
2037 #if HAVE_WORKING_LINK
2038         if (link(tmppath, path) < 0)
2039                 ereport(ERROR,
2040                                 (errcode_for_file_access(),
2041                                  errmsg("could not link file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2042                                                 tmppath, path, *log, *seg)));
2043         unlink(tmppath);
2044 #else
2045         if (rename(tmppath, path) < 0)
2046                 ereport(ERROR,
2047                                 (errcode_for_file_access(),
2048                                  errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2049                                                 tmppath, path, *log, *seg)));
2050 #endif
2051
2052         if (use_lock)
2053                 LWLockRelease(ControlFileLock);
2054
2055         return true;
2056 }
2057
2058 /*
2059  * Open a pre-existing logfile segment for writing.
2060  */
2061 static int
2062 XLogFileOpen(uint32 log, uint32 seg)
2063 {
2064         char            path[MAXPGPATH];
2065         int                     fd;
2066
2067         XLogFilePath(path, ThisTimeLineID, log, seg);
2068
2069         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
2070                                            S_IRUSR | S_IWUSR);
2071         if (fd < 0)
2072                 ereport(PANIC,
2073                                 (errcode_for_file_access(),
2074                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2075                                   path, log, seg)));
2076
2077         return fd;
2078 }
2079
2080 /*
2081  * Open a logfile segment for reading (during recovery).
2082  */
2083 static int
2084 XLogFileRead(uint32 log, uint32 seg, int emode)
2085 {
2086         char            path[MAXPGPATH];
2087         char            xlogfname[MAXFNAMELEN];
2088         ListCell   *cell;
2089         int                     fd;
2090
2091         /*
2092          * Loop looking for a suitable timeline ID: we might need to read any of
2093          * the timelines listed in expectedTLIs.
2094          *
2095          * We expect curFileTLI on entry to be the TLI of the preceding file in
2096          * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
2097          * to go backwards; this prevents us from picking up the wrong file when a
2098          * parent timeline extends to higher segment numbers than the child we
2099          * want to read.
2100          */
2101         foreach(cell, expectedTLIs)
2102         {
2103                 TimeLineID      tli = (TimeLineID) lfirst_int(cell);
2104
2105                 if (tli < curFileTLI)
2106                         break;                          /* don't bother looking at too-old TLIs */
2107
2108                 if (InArchiveRecovery)
2109                 {
2110                         XLogFileName(xlogfname, tli, log, seg);
2111                         restoredFromArchive = RestoreArchivedFile(path, xlogfname,
2112                                                                                                           "RECOVERYXLOG",
2113                                                                                                           XLogSegSize);
2114                 }
2115                 else
2116                         XLogFilePath(path, tli, log, seg);
2117
2118                 fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2119                 if (fd >= 0)
2120                 {
2121                         /* Success! */
2122                         curFileTLI = tli;
2123                         return fd;
2124                 }
2125                 if (errno != ENOENT)    /* unexpected failure? */
2126                         ereport(PANIC,
2127                                         (errcode_for_file_access(),
2128                         errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2129                                    path, log, seg)));
2130         }
2131
2132         /* Couldn't find it.  For simplicity, complain about front timeline */
2133         XLogFilePath(path, recoveryTargetTLI, log, seg);
2134         errno = ENOENT;
2135         ereport(emode,
2136                         (errcode_for_file_access(),
2137                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2138                                   path, log, seg)));
2139         return -1;
2140 }
2141
2142 /*
2143  * Close the current logfile segment for writing.
2144  */
2145 static void
2146 XLogFileClose(void)
2147 {
2148         Assert(openLogFile >= 0);
2149
2150 #if defined(HAVE_DECL_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
2151         /*
2152          * WAL segment files will not be re-read in normal operation, so we advise
2153          * OS to release any cached pages.  But do not do so if WAL archiving is
2154          * active, because archiver process could use the cache to read the WAL
2155          * segment.
2156          *
2157          * While O_DIRECT works for O_SYNC, posix_fadvise() works for fsync()
2158          * and O_SYNC, and some platforms only have posix_fadvise().
2159          */
2160         if (!XLogArchivingActive())
2161                 posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
2162 #endif
2163
2164         if (close(openLogFile))
2165                 ereport(PANIC,
2166                         (errcode_for_file_access(),
2167                         errmsg("could not close log file %u, segment %u: %m",
2168                                    openLogId, openLogSeg)));
2169         openLogFile = -1;
2170 }
2171
2172 /*
2173  * Attempt to retrieve the specified file from off-line archival storage.
2174  * If successful, fill "path" with its complete path (note that this will be
2175  * a temp file name that doesn't follow the normal naming convention), and
2176  * return TRUE.
2177  *
2178  * If not successful, fill "path" with the name of the normal on-line file
2179  * (which may or may not actually exist, but we'll try to use it), and return
2180  * FALSE.
2181  *
2182  * For fixed-size files, the caller may pass the expected size as an
2183  * additional crosscheck on successful recovery.  If the file size is not
2184  * known, set expectedSize = 0.
2185  */
2186 static bool
2187 RestoreArchivedFile(char *path, const char *xlogfname,
2188                                         const char *recovername, off_t expectedSize)
2189 {
2190         char            xlogpath[MAXPGPATH];
2191         char            xlogRestoreCmd[MAXPGPATH];
2192         char       *dp;
2193         char       *endp;
2194         const char *sp;
2195         int                     rc;
2196         struct stat stat_buf;
2197
2198         /*
2199          * When doing archive recovery, we always prefer an archived log file even
2200          * if a file of the same name exists in XLOGDIR.  The reason is that the
2201          * file in XLOGDIR could be an old, un-filled or partly-filled version
2202          * that was copied and restored as part of backing up $PGDATA.
2203          *
2204          * We could try to optimize this slightly by checking the local copy
2205          * lastchange timestamp against the archived copy, but we have no API to
2206          * do this, nor can we guarantee that the lastchange timestamp was
2207          * preserved correctly when we copied to archive. Our aim is robustness,
2208          * so we elect not to do this.
2209          *
2210          * If we cannot obtain the log file from the archive, however, we will try
2211          * to use the XLOGDIR file if it exists.  This is so that we can make use
2212          * of log segments that weren't yet transferred to the archive.
2213          *
2214          * Notice that we don't actually overwrite any files when we copy back
2215          * from archive because the recoveryRestoreCommand may inadvertently
2216          * restore inappropriate xlogs, or they may be corrupt, so we may wish to
2217          * fallback to the segments remaining in current XLOGDIR later. The
2218          * copy-from-archive filename is always the same, ensuring that we don't
2219          * run out of disk space on long recoveries.
2220          */
2221         snprintf(xlogpath, MAXPGPATH, XLOGDIR "/%s", recovername);
2222
2223         /*
2224          * Make sure there is no existing file named recovername.
2225          */
2226         if (stat(xlogpath, &stat_buf) != 0)
2227         {
2228                 if (errno != ENOENT)
2229                         ereport(FATAL,
2230                                         (errcode_for_file_access(),
2231                                          errmsg("could not stat file \"%s\": %m",
2232                                                         xlogpath)));
2233         }
2234         else
2235         {
2236                 if (unlink(xlogpath) != 0)
2237                         ereport(FATAL,
2238                                         (errcode_for_file_access(),
2239                                          errmsg("could not remove file \"%s\": %m",
2240                                                         xlogpath)));
2241         }
2242
2243         /*
2244          * construct the command to be executed
2245          */
2246         dp = xlogRestoreCmd;
2247         endp = xlogRestoreCmd + MAXPGPATH - 1;
2248         *endp = '\0';
2249
2250         for (sp = recoveryRestoreCommand; *sp; sp++)
2251         {
2252                 if (*sp == '%')
2253                 {
2254                         switch (sp[1])
2255                         {
2256                                 case 'p':
2257                                         /* %p: full path of target file */
2258                                         sp++;
2259                                         StrNCpy(dp, xlogpath, endp - dp);
2260                                         make_native_path(dp);
2261                                         dp += strlen(dp);
2262                                         break;
2263                                 case 'f':
2264                                         /* %f: filename of desired file */
2265                                         sp++;
2266                                         StrNCpy(dp, xlogfname, endp - dp);
2267                                         dp += strlen(dp);
2268                                         break;
2269                                 case '%':
2270                                         /* convert %% to a single % */
2271                                         sp++;
2272                                         if (dp < endp)
2273                                                 *dp++ = *sp;
2274                                         break;
2275                                 default:
2276                                         /* otherwise treat the % as not special */
2277                                         if (dp < endp)
2278                                                 *dp++ = *sp;
2279                                         break;
2280                         }
2281                 }
2282                 else
2283                 {
2284                         if (dp < endp)
2285                                 *dp++ = *sp;
2286                 }
2287         }
2288         *dp = '\0';
2289
2290         ereport(DEBUG3,
2291                         (errmsg_internal("executing restore command \"%s\"",
2292                                                          xlogRestoreCmd)));
2293
2294         /*
2295          * Copy xlog from archival storage to XLOGDIR
2296          */
2297         rc = system(xlogRestoreCmd);
2298         if (rc == 0)
2299         {
2300                 /*
2301                  * command apparently succeeded, but let's make sure the file is
2302                  * really there now and has the correct size.
2303                  *
2304                  * XXX I made wrong-size a fatal error to ensure the DBA would notice
2305                  * it, but is that too strong?  We could try to plow ahead with a
2306                  * local copy of the file ... but the problem is that there probably
2307                  * isn't one, and we'd incorrectly conclude we've reached the end of
2308                  * WAL and we're done recovering ...
2309                  */
2310                 if (stat(xlogpath, &stat_buf) == 0)
2311                 {
2312                         if (expectedSize > 0 && stat_buf.st_size != expectedSize)
2313                                 ereport(FATAL,
2314                                                 (errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
2315                                                                 xlogfname,
2316                                                                 (unsigned long) stat_buf.st_size,
2317                                                                 (unsigned long) expectedSize)));
2318                         else
2319                         {
2320                                 ereport(LOG,
2321                                                 (errmsg("restored log file \"%s\" from archive",
2322                                                                 xlogfname)));
2323                                 strcpy(path, xlogpath);
2324                                 return true;
2325                         }
2326                 }
2327                 else
2328                 {
2329                         /* stat failed */
2330                         if (errno != ENOENT)
2331                                 ereport(FATAL,
2332                                                 (errcode_for_file_access(),
2333                                                  errmsg("could not stat file \"%s\": %m",
2334                                                                 xlogpath)));
2335                 }
2336         }
2337
2338         /*
2339          * remember, we rollforward UNTIL the restore fails so failure here is
2340          * just part of the process... that makes it difficult to determine
2341          * whether the restore failed because there isn't an archive to restore,
2342          * or because the administrator has specified the restore program
2343          * incorrectly.  We have to assume the former.
2344          */
2345         ereport(DEBUG2,
2346                 (errmsg("could not restore file \"%s\" from archive: return code %d",
2347                                 xlogfname, rc)));
2348
2349         /*
2350          * if an archived file is not available, there might still be a version of
2351          * this file in XLOGDIR, so return that as the filename to open.
2352          *
2353          * In many recovery scenarios we expect this to fail also, but if so that
2354          * just means we've reached the end of WAL.
2355          */
2356         snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
2357         return false;
2358 }
2359
2360 /*
2361  * Preallocate log files beyond the specified log endpoint, according to
2362  * the XLOGfile user parameter.
2363  */
2364 static int
2365 PreallocXlogFiles(XLogRecPtr endptr)
2366 {
2367         int                     nsegsadded = 0;
2368         uint32          _logId;
2369         uint32          _logSeg;
2370         int                     lf;
2371         bool            use_existent;
2372
2373         XLByteToPrevSeg(endptr, _logId, _logSeg);
2374         if ((endptr.xrecoff - 1) % XLogSegSize >=
2375                 (uint32) (0.75 * XLogSegSize))
2376         {
2377                 NextLogSeg(_logId, _logSeg);
2378                 use_existent = true;
2379                 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
2380                 close(lf);
2381                 if (!use_existent)
2382                         nsegsadded++;
2383         }
2384         return nsegsadded;
2385 }
2386
2387 /*
2388  * Remove or move offline all log files older or equal to passed log/seg#
2389  *
2390  * endptr is current (or recent) end of xlog; this is used to determine
2391  * whether we want to recycle rather than delete no-longer-wanted log files.
2392  */
2393 static void
2394 MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
2395                                 int *nsegsremoved, int *nsegsrecycled)
2396 {
2397         uint32          endlogId;
2398         uint32          endlogSeg;
2399         int                     max_advance;
2400         DIR                *xldir;
2401         struct dirent *xlde;
2402         char            lastoff[MAXFNAMELEN];
2403         char            path[MAXPGPATH];
2404
2405         *nsegsremoved = 0;
2406         *nsegsrecycled = 0;
2407
2408         /*
2409          * Initialize info about where to try to recycle to.  We allow recycling
2410          * segments up to XLOGfileslop segments beyond the current XLOG location.
2411          */
2412         XLByteToPrevSeg(endptr, endlogId, endlogSeg);
2413         max_advance = XLOGfileslop;
2414
2415         xldir = AllocateDir(XLOGDIR);
2416         if (xldir == NULL)
2417                 ereport(ERROR,
2418                                 (errcode_for_file_access(),
2419                                  errmsg("could not open transaction log directory \"%s\": %m",
2420                                                 XLOGDIR)));
2421
2422         XLogFileName(lastoff, ThisTimeLineID, log, seg);
2423
2424         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2425         {
2426                 /*
2427                  * We ignore the timeline part of the XLOG segment identifiers in
2428                  * deciding whether a segment is still needed.  This ensures that we
2429                  * won't prematurely remove a segment from a parent timeline. We could
2430                  * probably be a little more proactive about removing segments of
2431                  * non-parent timelines, but that would be a whole lot more
2432                  * complicated.
2433                  *
2434                  * We use the alphanumeric sorting property of the filenames to decide
2435                  * which ones are earlier than the lastoff segment.
2436                  */
2437                 if (strlen(xlde->d_name) == 24 &&
2438                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2439                         strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
2440                 {
2441                         bool            recycle;
2442
2443                         if (XLogArchivingActive())
2444                                 recycle = XLogArchiveIsDone(xlde->d_name);
2445                         else
2446                                 recycle = true;
2447
2448                         if (recycle)
2449                         {
2450                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2451
2452                                 /*
2453                                  * Before deleting the file, see if it can be recycled as a
2454                                  * future log segment.
2455                                  */
2456                                 if (InstallXLogFileSegment(&endlogId, &endlogSeg, path,
2457                                                                                    true, &max_advance,
2458                                                                                    true))
2459                                 {
2460                                         ereport(DEBUG2,
2461                                                         (errmsg("recycled transaction log file \"%s\"",
2462                                                                         xlde->d_name)));
2463                                         (*nsegsrecycled)++;
2464                                         /* Needn't recheck that slot on future iterations */
2465                                         if (max_advance > 0)
2466                                         {
2467                                                 NextLogSeg(endlogId, endlogSeg);
2468                                                 max_advance--;
2469                                         }
2470                                 }
2471                                 else
2472                                 {
2473                                         /* No need for any more future segments... */
2474                                         ereport(DEBUG2,
2475                                                         (errmsg("removing transaction log file \"%s\"",
2476                                                                         xlde->d_name)));
2477                                         unlink(path);
2478                                         (*nsegsremoved)++;
2479                                 }
2480
2481                                 XLogArchiveCleanup(xlde->d_name);
2482                         }
2483                 }
2484         }
2485
2486         FreeDir(xldir);
2487 }
2488
2489 /*
2490  * Remove previous backup history files
2491  */
2492 static void
2493 RemoveOldBackupHistory(void)
2494 {
2495         DIR                *xldir;
2496         struct dirent *xlde;
2497         char            path[MAXPGPATH];
2498
2499         xldir = AllocateDir(XLOGDIR);
2500         if (xldir == NULL)
2501                 ereport(ERROR,
2502                                 (errcode_for_file_access(),
2503                                  errmsg("could not open transaction log directory \"%s\": %m",
2504                                                 XLOGDIR)));
2505
2506         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2507         {
2508                 if (strlen(xlde->d_name) > 24 &&
2509                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2510                         strcmp(xlde->d_name + strlen(xlde->d_name) - strlen(".backup"),
2511                                    ".backup") == 0)
2512                 {
2513                         /* Remove any *.backup files that have been archived. */
2514                         if (!XLogArchivingActive() || XLogArchiveIsDone(xlde->d_name))
2515                         {
2516                                 ereport(DEBUG2,
2517                                 (errmsg("removing transaction log backup history file \"%s\"",
2518                                                 xlde->d_name)));
2519                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2520                                 unlink(path);
2521                                 XLogArchiveCleanup(xlde->d_name);
2522                         }
2523                 }
2524         }
2525
2526         FreeDir(xldir);
2527 }
2528
2529 /*
2530  * Restore the backup blocks present in an XLOG record, if any.
2531  *
2532  * We assume all of the record has been read into memory at *record.
2533  *
2534  * Note: when a backup block is available in XLOG, we restore it
2535  * unconditionally, even if the page in the database appears newer.
2536  * This is to protect ourselves against database pages that were partially
2537  * or incorrectly written during a crash.  We assume that the XLOG data
2538  * must be good because it has passed a CRC check, while the database
2539  * page might not be.  This will force us to replay all subsequent
2540  * modifications of the page that appear in XLOG, rather than possibly
2541  * ignoring them as already applied, but that's not a huge drawback.
2542  */
2543 static void
2544 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
2545 {
2546         Relation        reln;
2547         Buffer          buffer;
2548         Page            page;
2549         BkpBlock        bkpb;
2550         char       *blk;
2551         int                     i;
2552
2553         blk = (char *) XLogRecGetData(record) + record->xl_len;
2554         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2555         {
2556                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2557                         continue;
2558
2559                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2560                 blk += sizeof(BkpBlock);
2561
2562                 reln = XLogOpenRelation(bkpb.node);
2563                 buffer = XLogReadBuffer(reln, bkpb.block, true);
2564                 Assert(BufferIsValid(buffer));
2565                 page = (Page) BufferGetPage(buffer);
2566
2567                 if (bkpb.hole_length == 0)
2568                 {
2569                         memcpy((char *) page, blk, BLCKSZ);
2570                 }
2571                 else
2572                 {
2573                         /* must zero-fill the hole */
2574                         MemSet((char *) page, 0, BLCKSZ);
2575                         memcpy((char *) page, blk, bkpb.hole_offset);
2576                         memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
2577                                    blk + bkpb.hole_offset,
2578                                    BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
2579                 }
2580
2581                 PageSetLSN(page, lsn);
2582                 PageSetTLI(page, ThisTimeLineID);
2583                 MarkBufferDirty(buffer);
2584                 UnlockReleaseBuffer(buffer);
2585
2586                 blk += BLCKSZ - bkpb.hole_length;
2587         }
2588 }
2589
2590 /*
2591  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
2592  * record (other than to the minimal extent of computing the amount of
2593  * data to read in) until we've checked the CRCs.
2594  *
2595  * We assume all of the record has been read into memory at *record.
2596  */
2597 static bool
2598 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
2599 {
2600         pg_crc32        crc;
2601         int                     i;
2602         uint32          len = record->xl_len;
2603         BkpBlock        bkpb;
2604         char       *blk;
2605
2606         /* First the rmgr data */
2607         INIT_CRC32(crc);
2608         COMP_CRC32(crc, XLogRecGetData(record), len);
2609
2610         /* Add in the backup blocks, if any */
2611         blk = (char *) XLogRecGetData(record) + len;
2612         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2613         {
2614                 uint32          blen;
2615
2616                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2617                         continue;
2618
2619                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2620                 if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
2621                 {
2622                         ereport(emode,
2623                                         (errmsg("incorrect hole size in record at %X/%X",
2624                                                         recptr.xlogid, recptr.xrecoff)));
2625                         return false;
2626                 }
2627                 blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
2628                 COMP_CRC32(crc, blk, blen);
2629                 blk += blen;
2630         }
2631
2632         /* Check that xl_tot_len agrees with our calculation */
2633         if (blk != (char *) record + record->xl_tot_len)
2634         {
2635                 ereport(emode,
2636                                 (errmsg("incorrect total length in record at %X/%X",
2637                                                 recptr.xlogid, recptr.xrecoff)));
2638                 return false;
2639         }
2640
2641         /* Finally include the record header */
2642         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
2643                            SizeOfXLogRecord - sizeof(pg_crc32));
2644         FIN_CRC32(crc);
2645
2646         if (!EQ_CRC32(record->xl_crc, crc))
2647         {
2648                 ereport(emode,
2649                 (errmsg("incorrect resource manager data checksum in record at %X/%X",
2650                                 recptr.xlogid, recptr.xrecoff)));
2651                 return false;
2652         }
2653
2654         return true;
2655 }
2656
2657 /*
2658  * Attempt to read an XLOG record.
2659  *
2660  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
2661  * try to read a record just after the last one previously read.
2662  *
2663  * If no valid record is available, returns NULL, or fails if emode is PANIC.
2664  * (emode must be either PANIC or LOG.)
2665  *
2666  * The record is copied into readRecordBuf, so that on successful return,
2667  * the returned record pointer always points there.
2668  */
2669 static XLogRecord *
2670 ReadRecord(XLogRecPtr *RecPtr, int emode)
2671 {
2672         XLogRecord *record;
2673         char       *buffer;
2674         XLogRecPtr      tmpRecPtr = EndRecPtr;
2675         bool            randAccess = false;
2676         uint32          len,
2677                                 total_len;
2678         uint32          targetPageOff;
2679         uint32          targetRecOff;
2680         uint32          pageHeaderSize;
2681
2682         if (readBuf == NULL)
2683         {
2684                 /*
2685                  * First time through, permanently allocate readBuf.  We do it this
2686                  * way, rather than just making a static array, for two reasons: (1)
2687                  * no need to waste the storage in most instantiations of the backend;
2688                  * (2) a static char array isn't guaranteed to have any particular
2689                  * alignment, whereas malloc() will provide MAXALIGN'd storage.
2690                  */
2691                 readBuf = (char *) malloc(XLOG_BLCKSZ);
2692                 Assert(readBuf != NULL);
2693         }
2694
2695         if (RecPtr == NULL)
2696         {
2697                 RecPtr = &tmpRecPtr;
2698                 /* fast case if next record is on same page */
2699                 if (nextRecord != NULL)
2700                 {
2701                         record = nextRecord;
2702                         goto got_record;
2703                 }
2704                 /* align old recptr to next page */
2705                 if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
2706                         tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
2707                 if (tmpRecPtr.xrecoff >= XLogFileSize)
2708                 {
2709                         (tmpRecPtr.xlogid)++;
2710                         tmpRecPtr.xrecoff = 0;
2711                 }
2712                 /* We will account for page header size below */
2713         }
2714         else
2715         {
2716                 if (!XRecOffIsValid(RecPtr->xrecoff))
2717                         ereport(PANIC,
2718                                         (errmsg("invalid record offset at %X/%X",
2719                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2720
2721                 /*
2722                  * Since we are going to a random position in WAL, forget any prior
2723                  * state about what timeline we were in, and allow it to be any
2724                  * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
2725                  * to go backwards (but we can't reset that variable right here, since
2726                  * we might not change files at all).
2727                  */
2728                 lastPageTLI = 0;                /* see comment in ValidXLOGHeader */
2729                 randAccess = true;              /* allow curFileTLI to go backwards too */
2730         }
2731
2732         if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
2733         {
2734                 close(readFile);
2735                 readFile = -1;
2736         }
2737         XLByteToSeg(*RecPtr, readId, readSeg);
2738         if (readFile < 0)
2739         {
2740                 /* Now it's okay to reset curFileTLI if random fetch */
2741                 if (randAccess)
2742                         curFileTLI = 0;
2743
2744                 readFile = XLogFileRead(readId, readSeg, emode);
2745                 if (readFile < 0)
2746                         goto next_record_is_invalid;
2747
2748                 /*
2749                  * Whenever switching to a new WAL segment, we read the first page of
2750                  * the file and validate its header, even if that's not where the
2751                  * target record is.  This is so that we can check the additional
2752                  * identification info that is present in the first page's "long"
2753                  * header.
2754                  */
2755                 readOff = 0;
2756                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2757                 {
2758                         ereport(emode,
2759                                         (errcode_for_file_access(),
2760                                          errmsg("could not read from log file %u, segment %u, offset %u: %m",
2761                                                         readId, readSeg, readOff)));
2762                         goto next_record_is_invalid;
2763                 }
2764                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2765                         goto next_record_is_invalid;
2766         }
2767
2768         targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
2769         if (readOff != targetPageOff)
2770         {
2771                 readOff = targetPageOff;
2772                 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
2773                 {
2774                         ereport(emode,
2775                                         (errcode_for_file_access(),
2776                                          errmsg("could not seek in log file %u, segment %u to offset %u: %m",
2777                                                         readId, readSeg, readOff)));
2778                         goto next_record_is_invalid;
2779                 }
2780                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2781                 {
2782                         ereport(emode,
2783                                         (errcode_for_file_access(),
2784                                          errmsg("could not read from log file %u, segment %u at offset %u: %m",
2785                                                         readId, readSeg, readOff)));
2786                         goto next_record_is_invalid;
2787                 }
2788                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2789                         goto next_record_is_invalid;
2790         }
2791         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2792         targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
2793         if (targetRecOff == 0)
2794         {
2795                 /*
2796                  * Can only get here in the continuing-from-prev-page case, because
2797                  * XRecOffIsValid eliminated the zero-page-offset case otherwise. Need
2798                  * to skip over the new page's header.
2799                  */
2800                 tmpRecPtr.xrecoff += pageHeaderSize;
2801                 targetRecOff = pageHeaderSize;
2802         }
2803         else if (targetRecOff < pageHeaderSize)
2804         {
2805                 ereport(emode,
2806                                 (errmsg("invalid record offset at %X/%X",
2807                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2808                 goto next_record_is_invalid;
2809         }
2810         if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
2811                 targetRecOff == pageHeaderSize)
2812         {
2813                 ereport(emode,
2814                                 (errmsg("contrecord is requested by %X/%X",
2815                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2816                 goto next_record_is_invalid;
2817         }
2818         record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
2819
2820 got_record:;
2821
2822         /*
2823          * Currently, xl_len == 0 must be bad data, but that might not be true
2824          * forever.  See note in XLogInsert.
2825          */
2826         if (record->xl_len == 0)
2827         {
2828                 ereport(emode,
2829                                 (errmsg("record with zero length at %X/%X",
2830                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2831                 goto next_record_is_invalid;
2832         }
2833         if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
2834                 record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
2835                 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
2836         {
2837                 ereport(emode,
2838                                 (errmsg("invalid record length at %X/%X",
2839                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2840                 goto next_record_is_invalid;
2841         }
2842         if (record->xl_rmid > RM_MAX_ID)
2843         {
2844                 ereport(emode,
2845                                 (errmsg("invalid resource manager ID %u at %X/%X",
2846                                                 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
2847                 goto next_record_is_invalid;
2848         }
2849         if (randAccess)
2850         {
2851                 /*
2852                  * We can't exactly verify the prev-link, but surely it should be less
2853                  * than the record's own address.
2854                  */
2855                 if (!XLByteLT(record->xl_prev, *RecPtr))
2856                 {
2857                         ereport(emode,
2858                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
2859                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
2860                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2861                         goto next_record_is_invalid;
2862                 }
2863         }
2864         else
2865         {
2866                 /*
2867                  * Record's prev-link should exactly match our previous location. This
2868                  * check guards against torn WAL pages where a stale but valid-looking
2869                  * WAL record starts on a sector boundary.
2870                  */
2871                 if (!XLByteEQ(record->xl_prev, ReadRecPtr))
2872                 {
2873                         ereport(emode,
2874                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
2875                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
2876                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2877                         goto next_record_is_invalid;
2878                 }
2879         }
2880
2881         /*
2882          * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
2883          * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
2884          * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
2885          * enough for all "normal" records, but very large commit or abort records
2886          * might need more space.)
2887          */
2888         total_len = record->xl_tot_len;
2889         if (total_len > readRecordBufSize)
2890         {
2891                 uint32          newSize = total_len;
2892
2893                 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
2894                 newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
2895                 if (readRecordBuf)
2896                         free(readRecordBuf);
2897                 readRecordBuf = (char *) malloc(newSize);
2898                 if (!readRecordBuf)
2899                 {
2900                         readRecordBufSize = 0;
2901                         /* We treat this as a "bogus data" condition */
2902                         ereport(emode,
2903                                         (errmsg("record length %u at %X/%X too long",
2904                                                         total_len, RecPtr->xlogid, RecPtr->xrecoff)));
2905                         goto next_record_is_invalid;
2906                 }
2907                 readRecordBufSize = newSize;
2908         }
2909
2910         buffer = readRecordBuf;
2911         nextRecord = NULL;
2912         len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
2913         if (total_len > len)
2914         {
2915                 /* Need to reassemble record */
2916                 XLogContRecord *contrecord;
2917                 uint32          gotlen = len;
2918
2919                 memcpy(buffer, record, len);
2920                 record = (XLogRecord *) buffer;
2921                 buffer += len;
2922                 for (;;)
2923                 {
2924                         readOff += XLOG_BLCKSZ;
2925                         if (readOff >= XLogSegSize)
2926                         {
2927                                 close(readFile);
2928                                 readFile = -1;
2929                                 NextLogSeg(readId, readSeg);
2930                                 readFile = XLogFileRead(readId, readSeg, emode);
2931                                 if (readFile < 0)
2932                                         goto next_record_is_invalid;
2933                                 readOff = 0;
2934                         }
2935                         if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2936                         {
2937                                 ereport(emode,
2938                                                 (errcode_for_file_access(),
2939                                                  errmsg("could not read from log file %u, segment %u, offset %u: %m",
2940                                                                 readId, readSeg, readOff)));
2941                                 goto next_record_is_invalid;
2942                         }
2943                         if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2944                                 goto next_record_is_invalid;
2945                         if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
2946                         {
2947                                 ereport(emode,
2948                                                 (errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
2949                                                                 readId, readSeg, readOff)));
2950                                 goto next_record_is_invalid;
2951                         }
2952                         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2953                         contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize);
2954                         if (contrecord->xl_rem_len == 0 ||
2955                                 total_len != (contrecord->xl_rem_len + gotlen))
2956                         {
2957                                 ereport(emode,
2958                                                 (errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
2959                                                                 contrecord->xl_rem_len,
2960                                                                 readId, readSeg, readOff)));
2961                                 goto next_record_is_invalid;
2962                         }
2963                         len = XLOG_BLCKSZ - pageHeaderSize - SizeOfXLogContRecord;
2964                         if (contrecord->xl_rem_len > len)
2965                         {
2966                                 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
2967                                 gotlen += len;
2968                                 buffer += len;
2969                                 continue;
2970                         }
2971                         memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
2972                                    contrecord->xl_rem_len);
2973                         break;
2974                 }
2975                 if (!RecordIsValid(record, *RecPtr, emode))
2976                         goto next_record_is_invalid;
2977                 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2978                 if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
2979                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
2980                 {
2981                         nextRecord = (XLogRecord *) ((char *) contrecord +
2982                                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
2983                 }
2984                 EndRecPtr.xlogid = readId;
2985                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
2986                         pageHeaderSize +
2987                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
2988                 ReadRecPtr = *RecPtr;
2989                 return record;
2990         }
2991
2992         /* Record does not cross a page boundary */
2993         if (!RecordIsValid(record, *RecPtr, emode))
2994                 goto next_record_is_invalid;
2995         if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
2996                 MAXALIGN(total_len))
2997                 nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
2998         EndRecPtr.xlogid = RecPtr->xlogid;
2999         EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
3000         ReadRecPtr = *RecPtr;
3001         memcpy(buffer, record, total_len);
3002         return (XLogRecord *) buffer;
3003
3004 next_record_is_invalid:;
3005         close(readFile);
3006         readFile = -1;
3007         nextRecord = NULL;
3008         return NULL;
3009 }
3010
3011 /*
3012  * Check whether the xlog header of a page just read in looks valid.
3013  *
3014  * This is just a convenience subroutine to avoid duplicated code in
3015  * ReadRecord.  It's not intended for use from anywhere else.
3016  */
3017 static bool
3018 ValidXLOGHeader(XLogPageHeader hdr, int emode)
3019 {
3020         XLogRecPtr      recaddr;
3021
3022         if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
3023         {
3024                 ereport(emode,
3025                                 (errmsg("invalid magic number %04X in log file %u, segment %u, offset %u",
3026                                                 hdr->xlp_magic, readId, readSeg, readOff)));
3027                 return false;
3028         }
3029         if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
3030         {
3031                 ereport(emode,
3032                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3033                                                 hdr->xlp_info, readId, readSeg, readOff)));
3034                 return false;
3035         }
3036         if (hdr->xlp_info & XLP_LONG_HEADER)
3037         {
3038                 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
3039
3040                 if (longhdr->xlp_sysid != ControlFile->system_identifier)
3041                 {
3042                         char            fhdrident_str[32];
3043                         char            sysident_str[32];
3044
3045                         /*
3046                          * Format sysids separately to keep platform-dependent format code
3047                          * out of the translatable message string.
3048                          */
3049                         snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
3050                                          longhdr->xlp_sysid);
3051                         snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
3052                                          ControlFile->system_identifier);
3053                         ereport(emode,
3054                                         (errmsg("WAL file is from different system"),
3055                                          errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
3056                                                            fhdrident_str, sysident_str)));
3057                         return false;
3058                 }
3059                 if (longhdr->xlp_seg_size != XLogSegSize)
3060                 {
3061                         ereport(emode,
3062                                         (errmsg("WAL file is from different system"),
3063                                          errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
3064                         return false;
3065                 }
3066                 if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
3067                 {
3068                         ereport(emode,
3069                                         (errmsg("WAL file is from different system"),
3070                                          errdetail("Incorrect XLOG_BLCKSZ in page header.")));
3071                         return false;
3072                 }
3073         }
3074         else if (readOff == 0)
3075         {
3076                 /* hmm, first page of file doesn't have a long header? */
3077                 ereport(emode,
3078                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3079                                                 hdr->xlp_info, readId, readSeg, readOff)));
3080                 return false;
3081         }
3082
3083         recaddr.xlogid = readId;
3084         recaddr.xrecoff = readSeg * XLogSegSize + readOff;
3085         if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
3086         {
3087                 ereport(emode,
3088                                 (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
3089                                                 hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
3090                                                 readId, readSeg, readOff)));
3091                 return false;
3092         }
3093
3094         /*
3095          * Check page TLI is one of the expected values.
3096          */
3097         if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
3098         {
3099                 ereport(emode,
3100                                 (errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u",
3101                                                 hdr->xlp_tli,
3102                                                 readId, readSeg, readOff)));
3103                 return false;
3104         }
3105
3106         /*
3107          * Since child timelines are always assigned a TLI greater than their
3108          * immediate parent's TLI, we should never see TLI go backwards across
3109          * successive pages of a consistent WAL sequence.
3110          *
3111          * Of course this check should only be applied when advancing sequentially
3112          * across pages; therefore ReadRecord resets lastPageTLI to zero when
3113          * going to a random page.
3114          */
3115         if (hdr->xlp_tli < lastPageTLI)
3116         {
3117                 ereport(emode,
3118                                 (errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u",
3119                                                 hdr->xlp_tli, lastPageTLI,
3120                                                 readId, readSeg, readOff)));
3121                 return false;
3122         }
3123         lastPageTLI = hdr->xlp_tli;
3124         return true;
3125 }
3126
3127 /*
3128  * Try to read a timeline's history file.
3129  *
3130  * If successful, return the list of component TLIs (the given TLI followed by
3131  * its ancestor TLIs).  If we can't find the history file, assume that the
3132  * timeline has no parents, and return a list of just the specified timeline
3133  * ID.
3134  */
3135 static List *
3136 readTimeLineHistory(TimeLineID targetTLI)
3137 {
3138         List       *result;
3139         char            path[MAXPGPATH];
3140         char            histfname[MAXFNAMELEN];
3141         char            fline[MAXPGPATH];
3142         FILE       *fd;
3143
3144         if (InArchiveRecovery)
3145         {
3146                 TLHistoryFileName(histfname, targetTLI);
3147                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3148         }
3149         else
3150                 TLHistoryFilePath(path, targetTLI);
3151
3152         fd = AllocateFile(path, "r");
3153         if (fd == NULL)
3154         {
3155                 if (errno != ENOENT)
3156                         ereport(FATAL,
3157                                         (errcode_for_file_access(),
3158                                          errmsg("could not open file \"%s\": %m", path)));
3159                 /* Not there, so assume no parents */
3160                 return list_make1_int((int) targetTLI);
3161         }
3162
3163         result = NIL;
3164
3165         /*
3166          * Parse the file...
3167          */
3168         while (fgets(fline, MAXPGPATH, fd) != NULL)
3169         {
3170                 /* skip leading whitespace and check for # comment */
3171                 char       *ptr;
3172                 char       *endptr;
3173                 TimeLineID      tli;
3174
3175                 for (ptr = fline; *ptr; ptr++)
3176                 {
3177                         if (!isspace((unsigned char) *ptr))
3178                                 break;
3179                 }
3180                 if (*ptr == '\0' || *ptr == '#')
3181                         continue;
3182
3183                 /* expect a numeric timeline ID as first field of line */
3184                 tli = (TimeLineID) strtoul(ptr, &endptr, 0);
3185                 if (endptr == ptr)
3186                         ereport(FATAL,
3187                                         (errmsg("syntax error in history file: %s", fline),
3188                                          errhint("Expected a numeric timeline ID.")));
3189
3190                 if (result &&
3191                         tli <= (TimeLineID) linitial_int(result))
3192                         ereport(FATAL,
3193                                         (errmsg("invalid data in history file: %s", fline),
3194                                    errhint("Timeline IDs must be in increasing sequence.")));
3195
3196                 /* Build list with newest item first */
3197                 result = lcons_int((int) tli, result);
3198
3199                 /* we ignore the remainder of each line */
3200         }
3201
3202         FreeFile(fd);
3203
3204         if (result &&
3205                 targetTLI <= (TimeLineID) linitial_int(result))
3206                 ereport(FATAL,
3207                                 (errmsg("invalid data in history file \"%s\"", path),
3208                         errhint("Timeline IDs must be less than child timeline's ID.")));
3209
3210         result = lcons_int((int) targetTLI, result);
3211
3212         ereport(DEBUG3,
3213                         (errmsg_internal("history of timeline %u is %s",
3214                                                          targetTLI, nodeToString(result))));
3215
3216         return result;
3217 }
3218
3219 /*
3220  * Probe whether a timeline history file exists for the given timeline ID
3221  */
3222 static bool
3223 existsTimeLineHistory(TimeLineID probeTLI)
3224 {
3225         char            path[MAXPGPATH];
3226         char            histfname[MAXFNAMELEN];
3227         FILE       *fd;
3228
3229         if (InArchiveRecovery)
3230         {
3231                 TLHistoryFileName(histfname, probeTLI);
3232                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3233         }
3234         else
3235                 TLHistoryFilePath(path, probeTLI);
3236
3237         fd = AllocateFile(path, "r");
3238         if (fd != NULL)
3239         {
3240                 FreeFile(fd);
3241                 return true;
3242         }
3243         else
3244         {
3245                 if (errno != ENOENT)
3246                         ereport(FATAL,
3247                                         (errcode_for_file_access(),
3248                                          errmsg("could not open file \"%s\": %m", path)));
3249                 return false;
3250         }
3251 }
3252
3253 /*
3254  * Find the newest existing timeline, assuming that startTLI exists.
3255  *
3256  * Note: while this is somewhat heuristic, it does positively guarantee
3257  * that (result + 1) is not a known timeline, and therefore it should
3258  * be safe to assign that ID to a new timeline.
3259  */
3260 static TimeLineID
3261 findNewestTimeLine(TimeLineID startTLI)
3262 {
3263         TimeLineID      newestTLI;
3264         TimeLineID      probeTLI;
3265
3266         /*
3267          * The algorithm is just to probe for the existence of timeline history
3268          * files.  XXX is it useful to allow gaps in the sequence?
3269          */
3270         newestTLI = startTLI;
3271
3272         for (probeTLI = startTLI + 1;; probeTLI++)
3273         {
3274                 if (existsTimeLineHistory(probeTLI))
3275                 {
3276                         newestTLI = probeTLI;           /* probeTLI exists */
3277                 }
3278                 else
3279                 {
3280                         /* doesn't exist, assume we're done */
3281                         break;
3282                 }
3283         }
3284
3285         return newestTLI;
3286 }
3287
3288 /*
3289  * Create a new timeline history file.
3290  *
3291  *      newTLI: ID of the new timeline
3292  *      parentTLI: ID of its immediate parent
3293  *      endTLI et al: ID of the last used WAL file, for annotation purposes
3294  *
3295  * Currently this is only used during recovery, and so there are no locking
3296  * considerations.      But we should be just as tense as XLogFileInit to avoid
3297  * emplacing a bogus file.
3298  */
3299 static void
3300 writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
3301                                          TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
3302 {
3303         char            path[MAXPGPATH];
3304         char            tmppath[MAXPGPATH];
3305         char            histfname[MAXFNAMELEN];
3306         char            xlogfname[MAXFNAMELEN];
3307         char            buffer[BLCKSZ];
3308         int                     srcfd;
3309         int                     fd;
3310         int                     nbytes;
3311
3312         Assert(newTLI > parentTLI); /* else bad selection of newTLI */
3313
3314         /*
3315          * Write into a temp file name.
3316          */
3317         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3318
3319         unlink(tmppath);
3320
3321         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
3322         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
3323                                            S_IRUSR | S_IWUSR);
3324         if (fd < 0)
3325                 ereport(ERROR,
3326                                 (errcode_for_file_access(),
3327                                  errmsg("could not create file \"%s\": %m", tmppath)));
3328
3329         /*
3330          * If a history file exists for the parent, copy it verbatim
3331          */
3332         if (InArchiveRecovery)
3333         {
3334                 TLHistoryFileName(histfname, parentTLI);
3335                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3336         }
3337         else
3338                 TLHistoryFilePath(path, parentTLI);
3339
3340         srcfd = BasicOpenFile(path, O_RDONLY, 0);
3341         if (srcfd < 0)
3342         {
3343                 if (errno != ENOENT)
3344                         ereport(ERROR,
3345                                         (errcode_for_file_access(),
3346                                          errmsg("could not open file \"%s\": %m", path)));
3347                 /* Not there, so assume parent has no parents */
3348         }
3349         else
3350         {
3351                 for (;;)
3352                 {
3353                         errno = 0;
3354                         nbytes = (int) read(srcfd, buffer, sizeof(buffer));
3355                         if (nbytes < 0 || errno != 0)
3356                                 ereport(ERROR,
3357                                                 (errcode_for_file_access(),
3358                                                  errmsg("could not read file \"%s\": %m", path)));
3359                         if (nbytes == 0)
3360                                 break;
3361                         errno = 0;
3362                         if ((int) write(fd, buffer, nbytes) != nbytes)
3363                         {
3364                                 int                     save_errno = errno;
3365
3366                                 /*
3367                                  * If we fail to make the file, delete it to release disk
3368                                  * space
3369                                  */
3370                                 unlink(tmppath);
3371
3372                                 /*
3373                                  * if write didn't set errno, assume problem is no disk space
3374                                  */
3375                                 errno = save_errno ? save_errno : ENOSPC;
3376
3377                                 ereport(ERROR,
3378                                                 (errcode_for_file_access(),
3379                                          errmsg("could not write to file \"%s\": %m", tmppath)));
3380                         }
3381                 }
3382                 close(srcfd);
3383         }
3384
3385         /*
3386          * Append one line with the details of this timeline split.
3387          *
3388          * If we did have a parent file, insert an extra newline just in case the
3389          * parent file failed to end with one.
3390          */
3391         XLogFileName(xlogfname, endTLI, endLogId, endLogSeg);
3392
3393         snprintf(buffer, sizeof(buffer),
3394                          "%s%u\t%s\t%s transaction %u at %s\n",
3395                          (srcfd < 0) ? "" : "\n",
3396                          parentTLI,
3397                          xlogfname,
3398                          recoveryStopAfter ? "after" : "before",
3399                          recoveryStopXid,
3400                          str_time(recoveryStopTime));
3401
3402         nbytes = strlen(buffer);
3403         errno = 0;
3404         if ((int) write(fd, buffer, nbytes) != nbytes)
3405         {
3406                 int                     save_errno = errno;
3407
3408                 /*
3409                  * If we fail to make the file, delete it to release disk space
3410                  */
3411                 unlink(tmppath);
3412                 /* if write didn't set errno, assume problem is no disk space */
3413                 errno = save_errno ? save_errno : ENOSPC;
3414
3415                 ereport(ERROR,
3416                                 (errcode_for_file_access(),
3417                                  errmsg("could not write to file \"%s\": %m", tmppath)));
3418         }
3419
3420         if (pg_fsync(fd) != 0)
3421                 ereport(ERROR,
3422                                 (errcode_for_file_access(),
3423                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
3424
3425         if (close(fd))
3426                 ereport(ERROR,
3427                                 (errcode_for_file_access(),
3428                                  errmsg("could not close file \"%s\": %m", tmppath)));
3429
3430
3431         /*
3432          * Now move the completed history file into place with its final name.
3433          */
3434         TLHistoryFilePath(path, newTLI);
3435
3436         /*
3437          * Prefer link() to rename() here just to be really sure that we don't
3438          * overwrite an existing logfile.  However, there shouldn't be one, so
3439          * rename() is an acceptable substitute except for the truly paranoid.
3440          */
3441 #if HAVE_WORKING_LINK
3442         if (link(tmppath, path) < 0)
3443                 ereport(ERROR,
3444                                 (errcode_for_file_access(),
3445                                  errmsg("could not link file \"%s\" to \"%s\": %m",
3446                                                 tmppath, path)));
3447         unlink(tmppath);
3448 #else
3449         if (rename(tmppath, path) < 0)
3450                 ereport(ERROR,
3451                                 (errcode_for_file_access(),
3452                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
3453                                                 tmppath, path)));
3454 #endif
3455
3456         /* The history file can be archived immediately. */
3457         TLHistoryFileName(histfname, newTLI);
3458         XLogArchiveNotify(histfname);
3459 }
3460
3461 /*
3462  * I/O routines for pg_control
3463  *
3464  * *ControlFile is a buffer in shared memory that holds an image of the
3465  * contents of pg_control.      WriteControlFile() initializes pg_control
3466  * given a preloaded buffer, ReadControlFile() loads the buffer from
3467  * the pg_control file (during postmaster or standalone-backend startup),
3468  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
3469  *
3470  * For simplicity, WriteControlFile() initializes the fields of pg_control
3471  * that are related to checking backend/database compatibility, and
3472  * ReadControlFile() verifies they are correct.  We could split out the
3473  * I/O and compatibility-check functions, but there seems no need currently.
3474  */
3475 static void
3476 WriteControlFile(void)
3477 {
3478         int                     fd;
3479         char            buffer[PG_CONTROL_SIZE]; /* need not be aligned */
3480         char       *localeptr;
3481
3482         /*
3483          * Initialize version and compatibility-check fields
3484          */
3485         ControlFile->pg_control_version = PG_CONTROL_VERSION;
3486         ControlFile->catalog_version_no = CATALOG_VERSION_NO;
3487
3488         ControlFile->maxAlign = MAXIMUM_ALIGNOF;
3489         ControlFile->floatFormat = FLOATFORMAT_VALUE;
3490
3491         ControlFile->blcksz = BLCKSZ;
3492         ControlFile->relseg_size = RELSEG_SIZE;
3493         ControlFile->xlog_blcksz = XLOG_BLCKSZ;
3494         ControlFile->xlog_seg_size = XLOG_SEG_SIZE;
3495
3496         ControlFile->nameDataLen = NAMEDATALEN;
3497         ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
3498
3499 #ifdef HAVE_INT64_TIMESTAMP
3500         ControlFile->enableIntTimes = TRUE;
3501 #else
3502         ControlFile->enableIntTimes = FALSE;
3503 #endif
3504
3505         ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
3506         localeptr = setlocale(LC_COLLATE, NULL);
3507         if (!localeptr)
3508                 ereport(PANIC,
3509                                 (errmsg("invalid LC_COLLATE setting")));
3510         StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
3511         localeptr = setlocale(LC_CTYPE, NULL);
3512         if (!localeptr)
3513                 ereport(PANIC,
3514                                 (errmsg("invalid LC_CTYPE setting")));
3515         StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
3516
3517         /* Contents are protected with a CRC */
3518         INIT_CRC32(ControlFile->crc);
3519         COMP_CRC32(ControlFile->crc,
3520                            (char *) ControlFile,
3521                            offsetof(ControlFileData, crc));
3522         FIN_CRC32(ControlFile->crc);
3523
3524         /*
3525          * We write out PG_CONTROL_SIZE bytes into pg_control, zero-padding the
3526          * excess over sizeof(ControlFileData).  This reduces the odds of
3527          * premature-EOF errors when reading pg_control.  We'll still fail when we
3528          * check the contents of the file, but hopefully with a more specific
3529          * error than "couldn't read pg_control".
3530          */
3531         if (sizeof(ControlFileData) > PG_CONTROL_SIZE)
3532                 elog(PANIC, "sizeof(ControlFileData) is larger than PG_CONTROL_SIZE; fix either one");
3533
3534         memset(buffer, 0, PG_CONTROL_SIZE);
3535         memcpy(buffer, ControlFile, sizeof(ControlFileData));
3536
3537         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3538                                            O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
3539                                            S_IRUSR | S_IWUSR);
3540         if (fd < 0)
3541                 ereport(PANIC,
3542                                 (errcode_for_file_access(),
3543                                  errmsg("could not create control file \"%s\": %m",
3544                                                 XLOG_CONTROL_FILE)));
3545
3546         errno = 0;
3547         if (write(fd, buffer, PG_CONTROL_SIZE) != PG_CONTROL_SIZE)
3548         {
3549                 /* if write didn't set errno, assume problem is no disk space */
3550                 if (errno == 0)
3551                         errno = ENOSPC;
3552                 ereport(PANIC,
3553                                 (errcode_for_file_access(),
3554                                  errmsg("could not write to control file: %m")));
3555         }
3556
3557         if (pg_fsync(fd) != 0)
3558                 ereport(PANIC,
3559                                 (errcode_for_file_access(),
3560                                  errmsg("could not fsync control file: %m")));
3561
3562         if (close(fd))
3563                 ereport(PANIC,
3564                                 (errcode_for_file_access(),
3565                                  errmsg("could not close control file: %m")));
3566 }
3567
3568 static void
3569 ReadControlFile(void)
3570 {
3571         pg_crc32        crc;
3572         int                     fd;
3573
3574         /*
3575          * Read data...
3576          */
3577         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3578                                            O_RDWR | PG_BINARY,
3579                                            S_IRUSR | S_IWUSR);
3580         if (fd < 0)
3581                 ereport(PANIC,
3582                                 (errcode_for_file_access(),
3583                                  errmsg("could not open control file \"%s\": %m",
3584                                                 XLOG_CONTROL_FILE)));
3585
3586         if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3587                 ereport(PANIC,
3588                                 (errcode_for_file_access(),
3589                                  errmsg("could not read from control file: %m")));
3590
3591         close(fd);
3592
3593         /*
3594          * Check for expected pg_control format version.  If this is wrong, the
3595          * CRC check will likely fail because we'll be checking the wrong number
3596          * of bytes.  Complaining about wrong version will probably be more
3597          * enlightening than complaining about wrong CRC.
3598          */
3599         if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
3600                 ereport(FATAL,
3601                                 (errmsg("database files are incompatible with server"),
3602                                  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
3603                                   " but the server was compiled with PG_CONTROL_VERSION %d.",
3604                                                 ControlFile->pg_control_version, PG_CONTROL_VERSION),
3605                                  errhint("It looks like you need to initdb.")));
3606         /* Now check the CRC. */
3607         INIT_CRC32(crc);
3608         COMP_CRC32(crc,
3609                            (char *) ControlFile,
3610                            offsetof(ControlFileData, crc));
3611         FIN_CRC32(crc);
3612
3613         if (!EQ_CRC32(crc, ControlFile->crc))
3614                 ereport(FATAL,
3615                                 (errmsg("incorrect checksum in control file")));
3616
3617         /*
3618          * Do compatibility checking immediately.  We do this here for 2 reasons:
3619          *
3620          * (1) if the database isn't compatible with the backend executable, we
3621          * want to abort before we can possibly do any damage;
3622          *
3623          * (2) this code is executed in the postmaster, so the setlocale() will
3624          * propagate to forked backends, which aren't going to read this file for
3625          * themselves.  (These locale settings are considered critical
3626          * compatibility items because they can affect sort order of indexes.)
3627          */
3628         if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
3629                 ereport(FATAL,
3630                                 (errmsg("database files are incompatible with server"),
3631                                  errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
3632                                   " but the server was compiled with CATALOG_VERSION_NO %d.",
3633                                                 ControlFile->catalog_version_no, CATALOG_VERSION_NO),
3634                                  errhint("It looks like you need to initdb.")));
3635         if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
3636                 ereport(FATAL,
3637                                 (errmsg("database files are incompatible with server"),
3638                    errdetail("The database cluster was initialized with MAXALIGN %d,"
3639                                          " but the server was compiled with MAXALIGN %d.",
3640                                          ControlFile->maxAlign, MAXIMUM_ALIGNOF),
3641                                  errhint("It looks like you need to initdb.")));
3642         if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
3643                 ereport(FATAL,
3644                                 (errmsg("database files are incompatible with server"),
3645                                  errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
3646                                  errhint("It looks like you need to initdb.")));
3647         if (ControlFile->blcksz != BLCKSZ)
3648                 ereport(FATAL,
3649                                 (errmsg("database files are incompatible with server"),
3650                          errdetail("The database cluster was initialized with BLCKSZ %d,"
3651                                            " but the server was compiled with BLCKSZ %d.",
3652                                            ControlFile->blcksz, BLCKSZ),
3653                                  errhint("It looks like you need to recompile or initdb.")));
3654         if (ControlFile->relseg_size != RELSEG_SIZE)
3655                 ereport(FATAL,
3656                                 (errmsg("database files are incompatible with server"),
3657                 errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
3658                                   " but the server was compiled with RELSEG_SIZE %d.",
3659                                   ControlFile->relseg_size, RELSEG_SIZE),
3660                                  errhint("It looks like you need to recompile or initdb.")));
3661         if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
3662                 ereport(FATAL,
3663                                 (errmsg("database files are incompatible with server"),
3664                          errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
3665                                            " but the server was compiled with XLOG_BLCKSZ %d.",
3666                                            ControlFile->xlog_blcksz, XLOG_BLCKSZ),
3667                                  errhint("It looks like you need to recompile or initdb.")));
3668         if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
3669                 ereport(FATAL,
3670                                 (errmsg("database files are incompatible with server"),
3671                                  errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d,"
3672                                            " but the server was compiled with XLOG_SEG_SIZE %d.",
3673                                                    ControlFile->xlog_seg_size, XLOG_SEG_SIZE),
3674                                  errhint("It looks like you need to recompile or initdb.")));
3675         if (ControlFile->nameDataLen != NAMEDATALEN)
3676                 ereport(FATAL,
3677                                 (errmsg("database files are incompatible with server"),
3678                 errdetail("The database cluster was initialized with NAMEDATALEN %d,"
3679                                   " but the server was compiled with NAMEDATALEN %d.",
3680                                   ControlFile->nameDataLen, NAMEDATALEN),
3681                                  errhint("It looks like you need to recompile or initdb.")));
3682         if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
3683                 ereport(FATAL,
3684                                 (errmsg("database files are incompatible with server"),
3685                                  errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
3686                                           " but the server was compiled with INDEX_MAX_KEYS %d.",
3687                                                    ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
3688                                  errhint("It looks like you need to recompile or initdb.")));
3689
3690 #ifdef HAVE_INT64_TIMESTAMP
3691         if (ControlFile->enableIntTimes != TRUE)
3692                 ereport(FATAL,
3693                                 (errmsg("database files are incompatible with server"),
3694                                  errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
3695                                   " but the server was compiled with HAVE_INT64_TIMESTAMP."),
3696                                  errhint("It looks like you need to recompile or initdb.")));
3697 #else
3698         if (ControlFile->enableIntTimes != FALSE)
3699                 ereport(FATAL,
3700                                 (errmsg("database files are incompatible with server"),
3701                                  errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
3702                            " but the server was compiled without HAVE_INT64_TIMESTAMP."),
3703                                  errhint("It looks like you need to recompile or initdb.")));
3704 #endif
3705
3706         if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
3707                 ereport(FATAL,
3708                                 (errmsg("database files are incompatible with server"),
3709                                  errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
3710                                   " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
3711                                                    ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
3712                                  errhint("It looks like you need to recompile or initdb.")));
3713         if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
3714                 ereport(FATAL,
3715                         (errmsg("database files are incompatible with operating system"),
3716                          errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
3717                                            " which is not recognized by setlocale().",
3718                                            ControlFile->lc_collate),
3719                          errhint("It looks like you need to initdb or install locale support.")));
3720         if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
3721                 ereport(FATAL,
3722                         (errmsg("database files are incompatible with operating system"),
3723                 errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
3724                                   " which is not recognized by setlocale().",
3725                                   ControlFile->lc_ctype),
3726                          errhint("It looks like you need to initdb or install locale support.")));
3727
3728         /* Make the fixed locale settings visible as GUC variables, too */
3729         SetConfigOption("lc_collate", ControlFile->lc_collate,
3730                                         PGC_INTERNAL, PGC_S_OVERRIDE);
3731         SetConfigOption("lc_ctype", ControlFile->lc_ctype,
3732                                         PGC_INTERNAL, PGC_S_OVERRIDE);
3733 }
3734
3735 void
3736 UpdateControlFile(void)
3737 {
3738         int                     fd;
3739
3740         INIT_CRC32(ControlFile->crc);
3741         COMP_CRC32(ControlFile->crc,
3742                            (char *) ControlFile,
3743                            offsetof(ControlFileData, crc));
3744         FIN_CRC32(ControlFile->crc);
3745
3746         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3747                                            O_RDWR | PG_BINARY,
3748                                            S_IRUSR | S_IWUSR);
3749         if (fd < 0)
3750                 ereport(PANIC,
3751                                 (errcode_for_file_access(),
3752                                  errmsg("could not open control file \"%s\": %m",
3753                                                 XLOG_CONTROL_FILE)));
3754
3755         errno = 0;
3756         if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3757         {
3758                 /* if write didn't set errno, assume problem is no disk space */
3759                 if (errno == 0)
3760                         errno = ENOSPC;
3761                 ereport(PANIC,
3762                                 (errcode_for_file_access(),
3763                                  errmsg("could not write to control file: %m")));
3764         }
3765
3766         if (pg_fsync(fd) != 0)
3767                 ereport(PANIC,
3768                                 (errcode_for_file_access(),
3769                                  errmsg("could not fsync control file: %m")));
3770
3771         if (close(fd))
3772                 ereport(PANIC,
3773                                 (errcode_for_file_access(),
3774                                  errmsg("could not close control file: %m")));
3775 }
3776
3777 /*
3778  * Initialization of shared memory for XLOG
3779  */
3780 Size
3781 XLOGShmemSize(void)
3782 {
3783         Size            size;
3784
3785         /* XLogCtl */
3786         size = sizeof(XLogCtlData);
3787         /* xlblocks array */
3788         size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
3789         /* extra alignment padding for XLOG I/O buffers */
3790         size = add_size(size, ALIGNOF_XLOG_BUFFER);
3791         /* and the buffers themselves */
3792         size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
3793
3794         /*
3795          * Note: we don't count ControlFileData, it comes out of the "slop factor"
3796          * added by CreateSharedMemoryAndSemaphores.  This lets us use this
3797          * routine again below to compute the actual allocation size.
3798          */
3799
3800         return size;
3801 }
3802
3803 void
3804 XLOGShmemInit(void)
3805 {
3806         bool            foundCFile,
3807                                 foundXLog;
3808         char       *allocptr;
3809
3810         ControlFile = (ControlFileData *)
3811                 ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
3812         XLogCtl = (XLogCtlData *)
3813                 ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
3814
3815         if (foundCFile || foundXLog)
3816         {
3817                 /* both should be present or neither */
3818                 Assert(foundCFile && foundXLog);
3819                 return;
3820         }
3821
3822         memset(XLogCtl, 0, sizeof(XLogCtlData));
3823
3824         /*
3825          * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
3826          * multiple of the alignment for same, so no extra alignment padding is
3827          * needed here.
3828          */
3829         allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
3830         XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
3831         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
3832         allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
3833
3834         /*
3835          * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
3836          */
3837         allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
3838         XLogCtl->pages = allocptr;
3839         memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
3840
3841         /*
3842          * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
3843          * in additional info.)
3844          */
3845         XLogCtl->XLogCacheByte = (Size) XLOG_BLCKSZ * XLOGbuffers;
3846
3847         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
3848         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
3849         SpinLockInit(&XLogCtl->info_lck);
3850
3851         /*
3852          * If we are not in bootstrap mode, pg_control should already exist. Read
3853          * and validate it immediately (see comments in ReadControlFile() for the
3854          * reasons why).
3855          */
3856         if (!IsBootstrapProcessingMode())
3857                 ReadControlFile();
3858 }
3859
3860 /*
3861  * This func must be called ONCE on system install.  It creates pg_control
3862  * and the initial XLOG segment.
3863  */
3864 void
3865 BootStrapXLOG(void)
3866 {
3867         CheckPoint      checkPoint;
3868         char       *buffer;
3869         XLogPageHeader page;
3870         XLogLongPageHeader longpage;
3871         XLogRecord *record;
3872         bool            use_existent;
3873         uint64          sysidentifier;
3874         struct timeval tv;
3875         pg_crc32        crc;
3876
3877         /*
3878          * Select a hopefully-unique system identifier code for this installation.
3879          * We use the result of gettimeofday(), including the fractional seconds
3880          * field, as being about as unique as we can easily get.  (Think not to
3881          * use random(), since it hasn't been seeded and there's no portable way
3882          * to seed it other than the system clock value...)  The upper half of the
3883          * uint64 value is just the tv_sec part, while the lower half is the XOR
3884          * of tv_sec and tv_usec.  This is to ensure that we don't lose uniqueness
3885          * unnecessarily if "uint64" is really only 32 bits wide.  A person
3886          * knowing this encoding can determine the initialization time of the
3887          * installation, which could perhaps be useful sometimes.
3888          */
3889         gettimeofday(&tv, NULL);
3890         sysidentifier = ((uint64) tv.tv_sec) << 32;
3891         sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
3892
3893         /* First timeline ID is always 1 */
3894         ThisTimeLineID = 1;
3895
3896         /* page buffer must be aligned suitably for O_DIRECT */
3897         buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
3898         page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
3899         memset(page, 0, XLOG_BLCKSZ);
3900
3901         /* Set up information for the initial checkpoint record */
3902         checkPoint.redo.xlogid = 0;
3903         checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
3904         checkPoint.undo = checkPoint.redo;
3905         checkPoint.ThisTimeLineID = ThisTimeLineID;
3906         checkPoint.nextXid = FirstNormalTransactionId;
3907         checkPoint.nextOid = FirstBootstrapObjectId;
3908         checkPoint.nextMulti = FirstMultiXactId;
3909         checkPoint.nextMultiOffset = 0;
3910         checkPoint.time = time(NULL);
3911
3912         ShmemVariableCache->nextXid = checkPoint.nextXid;
3913         ShmemVariableCache->nextOid = checkPoint.nextOid;
3914         ShmemVariableCache->oidCount = 0;
3915         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
3916
3917         /* Set up the XLOG page header */
3918         page->xlp_magic = XLOG_PAGE_MAGIC;
3919         page->xlp_info = XLP_LONG_HEADER;
3920         page->xlp_tli = ThisTimeLineID;
3921         page->xlp_pageaddr.xlogid = 0;
3922         page->xlp_pageaddr.xrecoff = 0;
3923         longpage = (XLogLongPageHeader) page;
3924         longpage->xlp_sysid = sysidentifier;
3925         longpage->xlp_seg_size = XLogSegSize;
3926         longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
3927
3928         /* Insert the initial checkpoint record */
3929         record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
3930         record->xl_prev.xlogid = 0;
3931         record->xl_prev.xrecoff = 0;
3932         record->xl_xid = InvalidTransactionId;
3933         record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
3934         record->xl_len = sizeof(checkPoint);
3935         record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
3936         record->xl_rmid = RM_XLOG_ID;
3937         memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
3938
3939         INIT_CRC32(crc);
3940         COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
3941         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
3942                            SizeOfXLogRecord - sizeof(pg_crc32));
3943         FIN_CRC32(crc);
3944         record->xl_crc = crc;
3945
3946         /* Create first XLOG segment file */
3947         use_existent = false;
3948         openLogFile = XLogFileInit(0, 0, &use_existent, false);
3949
3950         /* Write the first page with the initial record */
3951         errno = 0;
3952         if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3953         {
3954                 /* if write didn't set errno, assume problem is no disk space */
3955                 if (errno == 0)
3956                         errno = ENOSPC;
3957                 ereport(PANIC,
3958                                 (errcode_for_file_access(),
3959                           errmsg("could not write bootstrap transaction log file: %m")));
3960         }
3961
3962         if (pg_fsync(openLogFile) != 0)
3963                 ereport(PANIC,
3964                                 (errcode_for_file_access(),
3965                           errmsg("could not fsync bootstrap transaction log file: %m")));
3966
3967         if (close(openLogFile))
3968                 ereport(PANIC,
3969                                 (errcode_for_file_access(),
3970                           errmsg("could not close bootstrap transaction log file: %m")));
3971
3972         openLogFile = -1;
3973
3974         /* Now create pg_control */
3975
3976         memset(ControlFile, 0, sizeof(ControlFileData));
3977         /* Initialize pg_control status fields */
3978         ControlFile->system_identifier = sysidentifier;
3979         ControlFile->state = DB_SHUTDOWNED;
3980         ControlFile->time = checkPoint.time;
3981         ControlFile->logId = 0;
3982         ControlFile->logSeg = 1;
3983         ControlFile->checkPoint = checkPoint.redo;
3984         ControlFile->checkPointCopy = checkPoint;
3985         /* some additional ControlFile fields are set in WriteControlFile() */
3986
3987         WriteControlFile();
3988
3989         /* Bootstrap the commit log, too */
3990         BootStrapCLOG();
3991         BootStrapSUBTRANS();
3992         BootStrapMultiXact();
3993
3994         pfree(buffer);
3995 }
3996
3997 static char *
3998 str_time(time_t tnow)
3999 {
4000         static char buf[128];
4001
4002         strftime(buf, sizeof(buf),
4003                          "%Y-%m-%d %H:%M:%S %Z",
4004                          localtime(&tnow));
4005
4006         return buf;
4007 }
4008
4009 /*
4010  * See if there is a recovery command file (recovery.conf), and if so
4011  * read in parameters for archive recovery.
4012  *
4013  * XXX longer term intention is to expand this to
4014  * cater for additional parameters and controls
4015  * possibly use a flex lexer similar to the GUC one
4016  */
4017 static void
4018 readRecoveryCommandFile(void)
4019 {
4020         FILE       *fd;
4021         char            cmdline[MAXPGPATH];
4022         TimeLineID      rtli = 0;
4023         bool            rtliGiven = false;
4024         bool            syntaxError = false;
4025
4026         fd = AllocateFile(RECOVERY_COMMAND_FILE, "r");
4027         if (fd == NULL)
4028         {
4029                 if (errno == ENOENT)
4030                         return;                         /* not there, so no archive recovery */
4031                 ereport(FATAL,
4032                                 (errcode_for_file_access(),
4033                                  errmsg("could not open recovery command file \"%s\": %m",
4034                                                 RECOVERY_COMMAND_FILE)));
4035         }
4036
4037         ereport(LOG,
4038                         (errmsg("starting archive recovery")));
4039
4040         /*
4041          * Parse the file...
4042          */
4043         while (fgets(cmdline, MAXPGPATH, fd) != NULL)
4044         {
4045                 /* skip leading whitespace and check for # comment */
4046                 char       *ptr;
4047                 char       *tok1;
4048                 char       *tok2;
4049
4050                 for (ptr = cmdline; *ptr; ptr++)
4051                 {
4052                         if (!isspace((unsigned char) *ptr))
4053                                 break;
4054                 }
4055                 if (*ptr == '\0' || *ptr == '#')
4056                         continue;
4057
4058                 /* identify the quoted parameter value */
4059                 tok1 = strtok(ptr, "'");
4060                 if (!tok1)
4061                 {
4062                         syntaxError = true;
4063                         break;
4064                 }
4065                 tok2 = strtok(NULL, "'");
4066                 if (!tok2)
4067                 {
4068                         syntaxError = true;
4069                         break;
4070                 }
4071                 /* reparse to get just the parameter name */
4072                 tok1 = strtok(ptr, " \t=");
4073                 if (!tok1)
4074                 {
4075                         syntaxError = true;
4076                         break;
4077                 }
4078
4079                 if (strcmp(tok1, "restore_command") == 0)
4080                 {
4081                         recoveryRestoreCommand = pstrdup(tok2);
4082                         ereport(LOG,
4083                                         (errmsg("restore_command = \"%s\"",
4084                                                         recoveryRestoreCommand)));
4085                 }
4086                 else if (strcmp(tok1, "recovery_target_timeline") == 0)
4087                 {
4088                         rtliGiven = true;
4089                         if (strcmp(tok2, "latest") == 0)
4090                                 rtli = 0;
4091                         else
4092                         {
4093                                 errno = 0;
4094                                 rtli = (TimeLineID) strtoul(tok2, NULL, 0);
4095                                 if (errno == EINVAL || errno == ERANGE)
4096                                         ereport(FATAL,
4097                                                         (errmsg("recovery_target_timeline is not a valid number: \"%s\"",
4098                                                                         tok2)));
4099                         }
4100                         if (rtli)
4101                                 ereport(LOG,
4102                                                 (errmsg("recovery_target_timeline = %u", rtli)));
4103                         else
4104                                 ereport(LOG,
4105                                                 (errmsg("recovery_target_timeline = latest")));
4106                 }
4107                 else if (strcmp(tok1, "recovery_target_xid") == 0)
4108                 {
4109                         errno = 0;
4110                         recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0);
4111                         if (errno == EINVAL || errno == ERANGE)
4112                                 ereport(FATAL,
4113                                  (errmsg("recovery_target_xid is not a valid number: \"%s\"",
4114                                                  tok2)));
4115                         ereport(LOG,
4116                                         (errmsg("recovery_target_xid = %u",
4117                                                         recoveryTargetXid)));
4118                         recoveryTarget = true;
4119                         recoveryTargetExact = true;
4120                 }
4121                 else if (strcmp(tok1, "recovery_target_time") == 0)
4122                 {
4123                         /*
4124                          * if recovery_target_xid specified, then this overrides
4125                          * recovery_target_time
4126                          */
4127                         if (recoveryTargetExact)
4128                                 continue;
4129                         recoveryTarget = true;
4130                         recoveryTargetExact = false;
4131
4132                         /*
4133                          * Convert the time string given by the user to the time_t format.
4134                          * We use type abstime's input converter because we know abstime
4135                          * has the same representation as time_t.
4136                          */
4137                         recoveryTargetTime = (time_t)
4138                                 DatumGetAbsoluteTime(DirectFunctionCall1(abstimein,
4139                                                                                                          CStringGetDatum(tok2)));
4140                         ereport(LOG,
4141                                         (errmsg("recovery_target_time = %s",
4142                                                         DatumGetCString(DirectFunctionCall1(abstimeout,
4143                                 AbsoluteTimeGetDatum((AbsoluteTime) recoveryTargetTime))))));
4144                 }
4145                 else if (strcmp(tok1, "recovery_target_inclusive") == 0)
4146                 {
4147                         /*
4148                          * does nothing if a recovery_target is not also set
4149                          */
4150                         if (strcmp(tok2, "true") == 0)
4151                                 recoveryTargetInclusive = true;
4152                         else
4153                         {
4154                                 recoveryTargetInclusive = false;
4155                                 tok2 = "false";
4156                         }
4157                         ereport(LOG,
4158                                         (errmsg("recovery_target_inclusive = %s", tok2)));
4159                 }
4160                 else
4161                         ereport(FATAL,
4162                                         (errmsg("unrecognized recovery parameter \"%s\"",
4163                                                         tok1)));
4164         }
4165
4166         FreeFile(fd);
4167
4168         if (syntaxError)
4169                 ereport(FATAL,
4170                                 (errmsg("syntax error in recovery command file: %s",
4171                                                 cmdline),
4172                           errhint("Lines should have the format parameter = 'value'.")));
4173
4174         /* Check that required parameters were supplied */
4175         if (recoveryRestoreCommand == NULL)
4176                 ereport(FATAL,
4177                                 (errmsg("recovery command file \"%s\" did not specify restore_command",
4178                                                 RECOVERY_COMMAND_FILE)));
4179
4180         /* Enable fetching from archive recovery area */
4181         InArchiveRecovery = true;
4182
4183         /*
4184          * If user specified recovery_target_timeline, validate it or compute the
4185          * "latest" value.      We can't do this until after we've gotten the restore
4186          * command and set InArchiveRecovery, because we need to fetch timeline
4187          * history files from the archive.
4188          */
4189         if (rtliGiven)
4190         {
4191                 if (rtli)
4192                 {
4193                         /* Timeline 1 does not have a history file, all else should */
4194                         if (rtli != 1 && !existsTimeLineHistory(rtli))
4195                                 ereport(FATAL,
4196                                                 (errmsg("recovery_target_timeline %u does not exist",
4197                                                                 rtli)));
4198                         recoveryTargetTLI = rtli;
4199                 }
4200                 else
4201                 {
4202                         /* We start the "latest" search from pg_control's timeline */
4203                         recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI);
4204                 }
4205         }
4206 }
4207
4208 /*
4209  * Exit archive-recovery state
4210  */
4211 static void
4212 exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
4213 {
4214         char            recoveryPath[MAXPGPATH];
4215         char            xlogpath[MAXPGPATH];
4216
4217         /*
4218          * We are no longer in archive recovery state.
4219          */
4220         InArchiveRecovery = false;
4221
4222         /*
4223          * We should have the ending log segment currently open.  Verify, and then
4224          * close it (to avoid problems on Windows with trying to rename or delete
4225          * an open file).
4226          */
4227         Assert(readFile >= 0);
4228         Assert(readId == endLogId);
4229         Assert(readSeg == endLogSeg);
4230
4231         close(readFile);
4232         readFile = -1;
4233
4234         /*
4235          * If the segment was fetched from archival storage, we want to replace
4236          * the existing xlog segment (if any) with the archival version.  This is
4237          * because whatever is in XLOGDIR is very possibly older than what we have
4238          * from the archives, since it could have come from restoring a PGDATA
4239          * backup.      In any case, the archival version certainly is more
4240          * descriptive of what our current database state is, because that is what
4241          * we replayed from.
4242          *
4243          * Note that if we are establishing a new timeline, ThisTimeLineID is
4244          * already set to the new value, and so we will create a new file instead
4245          * of overwriting any existing file.
4246          */
4247         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
4248         XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4249
4250         if (restoredFromArchive)
4251         {
4252                 ereport(DEBUG3,
4253                                 (errmsg_internal("moving last restored xlog to \"%s\"",
4254                                                                  xlogpath)));
4255                 unlink(xlogpath);               /* might or might not exist */
4256                 if (rename(recoveryPath, xlogpath) != 0)
4257                         ereport(FATAL,
4258                                         (errcode_for_file_access(),
4259                                          errmsg("could not rename file \"%s\" to \"%s\": %m",
4260                                                         recoveryPath, xlogpath)));
4261                 /* XXX might we need to fix permissions on the file? */
4262         }
4263         else
4264         {
4265                 /*
4266                  * If the latest segment is not archival, but there's still a
4267                  * RECOVERYXLOG laying about, get rid of it.
4268                  */
4269                 unlink(recoveryPath);   /* ignore any error */
4270
4271                 /*
4272                  * If we are establishing a new timeline, we have to copy data from
4273                  * the last WAL segment of the old timeline to create a starting WAL
4274                  * segment for the new timeline.
4275                  */
4276                 if (endTLI != ThisTimeLineID)
4277                         XLogFileCopy(endLogId, endLogSeg,
4278                                                  endTLI, endLogId, endLogSeg);
4279         }
4280
4281         /*
4282          * Let's just make real sure there are not .ready or .done flags posted
4283          * for the new segment.
4284          */
4285         XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4286         XLogArchiveCleanup(xlogpath);
4287
4288         /* Get rid of any remaining recovered timeline-history file, too */
4289         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
4290         unlink(recoveryPath);           /* ignore any error */
4291
4292         /*
4293          * Rename the config file out of the way, so that we don't accidentally
4294          * re-enter archive recovery mode in a subsequent crash.
4295          */
4296         unlink(RECOVERY_COMMAND_DONE);
4297         if (rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE) != 0)
4298                 ereport(FATAL,
4299                                 (errcode_for_file_access(),
4300                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
4301                                                 RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE)));
4302
4303         ereport(LOG,
4304                         (errmsg("archive recovery complete")));
4305 }
4306
4307 /*
4308  * For point-in-time recovery, this function decides whether we want to
4309  * stop applying the XLOG at or after the current record.
4310  *
4311  * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
4312  * *includeThis is set TRUE if we should apply this record before stopping.
4313  * Also, some information is saved in recoveryStopXid et al for use in
4314  * annotating the new timeline's history file.
4315  */
4316 static bool
4317 recoveryStopsHere(XLogRecord *record, bool *includeThis)
4318 {
4319         bool            stopsHere;
4320         uint8           record_info;
4321         time_t          recordXtime;
4322
4323         /* Do we have a PITR target at all? */
4324         if (!recoveryTarget)
4325                 return false;
4326
4327         /* We only consider stopping at COMMIT or ABORT records */
4328         if (record->xl_rmid != RM_XACT_ID)
4329                 return false;
4330         record_info = record->xl_info & ~XLR_INFO_MASK;
4331         if (record_info == XLOG_XACT_COMMIT)
4332         {
4333                 xl_xact_commit *recordXactCommitData;
4334
4335                 recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
4336                 recordXtime = recordXactCommitData->xtime;
4337         }
4338         else if (record_info == XLOG_XACT_ABORT)
4339         {
4340                 xl_xact_abort *recordXactAbortData;
4341
4342                 recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
4343                 recordXtime = recordXactAbortData->xtime;
4344         }
4345         else
4346                 return false;
4347
4348         if (recoveryTargetExact)
4349         {
4350                 /*
4351                  * there can be only one transaction end record with this exact
4352                  * transactionid
4353                  *
4354                  * when testing for an xid, we MUST test for equality only, since
4355                  * transactions are numbered in the order they start, not the order
4356                  * they complete. A higher numbered xid will complete before you about
4357                  * 50% of the time...
4358                  */
4359                 stopsHere = (record->xl_xid == recoveryTargetXid);
4360                 if (stopsHere)
4361                         *includeThis = recoveryTargetInclusive;
4362         }
4363         else
4364         {
4365                 /*
4366                  * there can be many transactions that share the same commit time, so
4367                  * we stop after the last one, if we are inclusive, or stop at the
4368                  * first one if we are exclusive
4369                  */
4370                 if (recoveryTargetInclusive)
4371                         stopsHere = (recordXtime > recoveryTargetTime);
4372                 else
4373                         stopsHere = (recordXtime >= recoveryTargetTime);
4374                 if (stopsHere)
4375                         *includeThis = false;
4376         }
4377
4378         if (stopsHere)
4379         {
4380                 recoveryStopXid = record->xl_xid;
4381                 recoveryStopTime = recordXtime;
4382                 recoveryStopAfter = *includeThis;
4383
4384                 if (record_info == XLOG_XACT_COMMIT)
4385                 {
4386                         if (recoveryStopAfter)
4387                                 ereport(LOG,
4388                                                 (errmsg("recovery stopping after commit of transaction %u, time %s",
4389                                                           recoveryStopXid, str_time(recoveryStopTime))));
4390                         else
4391                                 ereport(LOG,
4392                                                 (errmsg("recovery stopping before commit of transaction %u, time %s",
4393                                                           recoveryStopXid, str_time(recoveryStopTime))));
4394                 }
4395                 else
4396                 {
4397                         if (recoveryStopAfter)
4398                                 ereport(LOG,
4399                                                 (errmsg("recovery stopping after abort of transaction %u, time %s",
4400                                                           recoveryStopXid, str_time(recoveryStopTime))));
4401                         else
4402                                 ereport(LOG,
4403                                                 (errmsg("recovery stopping before abort of transaction %u, time %s",
4404                                                           recoveryStopXid, str_time(recoveryStopTime))));
4405                 }
4406         }
4407
4408         return stopsHere;
4409 }
4410
4411 /*
4412  * This must be called ONCE during postmaster or standalone-backend startup
4413  */
4414 void
4415 StartupXLOG(void)
4416 {
4417         XLogCtlInsert *Insert;
4418         CheckPoint      checkPoint;
4419         bool            wasShutdown;
4420         bool            needNewTimeLine = false;
4421         XLogRecPtr      RecPtr,
4422                                 LastRec,
4423                                 checkPointLoc,
4424                                 EndOfLog;
4425         uint32          endLogId;
4426         uint32          endLogSeg;
4427         XLogRecord *record;
4428         uint32          freespace;
4429         TransactionId oldestActiveXID;
4430
4431         CritSectionCount++;
4432
4433         /*
4434          * Read control file and check XLOG status looks valid.
4435          *
4436          * Note: in most control paths, *ControlFile is already valid and we need
4437          * not do ReadControlFile() here, but might as well do it to be sure.
4438          */
4439         ReadControlFile();
4440
4441         if (ControlFile->logSeg == 0 ||
4442                 ControlFile->state < DB_SHUTDOWNED ||
4443                 ControlFile->state > DB_IN_PRODUCTION ||
4444                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
4445                 ereport(FATAL,
4446                                 (errmsg("control file contains invalid data")));
4447
4448         if (ControlFile->state == DB_SHUTDOWNED)
4449                 ereport(LOG,
4450                                 (errmsg("database system was shut down at %s",
4451                                                 str_time(ControlFile->time))));
4452         else if (ControlFile->state == DB_SHUTDOWNING)
4453                 ereport(LOG,
4454                                 (errmsg("database system shutdown was interrupted at %s",
4455                                                 str_time(ControlFile->time))));
4456         else if (ControlFile->state == DB_IN_RECOVERY)
4457                 ereport(LOG,
4458                    (errmsg("database system was interrupted while in recovery at %s",
4459                                    str_time(ControlFile->time)),
4460                         errhint("This probably means that some data is corrupted and"
4461                                         " you will have to use the last backup for recovery.")));
4462         else if (ControlFile->state == DB_IN_PRODUCTION)
4463                 ereport(LOG,
4464                                 (errmsg("database system was interrupted at %s",
4465                                                 str_time(ControlFile->time))));
4466
4467         /* This is just to allow attaching to startup process with a debugger */
4468 #ifdef XLOG_REPLAY_DELAY
4469         if (ControlFile->state != DB_SHUTDOWNED)
4470                 pg_usleep(60000000L);
4471 #endif
4472
4473         /*
4474          * Initialize on the assumption we want to recover to the same timeline
4475          * that's active according to pg_control.
4476          */
4477         recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
4478
4479         /*
4480          * Check for recovery control file, and if so set up state for offline
4481          * recovery
4482          */
4483         readRecoveryCommandFile();
4484
4485         /* Now we can determine the list of expected TLIs */
4486         expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
4487
4488         /*
4489          * If pg_control's timeline is not in expectedTLIs, then we cannot
4490          * proceed: the backup is not part of the history of the requested
4491          * timeline.
4492          */
4493         if (!list_member_int(expectedTLIs,
4494                                                  (int) ControlFile->checkPointCopy.ThisTimeLineID))
4495                 ereport(FATAL,
4496                                 (errmsg("requested timeline %u is not a child of database system timeline %u",
4497                                                 recoveryTargetTLI,
4498                                                 ControlFile->checkPointCopy.ThisTimeLineID)));
4499
4500         if (read_backup_label(&checkPointLoc))
4501         {
4502                 /*
4503                  * When a backup_label file is present, we want to roll forward from
4504                  * the checkpoint it identifies, rather than using pg_control.
4505                  */
4506                 record = ReadCheckpointRecord(checkPointLoc, 0);
4507                 if (record != NULL)
4508                 {
4509                         ereport(LOG,
4510                                         (errmsg("checkpoint record is at %X/%X",
4511                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4512                         InRecovery = true;      /* force recovery even if SHUTDOWNED */
4513                 }
4514                 else
4515                 {
4516                         ereport(PANIC,
4517                                         (errmsg("could not locate required checkpoint record"),
4518                                          errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
4519                 }
4520         }
4521         else
4522         {
4523                 /*
4524                  * Get the last valid checkpoint record.  If the latest one according
4525                  * to pg_control is broken, try the next-to-last one.
4526                  */
4527                 checkPointLoc = ControlFile->checkPoint;
4528                 record = ReadCheckpointRecord(checkPointLoc, 1);
4529                 if (record != NULL)
4530                 {
4531                         ereport(LOG,
4532                                         (errmsg("checkpoint record is at %X/%X",
4533                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4534                 }
4535                 else
4536                 {
4537                         checkPointLoc = ControlFile->prevCheckPoint;
4538                         record = ReadCheckpointRecord(checkPointLoc, 2);
4539                         if (record != NULL)
4540                         {
4541                                 ereport(LOG,
4542                                                 (errmsg("using previous checkpoint record at %X/%X",
4543                                                           checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4544                                 InRecovery = true;              /* force recovery even if SHUTDOWNED */
4545                         }
4546                         else
4547                                 ereport(PANIC,
4548                                          (errmsg("could not locate a valid checkpoint record")));
4549                 }
4550         }
4551
4552         LastRec = RecPtr = checkPointLoc;
4553         memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
4554         wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
4555
4556         ereport(LOG,
4557          (errmsg("redo record is at %X/%X; undo record is at %X/%X; shutdown %s",
4558                          checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
4559                          checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
4560                          wasShutdown ? "TRUE" : "FALSE")));
4561         ereport(LOG,
4562                         (errmsg("next transaction ID: %u; next OID: %u",
4563                                         checkPoint.nextXid, checkPoint.nextOid)));
4564         ereport(LOG,
4565                         (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
4566                                         checkPoint.nextMulti, checkPoint.nextMultiOffset)));
4567         if (!TransactionIdIsNormal(checkPoint.nextXid))
4568                 ereport(PANIC,
4569                                 (errmsg("invalid next transaction ID")));
4570
4571         ShmemVariableCache->nextXid = checkPoint.nextXid;
4572         ShmemVariableCache->nextOid = checkPoint.nextOid;
4573         ShmemVariableCache->oidCount = 0;
4574         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4575
4576         /*
4577          * We must replay WAL entries using the same TimeLineID they were created
4578          * under, so temporarily adopt the TLI indicated by the checkpoint (see
4579          * also xlog_redo()).
4580          */
4581         ThisTimeLineID = checkPoint.ThisTimeLineID;
4582
4583         RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
4584
4585         if (XLByteLT(RecPtr, checkPoint.redo))
4586                 ereport(PANIC,
4587                                 (errmsg("invalid redo in checkpoint record")));
4588         if (checkPoint.undo.xrecoff == 0)
4589                 checkPoint.undo = RecPtr;
4590
4591         /*
4592          * Check whether we need to force recovery from WAL.  If it appears to
4593          * have been a clean shutdown and we did not have a recovery.conf file,
4594          * then assume no recovery needed.
4595          */
4596         if (XLByteLT(checkPoint.undo, RecPtr) ||
4597                 XLByteLT(checkPoint.redo, RecPtr))
4598         {
4599                 if (wasShutdown)
4600                         ereport(PANIC,
4601                                 (errmsg("invalid redo/undo record in shutdown checkpoint")));
4602                 InRecovery = true;
4603         }
4604         else if (ControlFile->state != DB_SHUTDOWNED)
4605                 InRecovery = true;
4606         else if (InArchiveRecovery)
4607         {
4608                 /* force recovery due to presence of recovery.conf */
4609                 InRecovery = true;
4610         }
4611
4612         /* REDO */
4613         if (InRecovery)
4614         {
4615                 int                     rmid;
4616
4617                 if (InArchiveRecovery)
4618                         ereport(LOG,
4619                                         (errmsg("automatic recovery in progress")));
4620                 else
4621                         ereport(LOG,
4622                                         (errmsg("database system was not properly shut down; "
4623                                                         "automatic recovery in progress")));
4624                 ControlFile->state = DB_IN_RECOVERY;
4625                 ControlFile->time = time(NULL);
4626                 UpdateControlFile();
4627
4628                 /* Start up the recovery environment */
4629                 XLogInitRelationCache();
4630
4631                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
4632                 {
4633                         if (RmgrTable[rmid].rm_startup != NULL)
4634                                 RmgrTable[rmid].rm_startup();
4635                 }
4636
4637                 /*
4638                  * Find the first record that logically follows the checkpoint --- it
4639                  * might physically precede it, though.
4640                  */
4641                 if (XLByteLT(checkPoint.redo, RecPtr))
4642                 {
4643                         /* back up to find the record */
4644                         record = ReadRecord(&(checkPoint.redo), PANIC);
4645                 }
4646                 else
4647                 {
4648                         /* just have to read next record after CheckPoint */
4649                         record = ReadRecord(NULL, LOG);
4650                 }
4651
4652                 if (record != NULL)
4653                 {
4654                         bool            recoveryContinue = true;
4655                         bool            recoveryApply = true;
4656                         ErrorContextCallback    errcontext;
4657
4658                         InRedo = true;
4659                         ereport(LOG,
4660                                         (errmsg("redo starts at %X/%X",
4661                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
4662
4663                         /*
4664                          * main redo apply loop
4665                          */
4666                         do
4667                         {
4668 #ifdef WAL_DEBUG
4669                                 if (XLOG_DEBUG)
4670                                 {
4671                                         StringInfoData  buf;
4672
4673                                         initStringInfo(&buf);
4674                                         appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
4675                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
4676                                                         EndRecPtr.xlogid, EndRecPtr.xrecoff);
4677                                         xlog_outrec(&buf, record);
4678                                         appendStringInfo(&buf, " - ");
4679                                         RmgrTable[record->xl_rmid].rm_desc(&buf,
4680                                                                                                            record->xl_info,
4681                                                                                                            XLogRecGetData(record));
4682                                         elog(LOG, "%s", buf.data);
4683                                         pfree(buf.data);
4684                                 }
4685 #endif
4686
4687                                 /*
4688                                  * Have we reached our recovery target?
4689                                  */
4690                                 if (recoveryStopsHere(record, &recoveryApply))
4691                                 {
4692                                         needNewTimeLine = true;         /* see below */
4693                                         recoveryContinue = false;
4694                                         if (!recoveryApply)
4695                                                 break;
4696                                 }
4697
4698                                 /* Setup error traceback support for ereport() */
4699                                 errcontext.callback = rm_redo_error_callback;
4700                                 errcontext.arg = (void *) record;
4701                                 errcontext.previous = error_context_stack;
4702                                 error_context_stack = &errcontext;
4703
4704                                 /* nextXid must be beyond record's xid */
4705                                 if (TransactionIdFollowsOrEquals(record->xl_xid,
4706                                                                                                  ShmemVariableCache->nextXid))
4707                                 {
4708                                         ShmemVariableCache->nextXid = record->xl_xid;
4709                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
4710                                 }
4711
4712                                 if (record->xl_info & XLR_BKP_BLOCK_MASK)
4713                                         RestoreBkpBlocks(record, EndRecPtr);
4714
4715                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
4716
4717                                 /* Pop the error context stack */
4718                                 error_context_stack = errcontext.previous;
4719
4720                                 LastRec = ReadRecPtr;
4721
4722                                 record = ReadRecord(NULL, LOG);
4723                         } while (record != NULL && recoveryContinue);
4724
4725                         /*
4726                          * end of main redo apply loop
4727                          */
4728
4729                         ereport(LOG,
4730                                         (errmsg("redo done at %X/%X",
4731                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
4732                         InRedo = false;
4733                 }
4734                 else
4735                 {
4736                         /* there are no WAL records following the checkpoint */
4737                         ereport(LOG,
4738                                         (errmsg("redo is not required")));
4739                 }
4740         }
4741
4742         /*
4743          * Re-fetch the last valid or last applied record, so we can identify the
4744          * exact endpoint of what we consider the valid portion of WAL.
4745          */
4746         record = ReadRecord(&LastRec, PANIC);
4747         EndOfLog = EndRecPtr;
4748         XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
4749
4750         /*
4751          * Complain if we did not roll forward far enough to render the backup
4752          * dump consistent.
4753          */
4754         if (XLByteLT(EndOfLog, recoveryMinXlogOffset))
4755         {
4756                 if (needNewTimeLine)    /* stopped because of stop request */
4757                         ereport(FATAL,
4758                                         (errmsg("requested recovery stop point is before end time of backup dump")));
4759                 else
4760                         /* ran off end of WAL */
4761                         ereport(FATAL,
4762                                         (errmsg("WAL ends before end time of backup dump")));
4763         }
4764
4765         /*
4766          * Consider whether we need to assign a new timeline ID.
4767          *
4768          * If we stopped short of the end of WAL during recovery, then we are
4769          * generating a new timeline and must assign it a unique new ID.
4770          * Otherwise, we can just extend the timeline we were in when we ran out
4771          * of WAL.
4772          */
4773         if (needNewTimeLine)
4774         {
4775                 ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
4776                 ereport(LOG,
4777                                 (errmsg("selected new timeline ID: %u", ThisTimeLineID)));
4778                 writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
4779                                                          curFileTLI, endLogId, endLogSeg);
4780         }
4781
4782         /* Save the selected TimeLineID in shared memory, too */
4783         XLogCtl->ThisTimeLineID = ThisTimeLineID;
4784
4785         /*
4786          * We are now done reading the old WAL.  Turn off archive fetching if it
4787          * was active, and make a writable copy of the last WAL segment. (Note
4788          * that we also have a copy of the last block of the old WAL in readBuf;
4789          * we will use that below.)
4790          */
4791         if (InArchiveRecovery)
4792                 exitArchiveRecovery(curFileTLI, endLogId, endLogSeg);
4793
4794         /*
4795          * Prepare to write WAL starting at EndOfLog position, and init xlog
4796          * buffer cache using the block containing the last record from the
4797          * previous incarnation.
4798          */
4799         openLogId = endLogId;
4800         openLogSeg = endLogSeg;
4801         openLogFile = XLogFileOpen(openLogId, openLogSeg);
4802         openLogOff = 0;
4803         ControlFile->logId = openLogId;
4804         ControlFile->logSeg = openLogSeg + 1;
4805         Insert = &XLogCtl->Insert;
4806         Insert->PrevRecord = LastRec;
4807         XLogCtl->xlblocks[0].xlogid = openLogId;
4808         XLogCtl->xlblocks[0].xrecoff =
4809                 ((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
4810
4811         /*
4812          * Tricky point here: readBuf contains the *last* block that the LastRec
4813          * record spans, not the one it starts in.      The last block is indeed the
4814          * one we want to use.
4815          */
4816         Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
4817         memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
4818         Insert->currpos = (char *) Insert->currpage +
4819                 (EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
4820
4821         LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
4822
4823         XLogCtl->Write.LogwrtResult = LogwrtResult;
4824         Insert->LogwrtResult = LogwrtResult;
4825         XLogCtl->LogwrtResult = LogwrtResult;
4826
4827         XLogCtl->LogwrtRqst.Write = EndOfLog;
4828         XLogCtl->LogwrtRqst.Flush = EndOfLog;
4829
4830         freespace = INSERT_FREESPACE(Insert);
4831         if (freespace > 0)
4832         {
4833                 /* Make sure rest of page is zero */
4834                 MemSet(Insert->currpos, 0, freespace);
4835                 XLogCtl->Write.curridx = 0;
4836         }
4837         else
4838         {
4839                 /*
4840                  * Whenever Write.LogwrtResult points to exactly the end of a page,
4841                  * Write.curridx must point to the *next* page (see XLogWrite()).
4842                  *
4843                  * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
4844                  * this is sufficient.  The first actual attempt to insert a log
4845                  * record will advance the insert state.
4846                  */
4847                 XLogCtl->Write.curridx = NextBufIdx(0);
4848         }
4849
4850         /* Pre-scan prepared transactions to find out the range of XIDs present */
4851         oldestActiveXID = PrescanPreparedTransactions();
4852
4853         if (InRecovery)
4854         {
4855                 int                     rmid;
4856
4857                 /*
4858                  * Allow resource managers to do any required cleanup.
4859                  */
4860                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
4861                 {
4862                         if (RmgrTable[rmid].rm_cleanup != NULL)
4863                                 RmgrTable[rmid].rm_cleanup();
4864                 }
4865
4866                 /*
4867                  * Check to see if the XLOG sequence contained any unresolved
4868                  * references to uninitialized pages.
4869                  */
4870                 XLogCheckInvalidPages();
4871
4872                 /*
4873                  * Reset pgstat data, because it may be invalid after recovery.
4874                  */
4875                 pgstat_reset_all();
4876
4877                 /*
4878                  * Perform a new checkpoint to update our recovery activity to disk.
4879                  *
4880                  * Note that we write a shutdown checkpoint rather than an on-line
4881                  * one. This is not particularly critical, but since we may be
4882                  * assigning a new TLI, using a shutdown checkpoint allows us to have
4883                  * the rule that TLI only changes in shutdown checkpoints, which
4884                  * allows some extra error checking in xlog_redo.
4885                  *
4886                  * In case we had to use the secondary checkpoint, make sure that it
4887                  * will still be shown as the secondary checkpoint after this
4888                  * CreateCheckPoint operation; we don't want the broken primary
4889                  * checkpoint to become prevCheckPoint...
4890                  */
4891                 if (XLByteEQ(checkPointLoc, ControlFile->prevCheckPoint))
4892                         ControlFile->checkPoint = checkPointLoc;
4893
4894                 CreateCheckPoint(true, true);
4895
4896                 /*
4897                  * Close down recovery environment
4898                  */
4899                 XLogCloseRelationCache();
4900
4901                 /*
4902                  * Now that we've checkpointed the recovery, it's safe to flush old
4903                  * backup_label, if present.
4904                  */
4905                 remove_backup_label();
4906         }
4907
4908         /*
4909          * Preallocate additional log files, if wanted.
4910          */
4911         (void) PreallocXlogFiles(EndOfLog);
4912
4913         /*
4914          * Okay, we're officially UP.
4915          */
4916         InRecovery = false;
4917
4918         ControlFile->state = DB_IN_PRODUCTION;
4919         ControlFile->time = time(NULL);
4920         UpdateControlFile();
4921
4922         /* Start up the commit log and related stuff, too */
4923         StartupCLOG();
4924         StartupSUBTRANS(oldestActiveXID);
4925         StartupMultiXact();
4926
4927         /* Reload shared-memory state for prepared transactions */
4928         RecoverPreparedTransactions();
4929
4930         ereport(LOG,
4931                         (errmsg("database system is ready")));
4932         CritSectionCount--;
4933
4934         /* Shut down readFile facility, free space */
4935         if (readFile >= 0)
4936         {
4937                 close(readFile);
4938                 readFile = -1;
4939         }
4940         if (readBuf)
4941         {
4942                 free(readBuf);
4943                 readBuf = NULL;
4944         }
4945         if (readRecordBuf)
4946         {
4947                 free(readRecordBuf);
4948                 readRecordBuf = NULL;
4949                 readRecordBufSize = 0;
4950         }
4951 }
4952
4953 /*
4954  * Subroutine to try to fetch and validate a prior checkpoint record.
4955  *
4956  * whichChkpt identifies the checkpoint (merely for reporting purposes).
4957  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
4958  */
4959 static XLogRecord *
4960 ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
4961 {
4962         XLogRecord *record;
4963
4964         if (!XRecOffIsValid(RecPtr.xrecoff))
4965         {
4966                 switch (whichChkpt)
4967                 {
4968                         case 1:
4969                                 ereport(LOG,
4970                                 (errmsg("invalid primary checkpoint link in control file")));
4971                                 break;
4972                         case 2:
4973                                 ereport(LOG,
4974                                                 (errmsg("invalid secondary checkpoint link in control file")));
4975                                 break;
4976                         default:
4977                                 ereport(LOG,
4978                                    (errmsg("invalid checkpoint link in backup_label file")));
4979                                 break;
4980                 }
4981                 return NULL;
4982         }
4983
4984         record = ReadRecord(&RecPtr, LOG);
4985
4986         if (record == NULL)
4987         {
4988                 switch (whichChkpt)
4989                 {
4990                         case 1:
4991                                 ereport(LOG,
4992                                                 (errmsg("invalid primary checkpoint record")));
4993                                 break;
4994                         case 2:
4995                                 ereport(LOG,
4996                                                 (errmsg("invalid secondary checkpoint record")));
4997                                 break;
4998                         default:
4999                                 ereport(LOG,
5000                                                 (errmsg("invalid checkpoint record")));
5001                                 break;
5002                 }
5003                 return NULL;
5004         }
5005         if (record->xl_rmid != RM_XLOG_ID)
5006         {
5007                 switch (whichChkpt)
5008                 {
5009                         case 1:
5010                                 ereport(LOG,
5011                                                 (errmsg("invalid resource manager ID in primary checkpoint record")));
5012                                 break;
5013                         case 2:
5014                                 ereport(LOG,
5015                                                 (errmsg("invalid resource manager ID in secondary checkpoint record")));
5016                                 break;
5017                         default:
5018                                 ereport(LOG,
5019                                 (errmsg("invalid resource manager ID in checkpoint record")));
5020                                 break;
5021                 }
5022                 return NULL;
5023         }
5024         if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
5025                 record->xl_info != XLOG_CHECKPOINT_ONLINE)
5026         {
5027                 switch (whichChkpt)
5028                 {
5029                         case 1:
5030                                 ereport(LOG,
5031                                    (errmsg("invalid xl_info in primary checkpoint record")));
5032                                 break;
5033                         case 2:
5034                                 ereport(LOG,
5035                                  (errmsg("invalid xl_info in secondary checkpoint record")));
5036                                 break;
5037                         default:
5038                                 ereport(LOG,
5039                                                 (errmsg("invalid xl_info in checkpoint record")));
5040                                 break;
5041                 }
5042                 return NULL;
5043         }
5044         if (record->xl_len != sizeof(CheckPoint) ||
5045                 record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
5046         {
5047                 switch (whichChkpt)
5048                 {
5049                         case 1:
5050                                 ereport(LOG,
5051                                         (errmsg("invalid length of primary checkpoint record")));
5052                                 break;
5053                         case 2:
5054                                 ereport(LOG,
5055                                   (errmsg("invalid length of secondary checkpoint record")));
5056                                 break;
5057                         default:
5058                                 ereport(LOG,
5059                                                 (errmsg("invalid length of checkpoint record")));
5060                                 break;
5061                 }
5062                 return NULL;
5063         }
5064         return record;
5065 }
5066
5067 /*
5068  * This must be called during startup of a backend process, except that
5069  * it need not be called in a standalone backend (which does StartupXLOG
5070  * instead).  We need to initialize the local copies of ThisTimeLineID and
5071  * RedoRecPtr.
5072  *
5073  * Note: before Postgres 8.0, we went to some effort to keep the postmaster
5074  * process's copies of ThisTimeLineID and RedoRecPtr valid too.  This was
5075  * unnecessary however, since the postmaster itself never touches XLOG anyway.
5076  */
5077 void
5078 InitXLOGAccess(void)
5079 {
5080         /* ThisTimeLineID doesn't change so we need no lock to copy it */
5081         ThisTimeLineID = XLogCtl->ThisTimeLineID;
5082         /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
5083         (void) GetRedoRecPtr();
5084 }
5085
5086 /*
5087  * Once spawned, a backend may update its local RedoRecPtr from
5088  * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
5089  * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
5090  */
5091 XLogRecPtr
5092 GetRedoRecPtr(void)
5093 {
5094         /* use volatile pointer to prevent code rearrangement */
5095         volatile XLogCtlData *xlogctl = XLogCtl;
5096
5097         SpinLockAcquire(&xlogctl->info_lck);
5098         Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
5099         RedoRecPtr = xlogctl->Insert.RedoRecPtr;
5100         SpinLockRelease(&xlogctl->info_lck);
5101
5102         return RedoRecPtr;
5103 }
5104
5105 /*
5106  * GetRecentNextXid - get the nextXid value saved by the most recent checkpoint
5107  *
5108  * This is currently used only by the autovacuum daemon.  To check for
5109  * impending XID wraparound, autovac needs an approximate idea of the current
5110  * XID counter, and it needs it before choosing which DB to attach to, hence
5111  * before it sets up a PGPROC, hence before it can take any LWLocks.  But it
5112  * has attached to shared memory, and so we can let it reach into the shared
5113  * ControlFile structure and pull out the last checkpoint nextXID.
5114  *
5115  * Since we don't take any sort of lock, we have to assume that reading a
5116  * TransactionId is atomic ... but that assumption is made elsewhere, too,
5117  * and in any case the worst possible consequence of a bogus result is that
5118  * autovac issues an unnecessary database-wide VACUUM.
5119  *
5120  * Note: we could also choose to read ShmemVariableCache->nextXid in an
5121  * unlocked fashion, thus getting a more up-to-date result; but since that
5122  * changes far more frequently than the controlfile checkpoint copy, it would
5123  * pose a far higher risk of bogus result if we did have a nonatomic-read
5124  * problem.
5125  *
5126  * A (theoretically) completely safe answer is to read the actual pg_control
5127  * file into local process memory, but that certainly seems like overkill.
5128  */
5129 TransactionId
5130 GetRecentNextXid(void)
5131 {
5132         return ControlFile->checkPointCopy.nextXid;
5133 }
5134
5135 /*
5136  * This must be called ONCE during postmaster or standalone-backend shutdown
5137  */
5138 void
5139 ShutdownXLOG(int code, Datum arg)
5140 {
5141         ereport(LOG,
5142                         (errmsg("shutting down")));
5143
5144         CritSectionCount++;
5145         CreateCheckPoint(true, true);
5146         ShutdownCLOG();
5147         ShutdownSUBTRANS();
5148         ShutdownMultiXact();
5149         CritSectionCount--;
5150
5151         ereport(LOG,
5152                         (errmsg("database system is shut down")));
5153 }
5154
5155 /*
5156  * Perform a checkpoint --- either during shutdown, or on-the-fly
5157  *
5158  * If force is true, we force a checkpoint regardless of whether any XLOG
5159  * activity has occurred since the last one.
5160  */
5161 void
5162 CreateCheckPoint(bool shutdown, bool force)
5163 {
5164         CheckPoint      checkPoint;
5165         XLogRecPtr      recptr;
5166         XLogCtlInsert *Insert = &XLogCtl->Insert;
5167         XLogRecData rdata;
5168         uint32          freespace;
5169         uint32          _logId;
5170         uint32          _logSeg;
5171         int                     nsegsadded = 0;
5172         int                     nsegsremoved = 0;
5173         int                     nsegsrecycled = 0;
5174
5175         /*
5176          * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
5177          * (This is just pro forma, since in the present system structure there is
5178          * only one process that is allowed to issue checkpoints at any given
5179          * time.)
5180          */
5181         LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
5182
5183         /*
5184          * Use a critical section to force system panic if we have trouble.
5185          */
5186         START_CRIT_SECTION();
5187
5188         if (shutdown)
5189         {
5190                 ControlFile->state = DB_SHUTDOWNING;
5191                 ControlFile->time = time(NULL);
5192                 UpdateControlFile();
5193         }
5194
5195         MemSet(&checkPoint, 0, sizeof(checkPoint));
5196         checkPoint.ThisTimeLineID = ThisTimeLineID;
5197         checkPoint.time = time(NULL);
5198
5199         /*
5200          * We must hold CheckpointStartLock while determining the checkpoint REDO
5201          * pointer.  This ensures that any concurrent transaction commits will be
5202          * either not yet logged, or logged and recorded in pg_clog. See notes in
5203          * RecordTransactionCommit().
5204          */
5205         LWLockAcquire(CheckpointStartLock, LW_EXCLUSIVE);
5206
5207         /* And we need WALInsertLock too */
5208         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5209
5210         /*
5211          * If this isn't a shutdown or forced checkpoint, and we have not inserted
5212          * any XLOG records since the start of the last checkpoint, skip the
5213          * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
5214          * when the system is idle. That wastes log space, and more importantly it
5215          * exposes us to possible loss of both current and previous checkpoint
5216          * records if the machine crashes just as we're writing the update.
5217          * (Perhaps it'd make even more sense to checkpoint only when the previous
5218          * checkpoint record is in a different xlog page?)
5219          *
5220          * We have to make two tests to determine that nothing has happened since
5221          * the start of the last checkpoint: current insertion point must match
5222          * the end of the last checkpoint record, and its redo pointer must point
5223          * to itself.
5224          */
5225         if (!shutdown && !force)
5226         {
5227                 XLogRecPtr      curInsert;
5228
5229                 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
5230                 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
5231                         curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
5232                         MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
5233                         ControlFile->checkPoint.xlogid ==
5234                         ControlFile->checkPointCopy.redo.xlogid &&
5235                         ControlFile->checkPoint.xrecoff ==
5236                         ControlFile->checkPointCopy.redo.xrecoff)
5237                 {
5238                         LWLockRelease(WALInsertLock);
5239                         LWLockRelease(CheckpointStartLock);
5240                         LWLockRelease(CheckpointLock);
5241                         END_CRIT_SECTION();
5242                         return;
5243                 }
5244         }
5245
5246         /*
5247          * Compute new REDO record ptr = location of next XLOG record.
5248          *
5249          * NB: this is NOT necessarily where the checkpoint record itself will be,
5250          * since other backends may insert more XLOG records while we're off doing
5251          * the buffer flush work.  Those XLOG records are logically after the
5252          * checkpoint, even though physically before it.  Got that?
5253          */
5254         freespace = INSERT_FREESPACE(Insert);
5255         if (freespace < SizeOfXLogRecord)
5256         {
5257                 (void) AdvanceXLInsertBuffer();
5258                 /* OK to ignore update return flag, since we will do flush anyway */
5259                 freespace = INSERT_FREESPACE(Insert);
5260         }
5261         INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
5262
5263         /*
5264          * Here we update the shared RedoRecPtr for future XLogInsert calls; this
5265          * must be done while holding the insert lock AND the info_lck.
5266          *
5267          * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
5268          * pointing past where it really needs to point.  This is okay; the only
5269          * consequence is that XLogInsert might back up whole buffers that it
5270          * didn't really need to.  We can't postpone advancing RedoRecPtr because
5271          * XLogInserts that happen while we are dumping buffers must assume that
5272          * their buffer changes are not included in the checkpoint.
5273          */
5274         {
5275                 /* use volatile pointer to prevent code rearrangement */
5276                 volatile XLogCtlData *xlogctl = XLogCtl;
5277
5278                 SpinLockAcquire(&xlogctl->info_lck);
5279                 RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
5280                 SpinLockRelease(&xlogctl->info_lck);
5281         }
5282
5283         /*
5284          * Now we can release insert lock and checkpoint start lock, allowing
5285          * other xacts to proceed even while we are flushing disk buffers.
5286          */
5287         LWLockRelease(WALInsertLock);
5288
5289         LWLockRelease(CheckpointStartLock);
5290
5291         /*
5292          * Get the other info we need for the checkpoint record.
5293          */
5294         LWLockAcquire(XidGenLock, LW_SHARED);
5295         checkPoint.nextXid = ShmemVariableCache->nextXid;
5296         LWLockRelease(XidGenLock);
5297
5298         LWLockAcquire(OidGenLock, LW_SHARED);
5299         checkPoint.nextOid = ShmemVariableCache->nextOid;
5300         if (!shutdown)
5301                 checkPoint.nextOid += ShmemVariableCache->oidCount;
5302         LWLockRelease(OidGenLock);
5303
5304         MultiXactGetCheckptMulti(shutdown,
5305                                                          &checkPoint.nextMulti,
5306                                                          &checkPoint.nextMultiOffset);
5307
5308         /*
5309          * Having constructed the checkpoint record, ensure all shmem disk buffers
5310          * and commit-log buffers are flushed to disk.
5311          *
5312          * This I/O could fail for various reasons.  If so, we will fail to
5313          * complete the checkpoint, but there is no reason to force a system
5314          * panic. Accordingly, exit critical section while doing it.  (If we are
5315          * doing a shutdown checkpoint, we probably *should* panic --- but that
5316          * will happen anyway because we'll still be inside the critical section
5317          * established by ShutdownXLOG.)
5318          */
5319         END_CRIT_SECTION();
5320
5321         if (!shutdown)
5322                 ereport(DEBUG2,
5323                                 (errmsg("checkpoint starting")));
5324
5325         CheckPointCLOG();
5326         CheckPointSUBTRANS();
5327         CheckPointMultiXact();
5328         FlushBufferPool();
5329         /* We deliberately delay 2PC checkpointing as long as possible */
5330         CheckPointTwoPhase(checkPoint.redo);
5331
5332         START_CRIT_SECTION();
5333
5334         /*
5335          * Now insert the checkpoint record into XLOG.
5336          */
5337         rdata.data = (char *) (&checkPoint);
5338         rdata.len = sizeof(checkPoint);
5339         rdata.buffer = InvalidBuffer;
5340         rdata.next = NULL;
5341
5342         recptr = XLogInsert(RM_XLOG_ID,
5343                                                 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
5344                                                 XLOG_CHECKPOINT_ONLINE,
5345                                                 &rdata);
5346
5347         XLogFlush(recptr);
5348
5349         /*
5350          * We now have ProcLastRecPtr = start of actual checkpoint record, recptr
5351          * = end of actual checkpoint record.
5352          */
5353         if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
5354                 ereport(PANIC,
5355                                 (errmsg("concurrent transaction log activity while database system is shutting down")));
5356
5357         /*
5358          * Select point at which we can truncate the log, which we base on the
5359          * prior checkpoint's earliest info.
5360          */
5361         XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
5362
5363         /*
5364          * Update the control file.
5365          */
5366         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5367         if (shutdown)
5368                 ControlFile->state = DB_SHUTDOWNED;
5369         ControlFile->prevCheckPoint = ControlFile->checkPoint;
5370         ControlFile->checkPoint = ProcLastRecPtr;
5371         ControlFile->checkPointCopy = checkPoint;
5372         ControlFile->time = time(NULL);
5373         UpdateControlFile();
5374         LWLockRelease(ControlFileLock);
5375
5376         /*
5377          * We are now done with critical updates; no need for system panic if we
5378          * have trouble while fooling with offline log segments.
5379          */
5380         END_CRIT_SECTION();
5381
5382         /*
5383          * Delete offline log files (those no longer needed even for previous
5384          * checkpoint).
5385          */
5386         if (_logId || _logSeg)
5387         {
5388                 PrevLogSeg(_logId, _logSeg);
5389                 MoveOfflineLogs(_logId, _logSeg, recptr,
5390                                                 &nsegsremoved, &nsegsrecycled);
5391         }
5392
5393         /*
5394          * Make more log segments if needed.  (Do this after deleting offline log
5395          * segments, to avoid having peak disk space usage higher than necessary.)
5396          */
5397         if (!shutdown)
5398                 nsegsadded = PreallocXlogFiles(recptr);
5399
5400         /*
5401          * Truncate pg_subtrans if possible.  We can throw away all data before
5402          * the oldest XMIN of any running transaction.  No future transaction will
5403          * attempt to reference any pg_subtrans entry older than that (see Asserts
5404          * in subtrans.c).      During recovery, though, we mustn't do this because
5405          * StartupSUBTRANS hasn't been called yet.
5406          */
5407         if (!InRecovery)
5408                 TruncateSUBTRANS(GetOldestXmin(true));
5409
5410         if (!shutdown)
5411                 ereport(DEBUG2,
5412                                 (errmsg("checkpoint complete; %d transaction log file(s) added, %d removed, %d recycled",
5413                                                 nsegsadded, nsegsremoved, nsegsrecycled)));
5414
5415         LWLockRelease(CheckpointLock);
5416 }
5417
5418 /*
5419  * Write a NEXTOID log record
5420  */
5421 void
5422 XLogPutNextOid(Oid nextOid)
5423 {
5424         XLogRecData rdata;
5425
5426         rdata.data = (char *) (&nextOid);
5427         rdata.len = sizeof(Oid);
5428         rdata.buffer = InvalidBuffer;
5429         rdata.next = NULL;
5430         (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
5431
5432         /*
5433          * We need not flush the NEXTOID record immediately, because any of the
5434          * just-allocated OIDs could only reach disk as part of a tuple insert or
5435          * update that would have its own XLOG record that must follow the NEXTOID
5436          * record.      Therefore, the standard buffer LSN interlock applied to those
5437          * records will ensure no such OID reaches disk before the NEXTOID record
5438          * does.
5439          */
5440 }
5441
5442 /*
5443  * XLOG resource manager's routines
5444  */
5445 void
5446 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
5447 {
5448         uint8           info = record->xl_info & ~XLR_INFO_MASK;
5449
5450         if (info == XLOG_NEXTOID)
5451         {
5452                 Oid                     nextOid;
5453
5454                 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
5455                 if (ShmemVariableCache->nextOid < nextOid)
5456                 {
5457                         ShmemVariableCache->nextOid = nextOid;
5458                         ShmemVariableCache->oidCount = 0;
5459                 }
5460         }
5461         else if (info == XLOG_CHECKPOINT_SHUTDOWN)
5462         {
5463                 CheckPoint      checkPoint;
5464
5465                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5466                 /* In a SHUTDOWN checkpoint, believe the counters exactly */
5467                 ShmemVariableCache->nextXid = checkPoint.nextXid;
5468                 ShmemVariableCache->nextOid = checkPoint.nextOid;
5469                 ShmemVariableCache->oidCount = 0;
5470                 MultiXactSetNextMXact(checkPoint.nextMulti,
5471                                                           checkPoint.nextMultiOffset);
5472
5473                 /*
5474                  * TLI may change in a shutdown checkpoint, but it shouldn't decrease
5475                  */
5476                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
5477                 {
5478                         if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
5479                                 !list_member_int(expectedTLIs,
5480                                                                  (int) checkPoint.ThisTimeLineID))
5481                                 ereport(PANIC,
5482                                                 (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
5483                                                                 checkPoint.ThisTimeLineID, ThisTimeLineID)));
5484                         /* Following WAL records should be run with new TLI */
5485                         ThisTimeLineID = checkPoint.ThisTimeLineID;
5486                 }
5487         }
5488         else if (info == XLOG_CHECKPOINT_ONLINE)
5489         {
5490                 CheckPoint      checkPoint;
5491
5492                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5493                 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
5494                 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
5495                                                                   checkPoint.nextXid))
5496                         ShmemVariableCache->nextXid = checkPoint.nextXid;
5497                 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
5498                 {
5499                         ShmemVariableCache->nextOid = checkPoint.nextOid;
5500                         ShmemVariableCache->oidCount = 0;
5501                 }
5502                 MultiXactAdvanceNextMXact(checkPoint.nextMulti,
5503                                                                   checkPoint.nextMultiOffset);
5504                 /* TLI should not change in an on-line checkpoint */
5505                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
5506                         ereport(PANIC,
5507                                         (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
5508                                                         checkPoint.ThisTimeLineID, ThisTimeLineID)));
5509         }
5510 }
5511
5512 void
5513 xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
5514 {
5515         uint8                   info = xl_info & ~XLR_INFO_MASK;
5516
5517         if (info == XLOG_CHECKPOINT_SHUTDOWN ||
5518                 info == XLOG_CHECKPOINT_ONLINE)
5519         {
5520                 CheckPoint *checkpoint = (CheckPoint *) rec;
5521
5522                 appendStringInfo(buf, "checkpoint: redo %X/%X; undo %X/%X; "
5523                                 "tli %u; xid %u; oid %u; multi %u; offset %u; %s",
5524                                 checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
5525                                 checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
5526                                 checkpoint->ThisTimeLineID, checkpoint->nextXid,
5527                                 checkpoint->nextOid,
5528                                 checkpoint->nextMulti,
5529                                 checkpoint->nextMultiOffset,
5530                                 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
5531         }
5532         else if (info == XLOG_NEXTOID)
5533         {
5534                 Oid                     nextOid;
5535
5536                 memcpy(&nextOid, rec, sizeof(Oid));
5537                 appendStringInfo(buf, "nextOid: %u", nextOid);
5538         }
5539         else
5540                 appendStringInfo(buf, "UNKNOWN");
5541 }
5542
5543 #ifdef WAL_DEBUG
5544
5545 static void
5546 xlog_outrec(StringInfo buf, XLogRecord *record)
5547 {
5548         int                     i;
5549
5550         appendStringInfo(buf, "prev %X/%X; xid %u",
5551                                          record->xl_prev.xlogid, record->xl_prev.xrecoff,
5552                                          record->xl_xid);
5553
5554         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
5555         {
5556                 if (record->xl_info & XLR_SET_BKP_BLOCK(i))
5557                         appendStringInfo(buf, "; bkpb%d", i+1);
5558         }
5559
5560         appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
5561 }
5562 #endif   /* WAL_DEBUG */
5563
5564
5565 /*
5566  * GUC support
5567  */
5568 const char *
5569 assign_xlog_sync_method(const char *method, bool doit, GucSource source)
5570 {
5571         int                     new_sync_method;
5572         int                     new_sync_bit;
5573
5574         if (pg_strcasecmp(method, "fsync") == 0)
5575         {
5576                 new_sync_method = SYNC_METHOD_FSYNC;
5577                 new_sync_bit = 0;
5578         }
5579 #ifdef HAVE_FSYNC_WRITETHROUGH
5580         else if (pg_strcasecmp(method, "fsync_writethrough") == 0)
5581         {
5582                 new_sync_method = SYNC_METHOD_FSYNC_WRITETHROUGH;
5583                 new_sync_bit = 0;
5584         }
5585 #endif
5586 #ifdef HAVE_FDATASYNC
5587         else if (pg_strcasecmp(method, "fdatasync") == 0)
5588         {
5589                 new_sync_method = SYNC_METHOD_FDATASYNC;
5590                 new_sync_bit = 0;
5591         }
5592 #endif
5593 #ifdef OPEN_SYNC_FLAG
5594         else if (pg_strcasecmp(method, "open_sync") == 0)
5595         {
5596                 new_sync_method = SYNC_METHOD_OPEN;
5597                 new_sync_bit = OPEN_SYNC_FLAG;
5598         }
5599 #endif
5600 #ifdef OPEN_DATASYNC_FLAG
5601         else if (pg_strcasecmp(method, "open_datasync") == 0)
5602         {
5603                 new_sync_method = SYNC_METHOD_OPEN;
5604                 new_sync_bit = OPEN_DATASYNC_FLAG;
5605         }
5606 #endif
5607         else
5608                 return NULL;
5609
5610         if (!doit)
5611                 return method;
5612
5613         if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
5614         {
5615                 /*
5616                  * To ensure that no blocks escape unsynced, force an fsync on the
5617                  * currently open log segment (if any).  Also, if the open flag is
5618                  * changing, close the log file so it will be reopened (with new flag
5619                  * bit) at next use.
5620                  */
5621                 if (openLogFile >= 0)
5622                 {
5623                         if (pg_fsync(openLogFile) != 0)
5624                                 ereport(PANIC,
5625                                                 (errcode_for_file_access(),
5626                                                  errmsg("could not fsync log file %u, segment %u: %m",
5627                                                                 openLogId, openLogSeg)));
5628                         if (open_sync_bit != new_sync_bit)
5629                                 XLogFileClose();
5630                 }
5631                 sync_method = new_sync_method;
5632                 open_sync_bit = new_sync_bit;
5633         }
5634
5635         return method;
5636 }
5637
5638
5639 /*
5640  * Issue appropriate kind of fsync (if any) on the current XLOG output file
5641  */
5642 static void
5643 issue_xlog_fsync(void)
5644 {
5645         switch (sync_method)
5646         {
5647                 case SYNC_METHOD_FSYNC:
5648                         if (pg_fsync_no_writethrough(openLogFile) != 0)
5649                                 ereport(PANIC,
5650                                                 (errcode_for_file_access(),
5651                                                  errmsg("could not fsync log file %u, segment %u: %m",
5652                                                                 openLogId, openLogSeg)));
5653                         break;
5654 #ifdef HAVE_FSYNC_WRITETHROUGH
5655                 case SYNC_METHOD_FSYNC_WRITETHROUGH:
5656                         if (pg_fsync_writethrough(openLogFile) != 0)
5657                                 ereport(PANIC,
5658                                                 (errcode_for_file_access(),
5659                                                  errmsg("could not fsync write-through log file %u, segment %u: %m",
5660                                                                 openLogId, openLogSeg)));
5661                         break;
5662 #endif
5663 #ifdef HAVE_FDATASYNC
5664                 case SYNC_METHOD_FDATASYNC:
5665                         if (pg_fdatasync(openLogFile) != 0)
5666                                 ereport(PANIC,
5667                                                 (errcode_for_file_access(),
5668                                         errmsg("could not fdatasync log file %u, segment %u: %m",
5669                                                    openLogId, openLogSeg)));
5670                         break;
5671 #endif
5672                 case SYNC_METHOD_OPEN:
5673                         /* write synced it already */
5674                         break;
5675                 default:
5676                         elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
5677                         break;
5678         }
5679 }
5680
5681
5682 /*
5683  * pg_start_backup: set up for taking an on-line backup dump
5684  *
5685  * Essentially what this does is to create a backup label file in $PGDATA,
5686  * where it will be archived as part of the backup dump.  The label file
5687  * contains the user-supplied label string (typically this would be used
5688  * to tell where the backup dump will be stored) and the starting time and
5689  * starting WAL offset for the dump.
5690  */
5691 Datum
5692 pg_start_backup(PG_FUNCTION_ARGS)
5693 {
5694         text       *backupid = PG_GETARG_TEXT_P(0);
5695         text       *result;
5696         char       *backupidstr;
5697         XLogRecPtr      checkpointloc;
5698         XLogRecPtr      startpoint;
5699         time_t          stamp_time;
5700         char            strfbuf[128];
5701         char            xlogfilename[MAXFNAMELEN];
5702         uint32          _logId;
5703         uint32          _logSeg;
5704         struct stat stat_buf;
5705         FILE       *fp;
5706
5707         if (!superuser())
5708                 ereport(ERROR,
5709                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5710                                  (errmsg("must be superuser to run a backup"))));
5711
5712         if (!XLogArchivingActive())
5713                 ereport(ERROR,
5714                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5715                                  (errmsg("WAL archiving is not active"),
5716                                   (errhint("archive_command must be defined before "
5717                                                    "online backups can be made safely.")))));
5718
5719         backupidstr = DatumGetCString(DirectFunctionCall1(textout,
5720                                                                                                  PointerGetDatum(backupid)));
5721
5722         /*
5723          * Mark backup active in shared memory.  We must do full-page WAL writes
5724          * during an on-line backup even if not doing so at other times, because
5725          * it's quite possible for the backup dump to obtain a "torn" (partially
5726          * written) copy of a database page if it reads the page concurrently
5727          * with our write to the same page.  This can be fixed as long as the
5728          * first write to the page in the WAL sequence is a full-page write.
5729          * Hence, we turn on forcePageWrites and then force a CHECKPOINT, to
5730          * ensure there are no dirty pages in shared memory that might get
5731          * dumped while the backup is in progress without having a corresponding
5732          * WAL record.  (Once the backup is complete, we need not force full-page
5733          * writes anymore, since we expect that any pages not modified during
5734          * the backup interval must have been correctly captured by the backup.)
5735          *
5736          * We must hold WALInsertLock to change the value of forcePageWrites,
5737          * to ensure adequate interlocking against XLogInsert().
5738          */
5739         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5740         if (XLogCtl->Insert.forcePageWrites)
5741         {
5742                 LWLockRelease(WALInsertLock);
5743                 ereport(ERROR,
5744                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5745                                  errmsg("a backup is already in progress"),
5746                                  errhint("Run pg_stop_backup() and try again.")));
5747         }
5748         XLogCtl->Insert.forcePageWrites = true;
5749         LWLockRelease(WALInsertLock);
5750
5751         /* Use a TRY block to ensure we release forcePageWrites if fail below */
5752         PG_TRY();
5753         {
5754                 /*
5755                  * Force a CHECKPOINT.  Aside from being necessary to prevent torn
5756                  * page problems, this guarantees that two successive backup runs will
5757                  * have different checkpoint positions and hence different history
5758                  * file names, even if nothing happened in between.
5759                  */
5760                 RequestCheckpoint(true, false);
5761
5762                 /*
5763                  * Now we need to fetch the checkpoint record location, and also its
5764                  * REDO pointer.  The oldest point in WAL that would be needed to
5765                  * restore starting from the checkpoint is precisely the REDO pointer.
5766                  */
5767                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5768                 checkpointloc = ControlFile->checkPoint;
5769                 startpoint = ControlFile->checkPointCopy.redo;
5770                 LWLockRelease(ControlFileLock);
5771
5772                 XLByteToSeg(startpoint, _logId, _logSeg);
5773                 XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
5774
5775                 /*
5776                  * We deliberately use strftime/localtime not the src/timezone
5777                  * functions, so that backup labels will consistently be recorded in
5778                  * the same timezone regardless of TimeZone setting.  This matches
5779                  * elog.c's practice.
5780                  */
5781                 stamp_time = time(NULL);
5782                 strftime(strfbuf, sizeof(strfbuf),
5783                                  "%Y-%m-%d %H:%M:%S %Z",
5784                                  localtime(&stamp_time));
5785
5786                 /*
5787                  * Check for existing backup label --- implies a backup is already
5788                  * running.  (XXX given that we checked forcePageWrites above, maybe
5789                  * it would be OK to just unlink any such label file?)
5790                  */
5791                 if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0)
5792                 {
5793                         if (errno != ENOENT)
5794                                 ereport(ERROR,
5795                                                 (errcode_for_file_access(),
5796                                                  errmsg("could not stat file \"%s\": %m",
5797                                                                 BACKUP_LABEL_FILE)));
5798                 }
5799                 else
5800                         ereport(ERROR,
5801                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5802                                          errmsg("a backup is already in progress"),
5803                                          errhint("If you're sure there is no backup in progress, remove file \"%s\" and try again.",
5804                                                          BACKUP_LABEL_FILE)));
5805
5806                 /*
5807                  * Okay, write the file
5808                  */
5809                 fp = AllocateFile(BACKUP_LABEL_FILE, "w");
5810                 if (!fp)
5811                         ereport(ERROR,
5812                                         (errcode_for_file_access(),
5813                                          errmsg("could not create file \"%s\": %m",
5814                                                         BACKUP_LABEL_FILE)));
5815                 fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
5816                                 startpoint.xlogid, startpoint.xrecoff, xlogfilename);
5817                 fprintf(fp, "CHECKPOINT LOCATION: %X/%X\n",
5818                                 checkpointloc.xlogid, checkpointloc.xrecoff);
5819                 fprintf(fp, "START TIME: %s\n", strfbuf);
5820                 fprintf(fp, "LABEL: %s\n", backupidstr);
5821                 if (fflush(fp) || ferror(fp) || FreeFile(fp))
5822                         ereport(ERROR,
5823                                         (errcode_for_file_access(),
5824                                          errmsg("could not write file \"%s\": %m",
5825                                                         BACKUP_LABEL_FILE)));
5826         }
5827         PG_CATCH();
5828         {
5829                 /* Turn off forcePageWrites on failure */
5830                 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5831                 XLogCtl->Insert.forcePageWrites = false;
5832                 LWLockRelease(WALInsertLock);
5833
5834                 PG_RE_THROW();
5835         }
5836         PG_END_TRY();
5837
5838         /*
5839          * We're done.  As a convenience, return the starting WAL offset.
5840          */
5841         snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
5842                          startpoint.xlogid, startpoint.xrecoff);
5843         result = DatumGetTextP(DirectFunctionCall1(textin,
5844                                                                                          CStringGetDatum(xlogfilename)));
5845         PG_RETURN_TEXT_P(result);
5846 }
5847
5848 /*
5849  * pg_stop_backup: finish taking an on-line backup dump
5850  *
5851  * We remove the backup label file created by pg_start_backup, and instead
5852  * create a backup history file in pg_xlog (whence it will immediately be
5853  * archived).  The backup history file contains the same info found in
5854  * the label file, plus the backup-end time and WAL offset.
5855  */
5856 Datum
5857 pg_stop_backup(PG_FUNCTION_ARGS)
5858 {
5859         text       *result;
5860         XLogCtlInsert *Insert = &XLogCtl->Insert;
5861         XLogRecPtr      startpoint;
5862         XLogRecPtr      stoppoint;
5863         time_t          stamp_time;
5864         char            strfbuf[128];
5865         char            histfilepath[MAXPGPATH];
5866         char            startxlogfilename[MAXFNAMELEN];
5867         char            stopxlogfilename[MAXFNAMELEN];
5868         uint32          _logId;
5869         uint32          _logSeg;
5870         FILE       *lfp;
5871         FILE       *fp;
5872         char            ch;
5873         int                     ich;
5874
5875         if (!superuser())
5876                 ereport(ERROR,
5877                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5878                                  (errmsg("must be superuser to run a backup"))));
5879
5880         /*
5881          * Get the current end-of-WAL position; it will be unsafe to use this dump
5882          * to restore to a point in advance of this time.  We can also clear
5883          * forcePageWrites here.
5884          */
5885         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5886         INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
5887         XLogCtl->Insert.forcePageWrites = false;
5888         LWLockRelease(WALInsertLock);
5889
5890         XLByteToSeg(stoppoint, _logId, _logSeg);
5891         XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
5892
5893         /*
5894          * We deliberately use strftime/localtime not the src/timezone functions,
5895          * so that backup labels will consistently be recorded in the same
5896          * timezone regardless of TimeZone setting.  This matches elog.c's
5897          * practice.
5898          */
5899         stamp_time = time(NULL);
5900         strftime(strfbuf, sizeof(strfbuf),
5901                          "%Y-%m-%d %H:%M:%S %Z",
5902                          localtime(&stamp_time));
5903
5904         /*
5905          * Open the existing label file
5906          */
5907         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
5908         if (!lfp)
5909         {
5910                 if (errno != ENOENT)
5911                         ereport(ERROR,
5912                                         (errcode_for_file_access(),
5913                                          errmsg("could not read file \"%s\": %m",
5914                                                         BACKUP_LABEL_FILE)));
5915                 ereport(ERROR,
5916                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5917                                  errmsg("a backup is not in progress")));
5918         }
5919
5920         /*
5921          * Read and parse the START WAL LOCATION line (this code is pretty crude,
5922          * but we are not expecting any variability in the file format).
5923          */
5924         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %24s)%c",
5925                            &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
5926                            &ch) != 4 || ch != '\n')
5927                 ereport(ERROR,
5928                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5929                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
5930
5931         /*
5932          * Write the backup history file
5933          */
5934         XLByteToSeg(startpoint, _logId, _logSeg);
5935         BackupHistoryFilePath(histfilepath, ThisTimeLineID, _logId, _logSeg,
5936                                                   startpoint.xrecoff % XLogSegSize);
5937         fp = AllocateFile(histfilepath, "w");
5938         if (!fp)
5939                 ereport(ERROR,
5940                                 (errcode_for_file_access(),
5941                                  errmsg("could not create file \"%s\": %m",
5942                                                 histfilepath)));
5943         fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
5944                         startpoint.xlogid, startpoint.xrecoff, startxlogfilename);
5945         fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
5946                         stoppoint.xlogid, stoppoint.xrecoff, stopxlogfilename);
5947         /* transfer remaining lines from label to history file */
5948         while ((ich = fgetc(lfp)) != EOF)
5949                 fputc(ich, fp);
5950         fprintf(fp, "STOP TIME: %s\n", strfbuf);
5951         if (fflush(fp) || ferror(fp) || FreeFile(fp))
5952                 ereport(ERROR,
5953                                 (errcode_for_file_access(),
5954                                  errmsg("could not write file \"%s\": %m",
5955                                                 histfilepath)));
5956
5957         /*
5958          * Close and remove the backup label file
5959          */
5960         if (ferror(lfp) || FreeFile(lfp))
5961                 ereport(ERROR,
5962                                 (errcode_for_file_access(),
5963                                  errmsg("could not read file \"%s\": %m",
5964                                                 BACKUP_LABEL_FILE)));
5965         if (unlink(BACKUP_LABEL_FILE) != 0)
5966                 ereport(ERROR,
5967                                 (errcode_for_file_access(),
5968                                  errmsg("could not remove file \"%s\": %m",
5969                                                 BACKUP_LABEL_FILE)));
5970
5971         RemoveOldBackupHistory();
5972
5973         /*
5974          * Notify archiver that history file may be archived immediately
5975          */
5976         if (XLogArchivingActive())
5977         {
5978                 BackupHistoryFileName(histfilepath, ThisTimeLineID, _logId, _logSeg,
5979                                                           startpoint.xrecoff % XLogSegSize);
5980                 XLogArchiveNotify(histfilepath);
5981         }
5982
5983         /*
5984          * We're done.  As a convenience, return the ending WAL offset.
5985          */
5986         snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
5987                          stoppoint.xlogid, stoppoint.xrecoff);
5988         result = DatumGetTextP(DirectFunctionCall1(textin,
5989                                                                                  CStringGetDatum(stopxlogfilename)));
5990         PG_RETURN_TEXT_P(result);
5991 }
5992
5993 /*
5994  * read_backup_label: check to see if a backup_label file is present
5995  *
5996  * If we see a backup_label during recovery, we assume that we are recovering
5997  * from a backup dump file, and we therefore roll forward from the checkpoint
5998  * identified by the label file, NOT what pg_control says.      This avoids the
5999  * problem that pg_control might have been archived one or more checkpoints
6000  * later than the start of the dump, and so if we rely on it as the start
6001  * point, we will fail to restore a consistent database state.
6002  *
6003  * We also attempt to retrieve the corresponding backup history file.
6004  * If successful, set recoveryMinXlogOffset to constrain valid PITR stopping
6005  * points.
6006  *
6007  * Returns TRUE if a backup_label was found (and fills the checkpoint
6008  * location into *checkPointLoc); returns FALSE if not.
6009  */
6010 static bool
6011 read_backup_label(XLogRecPtr *checkPointLoc)
6012 {
6013         XLogRecPtr      startpoint;
6014         XLogRecPtr      stoppoint;
6015         char            histfilename[MAXFNAMELEN];
6016         char            histfilepath[MAXPGPATH];
6017         char            startxlogfilename[MAXFNAMELEN];
6018         char            stopxlogfilename[MAXFNAMELEN];
6019         TimeLineID      tli;
6020         uint32          _logId;
6021         uint32          _logSeg;
6022         FILE       *lfp;
6023         FILE       *fp;
6024         char            ch;
6025
6026         /*
6027          * See if label file is present
6028          */
6029         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6030         if (!lfp)
6031         {
6032                 if (errno != ENOENT)
6033                         ereport(FATAL,
6034                                         (errcode_for_file_access(),
6035                                          errmsg("could not read file \"%s\": %m",
6036                                                         BACKUP_LABEL_FILE)));
6037                 return false;                   /* it's not there, all is fine */
6038         }
6039
6040         /*
6041          * Read and parse the START WAL LOCATION and CHECKPOINT lines (this code
6042          * is pretty crude, but we are not expecting any variability in the file
6043          * format).
6044          */
6045         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
6046                            &startpoint.xlogid, &startpoint.xrecoff, &tli,
6047                            startxlogfilename, &ch) != 5 || ch != '\n')
6048                 ereport(FATAL,
6049                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6050                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6051         if (fscanf(lfp, "CHECKPOINT LOCATION: %X/%X%c",
6052                            &checkPointLoc->xlogid, &checkPointLoc->xrecoff,
6053                            &ch) != 3 || ch != '\n')
6054                 ereport(FATAL,
6055                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6056                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6057         if (ferror(lfp) || FreeFile(lfp))
6058                 ereport(FATAL,
6059                                 (errcode_for_file_access(),
6060                                  errmsg("could not read file \"%s\": %m",
6061                                                 BACKUP_LABEL_FILE)));
6062
6063         /*
6064          * Try to retrieve the backup history file (no error if we can't)
6065          */
6066         XLByteToSeg(startpoint, _logId, _logSeg);
6067         BackupHistoryFileName(histfilename, tli, _logId, _logSeg,
6068                                                   startpoint.xrecoff % XLogSegSize);
6069
6070         if (InArchiveRecovery)
6071                 RestoreArchivedFile(histfilepath, histfilename, "RECOVERYHISTORY", 0);
6072         else
6073                 BackupHistoryFilePath(histfilepath, tli, _logId, _logSeg,
6074                                                           startpoint.xrecoff % XLogSegSize);
6075
6076         fp = AllocateFile(histfilepath, "r");
6077         if (fp)
6078         {
6079                 /*
6080                  * Parse history file to identify stop point.
6081                  */
6082                 if (fscanf(fp, "START WAL LOCATION: %X/%X (file %24s)%c",
6083                                    &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
6084                                    &ch) != 4 || ch != '\n')
6085                         ereport(FATAL,
6086                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6087                                          errmsg("invalid data in file \"%s\"", histfilename)));
6088                 if (fscanf(fp, "STOP WAL LOCATION: %X/%X (file %24s)%c",
6089                                    &stoppoint.xlogid, &stoppoint.xrecoff, stopxlogfilename,
6090                                    &ch) != 4 || ch != '\n')
6091                         ereport(FATAL,
6092                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6093                                          errmsg("invalid data in file \"%s\"", histfilename)));
6094                 recoveryMinXlogOffset = stoppoint;
6095                 if (ferror(fp) || FreeFile(fp))
6096                         ereport(FATAL,
6097                                         (errcode_for_file_access(),
6098                                          errmsg("could not read file \"%s\": %m",
6099                                                         histfilepath)));
6100         }
6101
6102         return true;
6103 }
6104
6105 /*
6106  * remove_backup_label: remove any extant backup_label after successful
6107  * recovery.  Once we have completed the end-of-recovery checkpoint there
6108  * is no reason to have to replay from the start point indicated by the
6109  * label (and indeed we'll probably have removed/recycled the needed WAL
6110  * segments), so remove the label to prevent trouble in later crash recoveries.
6111  */
6112 static void
6113 remove_backup_label(void)
6114 {
6115         if (unlink(BACKUP_LABEL_FILE) != 0)
6116                 if (errno != ENOENT)
6117                         ereport(FATAL,
6118                                         (errcode_for_file_access(),
6119                                          errmsg("could not remove file \"%s\": %m",
6120                                                         BACKUP_LABEL_FILE)));
6121 }
6122
6123 /*
6124  * Error context callback for errors occurring during rm_redo().
6125  */
6126 static void
6127 rm_redo_error_callback(void *arg)
6128 {
6129         XLogRecord              *record = (XLogRecord *) arg;
6130         StringInfoData   buf;
6131
6132         initStringInfo(&buf);
6133         RmgrTable[record->xl_rmid].rm_desc(&buf, 
6134                                                                            record->xl_info, 
6135                                                                            XLogRecGetData(record));
6136
6137         /* don't bother emitting empty description */
6138         if (buf.len > 0)
6139                 errcontext("xlog redo %s", buf.data);
6140
6141         pfree(buf.data);
6142 }