OSDN Git Service

Wording cleanup for error messages. Also change can't -> cannot.
[pg-rex/syncrep.git] / src / backend / access / transam / xlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  *              PostgreSQL transaction log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.261 2007/02/01 19:10:25 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include <ctype.h>
18 #include <fcntl.h>
19 #include <signal.h>
20 #include <time.h>
21 #include <sys/stat.h>
22 #include <sys/time.h>
23 #include <sys/wait.h>
24 #include <unistd.h>
25
26 #include "access/clog.h"
27 #include "access/heapam.h"
28 #include "access/multixact.h"
29 #include "access/subtrans.h"
30 #include "access/transam.h"
31 #include "access/twophase.h"
32 #include "access/xact.h"
33 #include "access/xlog_internal.h"
34 #include "access/xlogutils.h"
35 #include "catalog/catversion.h"
36 #include "catalog/pg_control.h"
37 #include "catalog/pg_type.h"
38 #include "funcapi.h"
39 #include "miscadmin.h"
40 #include "pgstat.h"
41 #include "postmaster/bgwriter.h"
42 #include "storage/bufpage.h"
43 #include "storage/fd.h"
44 #include "storage/pmsignal.h"
45 #include "storage/procarray.h"
46 #include "storage/spin.h"
47 #include "utils/builtins.h"
48 #include "utils/nabstime.h"
49 #include "utils/pg_locale.h"
50
51
52 /*
53  *      Because O_DIRECT bypasses the kernel buffers, and because we never
54  *      read those buffers except during crash recovery, it is a win to use
55  *      it in all cases where we sync on each write().  We could allow O_DIRECT
56  *      with fsync(), but because skipping the kernel buffer forces writes out
57  *      quickly, it seems best just to use it for O_SYNC.  It is hard to imagine
58  *      how fsync() could be a win for O_DIRECT compared to O_SYNC and O_DIRECT.
59  *      Also, O_DIRECT is never enough to force data to the drives, it merely
60  *      tries to bypass the kernel cache, so we still need O_SYNC or fsync().
61  */
62 #ifdef O_DIRECT
63 #define PG_O_DIRECT                             O_DIRECT
64 #else
65 #define PG_O_DIRECT                             0
66 #endif
67
68 /*
69  * This chunk of hackery attempts to determine which file sync methods
70  * are available on the current platform, and to choose an appropriate
71  * default method.      We assume that fsync() is always available, and that
72  * configure determined whether fdatasync() is.
73  */
74 #if defined(O_SYNC)
75 #define BARE_OPEN_SYNC_FLAG             O_SYNC
76 #elif defined(O_FSYNC)
77 #define BARE_OPEN_SYNC_FLAG             O_FSYNC
78 #endif
79 #ifdef BARE_OPEN_SYNC_FLAG
80 #define OPEN_SYNC_FLAG                  (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
81 #endif
82
83 #if defined(O_DSYNC)
84 #if defined(OPEN_SYNC_FLAG)
85 /* O_DSYNC is distinct? */
86 #if O_DSYNC != BARE_OPEN_SYNC_FLAG
87 #define OPEN_DATASYNC_FLAG              (O_DSYNC | PG_O_DIRECT)
88 #endif
89 #else                                                   /* !defined(OPEN_SYNC_FLAG) */
90 /* Win32 only has O_DSYNC */
91 #define OPEN_DATASYNC_FLAG              (O_DSYNC | PG_O_DIRECT)
92 #endif
93 #endif
94
95 #if defined(OPEN_DATASYNC_FLAG)
96 #define DEFAULT_SYNC_METHOD_STR "open_datasync"
97 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_OPEN
98 #define DEFAULT_SYNC_FLAGBIT    OPEN_DATASYNC_FLAG
99 #elif defined(HAVE_FDATASYNC)
100 #define DEFAULT_SYNC_METHOD_STR "fdatasync"
101 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FDATASYNC
102 #define DEFAULT_SYNC_FLAGBIT    0
103 #elif defined(HAVE_FSYNC_WRITETHROUGH_ONLY)
104 #define DEFAULT_SYNC_METHOD_STR "fsync_writethrough"
105 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FSYNC_WRITETHROUGH
106 #define DEFAULT_SYNC_FLAGBIT    0
107 #else
108 #define DEFAULT_SYNC_METHOD_STR "fsync"
109 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FSYNC
110 #define DEFAULT_SYNC_FLAGBIT    0
111 #endif
112
113
114 /*
115  * Limitation of buffer-alignment for direct IO depends on OS and filesystem,
116  * but XLOG_BLCKSZ is assumed to be enough for it.
117  */
118 #ifdef O_DIRECT
119 #define ALIGNOF_XLOG_BUFFER             XLOG_BLCKSZ
120 #else
121 #define ALIGNOF_XLOG_BUFFER             ALIGNOF_BUFFER
122 #endif
123
124
125 /* File path names (all relative to $PGDATA) */
126 #define BACKUP_LABEL_FILE               "backup_label"
127 #define BACKUP_LABEL_OLD                "backup_label.old"
128 #define RECOVERY_COMMAND_FILE   "recovery.conf"
129 #define RECOVERY_COMMAND_DONE   "recovery.done"
130
131
132 /* User-settable parameters */
133 int                     CheckPointSegments = 3;
134 int                     XLOGbuffers = 8;
135 int                     XLogArchiveTimeout = 0;
136 char       *XLogArchiveCommand = NULL;
137 char       *XLOG_sync_method = NULL;
138 const char      XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
139 bool            fullPageWrites = true;
140
141 #ifdef WAL_DEBUG
142 bool            XLOG_DEBUG = false;
143 #endif
144
145 /*
146  * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
147  * preallocated XLOG segments --- we try to have at least XLOGfiles advance
148  * segments but no more than XLOGfileslop segments.  This could
149  * be made a separate GUC variable, but at present I think it's sufficient
150  * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
151  * checkpoint will free no more than 2*CheckPointSegments log segments, and
152  * we want to recycle all of them; the +1 allows boundary cases to happen
153  * without wasting a delete/create-segment cycle.
154  */
155
156 #define XLOGfileslop    (2*CheckPointSegments + 1)
157
158
159 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
160 int                     sync_method = DEFAULT_SYNC_METHOD;
161 static int      open_sync_bit = DEFAULT_SYNC_FLAGBIT;
162
163 #define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
164
165
166 /*
167  * ThisTimeLineID will be same in all backends --- it identifies current
168  * WAL timeline for the database system.
169  */
170 TimeLineID      ThisTimeLineID = 0;
171
172 /* Are we doing recovery from XLOG? */
173 bool            InRecovery = false;
174
175 /* Are we recovering using offline XLOG archives? */
176 static bool InArchiveRecovery = false;
177
178 /* Was the last xlog file restored from archive, or local? */
179 static bool restoredFromArchive = false;
180
181 /* options taken from recovery.conf */
182 static char *recoveryRestoreCommand = NULL;
183 static bool recoveryTarget = false;
184 static bool recoveryTargetExact = false;
185 static bool recoveryTargetInclusive = true;
186 static TransactionId recoveryTargetXid;
187 static time_t recoveryTargetTime;
188
189 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
190 static TransactionId recoveryStopXid;
191 static time_t recoveryStopTime;
192 static bool recoveryStopAfter;
193
194 /*
195  * During normal operation, the only timeline we care about is ThisTimeLineID.
196  * During recovery, however, things are more complicated.  To simplify life
197  * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
198  * scan through the WAL history (that is, it is the line that was active when
199  * the currently-scanned WAL record was generated).  We also need these
200  * timeline values:
201  *
202  * recoveryTargetTLI: the desired timeline that we want to end in.
203  *
204  * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
205  * its known parents, newest first (so recoveryTargetTLI is always the
206  * first list member).  Only these TLIs are expected to be seen in the WAL
207  * segments we read, and indeed only these TLIs will be considered as
208  * candidate WAL files to open at all.
209  *
210  * curFileTLI: the TLI appearing in the name of the current input WAL file.
211  * (This is not necessarily the same as ThisTimeLineID, because we could
212  * be scanning data that was copied from an ancestor timeline when the current
213  * file was created.)  During a sequential scan we do not allow this value
214  * to decrease.
215  */
216 static TimeLineID recoveryTargetTLI;
217 static List *expectedTLIs;
218 static TimeLineID curFileTLI;
219
220 /*
221  * MyLastRecPtr points to the start of the last XLOG record inserted by the
222  * current transaction.  If MyLastRecPtr.xrecoff == 0, then the current
223  * xact hasn't yet inserted any transaction-controlled XLOG records.
224  *
225  * Note that XLOG records inserted outside transaction control are not
226  * reflected into MyLastRecPtr.  They do, however, cause MyXactMadeXLogEntry
227  * to be set true.      The latter can be used to test whether the current xact
228  * made any loggable changes (including out-of-xact changes, such as
229  * sequence updates).
230  *
231  * When we insert/update/delete a tuple in a temporary relation, we do not
232  * make any XLOG record, since we don't care about recovering the state of
233  * the temp rel after a crash.  However, we will still need to remember
234  * whether our transaction committed or aborted in that case.  So, we must
235  * set MyXactMadeTempRelUpdate true to indicate that the XID will be of
236  * interest later.
237  */
238 XLogRecPtr      MyLastRecPtr = {0, 0};
239
240 bool            MyXactMadeXLogEntry = false;
241
242 bool            MyXactMadeTempRelUpdate = false;
243
244 /*
245  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
246  * current backend.  It is updated for all inserts, transaction-controlled
247  * or not.      ProcLastRecEnd is similar but points to end+1 of last record.
248  */
249 static XLogRecPtr ProcLastRecPtr = {0, 0};
250
251 XLogRecPtr      ProcLastRecEnd = {0, 0};
252
253 /*
254  * RedoRecPtr is this backend's local copy of the REDO record pointer
255  * (which is almost but not quite the same as a pointer to the most recent
256  * CHECKPOINT record).  We update this from the shared-memory copy,
257  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
258  * hold the Insert lock).  See XLogInsert for details.  We are also allowed
259  * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
260  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
261  * InitXLOGAccess.
262  */
263 static XLogRecPtr RedoRecPtr;
264
265 /*----------
266  * Shared-memory data structures for XLOG control
267  *
268  * LogwrtRqst indicates a byte position that we need to write and/or fsync
269  * the log up to (all records before that point must be written or fsynced).
270  * LogwrtResult indicates the byte positions we have already written/fsynced.
271  * These structs are identical but are declared separately to indicate their
272  * slightly different functions.
273  *
274  * We do a lot of pushups to minimize the amount of access to lockable
275  * shared memory values.  There are actually three shared-memory copies of
276  * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
277  *              XLogCtl->LogwrtResult is protected by info_lck
278  *              XLogCtl->Write.LogwrtResult is protected by WALWriteLock
279  *              XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
280  * One must hold the associated lock to read or write any of these, but
281  * of course no lock is needed to read/write the unshared LogwrtResult.
282  *
283  * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
284  * right", since both are updated by a write or flush operation before
285  * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
286  * is that it can be examined/modified by code that already holds WALWriteLock
287  * without needing to grab info_lck as well.
288  *
289  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
290  * but is updated when convenient.      Again, it exists for the convenience of
291  * code that is already holding WALInsertLock but not the other locks.
292  *
293  * The unshared LogwrtResult may lag behind any or all of these, and again
294  * is updated when convenient.
295  *
296  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
297  * (protected by info_lck), but we don't need to cache any copies of it.
298  *
299  * Note that this all works because the request and result positions can only
300  * advance forward, never back up, and so we can easily determine which of two
301  * values is "more up to date".
302  *
303  * info_lck is only held long enough to read/update the protected variables,
304  * so it's a plain spinlock.  The other locks are held longer (potentially
305  * over I/O operations), so we use LWLocks for them.  These locks are:
306  *
307  * WALInsertLock: must be held to insert a record into the WAL buffers.
308  *
309  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
310  * XLogFlush).
311  *
312  * ControlFileLock: must be held to read/update control file or create
313  * new log file.
314  *
315  * CheckpointLock: must be held to do a checkpoint (ensures only one
316  * checkpointer at a time; currently, with all checkpoints done by the
317  * bgwriter, this is just pro forma).
318  *
319  *----------
320  */
321
322 typedef struct XLogwrtRqst
323 {
324         XLogRecPtr      Write;                  /* last byte + 1 to write out */
325         XLogRecPtr      Flush;                  /* last byte + 1 to flush */
326 } XLogwrtRqst;
327
328 typedef struct XLogwrtResult
329 {
330         XLogRecPtr      Write;                  /* last byte + 1 written out */
331         XLogRecPtr      Flush;                  /* last byte + 1 flushed */
332 } XLogwrtResult;
333
334 /*
335  * Shared state data for XLogInsert.
336  */
337 typedef struct XLogCtlInsert
338 {
339         XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
340         XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
341         int                     curridx;                /* current block index in cache */
342         XLogPageHeader currpage;        /* points to header of block in cache */
343         char       *currpos;            /* current insertion point in cache */
344         XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
345         bool            forcePageWrites;        /* forcing full-page writes for PITR? */
346 } XLogCtlInsert;
347
348 /*
349  * Shared state data for XLogWrite/XLogFlush.
350  */
351 typedef struct XLogCtlWrite
352 {
353         XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
354         int                     curridx;                /* cache index of next block to write */
355         time_t          lastSegSwitchTime;              /* time of last xlog segment switch */
356 } XLogCtlWrite;
357
358 /*
359  * Total shared-memory state for XLOG.
360  */
361 typedef struct XLogCtlData
362 {
363         /* Protected by WALInsertLock: */
364         XLogCtlInsert Insert;
365
366         /* Protected by info_lck: */
367         XLogwrtRqst LogwrtRqst;
368         XLogwrtResult LogwrtResult;
369         uint32          ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
370         TransactionId ckptXid;
371
372         /* Protected by WALWriteLock: */
373         XLogCtlWrite Write;
374
375         /*
376          * These values do not change after startup, although the pointed-to pages
377          * and xlblocks values certainly do.  Permission to read/write the pages
378          * and xlblocks values depends on WALInsertLock and WALWriteLock.
379          */
380         char       *pages;                      /* buffers for unwritten XLOG pages */
381         XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
382         Size            XLogCacheByte;  /* # bytes in xlog buffers */
383         int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
384         TimeLineID      ThisTimeLineID;
385
386         slock_t         info_lck;               /* locks shared variables shown above */
387 } XLogCtlData;
388
389 static XLogCtlData *XLogCtl = NULL;
390
391 /*
392  * We maintain an image of pg_control in shared memory.
393  */
394 static ControlFileData *ControlFile = NULL;
395
396 /*
397  * Macros for managing XLogInsert state.  In most cases, the calling routine
398  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
399  * so these are passed as parameters instead of being fetched via XLogCtl.
400  */
401
402 /* Free space remaining in the current xlog page buffer */
403 #define INSERT_FREESPACE(Insert)  \
404         (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
405
406 /* Construct XLogRecPtr value for current insertion point */
407 #define INSERT_RECPTR(recptr,Insert,curridx)  \
408         ( \
409           (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
410           (recptr).xrecoff = \
411                 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
412         )
413
414 #define PrevBufIdx(idx)         \
415                 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
416
417 #define NextBufIdx(idx)         \
418                 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
419
420 /*
421  * Private, possibly out-of-date copy of shared LogwrtResult.
422  * See discussion above.
423  */
424 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
425
426 /*
427  * openLogFile is -1 or a kernel FD for an open log file segment.
428  * When it's open, openLogOff is the current seek offset in the file.
429  * openLogId/openLogSeg identify the segment.  These variables are only
430  * used to write the XLOG, and so will normally refer to the active segment.
431  */
432 static int      openLogFile = -1;
433 static uint32 openLogId = 0;
434 static uint32 openLogSeg = 0;
435 static uint32 openLogOff = 0;
436
437 /*
438  * These variables are used similarly to the ones above, but for reading
439  * the XLOG.  Note, however, that readOff generally represents the offset
440  * of the page just read, not the seek position of the FD itself, which
441  * will be just past that page.
442  */
443 static int      readFile = -1;
444 static uint32 readId = 0;
445 static uint32 readSeg = 0;
446 static uint32 readOff = 0;
447
448 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
449 static char *readBuf = NULL;
450
451 /* Buffer for current ReadRecord result (expandable) */
452 static char *readRecordBuf = NULL;
453 static uint32 readRecordBufSize = 0;
454
455 /* State information for XLOG reading */
456 static XLogRecPtr ReadRecPtr;   /* start of last record read */
457 static XLogRecPtr EndRecPtr;    /* end+1 of last record read */
458 static XLogRecord *nextRecord = NULL;
459 static TimeLineID lastPageTLI = 0;
460
461 static bool InRedo = false;
462
463
464 static void XLogArchiveNotify(const char *xlog);
465 static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
466 static bool XLogArchiveCheckDone(const char *xlog);
467 static void XLogArchiveCleanup(const char *xlog);
468 static void readRecoveryCommandFile(void);
469 static void exitArchiveRecovery(TimeLineID endTLI,
470                                         uint32 endLogId, uint32 endLogSeg);
471 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
472 static void CheckPointGuts(XLogRecPtr checkPointRedo);
473
474 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
475                                 XLogRecPtr *lsn, BkpBlock *bkpb);
476 static bool AdvanceXLInsertBuffer(bool new_segment);
477 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
478 static int XLogFileInit(uint32 log, uint32 seg,
479                          bool *use_existent, bool use_lock);
480 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
481                                            bool find_free, int *max_advance,
482                                            bool use_lock);
483 static int      XLogFileOpen(uint32 log, uint32 seg);
484 static int      XLogFileRead(uint32 log, uint32 seg, int emode);
485 static void XLogFileClose(void);
486 static bool RestoreArchivedFile(char *path, const char *xlogfname,
487                                         const char *recovername, off_t expectedSize);
488 static int      PreallocXlogFiles(XLogRecPtr endptr);
489 static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
490                                 int *nsegsremoved, int *nsegsrecycled);
491 static void CleanupBackupHistory(void);
492 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
493 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
494 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
495 static List *readTimeLineHistory(TimeLineID targetTLI);
496 static bool existsTimeLineHistory(TimeLineID probeTLI);
497 static TimeLineID findNewestTimeLine(TimeLineID startTLI);
498 static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
499                                          TimeLineID endTLI,
500                                          uint32 endLogId, uint32 endLogSeg);
501 static void WriteControlFile(void);
502 static void ReadControlFile(void);
503 static char *str_time(time_t tnow);
504 static void issue_xlog_fsync(void);
505
506 #ifdef WAL_DEBUG
507 static void xlog_outrec(StringInfo buf, XLogRecord *record);
508 #endif
509 static bool read_backup_label(XLogRecPtr *checkPointLoc,
510                                   XLogRecPtr *minRecoveryLoc);
511 static void rm_redo_error_callback(void *arg);
512
513
514 /*
515  * Insert an XLOG record having the specified RMID and info bytes,
516  * with the body of the record being the data chunk(s) described by
517  * the rdata chain (see xlog.h for notes about rdata).
518  *
519  * Returns XLOG pointer to end of record (beginning of next record).
520  * This can be used as LSN for data pages affected by the logged action.
521  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
522  * before the data page can be written out.  This implements the basic
523  * WAL rule "write the log before the data".)
524  *
525  * NB: this routine feels free to scribble on the XLogRecData structs,
526  * though not on the data they reference.  This is OK since the XLogRecData
527  * structs are always just temporaries in the calling code.
528  */
529 XLogRecPtr
530 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
531 {
532         XLogCtlInsert *Insert = &XLogCtl->Insert;
533         XLogRecord *record;
534         XLogContRecord *contrecord;
535         XLogRecPtr      RecPtr;
536         XLogRecPtr      WriteRqst;
537         uint32          freespace;
538         int                     curridx;
539         XLogRecData *rdt;
540         Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
541         bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
542         BkpBlock        dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
543         XLogRecPtr      dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
544         XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
545         XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
546         XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
547         pg_crc32        rdata_crc;
548         uint32          len,
549                                 write_len;
550         unsigned        i;
551         XLogwrtRqst LogwrtRqst;
552         bool            updrqst;
553         bool            doPageWrites;
554         bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
555         bool            no_tran = (rmid == RM_XLOG_ID);
556
557         if (info & XLR_INFO_MASK)
558         {
559                 if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
560                         elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK));
561                 no_tran = true;
562                 info &= ~XLR_INFO_MASK;
563         }
564
565         /*
566          * In bootstrap mode, we don't actually log anything but XLOG resources;
567          * return a phony record pointer.
568          */
569         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
570         {
571                 RecPtr.xlogid = 0;
572                 RecPtr.xrecoff = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
573                 return RecPtr;
574         }
575
576         /*
577          * Here we scan the rdata chain, determine which buffers must be backed
578          * up, and compute the CRC values for the data.  Note that the record
579          * header isn't added into the CRC initially since we don't know the final
580          * length or info bits quite yet.  Thus, the CRC will represent the CRC of
581          * the whole record in the order "rdata, then backup blocks, then record
582          * header".
583          *
584          * We may have to loop back to here if a race condition is detected below.
585          * We could prevent the race by doing all this work while holding the
586          * insert lock, but it seems better to avoid doing CRC calculations while
587          * holding the lock.  This means we have to be careful about modifying the
588          * rdata chain until we know we aren't going to loop back again.  The only
589          * change we allow ourselves to make earlier is to set rdt->data = NULL in
590          * chain items we have decided we will have to back up the whole buffer
591          * for.  This is OK because we will certainly decide the same thing again
592          * for those items if we do it over; doing it here saves an extra pass
593          * over the chain later.
594          */
595 begin:;
596         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
597         {
598                 dtbuf[i] = InvalidBuffer;
599                 dtbuf_bkp[i] = false;
600         }
601
602         /*
603          * Decide if we need to do full-page writes in this XLOG record: true if
604          * full_page_writes is on or we have a PITR request for it.  Since we
605          * don't yet have the insert lock, forcePageWrites could change under us,
606          * but we'll recheck it once we have the lock.
607          */
608         doPageWrites = fullPageWrites || Insert->forcePageWrites;
609
610         INIT_CRC32(rdata_crc);
611         len = 0;
612         for (rdt = rdata;;)
613         {
614                 if (rdt->buffer == InvalidBuffer)
615                 {
616                         /* Simple data, just include it */
617                         len += rdt->len;
618                         COMP_CRC32(rdata_crc, rdt->data, rdt->len);
619                 }
620                 else
621                 {
622                         /* Find info for buffer */
623                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
624                         {
625                                 if (rdt->buffer == dtbuf[i])
626                                 {
627                                         /* Buffer already referenced by earlier chain item */
628                                         if (dtbuf_bkp[i])
629                                                 rdt->data = NULL;
630                                         else if (rdt->data)
631                                         {
632                                                 len += rdt->len;
633                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
634                                         }
635                                         break;
636                                 }
637                                 if (dtbuf[i] == InvalidBuffer)
638                                 {
639                                         /* OK, put it in this slot */
640                                         dtbuf[i] = rdt->buffer;
641                                         if (XLogCheckBuffer(rdt, doPageWrites,
642                                                                                 &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
643                                         {
644                                                 dtbuf_bkp[i] = true;
645                                                 rdt->data = NULL;
646                                         }
647                                         else if (rdt->data)
648                                         {
649                                                 len += rdt->len;
650                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
651                                         }
652                                         break;
653                                 }
654                         }
655                         if (i >= XLR_MAX_BKP_BLOCKS)
656                                 elog(PANIC, "can backup at most %d blocks per xlog record",
657                                          XLR_MAX_BKP_BLOCKS);
658                 }
659                 /* Break out of loop when rdt points to last chain item */
660                 if (rdt->next == NULL)
661                         break;
662                 rdt = rdt->next;
663         }
664
665         /*
666          * Now add the backup block headers and data into the CRC
667          */
668         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
669         {
670                 if (dtbuf_bkp[i])
671                 {
672                         BkpBlock   *bkpb = &(dtbuf_xlg[i]);
673                         char       *page;
674
675                         COMP_CRC32(rdata_crc,
676                                            (char *) bkpb,
677                                            sizeof(BkpBlock));
678                         page = (char *) BufferGetBlock(dtbuf[i]);
679                         if (bkpb->hole_length == 0)
680                         {
681                                 COMP_CRC32(rdata_crc,
682                                                    page,
683                                                    BLCKSZ);
684                         }
685                         else
686                         {
687                                 /* must skip the hole */
688                                 COMP_CRC32(rdata_crc,
689                                                    page,
690                                                    bkpb->hole_offset);
691                                 COMP_CRC32(rdata_crc,
692                                                    page + (bkpb->hole_offset + bkpb->hole_length),
693                                                    BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
694                         }
695                 }
696         }
697
698         /*
699          * NOTE: We disallow len == 0 because it provides a useful bit of extra
700          * error checking in ReadRecord.  This means that all callers of
701          * XLogInsert must supply at least some not-in-a-buffer data.  However, we
702          * make an exception for XLOG SWITCH records because we don't want them to
703          * ever cross a segment boundary.
704          */
705         if (len == 0 && !isLogSwitch)
706                 elog(PANIC, "invalid xlog record length %u", len);
707
708         START_CRIT_SECTION();
709
710         /* update LogwrtResult before doing cache fill check */
711         {
712                 /* use volatile pointer to prevent code rearrangement */
713                 volatile XLogCtlData *xlogctl = XLogCtl;
714
715                 SpinLockAcquire(&xlogctl->info_lck);
716                 LogwrtRqst = xlogctl->LogwrtRqst;
717                 LogwrtResult = xlogctl->LogwrtResult;
718                 SpinLockRelease(&xlogctl->info_lck);
719         }
720
721         /*
722          * If cache is half filled then try to acquire write lock and do
723          * XLogWrite. Ignore any fractional blocks in performing this check.
724          */
725         LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % XLOG_BLCKSZ;
726         if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
727                 (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
728                  XLogCtl->XLogCacheByte / 2))
729         {
730                 if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
731                 {
732                         /*
733                          * Since the amount of data we write here is completely optional
734                          * anyway, tell XLogWrite it can be "flexible" and stop at a
735                          * convenient boundary.  This allows writes triggered by this
736                          * mechanism to synchronize with the cache boundaries, so that in
737                          * a long transaction we'll basically dump alternating halves of
738                          * the buffer array.
739                          */
740                         LogwrtResult = XLogCtl->Write.LogwrtResult;
741                         if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
742                                 XLogWrite(LogwrtRqst, true, false);
743                         LWLockRelease(WALWriteLock);
744                 }
745         }
746
747         /* Now wait to get insert lock */
748         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
749
750         /*
751          * Check to see if my RedoRecPtr is out of date.  If so, may have to go
752          * back and recompute everything.  This can only happen just after a
753          * checkpoint, so it's better to be slow in this case and fast otherwise.
754          *
755          * If we aren't doing full-page writes then RedoRecPtr doesn't actually
756          * affect the contents of the XLOG record, so we'll update our local copy
757          * but not force a recomputation.
758          */
759         if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
760         {
761                 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
762                 RedoRecPtr = Insert->RedoRecPtr;
763
764                 if (doPageWrites)
765                 {
766                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
767                         {
768                                 if (dtbuf[i] == InvalidBuffer)
769                                         continue;
770                                 if (dtbuf_bkp[i] == false &&
771                                         XLByteLE(dtbuf_lsn[i], RedoRecPtr))
772                                 {
773                                         /*
774                                          * Oops, this buffer now needs to be backed up, but we
775                                          * didn't think so above.  Start over.
776                                          */
777                                         LWLockRelease(WALInsertLock);
778                                         END_CRIT_SECTION();
779                                         goto begin;
780                                 }
781                         }
782                 }
783         }
784
785         /*
786          * Also check to see if forcePageWrites was just turned on; if we weren't
787          * already doing full-page writes then go back and recompute. (If it was
788          * just turned off, we could recompute the record without full pages, but
789          * we choose not to bother.)
790          */
791         if (Insert->forcePageWrites && !doPageWrites)
792         {
793                 /* Oops, must redo it with full-page data */
794                 LWLockRelease(WALInsertLock);
795                 END_CRIT_SECTION();
796                 goto begin;
797         }
798
799         /*
800          * Make additional rdata chain entries for the backup blocks, so that we
801          * don't need to special-case them in the write loop.  Note that we have
802          * now irrevocably changed the input rdata chain.  At the exit of this
803          * loop, write_len includes the backup block data.
804          *
805          * Also set the appropriate info bits to show which buffers were backed
806          * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
807          * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
808          */
809         write_len = len;
810         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
811         {
812                 BkpBlock   *bkpb;
813                 char       *page;
814
815                 if (!dtbuf_bkp[i])
816                         continue;
817
818                 info |= XLR_SET_BKP_BLOCK(i);
819
820                 bkpb = &(dtbuf_xlg[i]);
821                 page = (char *) BufferGetBlock(dtbuf[i]);
822
823                 rdt->next = &(dtbuf_rdt1[i]);
824                 rdt = rdt->next;
825
826                 rdt->data = (char *) bkpb;
827                 rdt->len = sizeof(BkpBlock);
828                 write_len += sizeof(BkpBlock);
829
830                 rdt->next = &(dtbuf_rdt2[i]);
831                 rdt = rdt->next;
832
833                 if (bkpb->hole_length == 0)
834                 {
835                         rdt->data = page;
836                         rdt->len = BLCKSZ;
837                         write_len += BLCKSZ;
838                         rdt->next = NULL;
839                 }
840                 else
841                 {
842                         /* must skip the hole */
843                         rdt->data = page;
844                         rdt->len = bkpb->hole_offset;
845                         write_len += bkpb->hole_offset;
846
847                         rdt->next = &(dtbuf_rdt3[i]);
848                         rdt = rdt->next;
849
850                         rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
851                         rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
852                         write_len += rdt->len;
853                         rdt->next = NULL;
854                 }
855         }
856
857         /*
858          * If there isn't enough space on the current XLOG page for a record
859          * header, advance to the next page (leaving the unused space as zeroes).
860          */
861         updrqst = false;
862         freespace = INSERT_FREESPACE(Insert);
863         if (freespace < SizeOfXLogRecord)
864         {
865                 updrqst = AdvanceXLInsertBuffer(false);
866                 freespace = INSERT_FREESPACE(Insert);
867         }
868
869         /* Compute record's XLOG location */
870         curridx = Insert->curridx;
871         INSERT_RECPTR(RecPtr, Insert, curridx);
872
873         /*
874          * If the record is an XLOG_SWITCH, and we are exactly at the start of a
875          * segment, we need not insert it (and don't want to because we'd like
876          * consecutive switch requests to be no-ops).  Instead, make sure
877          * everything is written and flushed through the end of the prior segment,
878          * and return the prior segment's end address.
879          */
880         if (isLogSwitch &&
881                 (RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
882         {
883                 /* We can release insert lock immediately */
884                 LWLockRelease(WALInsertLock);
885
886                 RecPtr.xrecoff -= SizeOfXLogLongPHD;
887                 if (RecPtr.xrecoff == 0)
888                 {
889                         /* crossing a logid boundary */
890                         RecPtr.xlogid -= 1;
891                         RecPtr.xrecoff = XLogFileSize;
892                 }
893
894                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
895                 LogwrtResult = XLogCtl->Write.LogwrtResult;
896                 if (!XLByteLE(RecPtr, LogwrtResult.Flush))
897                 {
898                         XLogwrtRqst FlushRqst;
899
900                         FlushRqst.Write = RecPtr;
901                         FlushRqst.Flush = RecPtr;
902                         XLogWrite(FlushRqst, false, false);
903                 }
904                 LWLockRelease(WALWriteLock);
905
906                 END_CRIT_SECTION();
907
908                 return RecPtr;
909         }
910
911         /* Insert record header */
912
913         record = (XLogRecord *) Insert->currpos;
914         record->xl_prev = Insert->PrevRecord;
915         record->xl_xid = GetCurrentTransactionIdIfAny();
916         record->xl_tot_len = SizeOfXLogRecord + write_len;
917         record->xl_len = len;           /* doesn't include backup blocks */
918         record->xl_info = info;
919         record->xl_rmid = rmid;
920
921         /* Now we can finish computing the record's CRC */
922         COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
923                            SizeOfXLogRecord - sizeof(pg_crc32));
924         FIN_CRC32(rdata_crc);
925         record->xl_crc = rdata_crc;
926
927 #ifdef WAL_DEBUG
928         if (XLOG_DEBUG)
929         {
930                 StringInfoData buf;
931
932                 initStringInfo(&buf);
933                 appendStringInfo(&buf, "INSERT @ %X/%X: ",
934                                                  RecPtr.xlogid, RecPtr.xrecoff);
935                 xlog_outrec(&buf, record);
936                 if (rdata->data != NULL)
937                 {
938                         appendStringInfo(&buf, " - ");
939                         RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
940                 }
941                 elog(LOG, "%s", buf.data);
942                 pfree(buf.data);
943         }
944 #endif
945
946         /* Record begin of record in appropriate places */
947         if (!no_tran)
948                 MyLastRecPtr = RecPtr;
949         ProcLastRecPtr = RecPtr;
950         Insert->PrevRecord = RecPtr;
951         MyXactMadeXLogEntry = true;
952
953         Insert->currpos += SizeOfXLogRecord;
954         freespace -= SizeOfXLogRecord;
955
956         /*
957          * Append the data, including backup blocks if any
958          */
959         while (write_len)
960         {
961                 while (rdata->data == NULL)
962                         rdata = rdata->next;
963
964                 if (freespace > 0)
965                 {
966                         if (rdata->len > freespace)
967                         {
968                                 memcpy(Insert->currpos, rdata->data, freespace);
969                                 rdata->data += freespace;
970                                 rdata->len -= freespace;
971                                 write_len -= freespace;
972                         }
973                         else
974                         {
975                                 memcpy(Insert->currpos, rdata->data, rdata->len);
976                                 freespace -= rdata->len;
977                                 write_len -= rdata->len;
978                                 Insert->currpos += rdata->len;
979                                 rdata = rdata->next;
980                                 continue;
981                         }
982                 }
983
984                 /* Use next buffer */
985                 updrqst = AdvanceXLInsertBuffer(false);
986                 curridx = Insert->curridx;
987                 /* Insert cont-record header */
988                 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
989                 contrecord = (XLogContRecord *) Insert->currpos;
990                 contrecord->xl_rem_len = write_len;
991                 Insert->currpos += SizeOfXLogContRecord;
992                 freespace = INSERT_FREESPACE(Insert);
993         }
994
995         /* Ensure next record will be properly aligned */
996         Insert->currpos = (char *) Insert->currpage +
997                 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
998         freespace = INSERT_FREESPACE(Insert);
999
1000         /*
1001          * The recptr I return is the beginning of the *next* record. This will be
1002          * stored as LSN for changed data pages...
1003          */
1004         INSERT_RECPTR(RecPtr, Insert, curridx);
1005
1006         /*
1007          * If the record is an XLOG_SWITCH, we must now write and flush all the
1008          * existing data, and then forcibly advance to the start of the next
1009          * segment.  It's not good to do this I/O while holding the insert lock,
1010          * but there seems too much risk of confusion if we try to release the
1011          * lock sooner.  Fortunately xlog switch needn't be a high-performance
1012          * operation anyway...
1013          */
1014         if (isLogSwitch)
1015         {
1016                 XLogCtlWrite *Write = &XLogCtl->Write;
1017                 XLogwrtRqst FlushRqst;
1018                 XLogRecPtr      OldSegEnd;
1019
1020                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1021
1022                 /*
1023                  * Flush through the end of the page containing XLOG_SWITCH, and
1024                  * perform end-of-segment actions (eg, notifying archiver).
1025                  */
1026                 WriteRqst = XLogCtl->xlblocks[curridx];
1027                 FlushRqst.Write = WriteRqst;
1028                 FlushRqst.Flush = WriteRqst;
1029                 XLogWrite(FlushRqst, false, true);
1030
1031                 /* Set up the next buffer as first page of next segment */
1032                 /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
1033                 (void) AdvanceXLInsertBuffer(true);
1034
1035                 /* There should be no unwritten data */
1036                 curridx = Insert->curridx;
1037                 Assert(curridx == Write->curridx);
1038
1039                 /* Compute end address of old segment */
1040                 OldSegEnd = XLogCtl->xlblocks[curridx];
1041                 OldSegEnd.xrecoff -= XLOG_BLCKSZ;
1042                 if (OldSegEnd.xrecoff == 0)
1043                 {
1044                         /* crossing a logid boundary */
1045                         OldSegEnd.xlogid -= 1;
1046                         OldSegEnd.xrecoff = XLogFileSize;
1047                 }
1048
1049                 /* Make it look like we've written and synced all of old segment */
1050                 LogwrtResult.Write = OldSegEnd;
1051                 LogwrtResult.Flush = OldSegEnd;
1052
1053                 /*
1054                  * Update shared-memory status --- this code should match XLogWrite
1055                  */
1056                 {
1057                         /* use volatile pointer to prevent code rearrangement */
1058                         volatile XLogCtlData *xlogctl = XLogCtl;
1059
1060                         SpinLockAcquire(&xlogctl->info_lck);
1061                         xlogctl->LogwrtResult = LogwrtResult;
1062                         if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1063                                 xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1064                         if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1065                                 xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1066                         SpinLockRelease(&xlogctl->info_lck);
1067                 }
1068
1069                 Write->LogwrtResult = LogwrtResult;
1070
1071                 LWLockRelease(WALWriteLock);
1072
1073                 updrqst = false;                /* done already */
1074         }
1075         else
1076         {
1077                 /* normal case, ie not xlog switch */
1078
1079                 /* Need to update shared LogwrtRqst if some block was filled up */
1080                 if (freespace < SizeOfXLogRecord)
1081                 {
1082                         /* curridx is filled and available for writing out */
1083                         updrqst = true;
1084                 }
1085                 else
1086                 {
1087                         /* if updrqst already set, write through end of previous buf */
1088                         curridx = PrevBufIdx(curridx);
1089                 }
1090                 WriteRqst = XLogCtl->xlblocks[curridx];
1091         }
1092
1093         LWLockRelease(WALInsertLock);
1094
1095         if (updrqst)
1096         {
1097                 /* use volatile pointer to prevent code rearrangement */
1098                 volatile XLogCtlData *xlogctl = XLogCtl;
1099
1100                 SpinLockAcquire(&xlogctl->info_lck);
1101                 /* advance global request to include new block(s) */
1102                 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
1103                         xlogctl->LogwrtRqst.Write = WriteRqst;
1104                 /* update local result copy while I have the chance */
1105                 LogwrtResult = xlogctl->LogwrtResult;
1106                 SpinLockRelease(&xlogctl->info_lck);
1107         }
1108
1109         ProcLastRecEnd = RecPtr;
1110
1111         END_CRIT_SECTION();
1112
1113         return RecPtr;
1114 }
1115
1116 /*
1117  * Determine whether the buffer referenced by an XLogRecData item has to
1118  * be backed up, and if so fill a BkpBlock struct for it.  In any case
1119  * save the buffer's LSN at *lsn.
1120  */
1121 static bool
1122 XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
1123                                 XLogRecPtr *lsn, BkpBlock *bkpb)
1124 {
1125         PageHeader      page;
1126
1127         page = (PageHeader) BufferGetBlock(rdata->buffer);
1128
1129         /*
1130          * XXX We assume page LSN is first data on *every* page that can be passed
1131          * to XLogInsert, whether it otherwise has the standard page layout or
1132          * not.
1133          */
1134         *lsn = page->pd_lsn;
1135
1136         if (doPageWrites &&
1137                 XLByteLE(page->pd_lsn, RedoRecPtr))
1138         {
1139                 /*
1140                  * The page needs to be backed up, so set up *bkpb
1141                  */
1142                 bkpb->node = BufferGetFileNode(rdata->buffer);
1143                 bkpb->block = BufferGetBlockNumber(rdata->buffer);
1144
1145                 if (rdata->buffer_std)
1146                 {
1147                         /* Assume we can omit data between pd_lower and pd_upper */
1148                         uint16          lower = page->pd_lower;
1149                         uint16          upper = page->pd_upper;
1150
1151                         if (lower >= SizeOfPageHeaderData &&
1152                                 upper > lower &&
1153                                 upper <= BLCKSZ)
1154                         {
1155                                 bkpb->hole_offset = lower;
1156                                 bkpb->hole_length = upper - lower;
1157                         }
1158                         else
1159                         {
1160                                 /* No "hole" to compress out */
1161                                 bkpb->hole_offset = 0;
1162                                 bkpb->hole_length = 0;
1163                         }
1164                 }
1165                 else
1166                 {
1167                         /* Not a standard page header, don't try to eliminate "hole" */
1168                         bkpb->hole_offset = 0;
1169                         bkpb->hole_length = 0;
1170                 }
1171
1172                 return true;                    /* buffer requires backup */
1173         }
1174
1175         return false;                           /* buffer does not need to be backed up */
1176 }
1177
1178 /*
1179  * XLogArchiveNotify
1180  *
1181  * Create an archive notification file
1182  *
1183  * The name of the notification file is the message that will be picked up
1184  * by the archiver, e.g. we write 0000000100000001000000C6.ready
1185  * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
1186  * then when complete, rename it to 0000000100000001000000C6.done
1187  */
1188 static void
1189 XLogArchiveNotify(const char *xlog)
1190 {
1191         char            archiveStatusPath[MAXPGPATH];
1192         FILE       *fd;
1193
1194         /* insert an otherwise empty file called <XLOG>.ready */
1195         StatusFilePath(archiveStatusPath, xlog, ".ready");
1196         fd = AllocateFile(archiveStatusPath, "w");
1197         if (fd == NULL)
1198         {
1199                 ereport(LOG,
1200                                 (errcode_for_file_access(),
1201                                  errmsg("could not create archive status file \"%s\": %m",
1202                                                 archiveStatusPath)));
1203                 return;
1204         }
1205         if (FreeFile(fd))
1206         {
1207                 ereport(LOG,
1208                                 (errcode_for_file_access(),
1209                                  errmsg("could not write archive status file \"%s\": %m",
1210                                                 archiveStatusPath)));
1211                 return;
1212         }
1213
1214         /* Notify archiver that it's got something to do */
1215         if (IsUnderPostmaster)
1216                 SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);
1217 }
1218
1219 /*
1220  * Convenience routine to notify using log/seg representation of filename
1221  */
1222 static void
1223 XLogArchiveNotifySeg(uint32 log, uint32 seg)
1224 {
1225         char            xlog[MAXFNAMELEN];
1226
1227         XLogFileName(xlog, ThisTimeLineID, log, seg);
1228         XLogArchiveNotify(xlog);
1229 }
1230
1231 /*
1232  * XLogArchiveCheckDone
1233  *
1234  * This is called when we are ready to delete or recycle an old XLOG segment
1235  * file or backup history file.  If it is okay to delete it then return true.
1236  * If it is not time to delete it, make sure a .ready file exists, and return
1237  * false.
1238  *
1239  * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists,
1240  * then return false; else create <XLOG>.ready and return false.
1241  *
1242  * The reason we do things this way is so that if the original attempt to
1243  * create <XLOG>.ready fails, we'll retry during subsequent checkpoints.
1244  */
1245 static bool
1246 XLogArchiveCheckDone(const char *xlog)
1247 {
1248         char            archiveStatusPath[MAXPGPATH];
1249         struct stat stat_buf;
1250
1251         /* Always deletable if archiving is off */
1252         if (!XLogArchivingActive())
1253                 return true;
1254
1255         /* First check for .done --- this means archiver is done with it */
1256         StatusFilePath(archiveStatusPath, xlog, ".done");
1257         if (stat(archiveStatusPath, &stat_buf) == 0)
1258                 return true;
1259
1260         /* check for .ready --- this means archiver is still busy with it */
1261         StatusFilePath(archiveStatusPath, xlog, ".ready");
1262         if (stat(archiveStatusPath, &stat_buf) == 0)
1263                 return false;
1264
1265         /* Race condition --- maybe archiver just finished, so recheck */
1266         StatusFilePath(archiveStatusPath, xlog, ".done");
1267         if (stat(archiveStatusPath, &stat_buf) == 0)
1268                 return true;
1269
1270         /* Retry creation of the .ready file */
1271         XLogArchiveNotify(xlog);
1272         return false;
1273 }
1274
1275 /*
1276  * XLogArchiveCleanup
1277  *
1278  * Cleanup archive notification file(s) for a particular xlog segment
1279  */
1280 static void
1281 XLogArchiveCleanup(const char *xlog)
1282 {
1283         char            archiveStatusPath[MAXPGPATH];
1284
1285         /* Remove the .done file */
1286         StatusFilePath(archiveStatusPath, xlog, ".done");
1287         unlink(archiveStatusPath);
1288         /* should we complain about failure? */
1289
1290         /* Remove the .ready file if present --- normally it shouldn't be */
1291         StatusFilePath(archiveStatusPath, xlog, ".ready");
1292         unlink(archiveStatusPath);
1293         /* should we complain about failure? */
1294 }
1295
1296 /*
1297  * Advance the Insert state to the next buffer page, writing out the next
1298  * buffer if it still contains unwritten data.
1299  *
1300  * If new_segment is TRUE then we set up the next buffer page as the first
1301  * page of the next xlog segment file, possibly but not usually the next
1302  * consecutive file page.
1303  *
1304  * The global LogwrtRqst.Write pointer needs to be advanced to include the
1305  * just-filled page.  If we can do this for free (without an extra lock),
1306  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
1307  * request update still needs to be done, FALSE if we did it internally.
1308  *
1309  * Must be called with WALInsertLock held.
1310  */
1311 static bool
1312 AdvanceXLInsertBuffer(bool new_segment)
1313 {
1314         XLogCtlInsert *Insert = &XLogCtl->Insert;
1315         XLogCtlWrite *Write = &XLogCtl->Write;
1316         int                     nextidx = NextBufIdx(Insert->curridx);
1317         bool            update_needed = true;
1318         XLogRecPtr      OldPageRqstPtr;
1319         XLogwrtRqst WriteRqst;
1320         XLogRecPtr      NewPageEndPtr;
1321         XLogPageHeader NewPage;
1322
1323         /* Use Insert->LogwrtResult copy if it's more fresh */
1324         if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
1325                 LogwrtResult = Insert->LogwrtResult;
1326
1327         /*
1328          * Get ending-offset of the buffer page we need to replace (this may be
1329          * zero if the buffer hasn't been used yet).  Fall through if it's already
1330          * written out.
1331          */
1332         OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
1333         if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1334         {
1335                 /* nope, got work to do... */
1336                 XLogRecPtr      FinishedPageRqstPtr;
1337
1338                 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1339
1340                 /* Before waiting, get info_lck and update LogwrtResult */
1341                 {
1342                         /* use volatile pointer to prevent code rearrangement */
1343                         volatile XLogCtlData *xlogctl = XLogCtl;
1344
1345                         SpinLockAcquire(&xlogctl->info_lck);
1346                         if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
1347                                 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
1348                         LogwrtResult = xlogctl->LogwrtResult;
1349                         SpinLockRelease(&xlogctl->info_lck);
1350                 }
1351
1352                 update_needed = false;  /* Did the shared-request update */
1353
1354                 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1355                 {
1356                         /* OK, someone wrote it already */
1357                         Insert->LogwrtResult = LogwrtResult;
1358                 }
1359                 else
1360                 {
1361                         /* Must acquire write lock */
1362                         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1363                         LogwrtResult = Write->LogwrtResult;
1364                         if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1365                         {
1366                                 /* OK, someone wrote it already */
1367                                 LWLockRelease(WALWriteLock);
1368                                 Insert->LogwrtResult = LogwrtResult;
1369                         }
1370                         else
1371                         {
1372                                 /*
1373                                  * Have to write buffers while holding insert lock. This is
1374                                  * not good, so only write as much as we absolutely must.
1375                                  */
1376                                 WriteRqst.Write = OldPageRqstPtr;
1377                                 WriteRqst.Flush.xlogid = 0;
1378                                 WriteRqst.Flush.xrecoff = 0;
1379                                 XLogWrite(WriteRqst, false, false);
1380                                 LWLockRelease(WALWriteLock);
1381                                 Insert->LogwrtResult = LogwrtResult;
1382                         }
1383                 }
1384         }
1385
1386         /*
1387          * Now the next buffer slot is free and we can set it up to be the next
1388          * output page.
1389          */
1390         NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
1391
1392         if (new_segment)
1393         {
1394                 /* force it to a segment start point */
1395                 NewPageEndPtr.xrecoff += XLogSegSize - 1;
1396                 NewPageEndPtr.xrecoff -= NewPageEndPtr.xrecoff % XLogSegSize;
1397         }
1398
1399         if (NewPageEndPtr.xrecoff >= XLogFileSize)
1400         {
1401                 /* crossing a logid boundary */
1402                 NewPageEndPtr.xlogid += 1;
1403                 NewPageEndPtr.xrecoff = XLOG_BLCKSZ;
1404         }
1405         else
1406                 NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
1407         XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
1408         NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
1409
1410         Insert->curridx = nextidx;
1411         Insert->currpage = NewPage;
1412
1413         Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
1414
1415         /*
1416          * Be sure to re-zero the buffer so that bytes beyond what we've written
1417          * will look like zeroes and not valid XLOG records...
1418          */
1419         MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
1420
1421         /*
1422          * Fill the new page's header
1423          */
1424         NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
1425
1426         /* NewPage->xlp_info = 0; */    /* done by memset */
1427         NewPage   ->xlp_tli = ThisTimeLineID;
1428         NewPage   ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
1429         NewPage   ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - XLOG_BLCKSZ;
1430
1431         /*
1432          * If first page of an XLOG segment file, make it a long header.
1433          */
1434         if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
1435         {
1436                 XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
1437
1438                 NewLongPage->xlp_sysid = ControlFile->system_identifier;
1439                 NewLongPage->xlp_seg_size = XLogSegSize;
1440                 NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
1441                 NewPage   ->xlp_info |= XLP_LONG_HEADER;
1442
1443                 Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
1444         }
1445
1446         return update_needed;
1447 }
1448
1449 /*
1450  * Write and/or fsync the log at least as far as WriteRqst indicates.
1451  *
1452  * If flexible == TRUE, we don't have to write as far as WriteRqst, but
1453  * may stop at any convenient boundary (such as a cache or logfile boundary).
1454  * This option allows us to avoid uselessly issuing multiple writes when a
1455  * single one would do.
1456  *
1457  * If xlog_switch == TRUE, we are intending an xlog segment switch, so
1458  * perform end-of-segment actions after writing the last page, even if
1459  * it's not physically the end of its segment.  (NB: this will work properly
1460  * only if caller specifies WriteRqst == page-end and flexible == false,
1461  * and there is some data to write.)
1462  *
1463  * Must be called with WALWriteLock held.
1464  */
1465 static void
1466 XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
1467 {
1468         XLogCtlWrite *Write = &XLogCtl->Write;
1469         bool            ispartialpage;
1470         bool            last_iteration;
1471         bool            finishing_seg;
1472         bool            use_existent;
1473         int                     curridx;
1474         int                     npages;
1475         int                     startidx;
1476         uint32          startoffset;
1477
1478         /* We should always be inside a critical section here */
1479         Assert(CritSectionCount > 0);
1480
1481         /*
1482          * Update local LogwrtResult (caller probably did this already, but...)
1483          */
1484         LogwrtResult = Write->LogwrtResult;
1485
1486         /*
1487          * Since successive pages in the xlog cache are consecutively allocated,
1488          * we can usually gather multiple pages together and issue just one
1489          * write() call.  npages is the number of pages we have determined can be
1490          * written together; startidx is the cache block index of the first one,
1491          * and startoffset is the file offset at which it should go. The latter
1492          * two variables are only valid when npages > 0, but we must initialize
1493          * all of them to keep the compiler quiet.
1494          */
1495         npages = 0;
1496         startidx = 0;
1497         startoffset = 0;
1498
1499         /*
1500          * Within the loop, curridx is the cache block index of the page to
1501          * consider writing.  We advance Write->curridx only after successfully
1502          * writing pages.  (Right now, this refinement is useless since we are
1503          * going to PANIC if any error occurs anyway; but someday it may come in
1504          * useful.)
1505          */
1506         curridx = Write->curridx;
1507
1508         while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1509         {
1510                 /*
1511                  * Make sure we're not ahead of the insert process.  This could happen
1512                  * if we're passed a bogus WriteRqst.Write that is past the end of the
1513                  * last page that's been initialized by AdvanceXLInsertBuffer.
1514                  */
1515                 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
1516                         elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
1517                                  LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1518                                  XLogCtl->xlblocks[curridx].xlogid,
1519                                  XLogCtl->xlblocks[curridx].xrecoff);
1520
1521                 /* Advance LogwrtResult.Write to end of current buffer page */
1522                 LogwrtResult.Write = XLogCtl->xlblocks[curridx];
1523                 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1524
1525                 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1526                 {
1527                         /*
1528                          * Switch to new logfile segment.  We cannot have any pending
1529                          * pages here (since we dump what we have at segment end).
1530                          */
1531                         Assert(npages == 0);
1532                         if (openLogFile >= 0)
1533                                 XLogFileClose();
1534                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1535
1536                         /* create/use new log file */
1537                         use_existent = true;
1538                         openLogFile = XLogFileInit(openLogId, openLogSeg,
1539                                                                            &use_existent, true);
1540                         openLogOff = 0;
1541                 }
1542
1543                 /* Make sure we have the current logfile open */
1544                 if (openLogFile < 0)
1545                 {
1546                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1547                         openLogFile = XLogFileOpen(openLogId, openLogSeg);
1548                         openLogOff = 0;
1549                 }
1550
1551                 /* Add current page to the set of pending pages-to-dump */
1552                 if (npages == 0)
1553                 {
1554                         /* first of group */
1555                         startidx = curridx;
1556                         startoffset = (LogwrtResult.Write.xrecoff - XLOG_BLCKSZ) % XLogSegSize;
1557                 }
1558                 npages++;
1559
1560                 /*
1561                  * Dump the set if this will be the last loop iteration, or if we are
1562                  * at the last page of the cache area (since the next page won't be
1563                  * contiguous in memory), or if we are at the end of the logfile
1564                  * segment.
1565                  */
1566                 last_iteration = !XLByteLT(LogwrtResult.Write, WriteRqst.Write);
1567
1568                 finishing_seg = !ispartialpage &&
1569                         (startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
1570
1571                 if (last_iteration ||
1572                         curridx == XLogCtl->XLogCacheBlck ||
1573                         finishing_seg)
1574                 {
1575                         char       *from;
1576                         Size            nbytes;
1577
1578                         /* Need to seek in the file? */
1579                         if (openLogOff != startoffset)
1580                         {
1581                                 if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
1582                                         ereport(PANIC,
1583                                                         (errcode_for_file_access(),
1584                                                          errmsg("could not seek in log file %u, "
1585                                                                         "segment %u to offset %u: %m",
1586                                                                         openLogId, openLogSeg, startoffset)));
1587                                 openLogOff = startoffset;
1588                         }
1589
1590                         /* OK to write the page(s) */
1591                         from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
1592                         nbytes = npages * (Size) XLOG_BLCKSZ;
1593                         errno = 0;
1594                         if (write(openLogFile, from, nbytes) != nbytes)
1595                         {
1596                                 /* if write didn't set errno, assume no disk space */
1597                                 if (errno == 0)
1598                                         errno = ENOSPC;
1599                                 ereport(PANIC,
1600                                                 (errcode_for_file_access(),
1601                                                  errmsg("could not write to log file %u, segment %u "
1602                                                                 "at offset %u, length %lu: %m",
1603                                                                 openLogId, openLogSeg,
1604                                                                 openLogOff, (unsigned long) nbytes)));
1605                         }
1606
1607                         /* Update state for write */
1608                         openLogOff += nbytes;
1609                         Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
1610                         npages = 0;
1611
1612                         /*
1613                          * If we just wrote the whole last page of a logfile segment,
1614                          * fsync the segment immediately.  This avoids having to go back
1615                          * and re-open prior segments when an fsync request comes along
1616                          * later. Doing it here ensures that one and only one backend will
1617                          * perform this fsync.
1618                          *
1619                          * We also do this if this is the last page written for an xlog
1620                          * switch.
1621                          *
1622                          * This is also the right place to notify the Archiver that the
1623                          * segment is ready to copy to archival storage, and to update the
1624                          * timer for archive_timeout, and to signal for a checkpoint if
1625                          * too many logfile segments have been used since the last
1626                          * checkpoint.
1627                          */
1628                         if (finishing_seg || (xlog_switch && last_iteration))
1629                         {
1630                                 issue_xlog_fsync();
1631                                 LogwrtResult.Flush = LogwrtResult.Write;                /* end of page */
1632
1633                                 if (XLogArchivingActive())
1634                                         XLogArchiveNotifySeg(openLogId, openLogSeg);
1635
1636                                 Write->lastSegSwitchTime = time(NULL);
1637
1638                                 /*
1639                                  * Signal bgwriter to start a checkpoint if it's been too long
1640                                  * since the last one.  (We look at local copy of RedoRecPtr
1641                                  * which might be a little out of date, but should be close
1642                                  * enough for this purpose.)
1643                                  *
1644                                  * A straight computation of segment number could overflow 32
1645                                  * bits.  Rather than assuming we have working 64-bit
1646                                  * arithmetic, we compare the highest-order bits separately,
1647                                  * and force a checkpoint immediately when they change.
1648                                  */
1649                                 if (IsUnderPostmaster)
1650                                 {
1651                                         uint32          old_segno,
1652                                                                 new_segno;
1653                                         uint32          old_highbits,
1654                                                                 new_highbits;
1655
1656                                         old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
1657                                                 (RedoRecPtr.xrecoff / XLogSegSize);
1658                                         old_highbits = RedoRecPtr.xlogid / XLogSegSize;
1659                                         new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile +
1660                                                 openLogSeg;
1661                                         new_highbits = openLogId / XLogSegSize;
1662                                         if (new_highbits != old_highbits ||
1663                                                 new_segno >= old_segno + (uint32) (CheckPointSegments-1))
1664                                         {
1665 #ifdef WAL_DEBUG
1666                                                 if (XLOG_DEBUG)
1667                                                         elog(LOG, "time for a checkpoint, signaling bgwriter");
1668 #endif
1669                                                 RequestCheckpoint(false, true);
1670                                         }
1671                                 }
1672                         }
1673                 }
1674
1675                 if (ispartialpage)
1676                 {
1677                         /* Only asked to write a partial page */
1678                         LogwrtResult.Write = WriteRqst.Write;
1679                         break;
1680                 }
1681                 curridx = NextBufIdx(curridx);
1682
1683                 /* If flexible, break out of loop as soon as we wrote something */
1684                 if (flexible && npages == 0)
1685                         break;
1686         }
1687
1688         Assert(npages == 0);
1689         Assert(curridx == Write->curridx);
1690
1691         /*
1692          * If asked to flush, do so
1693          */
1694         if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1695                 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1696         {
1697                 /*
1698                  * Could get here without iterating above loop, in which case we might
1699                  * have no open file or the wrong one.  However, we do not need to
1700                  * fsync more than one file.
1701                  */
1702                 if (sync_method != SYNC_METHOD_OPEN)
1703                 {
1704                         if (openLogFile >= 0 &&
1705                                 !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1706                                 XLogFileClose();
1707                         if (openLogFile < 0)
1708                         {
1709                                 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1710                                 openLogFile = XLogFileOpen(openLogId, openLogSeg);
1711                                 openLogOff = 0;
1712                         }
1713                         issue_xlog_fsync();
1714                 }
1715                 LogwrtResult.Flush = LogwrtResult.Write;
1716         }
1717
1718         /*
1719          * Update shared-memory status
1720          *
1721          * We make sure that the shared 'request' values do not fall behind the
1722          * 'result' values.  This is not absolutely essential, but it saves some
1723          * code in a couple of places.
1724          */
1725         {
1726                 /* use volatile pointer to prevent code rearrangement */
1727                 volatile XLogCtlData *xlogctl = XLogCtl;
1728
1729                 SpinLockAcquire(&xlogctl->info_lck);
1730                 xlogctl->LogwrtResult = LogwrtResult;
1731                 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1732                         xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1733                 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1734                         xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1735                 SpinLockRelease(&xlogctl->info_lck);
1736         }
1737
1738         Write->LogwrtResult = LogwrtResult;
1739 }
1740
1741 /*
1742  * Ensure that all XLOG data through the given position is flushed to disk.
1743  *
1744  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1745  * already held, and we try to avoid acquiring it if possible.
1746  */
1747 void
1748 XLogFlush(XLogRecPtr record)
1749 {
1750         XLogRecPtr      WriteRqstPtr;
1751         XLogwrtRqst WriteRqst;
1752
1753         /* Disabled during REDO */
1754         if (InRedo)
1755                 return;
1756
1757         /* Quick exit if already known flushed */
1758         if (XLByteLE(record, LogwrtResult.Flush))
1759                 return;
1760
1761 #ifdef WAL_DEBUG
1762         if (XLOG_DEBUG)
1763                 elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
1764                          record.xlogid, record.xrecoff,
1765                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1766                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1767 #endif
1768
1769         START_CRIT_SECTION();
1770
1771         /*
1772          * Since fsync is usually a horribly expensive operation, we try to
1773          * piggyback as much data as we can on each fsync: if we see any more data
1774          * entered into the xlog buffer, we'll write and fsync that too, so that
1775          * the final value of LogwrtResult.Flush is as large as possible. This
1776          * gives us some chance of avoiding another fsync immediately after.
1777          */
1778
1779         /* initialize to given target; may increase below */
1780         WriteRqstPtr = record;
1781
1782         /* read LogwrtResult and update local state */
1783         {
1784                 /* use volatile pointer to prevent code rearrangement */
1785                 volatile XLogCtlData *xlogctl = XLogCtl;
1786
1787                 SpinLockAcquire(&xlogctl->info_lck);
1788                 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1789                         WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1790                 LogwrtResult = xlogctl->LogwrtResult;
1791                 SpinLockRelease(&xlogctl->info_lck);
1792         }
1793
1794         /* done already? */
1795         if (!XLByteLE(record, LogwrtResult.Flush))
1796         {
1797                 /* now wait for the write lock */
1798                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1799                 LogwrtResult = XLogCtl->Write.LogwrtResult;
1800                 if (!XLByteLE(record, LogwrtResult.Flush))
1801                 {
1802                         /* try to write/flush later additions to XLOG as well */
1803                         if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
1804                         {
1805                                 XLogCtlInsert *Insert = &XLogCtl->Insert;
1806                                 uint32          freespace = INSERT_FREESPACE(Insert);
1807
1808                                 if (freespace < SizeOfXLogRecord)               /* buffer is full */
1809                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1810                                 else
1811                                 {
1812                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1813                                         WriteRqstPtr.xrecoff -= freespace;
1814                                 }
1815                                 LWLockRelease(WALInsertLock);
1816                                 WriteRqst.Write = WriteRqstPtr;
1817                                 WriteRqst.Flush = WriteRqstPtr;
1818                         }
1819                         else
1820                         {
1821                                 WriteRqst.Write = WriteRqstPtr;
1822                                 WriteRqst.Flush = record;
1823                         }
1824                         XLogWrite(WriteRqst, false, false);
1825                 }
1826                 LWLockRelease(WALWriteLock);
1827         }
1828
1829         END_CRIT_SECTION();
1830
1831         /*
1832          * If we still haven't flushed to the request point then we have a
1833          * problem; most likely, the requested flush point is past end of XLOG.
1834          * This has been seen to occur when a disk page has a corrupted LSN.
1835          *
1836          * Formerly we treated this as a PANIC condition, but that hurts the
1837          * system's robustness rather than helping it: we do not want to take down
1838          * the whole system due to corruption on one data page.  In particular, if
1839          * the bad page is encountered again during recovery then we would be
1840          * unable to restart the database at all!  (This scenario has actually
1841          * happened in the field several times with 7.1 releases. Note that we
1842          * cannot get here while InRedo is true, but if the bad page is brought in
1843          * and marked dirty during recovery then CreateCheckPoint will try to
1844          * flush it at the end of recovery.)
1845          *
1846          * The current approach is to ERROR under normal conditions, but only
1847          * WARNING during recovery, so that the system can be brought up even if
1848          * there's a corrupt LSN.  Note that for calls from xact.c, the ERROR will
1849          * be promoted to PANIC since xact.c calls this routine inside a critical
1850          * section.  However, calls from bufmgr.c are not within critical sections
1851          * and so we will not force a restart for a bad LSN on a data page.
1852          */
1853         if (XLByteLT(LogwrtResult.Flush, record))
1854                 elog(InRecovery ? WARNING : ERROR,
1855                 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
1856                          record.xlogid, record.xrecoff,
1857                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1858 }
1859
1860 /*
1861  * Create a new XLOG file segment, or open a pre-existing one.
1862  *
1863  * log, seg: identify segment to be created/opened.
1864  *
1865  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1866  * pre-existing file will be deleted).  On return, TRUE if a pre-existing
1867  * file was used.
1868  *
1869  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1870  * place.  This should be TRUE except during bootstrap log creation.  The
1871  * caller must *not* hold the lock at call.
1872  *
1873  * Returns FD of opened file.
1874  *
1875  * Note: errors here are ERROR not PANIC because we might or might not be
1876  * inside a critical section (eg, during checkpoint there is no reason to
1877  * take down the system on failure).  They will promote to PANIC if we are
1878  * in a critical section.
1879  */
1880 static int
1881 XLogFileInit(uint32 log, uint32 seg,
1882                          bool *use_existent, bool use_lock)
1883 {
1884         char            path[MAXPGPATH];
1885         char            tmppath[MAXPGPATH];
1886         char            zbuffer[XLOG_BLCKSZ];
1887         uint32          installed_log;
1888         uint32          installed_seg;
1889         int                     max_advance;
1890         int                     fd;
1891         int                     nbytes;
1892
1893         XLogFilePath(path, ThisTimeLineID, log, seg);
1894
1895         /*
1896          * Try to use existent file (checkpoint maker may have created it already)
1897          */
1898         if (*use_existent)
1899         {
1900                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1901                                                    S_IRUSR | S_IWUSR);
1902                 if (fd < 0)
1903                 {
1904                         if (errno != ENOENT)
1905                                 ereport(ERROR,
1906                                                 (errcode_for_file_access(),
1907                                                  errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1908                                                                 path, log, seg)));
1909                 }
1910                 else
1911                         return fd;
1912         }
1913
1914         /*
1915          * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
1916          * another process is doing the same thing.  If so, we will end up
1917          * pre-creating an extra log segment.  That seems OK, and better than
1918          * holding the lock throughout this lengthy process.
1919          */
1920         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1921
1922         unlink(tmppath);
1923
1924         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1925         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1926                                            S_IRUSR | S_IWUSR);
1927         if (fd < 0)
1928                 ereport(ERROR,
1929                                 (errcode_for_file_access(),
1930                                  errmsg("could not create file \"%s\": %m", tmppath)));
1931
1932         /*
1933          * Zero-fill the file.  We have to do this the hard way to ensure that all
1934          * the file space has really been allocated --- on platforms that allow
1935          * "holes" in files, just seeking to the end doesn't allocate intermediate
1936          * space.  This way, we know that we have all the space and (after the
1937          * fsync below) that all the indirect blocks are down on disk.  Therefore,
1938          * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
1939          * log file.
1940          */
1941         MemSet(zbuffer, 0, sizeof(zbuffer));
1942         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
1943         {
1944                 errno = 0;
1945                 if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
1946                 {
1947                         int                     save_errno = errno;
1948
1949                         /*
1950                          * If we fail to make the file, delete it to release disk space
1951                          */
1952                         unlink(tmppath);
1953                         /* if write didn't set errno, assume problem is no disk space */
1954                         errno = save_errno ? save_errno : ENOSPC;
1955
1956                         ereport(ERROR,
1957                                         (errcode_for_file_access(),
1958                                          errmsg("could not write to file \"%s\": %m", tmppath)));
1959                 }
1960         }
1961
1962         if (pg_fsync(fd) != 0)
1963                 ereport(ERROR,
1964                                 (errcode_for_file_access(),
1965                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1966
1967         if (close(fd))
1968                 ereport(ERROR,
1969                                 (errcode_for_file_access(),
1970                                  errmsg("could not close file \"%s\": %m", tmppath)));
1971
1972         /*
1973          * Now move the segment into place with its final name.
1974          *
1975          * If caller didn't want to use a pre-existing file, get rid of any
1976          * pre-existing file.  Otherwise, cope with possibility that someone else
1977          * has created the file while we were filling ours: if so, use ours to
1978          * pre-create a future log segment.
1979          */
1980         installed_log = log;
1981         installed_seg = seg;
1982         max_advance = XLOGfileslop;
1983         if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
1984                                                                 *use_existent, &max_advance,
1985                                                                 use_lock))
1986         {
1987                 /* No need for any more future segments... */
1988                 unlink(tmppath);
1989         }
1990
1991         /* Set flag to tell caller there was no existent file */
1992         *use_existent = false;
1993
1994         /* Now open original target segment (might not be file I just made) */
1995         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1996                                            S_IRUSR | S_IWUSR);
1997         if (fd < 0)
1998                 ereport(ERROR,
1999                                 (errcode_for_file_access(),
2000                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2001                                   path, log, seg)));
2002
2003         return fd;
2004 }
2005
2006 /*
2007  * Create a new XLOG file segment by copying a pre-existing one.
2008  *
2009  * log, seg: identify segment to be created.
2010  *
2011  * srcTLI, srclog, srcseg: identify segment to be copied (could be from
2012  *              a different timeline)
2013  *
2014  * Currently this is only used during recovery, and so there are no locking
2015  * considerations.      But we should be just as tense as XLogFileInit to avoid
2016  * emplacing a bogus file.
2017  */
2018 static void
2019 XLogFileCopy(uint32 log, uint32 seg,
2020                          TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
2021 {
2022         char            path[MAXPGPATH];
2023         char            tmppath[MAXPGPATH];
2024         char            buffer[XLOG_BLCKSZ];
2025         int                     srcfd;
2026         int                     fd;
2027         int                     nbytes;
2028
2029         /*
2030          * Open the source file
2031          */
2032         XLogFilePath(path, srcTLI, srclog, srcseg);
2033         srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2034         if (srcfd < 0)
2035                 ereport(ERROR,
2036                                 (errcode_for_file_access(),
2037                                  errmsg("could not open file \"%s\": %m", path)));
2038
2039         /*
2040          * Copy into a temp file name.
2041          */
2042         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
2043
2044         unlink(tmppath);
2045
2046         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
2047         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
2048                                            S_IRUSR | S_IWUSR);
2049         if (fd < 0)
2050                 ereport(ERROR,
2051                                 (errcode_for_file_access(),
2052                                  errmsg("could not create file \"%s\": %m", tmppath)));
2053
2054         /*
2055          * Do the data copying.
2056          */
2057         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
2058         {
2059                 errno = 0;
2060                 if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
2061                 {
2062                         if (errno != 0)
2063                                 ereport(ERROR,
2064                                                 (errcode_for_file_access(),
2065                                                  errmsg("could not read file \"%s\": %m", path)));
2066                         else
2067                                 ereport(ERROR,
2068                                                 (errmsg("not enough data in file \"%s\"", path)));
2069                 }
2070                 errno = 0;
2071                 if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
2072                 {
2073                         int                     save_errno = errno;
2074
2075                         /*
2076                          * If we fail to make the file, delete it to release disk space
2077                          */
2078                         unlink(tmppath);
2079                         /* if write didn't set errno, assume problem is no disk space */
2080                         errno = save_errno ? save_errno : ENOSPC;
2081
2082                         ereport(ERROR,
2083                                         (errcode_for_file_access(),
2084                                          errmsg("could not write to file \"%s\": %m", tmppath)));
2085                 }
2086         }
2087
2088         if (pg_fsync(fd) != 0)
2089                 ereport(ERROR,
2090                                 (errcode_for_file_access(),
2091                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
2092
2093         if (close(fd))
2094                 ereport(ERROR,
2095                                 (errcode_for_file_access(),
2096                                  errmsg("could not close file \"%s\": %m", tmppath)));
2097
2098         close(srcfd);
2099
2100         /*
2101          * Now move the segment into place with its final name.
2102          */
2103         if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
2104                 elog(ERROR, "InstallXLogFileSegment should not have failed");
2105 }
2106
2107 /*
2108  * Install a new XLOG segment file as a current or future log segment.
2109  *
2110  * This is used both to install a newly-created segment (which has a temp
2111  * filename while it's being created) and to recycle an old segment.
2112  *
2113  * *log, *seg: identify segment to install as (or first possible target).
2114  * When find_free is TRUE, these are modified on return to indicate the
2115  * actual installation location or last segment searched.
2116  *
2117  * tmppath: initial name of file to install.  It will be renamed into place.
2118  *
2119  * find_free: if TRUE, install the new segment at the first empty log/seg
2120  * number at or after the passed numbers.  If FALSE, install the new segment
2121  * exactly where specified, deleting any existing segment file there.
2122  *
2123  * *max_advance: maximum number of log/seg slots to advance past the starting
2124  * point.  Fail if no free slot is found in this range.  On return, reduced
2125  * by the number of slots skipped over.  (Irrelevant, and may be NULL,
2126  * when find_free is FALSE.)
2127  *
2128  * use_lock: if TRUE, acquire ControlFileLock while moving file into
2129  * place.  This should be TRUE except during bootstrap log creation.  The
2130  * caller must *not* hold the lock at call.
2131  *
2132  * Returns TRUE if file installed, FALSE if not installed because of
2133  * exceeding max_advance limit.  On Windows, we also return FALSE if we
2134  * can't rename the file into place because someone's got it open.
2135  * (Any other kind of failure causes ereport().)
2136  */
2137 static bool
2138 InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
2139                                            bool find_free, int *max_advance,
2140                                            bool use_lock)
2141 {
2142         char            path[MAXPGPATH];
2143         struct stat stat_buf;
2144
2145         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2146
2147         /*
2148          * We want to be sure that only one process does this at a time.
2149          */
2150         if (use_lock)
2151                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
2152
2153         if (!find_free)
2154         {
2155                 /* Force installation: get rid of any pre-existing segment file */
2156                 unlink(path);
2157         }
2158         else
2159         {
2160                 /* Find a free slot to put it in */
2161                 while (stat(path, &stat_buf) == 0)
2162                 {
2163                         if (*max_advance <= 0)
2164                         {
2165                                 /* Failed to find a free slot within specified range */
2166                                 if (use_lock)
2167                                         LWLockRelease(ControlFileLock);
2168                                 return false;
2169                         }
2170                         NextLogSeg(*log, *seg);
2171                         (*max_advance)--;
2172                         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2173                 }
2174         }
2175
2176         /*
2177          * Prefer link() to rename() here just to be really sure that we don't
2178          * overwrite an existing logfile.  However, there shouldn't be one, so
2179          * rename() is an acceptable substitute except for the truly paranoid.
2180          */
2181 #if HAVE_WORKING_LINK
2182         if (link(tmppath, path) < 0)
2183                 ereport(ERROR,
2184                                 (errcode_for_file_access(),
2185                                  errmsg("could not link file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2186                                                 tmppath, path, *log, *seg)));
2187         unlink(tmppath);
2188 #else
2189         if (rename(tmppath, path) < 0)
2190         {
2191 #ifdef WIN32
2192 #if !defined(__CYGWIN__)
2193                 if (GetLastError() == ERROR_ACCESS_DENIED)
2194 #else
2195                 if (errno == EACCES)
2196 #endif
2197                 {
2198                         if (use_lock)
2199                                 LWLockRelease(ControlFileLock);
2200                         return false;
2201                 }
2202 #endif /* WIN32 */
2203
2204                 ereport(ERROR,
2205                                 (errcode_for_file_access(),
2206                                  errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2207                                                 tmppath, path, *log, *seg)));
2208         }
2209 #endif
2210
2211         if (use_lock)
2212                 LWLockRelease(ControlFileLock);
2213
2214         return true;
2215 }
2216
2217 /*
2218  * Open a pre-existing logfile segment for writing.
2219  */
2220 static int
2221 XLogFileOpen(uint32 log, uint32 seg)
2222 {
2223         char            path[MAXPGPATH];
2224         int                     fd;
2225
2226         XLogFilePath(path, ThisTimeLineID, log, seg);
2227
2228         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
2229                                            S_IRUSR | S_IWUSR);
2230         if (fd < 0)
2231                 ereport(PANIC,
2232                                 (errcode_for_file_access(),
2233                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2234                                   path, log, seg)));
2235
2236         return fd;
2237 }
2238
2239 /*
2240  * Open a logfile segment for reading (during recovery).
2241  */
2242 static int
2243 XLogFileRead(uint32 log, uint32 seg, int emode)
2244 {
2245         char            path[MAXPGPATH];
2246         char            xlogfname[MAXFNAMELEN];
2247         ListCell   *cell;
2248         int                     fd;
2249
2250         /*
2251          * Loop looking for a suitable timeline ID: we might need to read any of
2252          * the timelines listed in expectedTLIs.
2253          *
2254          * We expect curFileTLI on entry to be the TLI of the preceding file in
2255          * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
2256          * to go backwards; this prevents us from picking up the wrong file when a
2257          * parent timeline extends to higher segment numbers than the child we
2258          * want to read.
2259          */
2260         foreach(cell, expectedTLIs)
2261         {
2262                 TimeLineID      tli = (TimeLineID) lfirst_int(cell);
2263
2264                 if (tli < curFileTLI)
2265                         break;                          /* don't bother looking at too-old TLIs */
2266
2267                 if (InArchiveRecovery)
2268                 {
2269                         XLogFileName(xlogfname, tli, log, seg);
2270                         restoredFromArchive = RestoreArchivedFile(path, xlogfname,
2271                                                                                                           "RECOVERYXLOG",
2272                                                                                                           XLogSegSize);
2273                 }
2274                 else
2275                         XLogFilePath(path, tli, log, seg);
2276
2277                 fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2278                 if (fd >= 0)
2279                 {
2280                         /* Success! */
2281                         curFileTLI = tli;
2282                         return fd;
2283                 }
2284                 if (errno != ENOENT)    /* unexpected failure? */
2285                         ereport(PANIC,
2286                                         (errcode_for_file_access(),
2287                         errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2288                                    path, log, seg)));
2289         }
2290
2291         /* Couldn't find it.  For simplicity, complain about front timeline */
2292         XLogFilePath(path, recoveryTargetTLI, log, seg);
2293         errno = ENOENT;
2294         ereport(emode,
2295                         (errcode_for_file_access(),
2296                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2297                                   path, log, seg)));
2298         return -1;
2299 }
2300
2301 /*
2302  * Close the current logfile segment for writing.
2303  */
2304 static void
2305 XLogFileClose(void)
2306 {
2307         Assert(openLogFile >= 0);
2308
2309         /*
2310          * posix_fadvise is problematic on many platforms: on older x86 Linux it
2311          * just dumps core, and there are reports of problems on PPC platforms as
2312          * well.  The following is therefore disabled for the time being. We could
2313          * consider some kind of configure test to see if it's safe to use, but
2314          * since we lack hard evidence that there's any useful performance gain to
2315          * be had, spending time on that seems unprofitable for now.
2316          */
2317 #ifdef NOT_USED
2318
2319         /*
2320          * WAL segment files will not be re-read in normal operation, so we advise
2321          * OS to release any cached pages.      But do not do so if WAL archiving is
2322          * active, because archiver process could use the cache to read the WAL
2323          * segment.
2324          *
2325          * While O_DIRECT works for O_SYNC, posix_fadvise() works for fsync() and
2326          * O_SYNC, and some platforms only have posix_fadvise().
2327          */
2328 #if defined(HAVE_DECL_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
2329         if (!XLogArchivingActive())
2330                 posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
2331 #endif
2332 #endif   /* NOT_USED */
2333
2334         if (close(openLogFile))
2335                 ereport(PANIC,
2336                                 (errcode_for_file_access(),
2337                                  errmsg("could not close log file %u, segment %u: %m",
2338                                                 openLogId, openLogSeg)));
2339         openLogFile = -1;
2340 }
2341
2342 /*
2343  * Attempt to retrieve the specified file from off-line archival storage.
2344  * If successful, fill "path" with its complete path (note that this will be
2345  * a temp file name that doesn't follow the normal naming convention), and
2346  * return TRUE.
2347  *
2348  * If not successful, fill "path" with the name of the normal on-line file
2349  * (which may or may not actually exist, but we'll try to use it), and return
2350  * FALSE.
2351  *
2352  * For fixed-size files, the caller may pass the expected size as an
2353  * additional crosscheck on successful recovery.  If the file size is not
2354  * known, set expectedSize = 0.
2355  */
2356 static bool
2357 RestoreArchivedFile(char *path, const char *xlogfname,
2358                                         const char *recovername, off_t expectedSize)
2359 {
2360         char            xlogpath[MAXPGPATH];
2361         char            xlogRestoreCmd[MAXPGPATH];
2362         char       *dp;
2363         char       *endp;
2364         const char *sp;
2365         int                     rc;
2366         bool            signaled;
2367         struct stat stat_buf;
2368
2369         /*
2370          * When doing archive recovery, we always prefer an archived log file even
2371          * if a file of the same name exists in XLOGDIR.  The reason is that the
2372          * file in XLOGDIR could be an old, un-filled or partly-filled version
2373          * that was copied and restored as part of backing up $PGDATA.
2374          *
2375          * We could try to optimize this slightly by checking the local copy
2376          * lastchange timestamp against the archived copy, but we have no API to
2377          * do this, nor can we guarantee that the lastchange timestamp was
2378          * preserved correctly when we copied to archive. Our aim is robustness,
2379          * so we elect not to do this.
2380          *
2381          * If we cannot obtain the log file from the archive, however, we will try
2382          * to use the XLOGDIR file if it exists.  This is so that we can make use
2383          * of log segments that weren't yet transferred to the archive.
2384          *
2385          * Notice that we don't actually overwrite any files when we copy back
2386          * from archive because the recoveryRestoreCommand may inadvertently
2387          * restore inappropriate xlogs, or they may be corrupt, so we may wish to
2388          * fallback to the segments remaining in current XLOGDIR later. The
2389          * copy-from-archive filename is always the same, ensuring that we don't
2390          * run out of disk space on long recoveries.
2391          */
2392         snprintf(xlogpath, MAXPGPATH, XLOGDIR "/%s", recovername);
2393
2394         /*
2395          * Make sure there is no existing file named recovername.
2396          */
2397         if (stat(xlogpath, &stat_buf) != 0)
2398         {
2399                 if (errno != ENOENT)
2400                         ereport(FATAL,
2401                                         (errcode_for_file_access(),
2402                                          errmsg("could not stat file \"%s\": %m",
2403                                                         xlogpath)));
2404         }
2405         else
2406         {
2407                 if (unlink(xlogpath) != 0)
2408                         ereport(FATAL,
2409                                         (errcode_for_file_access(),
2410                                          errmsg("could not remove file \"%s\": %m",
2411                                                         xlogpath)));
2412         }
2413
2414         /*
2415          * construct the command to be executed
2416          */
2417         dp = xlogRestoreCmd;
2418         endp = xlogRestoreCmd + MAXPGPATH - 1;
2419         *endp = '\0';
2420
2421         for (sp = recoveryRestoreCommand; *sp; sp++)
2422         {
2423                 if (*sp == '%')
2424                 {
2425                         switch (sp[1])
2426                         {
2427                                 case 'p':
2428                                         /* %p: relative path of target file */
2429                                         sp++;
2430                                         StrNCpy(dp, xlogpath, endp - dp);
2431                                         make_native_path(dp);
2432                                         dp += strlen(dp);
2433                                         break;
2434                                 case 'f':
2435                                         /* %f: filename of desired file */
2436                                         sp++;
2437                                         StrNCpy(dp, xlogfname, endp - dp);
2438                                         dp += strlen(dp);
2439                                         break;
2440                                 case '%':
2441                                         /* convert %% to a single % */
2442                                         sp++;
2443                                         if (dp < endp)
2444                                                 *dp++ = *sp;
2445                                         break;
2446                                 default:
2447                                         /* otherwise treat the % as not special */
2448                                         if (dp < endp)
2449                                                 *dp++ = *sp;
2450                                         break;
2451                         }
2452                 }
2453                 else
2454                 {
2455                         if (dp < endp)
2456                                 *dp++ = *sp;
2457                 }
2458         }
2459         *dp = '\0';
2460
2461         ereport(DEBUG3,
2462                         (errmsg_internal("executing restore command \"%s\"",
2463                                                          xlogRestoreCmd)));
2464
2465         /*
2466          * Copy xlog from archival storage to XLOGDIR
2467          */
2468         rc = system(xlogRestoreCmd);
2469         if (rc == 0)
2470         {
2471                 /*
2472                  * command apparently succeeded, but let's make sure the file is
2473                  * really there now and has the correct size.
2474                  *
2475                  * XXX I made wrong-size a fatal error to ensure the DBA would notice
2476                  * it, but is that too strong?  We could try to plow ahead with a
2477                  * local copy of the file ... but the problem is that there probably
2478                  * isn't one, and we'd incorrectly conclude we've reached the end of
2479                  * WAL and we're done recovering ...
2480                  */
2481                 if (stat(xlogpath, &stat_buf) == 0)
2482                 {
2483                         if (expectedSize > 0 && stat_buf.st_size != expectedSize)
2484                                 ereport(FATAL,
2485                                                 (errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
2486                                                                 xlogfname,
2487                                                                 (unsigned long) stat_buf.st_size,
2488                                                                 (unsigned long) expectedSize)));
2489                         else
2490                         {
2491                                 ereport(LOG,
2492                                                 (errmsg("restored log file \"%s\" from archive",
2493                                                                 xlogfname)));
2494                                 strcpy(path, xlogpath);
2495                                 return true;
2496                         }
2497                 }
2498                 else
2499                 {
2500                         /* stat failed */
2501                         if (errno != ENOENT)
2502                                 ereport(FATAL,
2503                                                 (errcode_for_file_access(),
2504                                                  errmsg("could not stat file \"%s\": %m",
2505                                                                 xlogpath)));
2506                 }
2507         }
2508
2509         /*
2510          * Remember, we rollforward UNTIL the restore fails so failure here is
2511          * just part of the process... that makes it difficult to determine
2512          * whether the restore failed because there isn't an archive to restore,
2513          * or because the administrator has specified the restore program
2514          * incorrectly.  We have to assume the former.
2515          *
2516          * However, if the failure was due to any sort of signal, it's best to
2517          * punt and abort recovery.  (If we "return false" here, upper levels
2518          * will assume that recovery is complete and start up the database!)
2519          * It's essential to abort on child SIGINT and SIGQUIT, because per spec
2520          * system() ignores SIGINT and SIGQUIT while waiting; if we see one of
2521          * those it's a good bet we should have gotten it too.  Aborting on other
2522          * signals such as SIGTERM seems a good idea as well.
2523          *
2524          * Per the Single Unix Spec, shells report exit status > 128 when
2525          * a called command died on a signal.  Also, 126 and 127 are used to
2526          * report problems such as an unfindable command; treat those as fatal
2527          * errors too.
2528          */
2529         signaled = WIFSIGNALED(rc) || WEXITSTATUS(rc) > 125;
2530
2531         ereport(signaled ? FATAL : DEBUG2,
2532                 (errmsg("could not restore file \"%s\" from archive: return code %d",
2533                                 xlogfname, rc)));
2534
2535         /*
2536          * if an archived file is not available, there might still be a version of
2537          * this file in XLOGDIR, so return that as the filename to open.
2538          *
2539          * In many recovery scenarios we expect this to fail also, but if so that
2540          * just means we've reached the end of WAL.
2541          */
2542         snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
2543         return false;
2544 }
2545
2546 /*
2547  * Preallocate log files beyond the specified log endpoint, according to
2548  * the XLOGfile user parameter.
2549  */
2550 static int
2551 PreallocXlogFiles(XLogRecPtr endptr)
2552 {
2553         int                     nsegsadded = 0;
2554         uint32          _logId;
2555         uint32          _logSeg;
2556         int                     lf;
2557         bool            use_existent;
2558
2559         XLByteToPrevSeg(endptr, _logId, _logSeg);
2560         if ((endptr.xrecoff - 1) % XLogSegSize >=
2561                 (uint32) (0.75 * XLogSegSize))
2562         {
2563                 NextLogSeg(_logId, _logSeg);
2564                 use_existent = true;
2565                 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
2566                 close(lf);
2567                 if (!use_existent)
2568                         nsegsadded++;
2569         }
2570         return nsegsadded;
2571 }
2572
2573 /*
2574  * Remove or move offline all log files older or equal to passed log/seg#
2575  *
2576  * endptr is current (or recent) end of xlog; this is used to determine
2577  * whether we want to recycle rather than delete no-longer-wanted log files.
2578  */
2579 static void
2580 MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
2581                                 int *nsegsremoved, int *nsegsrecycled)
2582 {
2583         uint32          endlogId;
2584         uint32          endlogSeg;
2585         int                     max_advance;
2586         DIR                *xldir;
2587         struct dirent *xlde;
2588         char            lastoff[MAXFNAMELEN];
2589         char            path[MAXPGPATH];
2590
2591         *nsegsremoved = 0;
2592         *nsegsrecycled = 0;
2593
2594         /*
2595          * Initialize info about where to try to recycle to.  We allow recycling
2596          * segments up to XLOGfileslop segments beyond the current XLOG location.
2597          */
2598         XLByteToPrevSeg(endptr, endlogId, endlogSeg);
2599         max_advance = XLOGfileslop;
2600
2601         xldir = AllocateDir(XLOGDIR);
2602         if (xldir == NULL)
2603                 ereport(ERROR,
2604                                 (errcode_for_file_access(),
2605                                  errmsg("could not open transaction log directory \"%s\": %m",
2606                                                 XLOGDIR)));
2607
2608         XLogFileName(lastoff, ThisTimeLineID, log, seg);
2609
2610         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2611         {
2612                 /*
2613                  * We ignore the timeline part of the XLOG segment identifiers in
2614                  * deciding whether a segment is still needed.  This ensures that we
2615                  * won't prematurely remove a segment from a parent timeline. We could
2616                  * probably be a little more proactive about removing segments of
2617                  * non-parent timelines, but that would be a whole lot more
2618                  * complicated.
2619                  *
2620                  * We use the alphanumeric sorting property of the filenames to decide
2621                  * which ones are earlier than the lastoff segment.
2622                  */
2623                 if (strlen(xlde->d_name) == 24 &&
2624                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2625                         strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
2626                 {
2627                         if (XLogArchiveCheckDone(xlde->d_name))
2628                         {
2629                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2630
2631                                 /*
2632                                  * Before deleting the file, see if it can be recycled as a
2633                                  * future log segment.
2634                                  */
2635                                 if (InstallXLogFileSegment(&endlogId, &endlogSeg, path,
2636                                                                                    true, &max_advance,
2637                                                                                    true))
2638                                 {
2639                                         ereport(DEBUG2,
2640                                                         (errmsg("recycled transaction log file \"%s\"",
2641                                                                         xlde->d_name)));
2642                                         (*nsegsrecycled)++;
2643                                         /* Needn't recheck that slot on future iterations */
2644                                         if (max_advance > 0)
2645                                         {
2646                                                 NextLogSeg(endlogId, endlogSeg);
2647                                                 max_advance--;
2648                                         }
2649                                 }
2650                                 else
2651                                 {
2652                                         /* No need for any more future segments... */
2653                                         ereport(DEBUG2,
2654                                                         (errmsg("removing transaction log file \"%s\"",
2655                                                                         xlde->d_name)));
2656                                         unlink(path);
2657                                         (*nsegsremoved)++;
2658                                 }
2659
2660                                 XLogArchiveCleanup(xlde->d_name);
2661                         }
2662                 }
2663         }
2664
2665         FreeDir(xldir);
2666 }
2667
2668 /*
2669  * Remove previous backup history files.  This also retries creation of
2670  * .ready files for any backup history files for which XLogArchiveNotify
2671  * failed earlier.
2672  */
2673 static void
2674 CleanupBackupHistory(void)
2675 {
2676         DIR                *xldir;
2677         struct dirent *xlde;
2678         char            path[MAXPGPATH];
2679
2680         xldir = AllocateDir(XLOGDIR);
2681         if (xldir == NULL)
2682                 ereport(ERROR,
2683                                 (errcode_for_file_access(),
2684                                  errmsg("could not open transaction log directory \"%s\": %m",
2685                                                 XLOGDIR)));
2686
2687         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2688         {
2689                 if (strlen(xlde->d_name) > 24 &&
2690                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2691                         strcmp(xlde->d_name + strlen(xlde->d_name) - strlen(".backup"),
2692                                    ".backup") == 0)
2693                 {
2694                         if (XLogArchiveCheckDone(xlde->d_name))
2695                         {
2696                                 ereport(DEBUG2,
2697                                 (errmsg("removing transaction log backup history file \"%s\"",
2698                                                 xlde->d_name)));
2699                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2700                                 unlink(path);
2701                                 XLogArchiveCleanup(xlde->d_name);
2702                         }
2703                 }
2704         }
2705
2706         FreeDir(xldir);
2707 }
2708
2709 /*
2710  * Restore the backup blocks present in an XLOG record, if any.
2711  *
2712  * We assume all of the record has been read into memory at *record.
2713  *
2714  * Note: when a backup block is available in XLOG, we restore it
2715  * unconditionally, even if the page in the database appears newer.
2716  * This is to protect ourselves against database pages that were partially
2717  * or incorrectly written during a crash.  We assume that the XLOG data
2718  * must be good because it has passed a CRC check, while the database
2719  * page might not be.  This will force us to replay all subsequent
2720  * modifications of the page that appear in XLOG, rather than possibly
2721  * ignoring them as already applied, but that's not a huge drawback.
2722  */
2723 static void
2724 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
2725 {
2726         Relation        reln;
2727         Buffer          buffer;
2728         Page            page;
2729         BkpBlock        bkpb;
2730         char       *blk;
2731         int                     i;
2732
2733         blk = (char *) XLogRecGetData(record) + record->xl_len;
2734         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2735         {
2736                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2737                         continue;
2738
2739                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2740                 blk += sizeof(BkpBlock);
2741
2742                 reln = XLogOpenRelation(bkpb.node);
2743                 buffer = XLogReadBuffer(reln, bkpb.block, true);
2744                 Assert(BufferIsValid(buffer));
2745                 page = (Page) BufferGetPage(buffer);
2746
2747                 if (bkpb.hole_length == 0)
2748                 {
2749                         memcpy((char *) page, blk, BLCKSZ);
2750                 }
2751                 else
2752                 {
2753                         /* must zero-fill the hole */
2754                         MemSet((char *) page, 0, BLCKSZ);
2755                         memcpy((char *) page, blk, bkpb.hole_offset);
2756                         memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
2757                                    blk + bkpb.hole_offset,
2758                                    BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
2759                 }
2760
2761                 PageSetLSN(page, lsn);
2762                 PageSetTLI(page, ThisTimeLineID);
2763                 MarkBufferDirty(buffer);
2764                 UnlockReleaseBuffer(buffer);
2765
2766                 blk += BLCKSZ - bkpb.hole_length;
2767         }
2768 }
2769
2770 /*
2771  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
2772  * record (other than to the minimal extent of computing the amount of
2773  * data to read in) until we've checked the CRCs.
2774  *
2775  * We assume all of the record has been read into memory at *record.
2776  */
2777 static bool
2778 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
2779 {
2780         pg_crc32        crc;
2781         int                     i;
2782         uint32          len = record->xl_len;
2783         BkpBlock        bkpb;
2784         char       *blk;
2785
2786         /* First the rmgr data */
2787         INIT_CRC32(crc);
2788         COMP_CRC32(crc, XLogRecGetData(record), len);
2789
2790         /* Add in the backup blocks, if any */
2791         blk = (char *) XLogRecGetData(record) + len;
2792         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2793         {
2794                 uint32          blen;
2795
2796                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2797                         continue;
2798
2799                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2800                 if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
2801                 {
2802                         ereport(emode,
2803                                         (errmsg("incorrect hole size in record at %X/%X",
2804                                                         recptr.xlogid, recptr.xrecoff)));
2805                         return false;
2806                 }
2807                 blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
2808                 COMP_CRC32(crc, blk, blen);
2809                 blk += blen;
2810         }
2811
2812         /* Check that xl_tot_len agrees with our calculation */
2813         if (blk != (char *) record + record->xl_tot_len)
2814         {
2815                 ereport(emode,
2816                                 (errmsg("incorrect total length in record at %X/%X",
2817                                                 recptr.xlogid, recptr.xrecoff)));
2818                 return false;
2819         }
2820
2821         /* Finally include the record header */
2822         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
2823                            SizeOfXLogRecord - sizeof(pg_crc32));
2824         FIN_CRC32(crc);
2825
2826         if (!EQ_CRC32(record->xl_crc, crc))
2827         {
2828                 ereport(emode,
2829                 (errmsg("incorrect resource manager data checksum in record at %X/%X",
2830                                 recptr.xlogid, recptr.xrecoff)));
2831                 return false;
2832         }
2833
2834         return true;
2835 }
2836
2837 /*
2838  * Attempt to read an XLOG record.
2839  *
2840  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
2841  * try to read a record just after the last one previously read.
2842  *
2843  * If no valid record is available, returns NULL, or fails if emode is PANIC.
2844  * (emode must be either PANIC or LOG.)
2845  *
2846  * The record is copied into readRecordBuf, so that on successful return,
2847  * the returned record pointer always points there.
2848  */
2849 static XLogRecord *
2850 ReadRecord(XLogRecPtr *RecPtr, int emode)
2851 {
2852         XLogRecord *record;
2853         char       *buffer;
2854         XLogRecPtr      tmpRecPtr = EndRecPtr;
2855         bool            randAccess = false;
2856         uint32          len,
2857                                 total_len;
2858         uint32          targetPageOff;
2859         uint32          targetRecOff;
2860         uint32          pageHeaderSize;
2861
2862         if (readBuf == NULL)
2863         {
2864                 /*
2865                  * First time through, permanently allocate readBuf.  We do it this
2866                  * way, rather than just making a static array, for two reasons: (1)
2867                  * no need to waste the storage in most instantiations of the backend;
2868                  * (2) a static char array isn't guaranteed to have any particular
2869                  * alignment, whereas malloc() will provide MAXALIGN'd storage.
2870                  */
2871                 readBuf = (char *) malloc(XLOG_BLCKSZ);
2872                 Assert(readBuf != NULL);
2873         }
2874
2875         if (RecPtr == NULL)
2876         {
2877                 RecPtr = &tmpRecPtr;
2878                 /* fast case if next record is on same page */
2879                 if (nextRecord != NULL)
2880                 {
2881                         record = nextRecord;
2882                         goto got_record;
2883                 }
2884                 /* align old recptr to next page */
2885                 if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
2886                         tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
2887                 if (tmpRecPtr.xrecoff >= XLogFileSize)
2888                 {
2889                         (tmpRecPtr.xlogid)++;
2890                         tmpRecPtr.xrecoff = 0;
2891                 }
2892                 /* We will account for page header size below */
2893         }
2894         else
2895         {
2896                 if (!XRecOffIsValid(RecPtr->xrecoff))
2897                         ereport(PANIC,
2898                                         (errmsg("invalid record offset at %X/%X",
2899                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2900
2901                 /*
2902                  * Since we are going to a random position in WAL, forget any prior
2903                  * state about what timeline we were in, and allow it to be any
2904                  * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
2905                  * to go backwards (but we can't reset that variable right here, since
2906                  * we might not change files at all).
2907                  */
2908                 lastPageTLI = 0;                /* see comment in ValidXLOGHeader */
2909                 randAccess = true;              /* allow curFileTLI to go backwards too */
2910         }
2911
2912         if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
2913         {
2914                 close(readFile);
2915                 readFile = -1;
2916         }
2917         XLByteToSeg(*RecPtr, readId, readSeg);
2918         if (readFile < 0)
2919         {
2920                 /* Now it's okay to reset curFileTLI if random fetch */
2921                 if (randAccess)
2922                         curFileTLI = 0;
2923
2924                 readFile = XLogFileRead(readId, readSeg, emode);
2925                 if (readFile < 0)
2926                         goto next_record_is_invalid;
2927
2928                 /*
2929                  * Whenever switching to a new WAL segment, we read the first page of
2930                  * the file and validate its header, even if that's not where the
2931                  * target record is.  This is so that we can check the additional
2932                  * identification info that is present in the first page's "long"
2933                  * header.
2934                  */
2935                 readOff = 0;
2936                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2937                 {
2938                         ereport(emode,
2939                                         (errcode_for_file_access(),
2940                                          errmsg("could not read from log file %u, segment %u, offset %u: %m",
2941                                                         readId, readSeg, readOff)));
2942                         goto next_record_is_invalid;
2943                 }
2944                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2945                         goto next_record_is_invalid;
2946         }
2947
2948         targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
2949         if (readOff != targetPageOff)
2950         {
2951                 readOff = targetPageOff;
2952                 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
2953                 {
2954                         ereport(emode,
2955                                         (errcode_for_file_access(),
2956                                          errmsg("could not seek in log file %u, segment %u to offset %u: %m",
2957                                                         readId, readSeg, readOff)));
2958                         goto next_record_is_invalid;
2959                 }
2960                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2961                 {
2962                         ereport(emode,
2963                                         (errcode_for_file_access(),
2964                                          errmsg("could not read from log file %u, segment %u at offset %u: %m",
2965                                                         readId, readSeg, readOff)));
2966                         goto next_record_is_invalid;
2967                 }
2968                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2969                         goto next_record_is_invalid;
2970         }
2971         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2972         targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
2973         if (targetRecOff == 0)
2974         {
2975                 /*
2976                  * Can only get here in the continuing-from-prev-page case, because
2977                  * XRecOffIsValid eliminated the zero-page-offset case otherwise. Need
2978                  * to skip over the new page's header.
2979                  */
2980                 tmpRecPtr.xrecoff += pageHeaderSize;
2981                 targetRecOff = pageHeaderSize;
2982         }
2983         else if (targetRecOff < pageHeaderSize)
2984         {
2985                 ereport(emode,
2986                                 (errmsg("invalid record offset at %X/%X",
2987                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2988                 goto next_record_is_invalid;
2989         }
2990         if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
2991                 targetRecOff == pageHeaderSize)
2992         {
2993                 ereport(emode,
2994                                 (errmsg("contrecord is requested by %X/%X",
2995                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2996                 goto next_record_is_invalid;
2997         }
2998         record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
2999
3000 got_record:;
3001
3002         /*
3003          * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
3004          * required.
3005          */
3006         if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
3007         {
3008                 if (record->xl_len != 0)
3009                 {
3010                         ereport(emode,
3011                                         (errmsg("invalid xlog switch record at %X/%X",
3012                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3013                         goto next_record_is_invalid;
3014                 }
3015         }
3016         else if (record->xl_len == 0)
3017         {
3018                 ereport(emode,
3019                                 (errmsg("record with zero length at %X/%X",
3020                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3021                 goto next_record_is_invalid;
3022         }
3023         if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
3024                 record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
3025                 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
3026         {
3027                 ereport(emode,
3028                                 (errmsg("invalid record length at %X/%X",
3029                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3030                 goto next_record_is_invalid;
3031         }
3032         if (record->xl_rmid > RM_MAX_ID)
3033         {
3034                 ereport(emode,
3035                                 (errmsg("invalid resource manager ID %u at %X/%X",
3036                                                 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
3037                 goto next_record_is_invalid;
3038         }
3039         if (randAccess)
3040         {
3041                 /*
3042                  * We can't exactly verify the prev-link, but surely it should be less
3043                  * than the record's own address.
3044                  */
3045                 if (!XLByteLT(record->xl_prev, *RecPtr))
3046                 {
3047                         ereport(emode,
3048                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
3049                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
3050                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3051                         goto next_record_is_invalid;
3052                 }
3053         }
3054         else
3055         {
3056                 /*
3057                  * Record's prev-link should exactly match our previous location. This
3058                  * check guards against torn WAL pages where a stale but valid-looking
3059                  * WAL record starts on a sector boundary.
3060                  */
3061                 if (!XLByteEQ(record->xl_prev, ReadRecPtr))
3062                 {
3063                         ereport(emode,
3064                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
3065                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
3066                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3067                         goto next_record_is_invalid;
3068                 }
3069         }
3070
3071         /*
3072          * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
3073          * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
3074          * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
3075          * enough for all "normal" records, but very large commit or abort records
3076          * might need more space.)
3077          */
3078         total_len = record->xl_tot_len;
3079         if (total_len > readRecordBufSize)
3080         {
3081                 uint32          newSize = total_len;
3082
3083                 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
3084                 newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
3085                 if (readRecordBuf)
3086                         free(readRecordBuf);
3087                 readRecordBuf = (char *) malloc(newSize);
3088                 if (!readRecordBuf)
3089                 {
3090                         readRecordBufSize = 0;
3091                         /* We treat this as a "bogus data" condition */
3092                         ereport(emode,
3093                                         (errmsg("record length %u at %X/%X too long",
3094                                                         total_len, RecPtr->xlogid, RecPtr->xrecoff)));
3095                         goto next_record_is_invalid;
3096                 }
3097                 readRecordBufSize = newSize;
3098         }
3099
3100         buffer = readRecordBuf;
3101         nextRecord = NULL;
3102         len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
3103         if (total_len > len)
3104         {
3105                 /* Need to reassemble record */
3106                 XLogContRecord *contrecord;
3107                 uint32          gotlen = len;
3108
3109                 memcpy(buffer, record, len);
3110                 record = (XLogRecord *) buffer;
3111                 buffer += len;
3112                 for (;;)
3113                 {
3114                         readOff += XLOG_BLCKSZ;
3115                         if (readOff >= XLogSegSize)
3116                         {
3117                                 close(readFile);
3118                                 readFile = -1;
3119                                 NextLogSeg(readId, readSeg);
3120                                 readFile = XLogFileRead(readId, readSeg, emode);
3121                                 if (readFile < 0)
3122                                         goto next_record_is_invalid;
3123                                 readOff = 0;
3124                         }
3125                         if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3126                         {
3127                                 ereport(emode,
3128                                                 (errcode_for_file_access(),
3129                                                  errmsg("could not read from log file %u, segment %u, offset %u: %m",
3130                                                                 readId, readSeg, readOff)));
3131                                 goto next_record_is_invalid;
3132                         }
3133                         if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
3134                                 goto next_record_is_invalid;
3135                         if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
3136                         {
3137                                 ereport(emode,
3138                                                 (errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
3139                                                                 readId, readSeg, readOff)));
3140                                 goto next_record_is_invalid;
3141                         }
3142                         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3143                         contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize);
3144                         if (contrecord->xl_rem_len == 0 ||
3145                                 total_len != (contrecord->xl_rem_len + gotlen))
3146                         {
3147                                 ereport(emode,
3148                                                 (errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
3149                                                                 contrecord->xl_rem_len,
3150                                                                 readId, readSeg, readOff)));
3151                                 goto next_record_is_invalid;
3152                         }
3153                         len = XLOG_BLCKSZ - pageHeaderSize - SizeOfXLogContRecord;
3154                         if (contrecord->xl_rem_len > len)
3155                         {
3156                                 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
3157                                 gotlen += len;
3158                                 buffer += len;
3159                                 continue;
3160                         }
3161                         memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
3162                                    contrecord->xl_rem_len);
3163                         break;
3164                 }
3165                 if (!RecordIsValid(record, *RecPtr, emode))
3166                         goto next_record_is_invalid;
3167                 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3168                 if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
3169                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
3170                 {
3171                         nextRecord = (XLogRecord *) ((char *) contrecord +
3172                                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
3173                 }
3174                 EndRecPtr.xlogid = readId;
3175                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
3176                         pageHeaderSize +
3177                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
3178                 ReadRecPtr = *RecPtr;
3179                 /* needn't worry about XLOG SWITCH, it can't cross page boundaries */
3180                 return record;
3181         }
3182
3183         /* Record does not cross a page boundary */
3184         if (!RecordIsValid(record, *RecPtr, emode))
3185                 goto next_record_is_invalid;
3186         if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
3187                 MAXALIGN(total_len))
3188                 nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
3189         EndRecPtr.xlogid = RecPtr->xlogid;
3190         EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
3191         ReadRecPtr = *RecPtr;
3192         memcpy(buffer, record, total_len);
3193
3194         /*
3195          * Special processing if it's an XLOG SWITCH record
3196          */
3197         if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
3198         {
3199                 /* Pretend it extends to end of segment */
3200                 EndRecPtr.xrecoff += XLogSegSize - 1;
3201                 EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
3202                 nextRecord = NULL;              /* definitely not on same page */
3203
3204                 /*
3205                  * Pretend that readBuf contains the last page of the segment. This is
3206                  * just to avoid Assert failure in StartupXLOG if XLOG ends with this
3207                  * segment.
3208                  */
3209                 readOff = XLogSegSize - XLOG_BLCKSZ;
3210         }
3211         return (XLogRecord *) buffer;
3212
3213 next_record_is_invalid:;
3214         close(readFile);
3215         readFile = -1;
3216         nextRecord = NULL;
3217         return NULL;
3218 }
3219
3220 /*
3221  * Check whether the xlog header of a page just read in looks valid.
3222  *
3223  * This is just a convenience subroutine to avoid duplicated code in
3224  * ReadRecord.  It's not intended for use from anywhere else.
3225  */
3226 static bool
3227 ValidXLOGHeader(XLogPageHeader hdr, int emode)
3228 {
3229         XLogRecPtr      recaddr;
3230
3231         if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
3232         {
3233                 ereport(emode,
3234                                 (errmsg("invalid magic number %04X in log file %u, segment %u, offset %u",
3235                                                 hdr->xlp_magic, readId, readSeg, readOff)));
3236                 return false;
3237         }
3238         if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
3239         {
3240                 ereport(emode,
3241                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3242                                                 hdr->xlp_info, readId, readSeg, readOff)));
3243                 return false;
3244         }
3245         if (hdr->xlp_info & XLP_LONG_HEADER)
3246         {
3247                 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
3248
3249                 if (longhdr->xlp_sysid != ControlFile->system_identifier)
3250                 {
3251                         char            fhdrident_str[32];
3252                         char            sysident_str[32];
3253
3254                         /*
3255                          * Format sysids separately to keep platform-dependent format code
3256                          * out of the translatable message string.
3257                          */
3258                         snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
3259                                          longhdr->xlp_sysid);
3260                         snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
3261                                          ControlFile->system_identifier);
3262                         ereport(emode,
3263                                         (errmsg("WAL file is from different system"),
3264                                          errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
3265                                                            fhdrident_str, sysident_str)));
3266                         return false;
3267                 }
3268                 if (longhdr->xlp_seg_size != XLogSegSize)
3269                 {
3270                         ereport(emode,
3271                                         (errmsg("WAL file is from different system"),
3272                                          errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
3273                         return false;
3274                 }
3275                 if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
3276                 {
3277                         ereport(emode,
3278                                         (errmsg("WAL file is from different system"),
3279                                          errdetail("Incorrect XLOG_BLCKSZ in page header.")));
3280                         return false;
3281                 }
3282         }
3283         else if (readOff == 0)
3284         {
3285                 /* hmm, first page of file doesn't have a long header? */
3286                 ereport(emode,
3287                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3288                                                 hdr->xlp_info, readId, readSeg, readOff)));
3289                 return false;
3290         }
3291
3292         recaddr.xlogid = readId;
3293         recaddr.xrecoff = readSeg * XLogSegSize + readOff;
3294         if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
3295         {
3296                 ereport(emode,
3297                                 (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
3298                                                 hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
3299                                                 readId, readSeg, readOff)));
3300                 return false;
3301         }
3302
3303         /*
3304          * Check page TLI is one of the expected values.
3305          */
3306         if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
3307         {
3308                 ereport(emode,
3309                                 (errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u",
3310                                                 hdr->xlp_tli,
3311                                                 readId, readSeg, readOff)));
3312                 return false;
3313         }
3314
3315         /*
3316          * Since child timelines are always assigned a TLI greater than their
3317          * immediate parent's TLI, we should never see TLI go backwards across
3318          * successive pages of a consistent WAL sequence.
3319          *
3320          * Of course this check should only be applied when advancing sequentially
3321          * across pages; therefore ReadRecord resets lastPageTLI to zero when
3322          * going to a random page.
3323          */
3324         if (hdr->xlp_tli < lastPageTLI)
3325         {
3326                 ereport(emode,
3327                                 (errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u",
3328                                                 hdr->xlp_tli, lastPageTLI,
3329                                                 readId, readSeg, readOff)));
3330                 return false;
3331         }
3332         lastPageTLI = hdr->xlp_tli;
3333         return true;
3334 }
3335
3336 /*
3337  * Try to read a timeline's history file.
3338  *
3339  * If successful, return the list of component TLIs (the given TLI followed by
3340  * its ancestor TLIs).  If we can't find the history file, assume that the
3341  * timeline has no parents, and return a list of just the specified timeline
3342  * ID.
3343  */
3344 static List *
3345 readTimeLineHistory(TimeLineID targetTLI)
3346 {
3347         List       *result;
3348         char            path[MAXPGPATH];
3349         char            histfname[MAXFNAMELEN];
3350         char            fline[MAXPGPATH];
3351         FILE       *fd;
3352
3353         if (InArchiveRecovery)
3354         {
3355                 TLHistoryFileName(histfname, targetTLI);
3356                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3357         }
3358         else
3359                 TLHistoryFilePath(path, targetTLI);
3360
3361         fd = AllocateFile(path, "r");
3362         if (fd == NULL)
3363         {
3364                 if (errno != ENOENT)
3365                         ereport(FATAL,
3366                                         (errcode_for_file_access(),
3367                                          errmsg("could not open file \"%s\": %m", path)));
3368                 /* Not there, so assume no parents */
3369                 return list_make1_int((int) targetTLI);
3370         }
3371
3372         result = NIL;
3373
3374         /*
3375          * Parse the file...
3376          */
3377         while (fgets(fline, MAXPGPATH, fd) != NULL)
3378         {
3379                 /* skip leading whitespace and check for # comment */
3380                 char       *ptr;
3381                 char       *endptr;
3382                 TimeLineID      tli;
3383
3384                 for (ptr = fline; *ptr; ptr++)
3385                 {
3386                         if (!isspace((unsigned char) *ptr))
3387                                 break;
3388                 }
3389                 if (*ptr == '\0' || *ptr == '#')
3390                         continue;
3391
3392                 /* expect a numeric timeline ID as first field of line */
3393                 tli = (TimeLineID) strtoul(ptr, &endptr, 0);
3394                 if (endptr == ptr)
3395                         ereport(FATAL,
3396                                         (errmsg("syntax error in history file: %s", fline),
3397                                          errhint("Expected a numeric timeline ID.")));
3398
3399                 if (result &&
3400                         tli <= (TimeLineID) linitial_int(result))
3401                         ereport(FATAL,
3402                                         (errmsg("invalid data in history file: %s", fline),
3403                                    errhint("Timeline IDs must be in increasing sequence.")));
3404
3405                 /* Build list with newest item first */
3406                 result = lcons_int((int) tli, result);
3407
3408                 /* we ignore the remainder of each line */
3409         }
3410
3411         FreeFile(fd);
3412
3413         if (result &&
3414                 targetTLI <= (TimeLineID) linitial_int(result))
3415                 ereport(FATAL,
3416                                 (errmsg("invalid data in history file \"%s\"", path),
3417                         errhint("Timeline IDs must be less than child timeline's ID.")));
3418
3419         result = lcons_int((int) targetTLI, result);
3420
3421         ereport(DEBUG3,
3422                         (errmsg_internal("history of timeline %u is %s",
3423                                                          targetTLI, nodeToString(result))));
3424
3425         return result;
3426 }
3427
3428 /*
3429  * Probe whether a timeline history file exists for the given timeline ID
3430  */
3431 static bool
3432 existsTimeLineHistory(TimeLineID probeTLI)
3433 {
3434         char            path[MAXPGPATH];
3435         char            histfname[MAXFNAMELEN];
3436         FILE       *fd;
3437
3438         if (InArchiveRecovery)
3439         {
3440                 TLHistoryFileName(histfname, probeTLI);
3441                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3442         }
3443         else
3444                 TLHistoryFilePath(path, probeTLI);
3445
3446         fd = AllocateFile(path, "r");
3447         if (fd != NULL)
3448         {
3449                 FreeFile(fd);
3450                 return true;
3451         }
3452         else
3453         {
3454                 if (errno != ENOENT)
3455                         ereport(FATAL,
3456                                         (errcode_for_file_access(),
3457                                          errmsg("could not open file \"%s\": %m", path)));
3458                 return false;
3459         }
3460 }
3461
3462 /*
3463  * Find the newest existing timeline, assuming that startTLI exists.
3464  *
3465  * Note: while this is somewhat heuristic, it does positively guarantee
3466  * that (result + 1) is not a known timeline, and therefore it should
3467  * be safe to assign that ID to a new timeline.
3468  */
3469 static TimeLineID
3470 findNewestTimeLine(TimeLineID startTLI)
3471 {
3472         TimeLineID      newestTLI;
3473         TimeLineID      probeTLI;
3474
3475         /*
3476          * The algorithm is just to probe for the existence of timeline history
3477          * files.  XXX is it useful to allow gaps in the sequence?
3478          */
3479         newestTLI = startTLI;
3480
3481         for (probeTLI = startTLI + 1;; probeTLI++)
3482         {
3483                 if (existsTimeLineHistory(probeTLI))
3484                 {
3485                         newestTLI = probeTLI;           /* probeTLI exists */
3486                 }
3487                 else
3488                 {
3489                         /* doesn't exist, assume we're done */
3490                         break;
3491                 }
3492         }
3493
3494         return newestTLI;
3495 }
3496
3497 /*
3498  * Create a new timeline history file.
3499  *
3500  *      newTLI: ID of the new timeline
3501  *      parentTLI: ID of its immediate parent
3502  *      endTLI et al: ID of the last used WAL file, for annotation purposes
3503  *
3504  * Currently this is only used during recovery, and so there are no locking
3505  * considerations.      But we should be just as tense as XLogFileInit to avoid
3506  * emplacing a bogus file.
3507  */
3508 static void
3509 writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
3510                                          TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
3511 {
3512         char            path[MAXPGPATH];
3513         char            tmppath[MAXPGPATH];
3514         char            histfname[MAXFNAMELEN];
3515         char            xlogfname[MAXFNAMELEN];
3516         char            buffer[BLCKSZ];
3517         int                     srcfd;
3518         int                     fd;
3519         int                     nbytes;
3520
3521         Assert(newTLI > parentTLI); /* else bad selection of newTLI */
3522
3523         /*
3524          * Write into a temp file name.
3525          */
3526         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3527
3528         unlink(tmppath);
3529
3530         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
3531         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
3532                                            S_IRUSR | S_IWUSR);
3533         if (fd < 0)
3534                 ereport(ERROR,
3535                                 (errcode_for_file_access(),
3536                                  errmsg("could not create file \"%s\": %m", tmppath)));
3537
3538         /*
3539          * If a history file exists for the parent, copy it verbatim
3540          */
3541         if (InArchiveRecovery)
3542         {
3543                 TLHistoryFileName(histfname, parentTLI);
3544                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3545         }
3546         else
3547                 TLHistoryFilePath(path, parentTLI);
3548
3549         srcfd = BasicOpenFile(path, O_RDONLY, 0);
3550         if (srcfd < 0)
3551         {
3552                 if (errno != ENOENT)
3553                         ereport(ERROR,
3554                                         (errcode_for_file_access(),
3555                                          errmsg("could not open file \"%s\": %m", path)));
3556                 /* Not there, so assume parent has no parents */
3557         }
3558         else
3559         {
3560                 for (;;)
3561                 {
3562                         errno = 0;
3563                         nbytes = (int) read(srcfd, buffer, sizeof(buffer));
3564                         if (nbytes < 0 || errno != 0)
3565                                 ereport(ERROR,
3566                                                 (errcode_for_file_access(),
3567                                                  errmsg("could not read file \"%s\": %m", path)));
3568                         if (nbytes == 0)
3569                                 break;
3570                         errno = 0;
3571                         if ((int) write(fd, buffer, nbytes) != nbytes)
3572                         {
3573                                 int                     save_errno = errno;
3574
3575                                 /*
3576                                  * If we fail to make the file, delete it to release disk
3577                                  * space
3578                                  */
3579                                 unlink(tmppath);
3580
3581                                 /*
3582                                  * if write didn't set errno, assume problem is no disk space
3583                                  */
3584                                 errno = save_errno ? save_errno : ENOSPC;
3585
3586                                 ereport(ERROR,
3587                                                 (errcode_for_file_access(),
3588                                          errmsg("could not write to file \"%s\": %m", tmppath)));
3589                         }
3590                 }
3591                 close(srcfd);
3592         }
3593
3594         /*
3595          * Append one line with the details of this timeline split.
3596          *
3597          * If we did have a parent file, insert an extra newline just in case the
3598          * parent file failed to end with one.
3599          */
3600         XLogFileName(xlogfname, endTLI, endLogId, endLogSeg);
3601
3602         snprintf(buffer, sizeof(buffer),
3603                          "%s%u\t%s\t%s transaction %u at %s\n",
3604                          (srcfd < 0) ? "" : "\n",
3605                          parentTLI,
3606                          xlogfname,
3607                          recoveryStopAfter ? "after" : "before",
3608                          recoveryStopXid,
3609                          str_time(recoveryStopTime));
3610
3611         nbytes = strlen(buffer);
3612         errno = 0;
3613         if ((int) write(fd, buffer, nbytes) != nbytes)
3614         {
3615                 int                     save_errno = errno;
3616
3617                 /*
3618                  * If we fail to make the file, delete it to release disk space
3619                  */
3620                 unlink(tmppath);
3621                 /* if write didn't set errno, assume problem is no disk space */
3622                 errno = save_errno ? save_errno : ENOSPC;
3623
3624                 ereport(ERROR,
3625                                 (errcode_for_file_access(),
3626                                  errmsg("could not write to file \"%s\": %m", tmppath)));
3627         }
3628
3629         if (pg_fsync(fd) != 0)
3630                 ereport(ERROR,
3631                                 (errcode_for_file_access(),
3632                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
3633
3634         if (close(fd))
3635                 ereport(ERROR,
3636                                 (errcode_for_file_access(),
3637                                  errmsg("could not close file \"%s\": %m", tmppath)));
3638
3639
3640         /*
3641          * Now move the completed history file into place with its final name.
3642          */
3643         TLHistoryFilePath(path, newTLI);
3644
3645         /*
3646          * Prefer link() to rename() here just to be really sure that we don't
3647          * overwrite an existing logfile.  However, there shouldn't be one, so
3648          * rename() is an acceptable substitute except for the truly paranoid.
3649          */
3650 #if HAVE_WORKING_LINK
3651         if (link(tmppath, path) < 0)
3652                 ereport(ERROR,
3653                                 (errcode_for_file_access(),
3654                                  errmsg("could not link file \"%s\" to \"%s\": %m",
3655                                                 tmppath, path)));
3656         unlink(tmppath);
3657 #else
3658         if (rename(tmppath, path) < 0)
3659                 ereport(ERROR,
3660                                 (errcode_for_file_access(),
3661                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
3662                                                 tmppath, path)));
3663 #endif
3664
3665         /* The history file can be archived immediately. */
3666         TLHistoryFileName(histfname, newTLI);
3667         XLogArchiveNotify(histfname);
3668 }
3669
3670 /*
3671  * I/O routines for pg_control
3672  *
3673  * *ControlFile is a buffer in shared memory that holds an image of the
3674  * contents of pg_control.      WriteControlFile() initializes pg_control
3675  * given a preloaded buffer, ReadControlFile() loads the buffer from
3676  * the pg_control file (during postmaster or standalone-backend startup),
3677  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
3678  *
3679  * For simplicity, WriteControlFile() initializes the fields of pg_control
3680  * that are related to checking backend/database compatibility, and
3681  * ReadControlFile() verifies they are correct.  We could split out the
3682  * I/O and compatibility-check functions, but there seems no need currently.
3683  */
3684 static void
3685 WriteControlFile(void)
3686 {
3687         int                     fd;
3688         char            buffer[PG_CONTROL_SIZE];                /* need not be aligned */
3689         char       *localeptr;
3690
3691         /*
3692          * Initialize version and compatibility-check fields
3693          */
3694         ControlFile->pg_control_version = PG_CONTROL_VERSION;
3695         ControlFile->catalog_version_no = CATALOG_VERSION_NO;
3696
3697         ControlFile->maxAlign = MAXIMUM_ALIGNOF;
3698         ControlFile->floatFormat = FLOATFORMAT_VALUE;
3699
3700         ControlFile->blcksz = BLCKSZ;
3701         ControlFile->relseg_size = RELSEG_SIZE;
3702         ControlFile->xlog_blcksz = XLOG_BLCKSZ;
3703         ControlFile->xlog_seg_size = XLOG_SEG_SIZE;
3704
3705         ControlFile->nameDataLen = NAMEDATALEN;
3706         ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
3707
3708 #ifdef HAVE_INT64_TIMESTAMP
3709         ControlFile->enableIntTimes = TRUE;
3710 #else
3711         ControlFile->enableIntTimes = FALSE;
3712 #endif
3713
3714         ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
3715         localeptr = setlocale(LC_COLLATE, NULL);
3716         if (!localeptr)
3717                 ereport(PANIC,
3718                                 (errmsg("invalid LC_COLLATE setting")));
3719         StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
3720         localeptr = setlocale(LC_CTYPE, NULL);
3721         if (!localeptr)
3722                 ereport(PANIC,
3723                                 (errmsg("invalid LC_CTYPE setting")));
3724         StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
3725
3726         /* Contents are protected with a CRC */
3727         INIT_CRC32(ControlFile->crc);
3728         COMP_CRC32(ControlFile->crc,
3729                            (char *) ControlFile,
3730                            offsetof(ControlFileData, crc));
3731         FIN_CRC32(ControlFile->crc);
3732
3733         /*
3734          * We write out PG_CONTROL_SIZE bytes into pg_control, zero-padding the
3735          * excess over sizeof(ControlFileData).  This reduces the odds of
3736          * premature-EOF errors when reading pg_control.  We'll still fail when we
3737          * check the contents of the file, but hopefully with a more specific
3738          * error than "couldn't read pg_control".
3739          */
3740         if (sizeof(ControlFileData) > PG_CONTROL_SIZE)
3741                 elog(PANIC, "sizeof(ControlFileData) is larger than PG_CONTROL_SIZE; fix either one");
3742
3743         memset(buffer, 0, PG_CONTROL_SIZE);
3744         memcpy(buffer, ControlFile, sizeof(ControlFileData));
3745
3746         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3747                                            O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
3748                                            S_IRUSR | S_IWUSR);
3749         if (fd < 0)
3750                 ereport(PANIC,
3751                                 (errcode_for_file_access(),
3752                                  errmsg("could not create control file \"%s\": %m",
3753                                                 XLOG_CONTROL_FILE)));
3754
3755         errno = 0;
3756         if (write(fd, buffer, PG_CONTROL_SIZE) != PG_CONTROL_SIZE)
3757         {
3758                 /* if write didn't set errno, assume problem is no disk space */
3759                 if (errno == 0)
3760                         errno = ENOSPC;
3761                 ereport(PANIC,
3762                                 (errcode_for_file_access(),
3763                                  errmsg("could not write to control file: %m")));
3764         }
3765
3766         if (pg_fsync(fd) != 0)
3767                 ereport(PANIC,
3768                                 (errcode_for_file_access(),
3769                                  errmsg("could not fsync control file: %m")));
3770
3771         if (close(fd))
3772                 ereport(PANIC,
3773                                 (errcode_for_file_access(),
3774                                  errmsg("could not close control file: %m")));
3775 }
3776
3777 static void
3778 ReadControlFile(void)
3779 {
3780         pg_crc32        crc;
3781         int                     fd;
3782
3783         /*
3784          * Read data...
3785          */
3786         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3787                                            O_RDWR | PG_BINARY,
3788                                            S_IRUSR | S_IWUSR);
3789         if (fd < 0)
3790                 ereport(PANIC,
3791                                 (errcode_for_file_access(),
3792                                  errmsg("could not open control file \"%s\": %m",
3793                                                 XLOG_CONTROL_FILE)));
3794
3795         if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3796                 ereport(PANIC,
3797                                 (errcode_for_file_access(),
3798                                  errmsg("could not read from control file: %m")));
3799
3800         close(fd);
3801
3802         /*
3803          * Check for expected pg_control format version.  If this is wrong, the
3804          * CRC check will likely fail because we'll be checking the wrong number
3805          * of bytes.  Complaining about wrong version will probably be more
3806          * enlightening than complaining about wrong CRC.
3807          */
3808         if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
3809                 ereport(FATAL,
3810                                 (errmsg("database files are incompatible with server"),
3811                                  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
3812                                   " but the server was compiled with PG_CONTROL_VERSION %d.",
3813                                                 ControlFile->pg_control_version, PG_CONTROL_VERSION),
3814                                  errhint("It looks like you need to initdb.")));
3815         /* Now check the CRC. */
3816         INIT_CRC32(crc);
3817         COMP_CRC32(crc,
3818                            (char *) ControlFile,
3819                            offsetof(ControlFileData, crc));
3820         FIN_CRC32(crc);
3821
3822         if (!EQ_CRC32(crc, ControlFile->crc))
3823                 ereport(FATAL,
3824                                 (errmsg("incorrect checksum in control file")));
3825
3826         /*
3827          * Do compatibility checking immediately.  We do this here for 2 reasons:
3828          *
3829          * (1) if the database isn't compatible with the backend executable, we
3830          * want to abort before we can possibly do any damage;
3831          *
3832          * (2) this code is executed in the postmaster, so the setlocale() will
3833          * propagate to forked backends, which aren't going to read this file for
3834          * themselves.  (These locale settings are considered critical
3835          * compatibility items because they can affect sort order of indexes.)
3836          */
3837         if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
3838                 ereport(FATAL,
3839                                 (errmsg("database files are incompatible with server"),
3840                                  errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
3841                                   " but the server was compiled with CATALOG_VERSION_NO %d.",
3842                                                 ControlFile->catalog_version_no, CATALOG_VERSION_NO),
3843                                  errhint("It looks like you need to initdb.")));
3844         if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
3845                 ereport(FATAL,
3846                                 (errmsg("database files are incompatible with server"),
3847                    errdetail("The database cluster was initialized with MAXALIGN %d,"
3848                                          " but the server was compiled with MAXALIGN %d.",
3849                                          ControlFile->maxAlign, MAXIMUM_ALIGNOF),
3850                                  errhint("It looks like you need to initdb.")));
3851         if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
3852                 ereport(FATAL,
3853                                 (errmsg("database files are incompatible with server"),
3854                                  errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
3855                                  errhint("It looks like you need to initdb.")));
3856         if (ControlFile->blcksz != BLCKSZ)
3857                 ereport(FATAL,
3858                                 (errmsg("database files are incompatible with server"),
3859                          errdetail("The database cluster was initialized with BLCKSZ %d,"
3860                                            " but the server was compiled with BLCKSZ %d.",
3861                                            ControlFile->blcksz, BLCKSZ),
3862                                  errhint("It looks like you need to recompile or initdb.")));
3863         if (ControlFile->relseg_size != RELSEG_SIZE)
3864                 ereport(FATAL,
3865                                 (errmsg("database files are incompatible with server"),
3866                 errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
3867                                   " but the server was compiled with RELSEG_SIZE %d.",
3868                                   ControlFile->relseg_size, RELSEG_SIZE),
3869                                  errhint("It looks like you need to recompile or initdb.")));
3870         if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
3871                 ereport(FATAL,
3872                                 (errmsg("database files are incompatible with server"),
3873                 errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
3874                                   " but the server was compiled with XLOG_BLCKSZ %d.",
3875                                   ControlFile->xlog_blcksz, XLOG_BLCKSZ),
3876                                  errhint("It looks like you need to recompile or initdb.")));
3877         if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
3878                 ereport(FATAL,
3879                                 (errmsg("database files are incompatible with server"),
3880                                  errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d,"
3881                                            " but the server was compiled with XLOG_SEG_SIZE %d.",
3882                                                    ControlFile->xlog_seg_size, XLOG_SEG_SIZE),
3883                                  errhint("It looks like you need to recompile or initdb.")));
3884         if (ControlFile->nameDataLen != NAMEDATALEN)
3885                 ereport(FATAL,
3886                                 (errmsg("database files are incompatible with server"),
3887                 errdetail("The database cluster was initialized with NAMEDATALEN %d,"
3888                                   " but the server was compiled with NAMEDATALEN %d.",
3889                                   ControlFile->nameDataLen, NAMEDATALEN),
3890                                  errhint("It looks like you need to recompile or initdb.")));
3891         if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
3892                 ereport(FATAL,
3893                                 (errmsg("database files are incompatible with server"),
3894                                  errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
3895                                           " but the server was compiled with INDEX_MAX_KEYS %d.",
3896                                                    ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
3897                                  errhint("It looks like you need to recompile or initdb.")));
3898
3899 #ifdef HAVE_INT64_TIMESTAMP
3900         if (ControlFile->enableIntTimes != TRUE)
3901                 ereport(FATAL,
3902                                 (errmsg("database files are incompatible with server"),
3903                                  errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
3904                                   " but the server was compiled with HAVE_INT64_TIMESTAMP."),
3905                                  errhint("It looks like you need to recompile or initdb.")));
3906 #else
3907         if (ControlFile->enableIntTimes != FALSE)
3908                 ereport(FATAL,
3909                                 (errmsg("database files are incompatible with server"),
3910                                  errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
3911                            " but the server was compiled without HAVE_INT64_TIMESTAMP."),
3912                                  errhint("It looks like you need to recompile or initdb.")));
3913 #endif
3914
3915         if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
3916                 ereport(FATAL,
3917                                 (errmsg("database files are incompatible with server"),
3918                                  errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
3919                                   " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
3920                                                    ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
3921                                  errhint("It looks like you need to recompile or initdb.")));
3922         if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
3923                 ereport(FATAL,
3924                         (errmsg("database files are incompatible with operating system"),
3925                          errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
3926                                            " which is not recognized by setlocale().",
3927                                            ControlFile->lc_collate),
3928                          errhint("It looks like you need to initdb or install locale support.")));
3929         if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
3930                 ereport(FATAL,
3931                         (errmsg("database files are incompatible with operating system"),
3932                 errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
3933                                   " which is not recognized by setlocale().",
3934                                   ControlFile->lc_ctype),
3935                          errhint("It looks like you need to initdb or install locale support.")));
3936
3937         /* Make the fixed locale settings visible as GUC variables, too */
3938         SetConfigOption("lc_collate", ControlFile->lc_collate,
3939                                         PGC_INTERNAL, PGC_S_OVERRIDE);
3940         SetConfigOption("lc_ctype", ControlFile->lc_ctype,
3941                                         PGC_INTERNAL, PGC_S_OVERRIDE);
3942 }
3943
3944 void
3945 UpdateControlFile(void)
3946 {
3947         int                     fd;
3948
3949         INIT_CRC32(ControlFile->crc);
3950         COMP_CRC32(ControlFile->crc,
3951                            (char *) ControlFile,
3952                            offsetof(ControlFileData, crc));
3953         FIN_CRC32(ControlFile->crc);
3954
3955         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3956                                            O_RDWR | PG_BINARY,
3957                                            S_IRUSR | S_IWUSR);
3958         if (fd < 0)
3959                 ereport(PANIC,
3960                                 (errcode_for_file_access(),
3961                                  errmsg("could not open control file \"%s\": %m",
3962                                                 XLOG_CONTROL_FILE)));
3963
3964         errno = 0;
3965         if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3966         {
3967                 /* if write didn't set errno, assume problem is no disk space */
3968                 if (errno == 0)
3969                         errno = ENOSPC;
3970                 ereport(PANIC,
3971                                 (errcode_for_file_access(),
3972                                  errmsg("could not write to control file: %m")));
3973         }
3974
3975         if (pg_fsync(fd) != 0)
3976                 ereport(PANIC,
3977                                 (errcode_for_file_access(),
3978                                  errmsg("could not fsync control file: %m")));
3979
3980         if (close(fd))
3981                 ereport(PANIC,
3982                                 (errcode_for_file_access(),
3983                                  errmsg("could not close control file: %m")));
3984 }
3985
3986 /*
3987  * Initialization of shared memory for XLOG
3988  */
3989 Size
3990 XLOGShmemSize(void)
3991 {
3992         Size            size;
3993
3994         /* XLogCtl */
3995         size = sizeof(XLogCtlData);
3996         /* xlblocks array */
3997         size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
3998         /* extra alignment padding for XLOG I/O buffers */
3999         size = add_size(size, ALIGNOF_XLOG_BUFFER);
4000         /* and the buffers themselves */
4001         size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
4002
4003         /*
4004          * Note: we don't count ControlFileData, it comes out of the "slop factor"
4005          * added by CreateSharedMemoryAndSemaphores.  This lets us use this
4006          * routine again below to compute the actual allocation size.
4007          */
4008
4009         return size;
4010 }
4011
4012 void
4013 XLOGShmemInit(void)
4014 {
4015         bool            foundCFile,
4016                                 foundXLog;
4017         char       *allocptr;
4018
4019         ControlFile = (ControlFileData *)
4020                 ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
4021         XLogCtl = (XLogCtlData *)
4022                 ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
4023
4024         if (foundCFile || foundXLog)
4025         {
4026                 /* both should be present or neither */
4027                 Assert(foundCFile && foundXLog);
4028                 return;
4029         }
4030
4031         memset(XLogCtl, 0, sizeof(XLogCtlData));
4032
4033         /*
4034          * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
4035          * multiple of the alignment for same, so no extra alignment padding is
4036          * needed here.
4037          */
4038         allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
4039         XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
4040         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
4041         allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
4042
4043         /*
4044          * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
4045          */
4046         allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
4047         XLogCtl->pages = allocptr;
4048         memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
4049
4050         /*
4051          * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
4052          * in additional info.)
4053          */
4054         XLogCtl->XLogCacheByte = (Size) XLOG_BLCKSZ *XLOGbuffers;
4055
4056         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
4057         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
4058         SpinLockInit(&XLogCtl->info_lck);
4059
4060         /*
4061          * If we are not in bootstrap mode, pg_control should already exist. Read
4062          * and validate it immediately (see comments in ReadControlFile() for the
4063          * reasons why).
4064          */
4065         if (!IsBootstrapProcessingMode())
4066                 ReadControlFile();
4067 }
4068
4069 /*
4070  * This func must be called ONCE on system install.  It creates pg_control
4071  * and the initial XLOG segment.
4072  */
4073 void
4074 BootStrapXLOG(void)
4075 {
4076         CheckPoint      checkPoint;
4077         char       *buffer;
4078         XLogPageHeader page;
4079         XLogLongPageHeader longpage;
4080         XLogRecord *record;
4081         bool            use_existent;
4082         uint64          sysidentifier;
4083         struct timeval tv;
4084         pg_crc32        crc;
4085
4086         /*
4087          * Select a hopefully-unique system identifier code for this installation.
4088          * We use the result of gettimeofday(), including the fractional seconds
4089          * field, as being about as unique as we can easily get.  (Think not to
4090          * use random(), since it hasn't been seeded and there's no portable way
4091          * to seed it other than the system clock value...)  The upper half of the
4092          * uint64 value is just the tv_sec part, while the lower half is the XOR
4093          * of tv_sec and tv_usec.  This is to ensure that we don't lose uniqueness
4094          * unnecessarily if "uint64" is really only 32 bits wide.  A person
4095          * knowing this encoding can determine the initialization time of the
4096          * installation, which could perhaps be useful sometimes.
4097          */
4098         gettimeofday(&tv, NULL);
4099         sysidentifier = ((uint64) tv.tv_sec) << 32;
4100         sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
4101
4102         /* First timeline ID is always 1 */
4103         ThisTimeLineID = 1;
4104
4105         /* page buffer must be aligned suitably for O_DIRECT */
4106         buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
4107         page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
4108         memset(page, 0, XLOG_BLCKSZ);
4109
4110         /* Set up information for the initial checkpoint record */
4111         checkPoint.redo.xlogid = 0;
4112         checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
4113         checkPoint.undo = checkPoint.redo;
4114         checkPoint.ThisTimeLineID = ThisTimeLineID;
4115         checkPoint.nextXidEpoch = 0;
4116         checkPoint.nextXid = FirstNormalTransactionId;
4117         checkPoint.nextOid = FirstBootstrapObjectId;
4118         checkPoint.nextMulti = FirstMultiXactId;
4119         checkPoint.nextMultiOffset = 0;
4120         checkPoint.time = time(NULL);
4121
4122         ShmemVariableCache->nextXid = checkPoint.nextXid;
4123         ShmemVariableCache->nextOid = checkPoint.nextOid;
4124         ShmemVariableCache->oidCount = 0;
4125         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4126
4127         /* Set up the XLOG page header */
4128         page->xlp_magic = XLOG_PAGE_MAGIC;
4129         page->xlp_info = XLP_LONG_HEADER;
4130         page->xlp_tli = ThisTimeLineID;
4131         page->xlp_pageaddr.xlogid = 0;
4132         page->xlp_pageaddr.xrecoff = 0;
4133         longpage = (XLogLongPageHeader) page;
4134         longpage->xlp_sysid = sysidentifier;
4135         longpage->xlp_seg_size = XLogSegSize;
4136         longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
4137
4138         /* Insert the initial checkpoint record */
4139         record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
4140         record->xl_prev.xlogid = 0;
4141         record->xl_prev.xrecoff = 0;
4142         record->xl_xid = InvalidTransactionId;
4143         record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
4144         record->xl_len = sizeof(checkPoint);
4145         record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
4146         record->xl_rmid = RM_XLOG_ID;
4147         memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
4148
4149         INIT_CRC32(crc);
4150         COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
4151         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
4152                            SizeOfXLogRecord - sizeof(pg_crc32));
4153         FIN_CRC32(crc);
4154         record->xl_crc = crc;
4155
4156         /* Create first XLOG segment file */
4157         use_existent = false;
4158         openLogFile = XLogFileInit(0, 0, &use_existent, false);
4159
4160         /* Write the first page with the initial record */
4161         errno = 0;
4162         if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
4163         {
4164                 /* if write didn't set errno, assume problem is no disk space */
4165                 if (errno == 0)
4166                         errno = ENOSPC;
4167                 ereport(PANIC,
4168                                 (errcode_for_file_access(),
4169                           errmsg("could not write bootstrap transaction log file: %m")));
4170         }
4171
4172         if (pg_fsync(openLogFile) != 0)
4173                 ereport(PANIC,
4174                                 (errcode_for_file_access(),
4175                           errmsg("could not fsync bootstrap transaction log file: %m")));
4176
4177         if (close(openLogFile))
4178                 ereport(PANIC,
4179                                 (errcode_for_file_access(),
4180                           errmsg("could not close bootstrap transaction log file: %m")));
4181
4182         openLogFile = -1;
4183
4184         /* Now create pg_control */
4185
4186         memset(ControlFile, 0, sizeof(ControlFileData));
4187         /* Initialize pg_control status fields */
4188         ControlFile->system_identifier = sysidentifier;
4189         ControlFile->state = DB_SHUTDOWNED;
4190         ControlFile->time = checkPoint.time;
4191         ControlFile->checkPoint = checkPoint.redo;
4192         ControlFile->checkPointCopy = checkPoint;
4193         /* some additional ControlFile fields are set in WriteControlFile() */
4194
4195         WriteControlFile();
4196
4197         /* Bootstrap the commit log, too */
4198         BootStrapCLOG();
4199         BootStrapSUBTRANS();
4200         BootStrapMultiXact();
4201
4202         pfree(buffer);
4203 }
4204
4205 static char *
4206 str_time(time_t tnow)
4207 {
4208         static char buf[128];
4209
4210         strftime(buf, sizeof(buf),
4211                          "%Y-%m-%d %H:%M:%S %Z",
4212                          localtime(&tnow));
4213
4214         return buf;
4215 }
4216
4217 /*
4218  * See if there is a recovery command file (recovery.conf), and if so
4219  * read in parameters for archive recovery.
4220  *
4221  * XXX longer term intention is to expand this to
4222  * cater for additional parameters and controls
4223  * possibly use a flex lexer similar to the GUC one
4224  */
4225 static void
4226 readRecoveryCommandFile(void)
4227 {
4228         FILE       *fd;
4229         char            cmdline[MAXPGPATH];
4230         TimeLineID      rtli = 0;
4231         bool            rtliGiven = false;
4232         bool            syntaxError = false;
4233
4234         fd = AllocateFile(RECOVERY_COMMAND_FILE, "r");
4235         if (fd == NULL)
4236         {
4237                 if (errno == ENOENT)
4238                         return;                         /* not there, so no archive recovery */
4239                 ereport(FATAL,
4240                                 (errcode_for_file_access(),
4241                                  errmsg("could not open recovery command file \"%s\": %m",
4242                                                 RECOVERY_COMMAND_FILE)));
4243         }
4244
4245         ereport(LOG,
4246                         (errmsg("starting archive recovery")));
4247
4248         /*
4249          * Parse the file...
4250          */
4251         while (fgets(cmdline, MAXPGPATH, fd) != NULL)
4252         {
4253                 /* skip leading whitespace and check for # comment */
4254                 char       *ptr;
4255                 char       *tok1;
4256                 char       *tok2;
4257
4258                 for (ptr = cmdline; *ptr; ptr++)
4259                 {
4260                         if (!isspace((unsigned char) *ptr))
4261                                 break;
4262                 }
4263                 if (*ptr == '\0' || *ptr == '#')
4264                         continue;
4265
4266                 /* identify the quoted parameter value */
4267                 tok1 = strtok(ptr, "'");
4268                 if (!tok1)
4269                 {
4270                         syntaxError = true;
4271                         break;
4272                 }
4273                 tok2 = strtok(NULL, "'");
4274                 if (!tok2)
4275                 {
4276                         syntaxError = true;
4277                         break;
4278                 }
4279                 /* reparse to get just the parameter name */
4280                 tok1 = strtok(ptr, " \t=");
4281                 if (!tok1)
4282                 {
4283                         syntaxError = true;
4284                         break;
4285                 }
4286
4287                 if (strcmp(tok1, "restore_command") == 0)
4288                 {
4289                         recoveryRestoreCommand = pstrdup(tok2);
4290                         ereport(LOG,
4291                                         (errmsg("restore_command = \"%s\"",
4292                                                         recoveryRestoreCommand)));
4293                 }
4294                 else if (strcmp(tok1, "recovery_target_timeline") == 0)
4295                 {
4296                         rtliGiven = true;
4297                         if (strcmp(tok2, "latest") == 0)
4298                                 rtli = 0;
4299                         else
4300                         {
4301                                 errno = 0;
4302                                 rtli = (TimeLineID) strtoul(tok2, NULL, 0);
4303                                 if (errno == EINVAL || errno == ERANGE)
4304                                         ereport(FATAL,
4305                                                         (errmsg("recovery_target_timeline is not a valid number: \"%s\"",
4306                                                                         tok2)));
4307                         }
4308                         if (rtli)
4309                                 ereport(LOG,
4310                                                 (errmsg("recovery_target_timeline = %u", rtli)));
4311                         else
4312                                 ereport(LOG,
4313                                                 (errmsg("recovery_target_timeline = latest")));
4314                 }
4315                 else if (strcmp(tok1, "recovery_target_xid") == 0)
4316                 {
4317                         errno = 0;
4318                         recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0);
4319                         if (errno == EINVAL || errno == ERANGE)
4320                                 ereport(FATAL,
4321                                  (errmsg("recovery_target_xid is not a valid number: \"%s\"",
4322                                                  tok2)));
4323                         ereport(LOG,
4324                                         (errmsg("recovery_target_xid = %u",
4325                                                         recoveryTargetXid)));
4326                         recoveryTarget = true;
4327                         recoveryTargetExact = true;
4328                 }
4329                 else if (strcmp(tok1, "recovery_target_time") == 0)
4330                 {
4331                         /*
4332                          * if recovery_target_xid specified, then this overrides
4333                          * recovery_target_time
4334                          */
4335                         if (recoveryTargetExact)
4336                                 continue;
4337                         recoveryTarget = true;
4338                         recoveryTargetExact = false;
4339
4340                         /*
4341                          * Convert the time string given by the user to the time_t format.
4342                          * We use type abstime's input converter because we know abstime
4343                          * has the same representation as time_t.
4344                          */
4345                         recoveryTargetTime = (time_t)
4346                                 DatumGetAbsoluteTime(DirectFunctionCall1(abstimein,
4347                                                                                                          CStringGetDatum(tok2)));
4348                         ereport(LOG,
4349                                         (errmsg("recovery_target_time = %s",
4350                                                         DatumGetCString(DirectFunctionCall1(abstimeout,
4351                                 AbsoluteTimeGetDatum((AbsoluteTime) recoveryTargetTime))))));
4352                 }
4353                 else if (strcmp(tok1, "recovery_target_inclusive") == 0)
4354                 {
4355                         /*
4356                          * does nothing if a recovery_target is not also set
4357                          */
4358                         if (strcmp(tok2, "true") == 0)
4359                                 recoveryTargetInclusive = true;
4360                         else
4361                         {
4362                                 recoveryTargetInclusive = false;
4363                                 tok2 = "false";
4364                         }
4365                         ereport(LOG,
4366                                         (errmsg("recovery_target_inclusive = %s", tok2)));
4367                 }
4368                 else
4369                         ereport(FATAL,
4370                                         (errmsg("unrecognized recovery parameter \"%s\"",
4371                                                         tok1)));
4372         }
4373
4374         FreeFile(fd);
4375
4376         if (syntaxError)
4377                 ereport(FATAL,
4378                                 (errmsg("syntax error in recovery command file: %s",
4379                                                 cmdline),
4380                           errhint("Lines should have the format parameter = 'value'.")));
4381
4382         /* Check that required parameters were supplied */
4383         if (recoveryRestoreCommand == NULL)
4384                 ereport(FATAL,
4385                                 (errmsg("recovery command file \"%s\" did not specify restore_command",
4386                                                 RECOVERY_COMMAND_FILE)));
4387
4388         /* Enable fetching from archive recovery area */
4389         InArchiveRecovery = true;
4390
4391         /*
4392          * If user specified recovery_target_timeline, validate it or compute the
4393          * "latest" value.      We can't do this until after we've gotten the restore
4394          * command and set InArchiveRecovery, because we need to fetch timeline
4395          * history files from the archive.
4396          */
4397         if (rtliGiven)
4398         {
4399                 if (rtli)
4400                 {
4401                         /* Timeline 1 does not have a history file, all else should */
4402                         if (rtli != 1 && !existsTimeLineHistory(rtli))
4403                                 ereport(FATAL,
4404                                                 (errmsg("recovery_target_timeline %u does not exist",
4405                                                                 rtli)));
4406                         recoveryTargetTLI = rtli;
4407                 }
4408                 else
4409                 {
4410                         /* We start the "latest" search from pg_control's timeline */
4411                         recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI);
4412                 }
4413         }
4414 }
4415
4416 /*
4417  * Exit archive-recovery state
4418  */
4419 static void
4420 exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
4421 {
4422         char            recoveryPath[MAXPGPATH];
4423         char            xlogpath[MAXPGPATH];
4424
4425         /*
4426          * We are no longer in archive recovery state.
4427          */
4428         InArchiveRecovery = false;
4429
4430         /*
4431          * We should have the ending log segment currently open.  Verify, and then
4432          * close it (to avoid problems on Windows with trying to rename or delete
4433          * an open file).
4434          */
4435         Assert(readFile >= 0);
4436         Assert(readId == endLogId);
4437         Assert(readSeg == endLogSeg);
4438
4439         close(readFile);
4440         readFile = -1;
4441
4442         /*
4443          * If the segment was fetched from archival storage, we want to replace
4444          * the existing xlog segment (if any) with the archival version.  This is
4445          * because whatever is in XLOGDIR is very possibly older than what we have
4446          * from the archives, since it could have come from restoring a PGDATA
4447          * backup.      In any case, the archival version certainly is more
4448          * descriptive of what our current database state is, because that is what
4449          * we replayed from.
4450          *
4451          * Note that if we are establishing a new timeline, ThisTimeLineID is
4452          * already set to the new value, and so we will create a new file instead
4453          * of overwriting any existing file.
4454          */
4455         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
4456         XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4457
4458         if (restoredFromArchive)
4459         {
4460                 ereport(DEBUG3,
4461                                 (errmsg_internal("moving last restored xlog to \"%s\"",
4462                                                                  xlogpath)));
4463                 unlink(xlogpath);               /* might or might not exist */
4464                 if (rename(recoveryPath, xlogpath) != 0)
4465                         ereport(FATAL,
4466                                         (errcode_for_file_access(),
4467                                          errmsg("could not rename file \"%s\" to \"%s\": %m",
4468                                                         recoveryPath, xlogpath)));
4469                 /* XXX might we need to fix permissions on the file? */
4470         }
4471         else
4472         {
4473                 /*
4474                  * If the latest segment is not archival, but there's still a
4475                  * RECOVERYXLOG laying about, get rid of it.
4476                  */
4477                 unlink(recoveryPath);   /* ignore any error */
4478
4479                 /*
4480                  * If we are establishing a new timeline, we have to copy data from
4481                  * the last WAL segment of the old timeline to create a starting WAL
4482                  * segment for the new timeline.
4483                  */
4484                 if (endTLI != ThisTimeLineID)
4485                         XLogFileCopy(endLogId, endLogSeg,
4486                                                  endTLI, endLogId, endLogSeg);
4487         }
4488
4489         /*
4490          * Let's just make real sure there are not .ready or .done flags posted
4491          * for the new segment.
4492          */
4493         XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4494         XLogArchiveCleanup(xlogpath);
4495
4496         /* Get rid of any remaining recovered timeline-history file, too */
4497         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
4498         unlink(recoveryPath);           /* ignore any error */
4499
4500         /*
4501          * Rename the config file out of the way, so that we don't accidentally
4502          * re-enter archive recovery mode in a subsequent crash.
4503          */
4504         unlink(RECOVERY_COMMAND_DONE);
4505         if (rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE) != 0)
4506                 ereport(FATAL,
4507                                 (errcode_for_file_access(),
4508                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
4509                                                 RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE)));
4510
4511         ereport(LOG,
4512                         (errmsg("archive recovery complete")));
4513 }
4514
4515 /*
4516  * For point-in-time recovery, this function decides whether we want to
4517  * stop applying the XLOG at or after the current record.
4518  *
4519  * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
4520  * *includeThis is set TRUE if we should apply this record before stopping.
4521  * Also, some information is saved in recoveryStopXid et al for use in
4522  * annotating the new timeline's history file.
4523  */
4524 static bool
4525 recoveryStopsHere(XLogRecord *record, bool *includeThis)
4526 {
4527         bool            stopsHere;
4528         uint8           record_info;
4529         time_t          recordXtime;
4530
4531         /* Do we have a PITR target at all? */
4532         if (!recoveryTarget)
4533                 return false;
4534
4535         /* We only consider stopping at COMMIT or ABORT records */
4536         if (record->xl_rmid != RM_XACT_ID)
4537                 return false;
4538         record_info = record->xl_info & ~XLR_INFO_MASK;
4539         if (record_info == XLOG_XACT_COMMIT)
4540         {
4541                 xl_xact_commit *recordXactCommitData;
4542
4543                 recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
4544                 recordXtime = recordXactCommitData->xtime;
4545         }
4546         else if (record_info == XLOG_XACT_ABORT)
4547         {
4548                 xl_xact_abort *recordXactAbortData;
4549
4550                 recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
4551                 recordXtime = recordXactAbortData->xtime;
4552         }
4553         else
4554                 return false;
4555
4556         if (recoveryTargetExact)
4557         {
4558                 /*
4559                  * there can be only one transaction end record with this exact
4560                  * transactionid
4561                  *
4562                  * when testing for an xid, we MUST test for equality only, since
4563                  * transactions are numbered in the order they start, not the order
4564                  * they complete. A higher numbered xid will complete before you about
4565                  * 50% of the time...
4566                  */
4567                 stopsHere = (record->xl_xid == recoveryTargetXid);
4568                 if (stopsHere)
4569                         *includeThis = recoveryTargetInclusive;
4570         }
4571         else
4572         {
4573                 /*
4574                  * there can be many transactions that share the same commit time, so
4575                  * we stop after the last one, if we are inclusive, or stop at the
4576                  * first one if we are exclusive
4577                  */
4578                 if (recoveryTargetInclusive)
4579                         stopsHere = (recordXtime > recoveryTargetTime);
4580                 else
4581                         stopsHere = (recordXtime >= recoveryTargetTime);
4582                 if (stopsHere)
4583                         *includeThis = false;
4584         }
4585
4586         if (stopsHere)
4587         {
4588                 recoveryStopXid = record->xl_xid;
4589                 recoveryStopTime = recordXtime;
4590                 recoveryStopAfter = *includeThis;
4591
4592                 if (record_info == XLOG_XACT_COMMIT)
4593                 {
4594                         if (recoveryStopAfter)
4595                                 ereport(LOG,
4596                                                 (errmsg("recovery stopping after commit of transaction %u, time %s",
4597                                                           recoveryStopXid, str_time(recoveryStopTime))));
4598                         else
4599                                 ereport(LOG,
4600                                                 (errmsg("recovery stopping before commit of transaction %u, time %s",
4601                                                           recoveryStopXid, str_time(recoveryStopTime))));
4602                 }
4603                 else
4604                 {
4605                         if (recoveryStopAfter)
4606                                 ereport(LOG,
4607                                                 (errmsg("recovery stopping after abort of transaction %u, time %s",
4608                                                           recoveryStopXid, str_time(recoveryStopTime))));
4609                         else
4610                                 ereport(LOG,
4611                                                 (errmsg("recovery stopping before abort of transaction %u, time %s",
4612                                                           recoveryStopXid, str_time(recoveryStopTime))));
4613                 }
4614         }
4615
4616         return stopsHere;
4617 }
4618
4619 /*
4620  * This must be called ONCE during postmaster or standalone-backend startup
4621  */
4622 void
4623 StartupXLOG(void)
4624 {
4625         XLogCtlInsert *Insert;
4626         CheckPoint      checkPoint;
4627         bool            wasShutdown;
4628         bool            needNewTimeLine = false;
4629         bool            haveBackupLabel = false;
4630         XLogRecPtr      RecPtr,
4631                                 LastRec,
4632                                 checkPointLoc,
4633                                 minRecoveryLoc,
4634                                 EndOfLog;
4635         uint32          endLogId;
4636         uint32          endLogSeg;
4637         XLogRecord *record;
4638         uint32          freespace;
4639         TransactionId oldestActiveXID;
4640
4641         /*
4642          * Read control file and check XLOG status looks valid.
4643          *
4644          * Note: in most control paths, *ControlFile is already valid and we need
4645          * not do ReadControlFile() here, but might as well do it to be sure.
4646          */
4647         ReadControlFile();
4648
4649         if (ControlFile->state < DB_SHUTDOWNED ||
4650                 ControlFile->state > DB_IN_PRODUCTION ||
4651                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
4652                 ereport(FATAL,
4653                                 (errmsg("control file contains invalid data")));
4654
4655         if (ControlFile->state == DB_SHUTDOWNED)
4656                 ereport(LOG,
4657                                 (errmsg("database system was shut down at %s",
4658                                                 str_time(ControlFile->time))));
4659         else if (ControlFile->state == DB_SHUTDOWNING)
4660                 ereport(LOG,
4661                                 (errmsg("database system shutdown was interrupted; last known up at %s",
4662                                                 str_time(ControlFile->time))));
4663         else if (ControlFile->state == DB_IN_CRASH_RECOVERY)
4664                 ereport(LOG,
4665                    (errmsg("database system was interrupted while in recovery at %s",
4666                                    str_time(ControlFile->time)),
4667                         errhint("This probably means that some data is corrupted and"
4668                                         " you will have to use the last backup for recovery.")));
4669         else if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY)
4670                 ereport(LOG,
4671                                 (errmsg("database system was interrupted while in recovery at log time %s",
4672                                                 str_time(ControlFile->checkPointCopy.time)),
4673                                  errhint("If this has occurred more than once some data might be corrupted"
4674                                 " and you might need to choose an earlier recovery target.")));
4675         else if (ControlFile->state == DB_IN_PRODUCTION)
4676                 ereport(LOG,
4677                                 (errmsg("database system was interrupted; last known up at %s",
4678                                                 str_time(ControlFile->time))));
4679
4680         /* This is just to allow attaching to startup process with a debugger */
4681 #ifdef XLOG_REPLAY_DELAY
4682         if (ControlFile->state != DB_SHUTDOWNED)
4683                 pg_usleep(60000000L);
4684 #endif
4685
4686         /*
4687          * Initialize on the assumption we want to recover to the same timeline
4688          * that's active according to pg_control.
4689          */
4690         recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
4691
4692         /*
4693          * Check for recovery control file, and if so set up state for offline
4694          * recovery
4695          */
4696         readRecoveryCommandFile();
4697
4698         /* Now we can determine the list of expected TLIs */
4699         expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
4700
4701         /*
4702          * If pg_control's timeline is not in expectedTLIs, then we cannot
4703          * proceed: the backup is not part of the history of the requested
4704          * timeline.
4705          */
4706         if (!list_member_int(expectedTLIs,
4707                                                  (int) ControlFile->checkPointCopy.ThisTimeLineID))
4708                 ereport(FATAL,
4709                                 (errmsg("requested timeline %u is not a child of database system timeline %u",
4710                                                 recoveryTargetTLI,
4711                                                 ControlFile->checkPointCopy.ThisTimeLineID)));
4712
4713         if (read_backup_label(&checkPointLoc, &minRecoveryLoc))
4714         {
4715                 /*
4716                  * When a backup_label file is present, we want to roll forward from
4717                  * the checkpoint it identifies, rather than using pg_control.
4718                  */
4719                 record = ReadCheckpointRecord(checkPointLoc, 0);
4720                 if (record != NULL)
4721                 {
4722                         ereport(LOG,
4723                                         (errmsg("checkpoint record is at %X/%X",
4724                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4725                         InRecovery = true;      /* force recovery even if SHUTDOWNED */
4726                 }
4727                 else
4728                 {
4729                         ereport(PANIC,
4730                                         (errmsg("could not locate required checkpoint record"),
4731                                          errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
4732                 }
4733                 /* set flag to delete it later */
4734                 haveBackupLabel = true;
4735         }
4736         else
4737         {
4738                 /*
4739                  * Get the last valid checkpoint record.  If the latest one according
4740                  * to pg_control is broken, try the next-to-last one.
4741                  */
4742                 checkPointLoc = ControlFile->checkPoint;
4743                 record = ReadCheckpointRecord(checkPointLoc, 1);
4744                 if (record != NULL)
4745                 {
4746                         ereport(LOG,
4747                                         (errmsg("checkpoint record is at %X/%X",
4748                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4749                 }
4750                 else
4751                 {
4752                         checkPointLoc = ControlFile->prevCheckPoint;
4753                         record = ReadCheckpointRecord(checkPointLoc, 2);
4754                         if (record != NULL)
4755                         {
4756                                 ereport(LOG,
4757                                                 (errmsg("using previous checkpoint record at %X/%X",
4758                                                           checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4759                                 InRecovery = true;              /* force recovery even if SHUTDOWNED */
4760                         }
4761                         else
4762                                 ereport(PANIC,
4763                                          (errmsg("could not locate a valid checkpoint record")));
4764                 }
4765         }
4766
4767         LastRec = RecPtr = checkPointLoc;
4768         memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
4769         wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
4770
4771         ereport(LOG,
4772          (errmsg("redo record is at %X/%X; undo record is at %X/%X; shutdown %s",
4773                          checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
4774                          checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
4775                          wasShutdown ? "TRUE" : "FALSE")));
4776         ereport(LOG,
4777                         (errmsg("next transaction ID: %u/%u; next OID: %u",
4778                                         checkPoint.nextXidEpoch, checkPoint.nextXid,
4779                                         checkPoint.nextOid)));
4780         ereport(LOG,
4781                         (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
4782                                         checkPoint.nextMulti, checkPoint.nextMultiOffset)));
4783         if (!TransactionIdIsNormal(checkPoint.nextXid))
4784                 ereport(PANIC,
4785                                 (errmsg("invalid next transaction ID")));
4786
4787         ShmemVariableCache->nextXid = checkPoint.nextXid;
4788         ShmemVariableCache->nextOid = checkPoint.nextOid;
4789         ShmemVariableCache->oidCount = 0;
4790         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4791
4792         /*
4793          * We must replay WAL entries using the same TimeLineID they were created
4794          * under, so temporarily adopt the TLI indicated by the checkpoint (see
4795          * also xlog_redo()).
4796          */
4797         ThisTimeLineID = checkPoint.ThisTimeLineID;
4798
4799         RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
4800
4801         if (XLByteLT(RecPtr, checkPoint.redo))
4802                 ereport(PANIC,
4803                                 (errmsg("invalid redo in checkpoint record")));
4804         if (checkPoint.undo.xrecoff == 0)
4805                 checkPoint.undo = RecPtr;
4806
4807         /*
4808          * Check whether we need to force recovery from WAL.  If it appears to
4809          * have been a clean shutdown and we did not have a recovery.conf file,
4810          * then assume no recovery needed.
4811          */
4812         if (XLByteLT(checkPoint.undo, RecPtr) ||
4813                 XLByteLT(checkPoint.redo, RecPtr))
4814         {
4815                 if (wasShutdown)
4816                         ereport(PANIC,
4817                                 (errmsg("invalid redo/undo record in shutdown checkpoint")));
4818                 InRecovery = true;
4819         }
4820         else if (ControlFile->state != DB_SHUTDOWNED)
4821                 InRecovery = true;
4822         else if (InArchiveRecovery)
4823         {
4824                 /* force recovery due to presence of recovery.conf */
4825                 InRecovery = true;
4826         }
4827
4828         /* REDO */
4829         if (InRecovery)
4830         {
4831                 int                     rmid;
4832
4833                 /*
4834                  * Update pg_control to show that we are recovering and to show the
4835                  * selected checkpoint as the place we are starting from. We also mark
4836                  * pg_control with any minimum recovery stop point obtained from a
4837                  * backup history file.
4838                  */
4839                 if (InArchiveRecovery)
4840                 {
4841                         ereport(LOG,
4842                                         (errmsg("automatic recovery in progress")));
4843                         ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
4844                 }
4845                 else
4846                 {
4847                         ereport(LOG,
4848                                         (errmsg("database system was not properly shut down; "
4849                                                         "automatic recovery in progress")));
4850                         ControlFile->state = DB_IN_CRASH_RECOVERY;
4851                 }
4852                 ControlFile->prevCheckPoint = ControlFile->checkPoint;
4853                 ControlFile->checkPoint = checkPointLoc;
4854                 ControlFile->checkPointCopy = checkPoint;
4855                 if (minRecoveryLoc.xlogid != 0 || minRecoveryLoc.xrecoff != 0)
4856                         ControlFile->minRecoveryPoint = minRecoveryLoc;
4857                 ControlFile->time = time(NULL);
4858                 UpdateControlFile();
4859
4860                 /*
4861                  * If there was a backup label file, it's done its job and the info
4862                  * has now been propagated into pg_control.  We must get rid of the
4863                  * label file so that if we crash during recovery, we'll pick up at
4864                  * the latest recovery restartpoint instead of going all the way back
4865                  * to the backup start point.  It seems prudent though to just rename
4866                  * the file out of the way rather than delete it completely.
4867                  */
4868                 if (haveBackupLabel)
4869                 {
4870                         unlink(BACKUP_LABEL_OLD);
4871                         if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) != 0)
4872                                 ereport(FATAL,
4873                                                 (errcode_for_file_access(),
4874                                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
4875                                                                 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
4876                 }
4877
4878                 /* Start up the recovery environment */
4879                 XLogInitRelationCache();
4880
4881                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
4882                 {
4883                         if (RmgrTable[rmid].rm_startup != NULL)
4884                                 RmgrTable[rmid].rm_startup();
4885                 }
4886
4887                 /*
4888                  * Find the first record that logically follows the checkpoint --- it
4889                  * might physically precede it, though.
4890                  */
4891                 if (XLByteLT(checkPoint.redo, RecPtr))
4892                 {
4893                         /* back up to find the record */
4894                         record = ReadRecord(&(checkPoint.redo), PANIC);
4895                 }
4896                 else
4897                 {
4898                         /* just have to read next record after CheckPoint */
4899                         record = ReadRecord(NULL, LOG);
4900                 }
4901
4902                 if (record != NULL)
4903                 {
4904                         bool            recoveryContinue = true;
4905                         bool            recoveryApply = true;
4906                         ErrorContextCallback errcontext;
4907
4908                         InRedo = true;
4909                         ereport(LOG,
4910                                         (errmsg("redo starts at %X/%X",
4911                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
4912
4913                         /*
4914                          * main redo apply loop
4915                          */
4916                         do
4917                         {
4918 #ifdef WAL_DEBUG
4919                                 if (XLOG_DEBUG)
4920                                 {
4921                                         StringInfoData buf;
4922
4923                                         initStringInfo(&buf);
4924                                         appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
4925                                                                          ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
4926                                                                          EndRecPtr.xlogid, EndRecPtr.xrecoff);
4927                                         xlog_outrec(&buf, record);
4928                                         appendStringInfo(&buf, " - ");
4929                                         RmgrTable[record->xl_rmid].rm_desc(&buf,
4930                                                                                                            record->xl_info,
4931                                                                                                          XLogRecGetData(record));
4932                                         elog(LOG, "%s", buf.data);
4933                                         pfree(buf.data);
4934                                 }
4935 #endif
4936
4937                                 /*
4938                                  * Have we reached our recovery target?
4939                                  */
4940                                 if (recoveryStopsHere(record, &recoveryApply))
4941                                 {
4942                                         needNewTimeLine = true;         /* see below */
4943                                         recoveryContinue = false;
4944                                         if (!recoveryApply)
4945                                                 break;
4946                                 }
4947
4948                                 /* Setup error traceback support for ereport() */
4949                                 errcontext.callback = rm_redo_error_callback;
4950                                 errcontext.arg = (void *) record;
4951                                 errcontext.previous = error_context_stack;
4952                                 error_context_stack = &errcontext;
4953
4954                                 /* nextXid must be beyond record's xid */
4955                                 if (TransactionIdFollowsOrEquals(record->xl_xid,
4956                                                                                                  ShmemVariableCache->nextXid))
4957                                 {
4958                                         ShmemVariableCache->nextXid = record->xl_xid;
4959                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
4960                                 }
4961
4962                                 if (record->xl_info & XLR_BKP_BLOCK_MASK)
4963                                         RestoreBkpBlocks(record, EndRecPtr);
4964
4965                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
4966
4967                                 /* Pop the error context stack */
4968                                 error_context_stack = errcontext.previous;
4969
4970                                 LastRec = ReadRecPtr;
4971
4972                                 record = ReadRecord(NULL, LOG);
4973                         } while (record != NULL && recoveryContinue);
4974
4975                         /*
4976                          * end of main redo apply loop
4977                          */
4978
4979                         ereport(LOG,
4980                                         (errmsg("redo done at %X/%X",
4981                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
4982                         InRedo = false;
4983                 }
4984                 else
4985                 {
4986                         /* there are no WAL records following the checkpoint */
4987                         ereport(LOG,
4988                                         (errmsg("redo is not required")));
4989                 }
4990         }
4991
4992         /*
4993          * Re-fetch the last valid or last applied record, so we can identify the
4994          * exact endpoint of what we consider the valid portion of WAL.
4995          */
4996         record = ReadRecord(&LastRec, PANIC);
4997         EndOfLog = EndRecPtr;
4998         XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
4999
5000         /*
5001          * Complain if we did not roll forward far enough to render the backup
5002          * dump consistent.
5003          */
5004         if (XLByteLT(EndOfLog, ControlFile->minRecoveryPoint))
5005         {
5006                 if (needNewTimeLine)    /* stopped because of stop request */
5007                         ereport(FATAL,
5008                                         (errmsg("requested recovery stop point is before end time of backup dump")));
5009                 else
5010                         /* ran off end of WAL */
5011                         ereport(FATAL,
5012                                         (errmsg("WAL ends before end time of backup dump")));
5013         }
5014
5015         /*
5016          * Consider whether we need to assign a new timeline ID.
5017          *
5018          * If we stopped short of the end of WAL during recovery, then we are
5019          * generating a new timeline and must assign it a unique new ID.
5020          * Otherwise, we can just extend the timeline we were in when we ran out
5021          * of WAL.
5022          */
5023         if (needNewTimeLine)
5024         {
5025                 ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
5026                 ereport(LOG,
5027                                 (errmsg("selected new timeline ID: %u", ThisTimeLineID)));
5028                 writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
5029                                                          curFileTLI, endLogId, endLogSeg);
5030         }
5031
5032         /* Save the selected TimeLineID in shared memory, too */
5033         XLogCtl->ThisTimeLineID = ThisTimeLineID;
5034
5035         /*
5036          * We are now done reading the old WAL.  Turn off archive fetching if it
5037          * was active, and make a writable copy of the last WAL segment. (Note
5038          * that we also have a copy of the last block of the old WAL in readBuf;
5039          * we will use that below.)
5040          */
5041         if (InArchiveRecovery)
5042                 exitArchiveRecovery(curFileTLI, endLogId, endLogSeg);
5043
5044         /*
5045          * Prepare to write WAL starting at EndOfLog position, and init xlog
5046          * buffer cache using the block containing the last record from the
5047          * previous incarnation.
5048          */
5049         openLogId = endLogId;
5050         openLogSeg = endLogSeg;
5051         openLogFile = XLogFileOpen(openLogId, openLogSeg);
5052         openLogOff = 0;
5053         Insert = &XLogCtl->Insert;
5054         Insert->PrevRecord = LastRec;
5055         XLogCtl->xlblocks[0].xlogid = openLogId;
5056         XLogCtl->xlblocks[0].xrecoff =
5057                 ((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
5058
5059         /*
5060          * Tricky point here: readBuf contains the *last* block that the LastRec
5061          * record spans, not the one it starts in.      The last block is indeed the
5062          * one we want to use.
5063          */
5064         Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
5065         memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
5066         Insert->currpos = (char *) Insert->currpage +
5067                 (EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
5068
5069         LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
5070
5071         XLogCtl->Write.LogwrtResult = LogwrtResult;
5072         Insert->LogwrtResult = LogwrtResult;
5073         XLogCtl->LogwrtResult = LogwrtResult;
5074
5075         XLogCtl->LogwrtRqst.Write = EndOfLog;
5076         XLogCtl->LogwrtRqst.Flush = EndOfLog;
5077
5078         freespace = INSERT_FREESPACE(Insert);
5079         if (freespace > 0)
5080         {
5081                 /* Make sure rest of page is zero */
5082                 MemSet(Insert->currpos, 0, freespace);
5083                 XLogCtl->Write.curridx = 0;
5084         }
5085         else
5086         {
5087                 /*
5088                  * Whenever Write.LogwrtResult points to exactly the end of a page,
5089                  * Write.curridx must point to the *next* page (see XLogWrite()).
5090                  *
5091                  * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
5092                  * this is sufficient.  The first actual attempt to insert a log
5093                  * record will advance the insert state.
5094                  */
5095                 XLogCtl->Write.curridx = NextBufIdx(0);
5096         }
5097
5098         /* Pre-scan prepared transactions to find out the range of XIDs present */
5099         oldestActiveXID = PrescanPreparedTransactions();
5100
5101         if (InRecovery)
5102         {
5103                 int                     rmid;
5104
5105                 /*
5106                  * Allow resource managers to do any required cleanup.
5107                  */
5108                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
5109                 {
5110                         if (RmgrTable[rmid].rm_cleanup != NULL)
5111                                 RmgrTable[rmid].rm_cleanup();
5112                 }
5113
5114                 /*
5115                  * Check to see if the XLOG sequence contained any unresolved
5116                  * references to uninitialized pages.
5117                  */
5118                 XLogCheckInvalidPages();
5119
5120                 /*
5121                  * Reset pgstat data, because it may be invalid after recovery.
5122                  */
5123                 pgstat_reset_all();
5124
5125                 /*
5126                  * Perform a checkpoint to update all our recovery activity to disk.
5127                  *
5128                  * Note that we write a shutdown checkpoint rather than an on-line
5129                  * one. This is not particularly critical, but since we may be
5130                  * assigning a new TLI, using a shutdown checkpoint allows us to have
5131                  * the rule that TLI only changes in shutdown checkpoints, which
5132                  * allows some extra error checking in xlog_redo.
5133                  */
5134                 CreateCheckPoint(true, true);
5135
5136                 /*
5137                  * Close down recovery environment
5138                  */
5139                 XLogCloseRelationCache();
5140         }
5141
5142         /*
5143          * Preallocate additional log files, if wanted.
5144          */
5145         (void) PreallocXlogFiles(EndOfLog);
5146
5147         /*
5148          * Okay, we're officially UP.
5149          */
5150         InRecovery = false;
5151
5152         ControlFile->state = DB_IN_PRODUCTION;
5153         ControlFile->time = time(NULL);
5154         UpdateControlFile();
5155
5156         /* start the archive_timeout timer running */
5157         XLogCtl->Write.lastSegSwitchTime = ControlFile->time;
5158
5159         /* initialize shared-memory copy of latest checkpoint XID/epoch */
5160         XLogCtl->ckptXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
5161         XLogCtl->ckptXid = ControlFile->checkPointCopy.nextXid;
5162
5163         /* Start up the commit log and related stuff, too */
5164         StartupCLOG();
5165         StartupSUBTRANS(oldestActiveXID);
5166         StartupMultiXact();
5167
5168         /* Reload shared-memory state for prepared transactions */
5169         RecoverPreparedTransactions();
5170
5171         ereport(LOG,
5172                         (errmsg("database system is ready")));
5173
5174         /* Shut down readFile facility, free space */
5175         if (readFile >= 0)
5176         {
5177                 close(readFile);
5178                 readFile = -1;
5179         }
5180         if (readBuf)
5181         {
5182                 free(readBuf);
5183                 readBuf = NULL;
5184         }
5185         if (readRecordBuf)
5186         {
5187                 free(readRecordBuf);
5188                 readRecordBuf = NULL;
5189                 readRecordBufSize = 0;
5190         }
5191 }
5192
5193 /*
5194  * Subroutine to try to fetch and validate a prior checkpoint record.
5195  *
5196  * whichChkpt identifies the checkpoint (merely for reporting purposes).
5197  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
5198  */
5199 static XLogRecord *
5200 ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
5201 {
5202         XLogRecord *record;
5203
5204         if (!XRecOffIsValid(RecPtr.xrecoff))
5205         {
5206                 switch (whichChkpt)
5207                 {
5208                         case 1:
5209                                 ereport(LOG,
5210                                 (errmsg("invalid primary checkpoint link in control file")));
5211                                 break;
5212                         case 2:
5213                                 ereport(LOG,
5214                                                 (errmsg("invalid secondary checkpoint link in control file")));
5215                                 break;
5216                         default:
5217                                 ereport(LOG,
5218                                    (errmsg("invalid checkpoint link in backup_label file")));
5219                                 break;
5220                 }
5221                 return NULL;
5222         }
5223
5224         record = ReadRecord(&RecPtr, LOG);
5225
5226         if (record == NULL)
5227         {
5228                 switch (whichChkpt)
5229                 {
5230                         case 1:
5231                                 ereport(LOG,
5232                                                 (errmsg("invalid primary checkpoint record")));
5233                                 break;
5234                         case 2:
5235                                 ereport(LOG,
5236                                                 (errmsg("invalid secondary checkpoint record")));
5237                                 break;
5238                         default:
5239                                 ereport(LOG,
5240                                                 (errmsg("invalid checkpoint record")));
5241                                 break;
5242                 }
5243                 return NULL;
5244         }
5245         if (record->xl_rmid != RM_XLOG_ID)
5246         {
5247                 switch (whichChkpt)
5248                 {
5249                         case 1:
5250                                 ereport(LOG,
5251                                                 (errmsg("invalid resource manager ID in primary checkpoint record")));
5252                                 break;
5253                         case 2:
5254                                 ereport(LOG,
5255                                                 (errmsg("invalid resource manager ID in secondary checkpoint record")));
5256                                 break;
5257                         default:
5258                                 ereport(LOG,
5259                                 (errmsg("invalid resource manager ID in checkpoint record")));
5260                                 break;
5261                 }
5262                 return NULL;
5263         }
5264         if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
5265                 record->xl_info != XLOG_CHECKPOINT_ONLINE)
5266         {
5267                 switch (whichChkpt)
5268                 {
5269                         case 1:
5270                                 ereport(LOG,
5271                                    (errmsg("invalid xl_info in primary checkpoint record")));
5272                                 break;
5273                         case 2:
5274                                 ereport(LOG,
5275                                  (errmsg("invalid xl_info in secondary checkpoint record")));
5276                                 break;
5277                         default:
5278                                 ereport(LOG,
5279                                                 (errmsg("invalid xl_info in checkpoint record")));
5280                                 break;
5281                 }
5282                 return NULL;
5283         }
5284         if (record->xl_len != sizeof(CheckPoint) ||
5285                 record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
5286         {
5287                 switch (whichChkpt)
5288                 {
5289                         case 1:
5290                                 ereport(LOG,
5291                                         (errmsg("invalid length of primary checkpoint record")));
5292                                 break;
5293                         case 2:
5294                                 ereport(LOG,
5295                                   (errmsg("invalid length of secondary checkpoint record")));
5296                                 break;
5297                         default:
5298                                 ereport(LOG,
5299                                                 (errmsg("invalid length of checkpoint record")));
5300                                 break;
5301                 }
5302                 return NULL;
5303         }
5304         return record;
5305 }
5306
5307 /*
5308  * This must be called during startup of a backend process, except that
5309  * it need not be called in a standalone backend (which does StartupXLOG
5310  * instead).  We need to initialize the local copies of ThisTimeLineID and
5311  * RedoRecPtr.
5312  *
5313  * Note: before Postgres 8.0, we went to some effort to keep the postmaster
5314  * process's copies of ThisTimeLineID and RedoRecPtr valid too.  This was
5315  * unnecessary however, since the postmaster itself never touches XLOG anyway.
5316  */
5317 void
5318 InitXLOGAccess(void)
5319 {
5320         /* ThisTimeLineID doesn't change so we need no lock to copy it */
5321         ThisTimeLineID = XLogCtl->ThisTimeLineID;
5322         /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
5323         (void) GetRedoRecPtr();
5324 }
5325
5326 /*
5327  * Once spawned, a backend may update its local RedoRecPtr from
5328  * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
5329  * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
5330  */
5331 XLogRecPtr
5332 GetRedoRecPtr(void)
5333 {
5334         /* use volatile pointer to prevent code rearrangement */
5335         volatile XLogCtlData *xlogctl = XLogCtl;
5336
5337         SpinLockAcquire(&xlogctl->info_lck);
5338         Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
5339         RedoRecPtr = xlogctl->Insert.RedoRecPtr;
5340         SpinLockRelease(&xlogctl->info_lck);
5341
5342         return RedoRecPtr;
5343 }
5344
5345 /*
5346  * Get the time of the last xlog segment switch
5347  */
5348 time_t
5349 GetLastSegSwitchTime(void)
5350 {
5351         time_t          result;
5352
5353         /* Need WALWriteLock, but shared lock is sufficient */
5354         LWLockAcquire(WALWriteLock, LW_SHARED);
5355         result = XLogCtl->Write.lastSegSwitchTime;
5356         LWLockRelease(WALWriteLock);
5357
5358         return result;
5359 }
5360
5361 /*
5362  * GetNextXidAndEpoch - get the current nextXid value and associated epoch
5363  *
5364  * This is exported for use by code that would like to have 64-bit XIDs.
5365  * We don't really support such things, but all XIDs within the system
5366  * can be presumed "close to" the result, and thus the epoch associated
5367  * with them can be determined.
5368  */
5369 void
5370 GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
5371 {
5372         uint32          ckptXidEpoch;
5373         TransactionId ckptXid;
5374         TransactionId nextXid;
5375
5376         /* Must read checkpoint info first, else have race condition */
5377         {
5378                 /* use volatile pointer to prevent code rearrangement */
5379                 volatile XLogCtlData *xlogctl = XLogCtl;
5380
5381                 SpinLockAcquire(&xlogctl->info_lck);
5382                 ckptXidEpoch = xlogctl->ckptXidEpoch;
5383                 ckptXid = xlogctl->ckptXid;
5384                 SpinLockRelease(&xlogctl->info_lck);
5385         }
5386
5387         /* Now fetch current nextXid */
5388         nextXid = ReadNewTransactionId();
5389
5390         /*
5391          * nextXid is certainly logically later than ckptXid.  So if it's
5392          * numerically less, it must have wrapped into the next epoch.
5393          */
5394         if (nextXid < ckptXid)
5395                 ckptXidEpoch++;
5396
5397         *xid = nextXid;
5398         *epoch = ckptXidEpoch;
5399 }
5400
5401 /*
5402  * This must be called ONCE during postmaster or standalone-backend shutdown
5403  */
5404 void
5405 ShutdownXLOG(int code, Datum arg)
5406 {
5407         ereport(LOG,
5408                         (errmsg("shutting down")));
5409
5410         CreateCheckPoint(true, true);
5411         ShutdownCLOG();
5412         ShutdownSUBTRANS();
5413         ShutdownMultiXact();
5414
5415         ereport(LOG,
5416                         (errmsg("database system is shut down")));
5417 }
5418
5419 /*
5420  * Perform a checkpoint --- either during shutdown, or on-the-fly
5421  *
5422  * If force is true, we force a checkpoint regardless of whether any XLOG
5423  * activity has occurred since the last one.
5424  */
5425 void
5426 CreateCheckPoint(bool shutdown, bool force)
5427 {
5428         CheckPoint      checkPoint;
5429         XLogRecPtr      recptr;
5430         XLogCtlInsert *Insert = &XLogCtl->Insert;
5431         XLogRecData rdata;
5432         uint32          freespace;
5433         uint32          _logId;
5434         uint32          _logSeg;
5435         int                     nsegsadded = 0;
5436         int                     nsegsremoved = 0;
5437         int                     nsegsrecycled = 0;
5438
5439         /*
5440          * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
5441          * (This is just pro forma, since in the present system structure there is
5442          * only one process that is allowed to issue checkpoints at any given
5443          * time.)
5444          */
5445         LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
5446
5447         /*
5448          * Use a critical section to force system panic if we have trouble.
5449          */
5450         START_CRIT_SECTION();
5451
5452         if (shutdown)
5453         {
5454                 ControlFile->state = DB_SHUTDOWNING;
5455                 ControlFile->time = time(NULL);
5456                 UpdateControlFile();
5457         }
5458
5459         MemSet(&checkPoint, 0, sizeof(checkPoint));
5460         checkPoint.ThisTimeLineID = ThisTimeLineID;
5461         checkPoint.time = time(NULL);
5462
5463         /*
5464          * We must hold CheckpointStartLock while determining the checkpoint REDO
5465          * pointer.  This ensures that any concurrent transaction commits will be
5466          * either not yet logged, or logged and recorded in pg_clog. See notes in
5467          * RecordTransactionCommit().
5468          */
5469         LWLockAcquire(CheckpointStartLock, LW_EXCLUSIVE);
5470
5471         /* And we need WALInsertLock too */
5472         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5473
5474         /*
5475          * If this isn't a shutdown or forced checkpoint, and we have not inserted
5476          * any XLOG records since the start of the last checkpoint, skip the
5477          * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
5478          * when the system is idle. That wastes log space, and more importantly it
5479          * exposes us to possible loss of both current and previous checkpoint
5480          * records if the machine crashes just as we're writing the update.
5481          * (Perhaps it'd make even more sense to checkpoint only when the previous
5482          * checkpoint record is in a different xlog page?)
5483          *
5484          * We have to make two tests to determine that nothing has happened since
5485          * the start of the last checkpoint: current insertion point must match
5486          * the end of the last checkpoint record, and its redo pointer must point
5487          * to itself.
5488          */
5489         if (!shutdown && !force)
5490         {
5491                 XLogRecPtr      curInsert;
5492
5493                 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
5494                 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
5495                         curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
5496                         MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
5497                         ControlFile->checkPoint.xlogid ==
5498                         ControlFile->checkPointCopy.redo.xlogid &&
5499                         ControlFile->checkPoint.xrecoff ==
5500                         ControlFile->checkPointCopy.redo.xrecoff)
5501                 {
5502                         LWLockRelease(WALInsertLock);
5503                         LWLockRelease(CheckpointStartLock);
5504                         LWLockRelease(CheckpointLock);
5505                         END_CRIT_SECTION();
5506                         return;
5507                 }
5508         }
5509
5510         /*
5511          * Compute new REDO record ptr = location of next XLOG record.
5512          *
5513          * NB: this is NOT necessarily where the checkpoint record itself will be,
5514          * since other backends may insert more XLOG records while we're off doing
5515          * the buffer flush work.  Those XLOG records are logically after the
5516          * checkpoint, even though physically before it.  Got that?
5517          */
5518         freespace = INSERT_FREESPACE(Insert);
5519         if (freespace < SizeOfXLogRecord)
5520         {
5521                 (void) AdvanceXLInsertBuffer(false);
5522                 /* OK to ignore update return flag, since we will do flush anyway */
5523                 freespace = INSERT_FREESPACE(Insert);
5524         }
5525         INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
5526
5527         /*
5528          * Here we update the shared RedoRecPtr for future XLogInsert calls; this
5529          * must be done while holding the insert lock AND the info_lck.
5530          *
5531          * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
5532          * pointing past where it really needs to point.  This is okay; the only
5533          * consequence is that XLogInsert might back up whole buffers that it
5534          * didn't really need to.  We can't postpone advancing RedoRecPtr because
5535          * XLogInserts that happen while we are dumping buffers must assume that
5536          * their buffer changes are not included in the checkpoint.
5537          */
5538         {
5539                 /* use volatile pointer to prevent code rearrangement */
5540                 volatile XLogCtlData *xlogctl = XLogCtl;
5541
5542                 SpinLockAcquire(&xlogctl->info_lck);
5543                 RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
5544                 SpinLockRelease(&xlogctl->info_lck);
5545         }
5546
5547         /*
5548          * Now we can release insert lock and checkpoint start lock, allowing
5549          * other xacts to proceed even while we are flushing disk buffers.
5550          */
5551         LWLockRelease(WALInsertLock);
5552
5553         LWLockRelease(CheckpointStartLock);
5554
5555         if (!shutdown)
5556                 ereport(DEBUG2,
5557                                 (errmsg("checkpoint starting")));
5558
5559         /*
5560          * Get the other info we need for the checkpoint record.
5561          */
5562         LWLockAcquire(XidGenLock, LW_SHARED);
5563         checkPoint.nextXid = ShmemVariableCache->nextXid;
5564         LWLockRelease(XidGenLock);
5565
5566         /* Increase XID epoch if we've wrapped around since last checkpoint */
5567         checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
5568         if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
5569                 checkPoint.nextXidEpoch++;
5570
5571         LWLockAcquire(OidGenLock, LW_SHARED);
5572         checkPoint.nextOid = ShmemVariableCache->nextOid;
5573         if (!shutdown)
5574                 checkPoint.nextOid += ShmemVariableCache->oidCount;
5575         LWLockRelease(OidGenLock);
5576
5577         MultiXactGetCheckptMulti(shutdown,
5578                                                          &checkPoint.nextMulti,
5579                                                          &checkPoint.nextMultiOffset);
5580
5581         /*
5582          * Having constructed the checkpoint record, ensure all shmem disk buffers
5583          * and commit-log buffers are flushed to disk.
5584          *
5585          * This I/O could fail for various reasons.  If so, we will fail to
5586          * complete the checkpoint, but there is no reason to force a system
5587          * panic. Accordingly, exit critical section while doing it.
5588          */
5589         END_CRIT_SECTION();
5590
5591         CheckPointGuts(checkPoint.redo);
5592
5593         START_CRIT_SECTION();
5594
5595         /*
5596          * Now insert the checkpoint record into XLOG.
5597          */
5598         rdata.data = (char *) (&checkPoint);
5599         rdata.len = sizeof(checkPoint);
5600         rdata.buffer = InvalidBuffer;
5601         rdata.next = NULL;
5602
5603         recptr = XLogInsert(RM_XLOG_ID,
5604                                                 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
5605                                                 XLOG_CHECKPOINT_ONLINE,
5606                                                 &rdata);
5607
5608         XLogFlush(recptr);
5609
5610         /*
5611          * We now have ProcLastRecPtr = start of actual checkpoint record, recptr
5612          * = end of actual checkpoint record.
5613          */
5614         if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
5615                 ereport(PANIC,
5616                                 (errmsg("concurrent transaction log activity while database system is shutting down")));
5617
5618         /*
5619          * Select point at which we can truncate the log, which we base on the
5620          * prior checkpoint's earliest info.
5621          */
5622         XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
5623
5624         /*
5625          * Update the control file.
5626          */
5627         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5628         if (shutdown)
5629                 ControlFile->state = DB_SHUTDOWNED;
5630         ControlFile->prevCheckPoint = ControlFile->checkPoint;
5631         ControlFile->checkPoint = ProcLastRecPtr;
5632         ControlFile->checkPointCopy = checkPoint;
5633         ControlFile->time = time(NULL);
5634         UpdateControlFile();
5635         LWLockRelease(ControlFileLock);
5636
5637         /* Update shared-memory copy of checkpoint XID/epoch */
5638         {
5639                 /* use volatile pointer to prevent code rearrangement */
5640                 volatile XLogCtlData *xlogctl = XLogCtl;
5641
5642                 SpinLockAcquire(&xlogctl->info_lck);
5643                 xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
5644                 xlogctl->ckptXid = checkPoint.nextXid;
5645                 SpinLockRelease(&xlogctl->info_lck);
5646         }
5647
5648         /*
5649          * We are now done with critical updates; no need for system panic if we
5650          * have trouble while fooling with offline log segments.
5651          */
5652         END_CRIT_SECTION();
5653
5654         /*
5655          * Delete offline log files (those no longer needed even for previous
5656          * checkpoint).
5657          */
5658         if (_logId || _logSeg)
5659         {
5660                 PrevLogSeg(_logId, _logSeg);
5661                 MoveOfflineLogs(_logId, _logSeg, recptr,
5662                                                 &nsegsremoved, &nsegsrecycled);
5663         }
5664
5665         /*
5666          * Make more log segments if needed.  (Do this after deleting offline log
5667          * segments, to avoid having peak disk space usage higher than necessary.)
5668          */
5669         if (!shutdown)
5670                 nsegsadded = PreallocXlogFiles(recptr);
5671
5672         /*
5673          * Truncate pg_subtrans if possible.  We can throw away all data before
5674          * the oldest XMIN of any running transaction.  No future transaction will
5675          * attempt to reference any pg_subtrans entry older than that (see Asserts
5676          * in subtrans.c).      During recovery, though, we mustn't do this because
5677          * StartupSUBTRANS hasn't been called yet.
5678          */
5679         if (!InRecovery)
5680                 TruncateSUBTRANS(GetOldestXmin(true, false));
5681
5682         if (!shutdown)
5683                 ereport(DEBUG2,
5684                                 (errmsg("checkpoint complete; %d transaction log file(s) added, %d removed, %d recycled",
5685                                                 nsegsadded, nsegsremoved, nsegsrecycled)));
5686
5687         LWLockRelease(CheckpointLock);
5688 }
5689
5690 /*
5691  * Flush all data in shared memory to disk, and fsync
5692  *
5693  * This is the common code shared between regular checkpoints and
5694  * recovery restartpoints.
5695  */
5696 static void
5697 CheckPointGuts(XLogRecPtr checkPointRedo)
5698 {
5699         CheckPointCLOG();
5700         CheckPointSUBTRANS();
5701         CheckPointMultiXact();
5702         FlushBufferPool();                      /* performs all required fsyncs */
5703         /* We deliberately delay 2PC checkpointing as long as possible */
5704         CheckPointTwoPhase(checkPointRedo);
5705 }
5706
5707 /*
5708  * Set a recovery restart point if appropriate
5709  *
5710  * This is similar to CreateCheckpoint, but is used during WAL recovery
5711  * to establish a point from which recovery can roll forward without
5712  * replaying the entire recovery log.  This function is called each time
5713  * a checkpoint record is read from XLOG; it must determine whether a
5714  * restartpoint is needed or not.
5715  */
5716 static void
5717 RecoveryRestartPoint(const CheckPoint *checkPoint)
5718 {
5719         int                     elapsed_secs;
5720         int                     rmid;
5721
5722         /*
5723          * Do nothing if the elapsed time since the last restartpoint is less than
5724          * half of checkpoint_timeout.  (We use a value less than
5725          * checkpoint_timeout so that variations in the timing of checkpoints on
5726          * the master, or speed of transmission of WAL segments to a slave, won't
5727          * make the slave skip a restartpoint once it's synced with the master.)
5728          * Checking true elapsed time keeps us from doing restartpoints too often
5729          * while rapidly scanning large amounts of WAL.
5730          */
5731         elapsed_secs = time(NULL) - ControlFile->time;
5732         if (elapsed_secs < CheckPointTimeout / 2)
5733                 return;
5734
5735         /*
5736          * Is it safe to checkpoint?  We must ask each of the resource managers
5737          * whether they have any partial state information that might prevent a
5738          * correct restart from this point.  If so, we skip this opportunity, but
5739          * return at the next checkpoint record for another try.
5740          */
5741         for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
5742         {
5743                 if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
5744                         if (!(RmgrTable[rmid].rm_safe_restartpoint()))
5745                                 return;
5746         }
5747
5748         /*
5749          * OK, force data out to disk
5750          */
5751         CheckPointGuts(checkPoint->redo);
5752
5753         /*
5754          * Update pg_control so that any subsequent crash will restart from this
5755          * checkpoint.  Note: ReadRecPtr gives the XLOG address of the checkpoint
5756          * record itself.
5757          */
5758         ControlFile->prevCheckPoint = ControlFile->checkPoint;
5759         ControlFile->checkPoint = ReadRecPtr;
5760         ControlFile->checkPointCopy = *checkPoint;
5761         ControlFile->time = time(NULL);
5762         UpdateControlFile();
5763
5764         ereport(DEBUG2,
5765                         (errmsg("recovery restart point at %X/%X",
5766                                         checkPoint->redo.xlogid, checkPoint->redo.xrecoff)));
5767 }
5768
5769 /*
5770  * Write a NEXTOID log record
5771  */
5772 void
5773 XLogPutNextOid(Oid nextOid)
5774 {
5775         XLogRecData rdata;
5776
5777         rdata.data = (char *) (&nextOid);
5778         rdata.len = sizeof(Oid);
5779         rdata.buffer = InvalidBuffer;
5780         rdata.next = NULL;
5781         (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
5782
5783         /*
5784          * We need not flush the NEXTOID record immediately, because any of the
5785          * just-allocated OIDs could only reach disk as part of a tuple insert or
5786          * update that would have its own XLOG record that must follow the NEXTOID
5787          * record.      Therefore, the standard buffer LSN interlock applied to those
5788          * records will ensure no such OID reaches disk before the NEXTOID record
5789          * does.
5790          *
5791          * Note, however, that the above statement only covers state "within" the
5792          * database.  When we use a generated OID as a file or directory name,
5793          * we are in a sense violating the basic WAL rule, because that filesystem
5794          * change may reach disk before the NEXTOID WAL record does.  The impact
5795          * of this is that if a database crash occurs immediately afterward,
5796          * we might after restart re-generate the same OID and find that it
5797          * conflicts with the leftover file or directory.  But since for safety's
5798          * sake we always loop until finding a nonconflicting filename, this poses
5799          * no real problem in practice. See pgsql-hackers discussion 27-Sep-2006.
5800          */
5801 }
5802
5803 /*
5804  * Write an XLOG SWITCH record.
5805  *
5806  * Here we just blindly issue an XLogInsert request for the record.
5807  * All the magic happens inside XLogInsert.
5808  *
5809  * The return value is either the end+1 address of the switch record,
5810  * or the end+1 address of the prior segment if we did not need to
5811  * write a switch record because we are already at segment start.
5812  */
5813 XLogRecPtr
5814 RequestXLogSwitch(void)
5815 {
5816         XLogRecPtr      RecPtr;
5817         XLogRecData rdata;
5818
5819         /* XLOG SWITCH, alone among xlog record types, has no data */
5820         rdata.buffer = InvalidBuffer;
5821         rdata.data = NULL;
5822         rdata.len = 0;
5823         rdata.next = NULL;
5824
5825         RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
5826
5827         return RecPtr;
5828 }
5829
5830 /*
5831  * XLOG resource manager's routines
5832  */
5833 void
5834 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
5835 {
5836         uint8           info = record->xl_info & ~XLR_INFO_MASK;
5837
5838         if (info == XLOG_NEXTOID)
5839         {
5840                 Oid                     nextOid;
5841
5842                 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
5843                 if (ShmemVariableCache->nextOid < nextOid)
5844                 {
5845                         ShmemVariableCache->nextOid = nextOid;
5846                         ShmemVariableCache->oidCount = 0;
5847                 }
5848         }
5849         else if (info == XLOG_CHECKPOINT_SHUTDOWN)
5850         {
5851                 CheckPoint      checkPoint;
5852
5853                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5854                 /* In a SHUTDOWN checkpoint, believe the counters exactly */
5855                 ShmemVariableCache->nextXid = checkPoint.nextXid;
5856                 ShmemVariableCache->nextOid = checkPoint.nextOid;
5857                 ShmemVariableCache->oidCount = 0;
5858                 MultiXactSetNextMXact(checkPoint.nextMulti,
5859                                                           checkPoint.nextMultiOffset);
5860
5861                 /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
5862                 ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
5863                 ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
5864
5865                 /*
5866                  * TLI may change in a shutdown checkpoint, but it shouldn't decrease
5867                  */
5868                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
5869                 {
5870                         if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
5871                                 !list_member_int(expectedTLIs,
5872                                                                  (int) checkPoint.ThisTimeLineID))
5873                                 ereport(PANIC,
5874                                                 (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
5875                                                                 checkPoint.ThisTimeLineID, ThisTimeLineID)));
5876                         /* Following WAL records should be run with new TLI */
5877                         ThisTimeLineID = checkPoint.ThisTimeLineID;
5878                 }
5879
5880                 RecoveryRestartPoint(&checkPoint);
5881         }
5882         else if (info == XLOG_CHECKPOINT_ONLINE)
5883         {
5884                 CheckPoint      checkPoint;
5885
5886                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5887                 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
5888                 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
5889                                                                   checkPoint.nextXid))
5890                         ShmemVariableCache->nextXid = checkPoint.nextXid;
5891                 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
5892                 {
5893                         ShmemVariableCache->nextOid = checkPoint.nextOid;
5894                         ShmemVariableCache->oidCount = 0;
5895                 }
5896                 MultiXactAdvanceNextMXact(checkPoint.nextMulti,
5897                                                                   checkPoint.nextMultiOffset);
5898
5899                 /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
5900                 ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
5901                 ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
5902
5903                 /* TLI should not change in an on-line checkpoint */
5904                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
5905                         ereport(PANIC,
5906                                         (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
5907                                                         checkPoint.ThisTimeLineID, ThisTimeLineID)));
5908
5909                 RecoveryRestartPoint(&checkPoint);
5910         }
5911         else if (info == XLOG_SWITCH)
5912         {
5913                 /* nothing to do here */
5914         }
5915 }
5916
5917 void
5918 xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
5919 {
5920         uint8           info = xl_info & ~XLR_INFO_MASK;
5921
5922         if (info == XLOG_CHECKPOINT_SHUTDOWN ||
5923                 info == XLOG_CHECKPOINT_ONLINE)
5924         {
5925                 CheckPoint *checkpoint = (CheckPoint *) rec;
5926
5927                 appendStringInfo(buf, "checkpoint: redo %X/%X; undo %X/%X; "
5928                                                  "tli %u; xid %u/%u; oid %u; multi %u; offset %u; %s",
5929                                                  checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
5930                                                  checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
5931                                                  checkpoint->ThisTimeLineID,
5932                                                  checkpoint->nextXidEpoch, checkpoint->nextXid,
5933                                                  checkpoint->nextOid,
5934                                                  checkpoint->nextMulti,
5935                                                  checkpoint->nextMultiOffset,
5936                                  (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
5937         }
5938         else if (info == XLOG_NEXTOID)
5939         {
5940                 Oid                     nextOid;
5941
5942                 memcpy(&nextOid, rec, sizeof(Oid));
5943                 appendStringInfo(buf, "nextOid: %u", nextOid);
5944         }
5945         else if (info == XLOG_SWITCH)
5946         {
5947                 appendStringInfo(buf, "xlog switch");
5948         }
5949         else
5950                 appendStringInfo(buf, "UNKNOWN");
5951 }
5952
5953 #ifdef WAL_DEBUG
5954
5955 static void
5956 xlog_outrec(StringInfo buf, XLogRecord *record)
5957 {
5958         int                     i;
5959
5960         appendStringInfo(buf, "prev %X/%X; xid %u",
5961                                          record->xl_prev.xlogid, record->xl_prev.xrecoff,
5962                                          record->xl_xid);
5963
5964         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
5965         {
5966                 if (record->xl_info & XLR_SET_BKP_BLOCK(i))
5967                         appendStringInfo(buf, "; bkpb%d", i + 1);
5968         }
5969
5970         appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
5971 }
5972 #endif   /* WAL_DEBUG */
5973
5974
5975 /*
5976  * GUC support
5977  */
5978 const char *
5979 assign_xlog_sync_method(const char *method, bool doit, GucSource source)
5980 {
5981         int                     new_sync_method;
5982         int                     new_sync_bit;
5983
5984         if (pg_strcasecmp(method, "fsync") == 0)
5985         {
5986                 new_sync_method = SYNC_METHOD_FSYNC;
5987                 new_sync_bit = 0;
5988         }
5989 #ifdef HAVE_FSYNC_WRITETHROUGH
5990         else if (pg_strcasecmp(method, "fsync_writethrough") == 0)
5991         {
5992                 new_sync_method = SYNC_METHOD_FSYNC_WRITETHROUGH;
5993                 new_sync_bit = 0;
5994         }
5995 #endif
5996 #ifdef HAVE_FDATASYNC
5997         else if (pg_strcasecmp(method, "fdatasync") == 0)
5998         {
5999                 new_sync_method = SYNC_METHOD_FDATASYNC;
6000                 new_sync_bit = 0;
6001         }
6002 #endif
6003 #ifdef OPEN_SYNC_FLAG
6004         else if (pg_strcasecmp(method, "open_sync") == 0)
6005         {
6006                 new_sync_method = SYNC_METHOD_OPEN;
6007                 new_sync_bit = OPEN_SYNC_FLAG;
6008         }
6009 #endif
6010 #ifdef OPEN_DATASYNC_FLAG
6011         else if (pg_strcasecmp(method, "open_datasync") == 0)
6012         {
6013                 new_sync_method = SYNC_METHOD_OPEN;
6014                 new_sync_bit = OPEN_DATASYNC_FLAG;
6015         }
6016 #endif
6017         else
6018                 return NULL;
6019
6020         if (!doit)
6021                 return method;
6022
6023         if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
6024         {
6025                 /*
6026                  * To ensure that no blocks escape unsynced, force an fsync on the
6027                  * currently open log segment (if any).  Also, if the open flag is
6028                  * changing, close the log file so it will be reopened (with new flag
6029                  * bit) at next use.
6030                  */
6031                 if (openLogFile >= 0)
6032                 {
6033                         if (pg_fsync(openLogFile) != 0)
6034                                 ereport(PANIC,
6035                                                 (errcode_for_file_access(),
6036                                                  errmsg("could not fsync log file %u, segment %u: %m",
6037                                                                 openLogId, openLogSeg)));
6038                         if (open_sync_bit != new_sync_bit)
6039                                 XLogFileClose();
6040                 }
6041                 sync_method = new_sync_method;
6042                 open_sync_bit = new_sync_bit;
6043         }
6044
6045         return method;
6046 }
6047
6048
6049 /*
6050  * Issue appropriate kind of fsync (if any) on the current XLOG output file
6051  */
6052 static void
6053 issue_xlog_fsync(void)
6054 {
6055         switch (sync_method)
6056         {
6057                 case SYNC_METHOD_FSYNC:
6058                         if (pg_fsync_no_writethrough(openLogFile) != 0)
6059                                 ereport(PANIC,
6060                                                 (errcode_for_file_access(),
6061                                                  errmsg("could not fsync log file %u, segment %u: %m",
6062                                                                 openLogId, openLogSeg)));
6063                         break;
6064 #ifdef HAVE_FSYNC_WRITETHROUGH
6065                 case SYNC_METHOD_FSYNC_WRITETHROUGH:
6066                         if (pg_fsync_writethrough(openLogFile) != 0)
6067                                 ereport(PANIC,
6068                                                 (errcode_for_file_access(),
6069                                                  errmsg("could not fsync write-through log file %u, segment %u: %m",
6070                                                                 openLogId, openLogSeg)));
6071                         break;
6072 #endif
6073 #ifdef HAVE_FDATASYNC
6074                 case SYNC_METHOD_FDATASYNC:
6075                         if (pg_fdatasync(openLogFile) != 0)
6076                                 ereport(PANIC,
6077                                                 (errcode_for_file_access(),
6078                                         errmsg("could not fdatasync log file %u, segment %u: %m",
6079                                                    openLogId, openLogSeg)));
6080                         break;
6081 #endif
6082                 case SYNC_METHOD_OPEN:
6083                         /* write synced it already */
6084                         break;
6085                 default:
6086                         elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
6087                         break;
6088         }
6089 }
6090
6091
6092 /*
6093  * pg_start_backup: set up for taking an on-line backup dump
6094  *
6095  * Essentially what this does is to create a backup label file in $PGDATA,
6096  * where it will be archived as part of the backup dump.  The label file
6097  * contains the user-supplied label string (typically this would be used
6098  * to tell where the backup dump will be stored) and the starting time and
6099  * starting WAL location for the dump.
6100  */
6101 Datum
6102 pg_start_backup(PG_FUNCTION_ARGS)
6103 {
6104         text       *backupid = PG_GETARG_TEXT_P(0);
6105         text       *result;
6106         char       *backupidstr;
6107         XLogRecPtr      checkpointloc;
6108         XLogRecPtr      startpoint;
6109         time_t          stamp_time;
6110         char            strfbuf[128];
6111         char            xlogfilename[MAXFNAMELEN];
6112         uint32          _logId;
6113         uint32          _logSeg;
6114         struct stat stat_buf;
6115         FILE       *fp;
6116
6117         if (!superuser())
6118                 ereport(ERROR,
6119                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6120                                  (errmsg("must be superuser to run a backup"))));
6121
6122         if (!XLogArchivingActive())
6123                 ereport(ERROR,
6124                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6125                                  (errmsg("WAL archiving is not active"),
6126                                   (errhint("archive_command must be defined before "
6127                                                    "online backups can be made safely.")))));
6128
6129         backupidstr = DatumGetCString(DirectFunctionCall1(textout,
6130                                                                                                  PointerGetDatum(backupid)));
6131
6132         /*
6133          * Mark backup active in shared memory.  We must do full-page WAL writes
6134          * during an on-line backup even if not doing so at other times, because
6135          * it's quite possible for the backup dump to obtain a "torn" (partially
6136          * written) copy of a database page if it reads the page concurrently with
6137          * our write to the same page.  This can be fixed as long as the first
6138          * write to the page in the WAL sequence is a full-page write. Hence, we
6139          * turn on forcePageWrites and then force a CHECKPOINT, to ensure there
6140          * are no dirty pages in shared memory that might get dumped while the
6141          * backup is in progress without having a corresponding WAL record.  (Once
6142          * the backup is complete, we need not force full-page writes anymore,
6143          * since we expect that any pages not modified during the backup interval
6144          * must have been correctly captured by the backup.)
6145          *
6146          * We must hold WALInsertLock to change the value of forcePageWrites, to
6147          * ensure adequate interlocking against XLogInsert().
6148          */
6149         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6150         if (XLogCtl->Insert.forcePageWrites)
6151         {
6152                 LWLockRelease(WALInsertLock);
6153                 ereport(ERROR,
6154                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6155                                  errmsg("a backup is already in progress"),
6156                                  errhint("Run pg_stop_backup() and try again.")));
6157         }
6158         XLogCtl->Insert.forcePageWrites = true;
6159         LWLockRelease(WALInsertLock);
6160
6161         /* Use a TRY block to ensure we release forcePageWrites if fail below */
6162         PG_TRY();
6163         {
6164                 /*
6165                  * Force a CHECKPOINT.  Aside from being necessary to prevent torn
6166                  * page problems, this guarantees that two successive backup runs will
6167                  * have different checkpoint positions and hence different history
6168                  * file names, even if nothing happened in between.
6169                  */
6170                 RequestCheckpoint(true, false);
6171
6172                 /*
6173                  * Now we need to fetch the checkpoint record location, and also its
6174                  * REDO pointer.  The oldest point in WAL that would be needed to
6175                  * restore starting from the checkpoint is precisely the REDO pointer.
6176                  */
6177                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
6178                 checkpointloc = ControlFile->checkPoint;
6179                 startpoint = ControlFile->checkPointCopy.redo;
6180                 LWLockRelease(ControlFileLock);
6181
6182                 XLByteToSeg(startpoint, _logId, _logSeg);
6183                 XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
6184
6185                 /*
6186                  * We deliberately use strftime/localtime not the src/timezone
6187                  * functions, so that backup labels will consistently be recorded in
6188                  * the same timezone regardless of TimeZone setting.  This matches
6189                  * elog.c's practice.
6190                  */
6191                 stamp_time = time(NULL);
6192                 strftime(strfbuf, sizeof(strfbuf),
6193                                  "%Y-%m-%d %H:%M:%S %Z",
6194                                  localtime(&stamp_time));
6195
6196                 /*
6197                  * Check for existing backup label --- implies a backup is already
6198                  * running.  (XXX given that we checked forcePageWrites above, maybe
6199                  * it would be OK to just unlink any such label file?)
6200                  */
6201                 if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0)
6202                 {
6203                         if (errno != ENOENT)
6204                                 ereport(ERROR,
6205                                                 (errcode_for_file_access(),
6206                                                  errmsg("could not stat file \"%s\": %m",
6207                                                                 BACKUP_LABEL_FILE)));
6208                 }
6209                 else
6210                         ereport(ERROR,
6211                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6212                                          errmsg("a backup is already in progress"),
6213                                          errhint("If you're sure there is no backup in progress, remove file \"%s\" and try again.",
6214                                                          BACKUP_LABEL_FILE)));
6215
6216                 /*
6217                  * Okay, write the file
6218                  */
6219                 fp = AllocateFile(BACKUP_LABEL_FILE, "w");
6220                 if (!fp)
6221                         ereport(ERROR,
6222                                         (errcode_for_file_access(),
6223                                          errmsg("could not create file \"%s\": %m",
6224                                                         BACKUP_LABEL_FILE)));
6225                 fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
6226                                 startpoint.xlogid, startpoint.xrecoff, xlogfilename);
6227                 fprintf(fp, "CHECKPOINT LOCATION: %X/%X\n",
6228                                 checkpointloc.xlogid, checkpointloc.xrecoff);
6229                 fprintf(fp, "START TIME: %s\n", strfbuf);
6230                 fprintf(fp, "LABEL: %s\n", backupidstr);
6231                 if (fflush(fp) || ferror(fp) || FreeFile(fp))
6232                         ereport(ERROR,
6233                                         (errcode_for_file_access(),
6234                                          errmsg("could not write file \"%s\": %m",
6235                                                         BACKUP_LABEL_FILE)));
6236         }
6237         PG_CATCH();
6238         {
6239                 /* Turn off forcePageWrites on failure */
6240                 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6241                 XLogCtl->Insert.forcePageWrites = false;
6242                 LWLockRelease(WALInsertLock);
6243
6244                 PG_RE_THROW();
6245         }
6246         PG_END_TRY();
6247
6248         /*
6249          * We're done.  As a convenience, return the starting WAL location.
6250          */
6251         snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
6252                          startpoint.xlogid, startpoint.xrecoff);
6253         result = DatumGetTextP(DirectFunctionCall1(textin,
6254                                                                                          CStringGetDatum(xlogfilename)));
6255         PG_RETURN_TEXT_P(result);
6256 }
6257
6258 /*
6259  * pg_stop_backup: finish taking an on-line backup dump
6260  *
6261  * We remove the backup label file created by pg_start_backup, and instead
6262  * create a backup history file in pg_xlog (whence it will immediately be
6263  * archived).  The backup history file contains the same info found in
6264  * the label file, plus the backup-end time and WAL location.
6265  */
6266 Datum
6267 pg_stop_backup(PG_FUNCTION_ARGS)
6268 {
6269         text       *result;
6270         XLogRecPtr      startpoint;
6271         XLogRecPtr      stoppoint;
6272         time_t          stamp_time;
6273         char            strfbuf[128];
6274         char            histfilepath[MAXPGPATH];
6275         char            startxlogfilename[MAXFNAMELEN];
6276         char            stopxlogfilename[MAXFNAMELEN];
6277         uint32          _logId;
6278         uint32          _logSeg;
6279         FILE       *lfp;
6280         FILE       *fp;
6281         char            ch;
6282         int                     ich;
6283
6284         if (!superuser())
6285                 ereport(ERROR,
6286                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6287                                  (errmsg("must be superuser to run a backup"))));
6288
6289         /*
6290          * OK to clear forcePageWrites
6291          */
6292         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6293         XLogCtl->Insert.forcePageWrites = false;
6294         LWLockRelease(WALInsertLock);
6295
6296         /*
6297          * Force a switch to a new xlog segment file, so that the backup is valid
6298          * as soon as archiver moves out the current segment file. We'll report
6299          * the end address of the XLOG SWITCH record as the backup stopping point.
6300          */
6301         stoppoint = RequestXLogSwitch();
6302
6303         XLByteToSeg(stoppoint, _logId, _logSeg);
6304         XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
6305
6306         /*
6307          * We deliberately use strftime/localtime not the src/timezone functions,
6308          * so that backup labels will consistently be recorded in the same
6309          * timezone regardless of TimeZone setting.  This matches elog.c's
6310          * practice.
6311          */
6312         stamp_time = time(NULL);
6313         strftime(strfbuf, sizeof(strfbuf),
6314                          "%Y-%m-%d %H:%M:%S %Z",
6315                          localtime(&stamp_time));
6316
6317         /*
6318          * Open the existing label file
6319          */
6320         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6321         if (!lfp)
6322         {
6323                 if (errno != ENOENT)
6324                         ereport(ERROR,
6325                                         (errcode_for_file_access(),
6326                                          errmsg("could not read file \"%s\": %m",
6327                                                         BACKUP_LABEL_FILE)));
6328                 ereport(ERROR,
6329                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6330                                  errmsg("a backup is not in progress")));
6331         }
6332
6333         /*
6334          * Read and parse the START WAL LOCATION line (this code is pretty crude,
6335          * but we are not expecting any variability in the file format).
6336          */
6337         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %24s)%c",
6338                            &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
6339                            &ch) != 4 || ch != '\n')
6340                 ereport(ERROR,
6341                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6342                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6343
6344         /*
6345          * Write the backup history file
6346          */
6347         XLByteToSeg(startpoint, _logId, _logSeg);
6348         BackupHistoryFilePath(histfilepath, ThisTimeLineID, _logId, _logSeg,
6349                                                   startpoint.xrecoff % XLogSegSize);
6350         fp = AllocateFile(histfilepath, "w");
6351         if (!fp)
6352                 ereport(ERROR,
6353                                 (errcode_for_file_access(),
6354                                  errmsg("could not create file \"%s\": %m",
6355                                                 histfilepath)));
6356         fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
6357                         startpoint.xlogid, startpoint.xrecoff, startxlogfilename);
6358         fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
6359                         stoppoint.xlogid, stoppoint.xrecoff, stopxlogfilename);
6360         /* transfer remaining lines from label to history file */
6361         while ((ich = fgetc(lfp)) != EOF)
6362                 fputc(ich, fp);
6363         fprintf(fp, "STOP TIME: %s\n", strfbuf);
6364         if (fflush(fp) || ferror(fp) || FreeFile(fp))
6365                 ereport(ERROR,
6366                                 (errcode_for_file_access(),
6367                                  errmsg("could not write file \"%s\": %m",
6368                                                 histfilepath)));
6369
6370         /*
6371          * Close and remove the backup label file
6372          */
6373         if (ferror(lfp) || FreeFile(lfp))
6374                 ereport(ERROR,
6375                                 (errcode_for_file_access(),
6376                                  errmsg("could not read file \"%s\": %m",
6377                                                 BACKUP_LABEL_FILE)));
6378         if (unlink(BACKUP_LABEL_FILE) != 0)
6379                 ereport(ERROR,
6380                                 (errcode_for_file_access(),
6381                                  errmsg("could not remove file \"%s\": %m",
6382                                                 BACKUP_LABEL_FILE)));
6383
6384         /*
6385          * Clean out any no-longer-needed history files.  As a side effect, this
6386          * will post a .ready file for the newly created history file, notifying
6387          * the archiver that history file may be archived immediately.
6388          */
6389         CleanupBackupHistory();
6390
6391         /*
6392          * We're done.  As a convenience, return the ending WAL location.
6393          */
6394         snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
6395                          stoppoint.xlogid, stoppoint.xrecoff);
6396         result = DatumGetTextP(DirectFunctionCall1(textin,
6397                                                                                  CStringGetDatum(stopxlogfilename)));
6398         PG_RETURN_TEXT_P(result);
6399 }
6400
6401 /*
6402  * pg_switch_xlog: switch to next xlog file
6403  */
6404 Datum
6405 pg_switch_xlog(PG_FUNCTION_ARGS)
6406 {
6407         text       *result;
6408         XLogRecPtr      switchpoint;
6409         char            location[MAXFNAMELEN];
6410
6411         if (!superuser())
6412                 ereport(ERROR,
6413                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6414                                  (errmsg("must be superuser to switch transaction log files"))));
6415
6416         switchpoint = RequestXLogSwitch();
6417
6418         /*
6419          * As a convenience, return the WAL location of the switch record
6420          */
6421         snprintf(location, sizeof(location), "%X/%X",
6422                          switchpoint.xlogid, switchpoint.xrecoff);
6423         result = DatumGetTextP(DirectFunctionCall1(textin,
6424                                                                                            CStringGetDatum(location)));
6425         PG_RETURN_TEXT_P(result);
6426 }
6427
6428 /*
6429  * Report the current WAL write location (same format as pg_start_backup etc)
6430  *
6431  * This is useful for determining how much of WAL is visible to an external
6432  * archiving process.  Note that the data before this point is written out
6433  * to the kernel, but is not necessarily synced to disk.
6434  */
6435 Datum
6436 pg_current_xlog_location(PG_FUNCTION_ARGS)
6437 {
6438         text       *result;
6439         char            location[MAXFNAMELEN];
6440
6441         /* Make sure we have an up-to-date local LogwrtResult */
6442         {
6443                 /* use volatile pointer to prevent code rearrangement */
6444                 volatile XLogCtlData *xlogctl = XLogCtl;
6445
6446                 SpinLockAcquire(&xlogctl->info_lck);
6447                 LogwrtResult = xlogctl->LogwrtResult;
6448                 SpinLockRelease(&xlogctl->info_lck);
6449         }
6450
6451         snprintf(location, sizeof(location), "%X/%X",
6452                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff);
6453
6454         result = DatumGetTextP(DirectFunctionCall1(textin,
6455                                                                                            CStringGetDatum(location)));
6456         PG_RETURN_TEXT_P(result);
6457 }
6458
6459 /*
6460  * Report the current WAL insert location (same format as pg_start_backup etc)
6461  *
6462  * This function is mostly for debugging purposes.
6463  */
6464 Datum
6465 pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
6466 {
6467         text       *result;
6468         XLogCtlInsert *Insert = &XLogCtl->Insert;
6469         XLogRecPtr      current_recptr;
6470         char            location[MAXFNAMELEN];
6471
6472         /*
6473          * Get the current end-of-WAL position ... shared lock is sufficient
6474          */
6475         LWLockAcquire(WALInsertLock, LW_SHARED);
6476         INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
6477         LWLockRelease(WALInsertLock);
6478
6479         snprintf(location, sizeof(location), "%X/%X",
6480                          current_recptr.xlogid, current_recptr.xrecoff);
6481
6482         result = DatumGetTextP(DirectFunctionCall1(textin,
6483                                                                                            CStringGetDatum(location)));
6484         PG_RETURN_TEXT_P(result);
6485 }
6486
6487 /*
6488  * Compute an xlog file name and decimal byte offset given a WAL location,
6489  * such as is returned by pg_stop_backup() or pg_xlog_switch().
6490  *
6491  * Note that a location exactly at a segment boundary is taken to be in
6492  * the previous segment.  This is usually the right thing, since the
6493  * expected usage is to determine which xlog file(s) are ready to archive.
6494  */
6495 Datum
6496 pg_xlogfile_name_offset(PG_FUNCTION_ARGS)
6497 {
6498         text       *location = PG_GETARG_TEXT_P(0);
6499         char       *locationstr;
6500         unsigned int uxlogid;
6501         unsigned int uxrecoff;
6502         uint32          xlogid;
6503         uint32          xlogseg;
6504         uint32          xrecoff;
6505         XLogRecPtr      locationpoint;
6506         char            xlogfilename[MAXFNAMELEN];
6507         Datum           values[2];
6508         bool            isnull[2];
6509         TupleDesc       resultTupleDesc;
6510         HeapTuple       resultHeapTuple;
6511         Datum           result;
6512
6513         /*
6514          * Read input and parse
6515          */
6516         locationstr = DatumGetCString(DirectFunctionCall1(textout,
6517                                                                                                  PointerGetDatum(location)));
6518
6519         if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
6520                 ereport(ERROR,
6521                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
6522                                  errmsg("could not parse transaction log location \"%s\"",
6523                                                 locationstr)));
6524
6525         locationpoint.xlogid = uxlogid;
6526         locationpoint.xrecoff = uxrecoff;
6527
6528         /*
6529          * Construct a tuple descriptor for the result row.  This must match this
6530          * function's pg_proc entry!
6531          */
6532         resultTupleDesc = CreateTemplateTupleDesc(2, false);
6533         TupleDescInitEntry(resultTupleDesc, (AttrNumber) 1, "file_name",
6534                                            TEXTOID, -1, 0);
6535         TupleDescInitEntry(resultTupleDesc, (AttrNumber) 2, "file_offset",
6536                                            INT4OID, -1, 0);
6537
6538         resultTupleDesc = BlessTupleDesc(resultTupleDesc);
6539
6540         /*
6541          * xlogfilename
6542          */
6543         XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
6544         XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
6545
6546         values[0] = DirectFunctionCall1(textin,
6547                                                                         CStringGetDatum(xlogfilename));
6548         isnull[0] = false;
6549
6550         /*
6551          * offset
6552          */
6553         xrecoff = locationpoint.xrecoff - xlogseg * XLogSegSize;
6554
6555         values[1] = UInt32GetDatum(xrecoff);
6556         isnull[1] = false;
6557
6558         /*
6559          * Tuple jam: Having first prepared your Datums, then squash together
6560          */
6561         resultHeapTuple = heap_form_tuple(resultTupleDesc, values, isnull);
6562
6563         result = HeapTupleGetDatum(resultHeapTuple);
6564
6565         PG_RETURN_DATUM(result);
6566 }
6567
6568 /*
6569  * Compute an xlog file name given a WAL location,
6570  * such as is returned by pg_stop_backup() or pg_xlog_switch().
6571  */
6572 Datum
6573 pg_xlogfile_name(PG_FUNCTION_ARGS)
6574 {
6575         text       *location = PG_GETARG_TEXT_P(0);
6576         text       *result;
6577         char       *locationstr;
6578         unsigned int uxlogid;
6579         unsigned int uxrecoff;
6580         uint32          xlogid;
6581         uint32          xlogseg;
6582         XLogRecPtr      locationpoint;
6583         char            xlogfilename[MAXFNAMELEN];
6584
6585         locationstr = DatumGetCString(DirectFunctionCall1(textout,
6586                                                                                                  PointerGetDatum(location)));
6587
6588         if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
6589                 ereport(ERROR,
6590                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
6591                                  errmsg("could not parse transaction log location \"%s\"",
6592                                                 locationstr)));
6593
6594         locationpoint.xlogid = uxlogid;
6595         locationpoint.xrecoff = uxrecoff;
6596
6597         XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
6598         XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
6599
6600         result = DatumGetTextP(DirectFunctionCall1(textin,
6601                                                                                          CStringGetDatum(xlogfilename)));
6602         PG_RETURN_TEXT_P(result);
6603 }
6604
6605 /*
6606  * read_backup_label: check to see if a backup_label file is present
6607  *
6608  * If we see a backup_label during recovery, we assume that we are recovering
6609  * from a backup dump file, and we therefore roll forward from the checkpoint
6610  * identified by the label file, NOT what pg_control says.      This avoids the
6611  * problem that pg_control might have been archived one or more checkpoints
6612  * later than the start of the dump, and so if we rely on it as the start
6613  * point, we will fail to restore a consistent database state.
6614  *
6615  * We also attempt to retrieve the corresponding backup history file.
6616  * If successful, set *minRecoveryLoc to constrain valid PITR stopping
6617  * points.
6618  *
6619  * Returns TRUE if a backup_label was found (and fills the checkpoint
6620  * location into *checkPointLoc); returns FALSE if not.
6621  */
6622 static bool
6623 read_backup_label(XLogRecPtr *checkPointLoc, XLogRecPtr *minRecoveryLoc)
6624 {
6625         XLogRecPtr      startpoint;
6626         XLogRecPtr      stoppoint;
6627         char            histfilename[MAXFNAMELEN];
6628         char            histfilepath[MAXPGPATH];
6629         char            startxlogfilename[MAXFNAMELEN];
6630         char            stopxlogfilename[MAXFNAMELEN];
6631         TimeLineID      tli;
6632         uint32          _logId;
6633         uint32          _logSeg;
6634         FILE       *lfp;
6635         FILE       *fp;
6636         char            ch;
6637
6638         /* Default is to not constrain recovery stop point */
6639         minRecoveryLoc->xlogid = 0;
6640         minRecoveryLoc->xrecoff = 0;
6641
6642         /*
6643          * See if label file is present
6644          */
6645         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6646         if (!lfp)
6647         {
6648                 if (errno != ENOENT)
6649                         ereport(FATAL,
6650                                         (errcode_for_file_access(),
6651                                          errmsg("could not read file \"%s\": %m",
6652                                                         BACKUP_LABEL_FILE)));
6653                 return false;                   /* it's not there, all is fine */
6654         }
6655
6656         /*
6657          * Read and parse the START WAL LOCATION and CHECKPOINT lines (this code
6658          * is pretty crude, but we are not expecting any variability in the file
6659          * format).
6660          */
6661         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
6662                            &startpoint.xlogid, &startpoint.xrecoff, &tli,
6663                            startxlogfilename, &ch) != 5 || ch != '\n')
6664                 ereport(FATAL,
6665                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6666                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6667         if (fscanf(lfp, "CHECKPOINT LOCATION: %X/%X%c",
6668                            &checkPointLoc->xlogid, &checkPointLoc->xrecoff,
6669                            &ch) != 3 || ch != '\n')
6670                 ereport(FATAL,
6671                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6672                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6673         if (ferror(lfp) || FreeFile(lfp))
6674                 ereport(FATAL,
6675                                 (errcode_for_file_access(),
6676                                  errmsg("could not read file \"%s\": %m",
6677                                                 BACKUP_LABEL_FILE)));
6678
6679         /*
6680          * Try to retrieve the backup history file (no error if we can't)
6681          */
6682         XLByteToSeg(startpoint, _logId, _logSeg);
6683         BackupHistoryFileName(histfilename, tli, _logId, _logSeg,
6684                                                   startpoint.xrecoff % XLogSegSize);
6685
6686         if (InArchiveRecovery)
6687                 RestoreArchivedFile(histfilepath, histfilename, "RECOVERYHISTORY", 0);
6688         else
6689                 BackupHistoryFilePath(histfilepath, tli, _logId, _logSeg,
6690                                                           startpoint.xrecoff % XLogSegSize);
6691
6692         fp = AllocateFile(histfilepath, "r");
6693         if (fp)
6694         {
6695                 /*
6696                  * Parse history file to identify stop point.
6697                  */
6698                 if (fscanf(fp, "START WAL LOCATION: %X/%X (file %24s)%c",
6699                                    &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
6700                                    &ch) != 4 || ch != '\n')
6701                         ereport(FATAL,
6702                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6703                                          errmsg("invalid data in file \"%s\"", histfilename)));
6704                 if (fscanf(fp, "STOP WAL LOCATION: %X/%X (file %24s)%c",
6705                                    &stoppoint.xlogid, &stoppoint.xrecoff, stopxlogfilename,
6706                                    &ch) != 4 || ch != '\n')
6707                         ereport(FATAL,
6708                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6709                                          errmsg("invalid data in file \"%s\"", histfilename)));
6710                 *minRecoveryLoc = stoppoint;
6711                 if (ferror(fp) || FreeFile(fp))
6712                         ereport(FATAL,
6713                                         (errcode_for_file_access(),
6714                                          errmsg("could not read file \"%s\": %m",
6715                                                         histfilepath)));
6716         }
6717
6718         return true;
6719 }
6720
6721 /*
6722  * Error context callback for errors occurring during rm_redo().
6723  */
6724 static void
6725 rm_redo_error_callback(void *arg)
6726 {
6727         XLogRecord *record = (XLogRecord *) arg;
6728         StringInfoData buf;
6729
6730         initStringInfo(&buf);
6731         RmgrTable[record->xl_rmid].rm_desc(&buf,
6732                                                                            record->xl_info,
6733                                                                            XLogRecGetData(record));
6734
6735         /* don't bother emitting empty description */
6736         if (buf.len > 0)
6737                 errcontext("xlog redo %s", buf.data);
6738
6739         pfree(buf.data);
6740 }