OSDN Git Service

Modify snapshot definition so that lazy vacuums are ignored by other
[pg-rex/syncrep.git] / src / backend / access / transam / xlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  *              PostgreSQL transaction log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.245 2006/07/30 02:07:18 alvherre Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include <ctype.h>
18 #include <fcntl.h>
19 #include <signal.h>
20 #include <time.h>
21 #include <unistd.h>
22 #include <sys/stat.h>
23 #include <sys/time.h>
24
25 #include "access/clog.h"
26 #include "access/multixact.h"
27 #include "access/subtrans.h"
28 #include "access/transam.h"
29 #include "access/twophase.h"
30 #include "access/xact.h"
31 #include "access/xlog_internal.h"
32 #include "access/xlogutils.h"
33 #include "catalog/catversion.h"
34 #include "catalog/pg_control.h"
35 #include "miscadmin.h"
36 #include "pgstat.h"
37 #include "postmaster/bgwriter.h"
38 #include "storage/bufpage.h"
39 #include "storage/fd.h"
40 #include "storage/pmsignal.h"
41 #include "storage/procarray.h"
42 #include "storage/spin.h"
43 #include "utils/builtins.h"
44 #include "utils/nabstime.h"
45 #include "utils/pg_locale.h"
46
47
48 /*
49  *      Because O_DIRECT bypasses the kernel buffers, and because we never
50  *      read those buffers except during crash recovery, it is a win to use
51  *      it in all cases where we sync on each write().  We could allow O_DIRECT
52  *      with fsync(), but because skipping the kernel buffer forces writes out
53  *      quickly, it seems best just to use it for O_SYNC.  It is hard to imagine
54  *      how fsync() could be a win for O_DIRECT compared to O_SYNC and O_DIRECT.
55  *      Also, O_DIRECT is never enough to force data to the drives, it merely
56  *      tries to bypass the kernel cache, so we still need O_SYNC or fsync().
57  */
58 #ifdef O_DIRECT
59 #define PG_O_DIRECT                             O_DIRECT
60 #else
61 #define PG_O_DIRECT                             0
62 #endif
63
64 /*
65  * This chunk of hackery attempts to determine which file sync methods
66  * are available on the current platform, and to choose an appropriate
67  * default method.      We assume that fsync() is always available, and that
68  * configure determined whether fdatasync() is.
69  */
70 #if defined(O_SYNC)
71 #define BARE_OPEN_SYNC_FLAG             O_SYNC
72 #elif defined(O_FSYNC)
73 #define BARE_OPEN_SYNC_FLAG             O_FSYNC
74 #endif
75 #ifdef BARE_OPEN_SYNC_FLAG
76 #define OPEN_SYNC_FLAG                  (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
77 #endif
78
79 #if defined(O_DSYNC)
80 #if defined(OPEN_SYNC_FLAG)
81 /* O_DSYNC is distinct? */
82 #if O_DSYNC != BARE_OPEN_SYNC_FLAG
83 #define OPEN_DATASYNC_FLAG              (O_DSYNC | PG_O_DIRECT)
84 #endif
85 #else                                                   /* !defined(OPEN_SYNC_FLAG) */
86 /* Win32 only has O_DSYNC */
87 #define OPEN_DATASYNC_FLAG              (O_DSYNC | PG_O_DIRECT)
88 #endif
89 #endif
90
91 #if defined(OPEN_DATASYNC_FLAG)
92 #define DEFAULT_SYNC_METHOD_STR "open_datasync"
93 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_OPEN
94 #define DEFAULT_SYNC_FLAGBIT    OPEN_DATASYNC_FLAG
95 #elif defined(HAVE_FDATASYNC)
96 #define DEFAULT_SYNC_METHOD_STR "fdatasync"
97 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FDATASYNC
98 #define DEFAULT_SYNC_FLAGBIT    0
99 #elif defined(HAVE_FSYNC_WRITETHROUGH_ONLY)
100 #define DEFAULT_SYNC_METHOD_STR "fsync_writethrough"
101 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FSYNC_WRITETHROUGH
102 #define DEFAULT_SYNC_FLAGBIT    0
103 #else
104 #define DEFAULT_SYNC_METHOD_STR "fsync"
105 #define DEFAULT_SYNC_METHOD             SYNC_METHOD_FSYNC
106 #define DEFAULT_SYNC_FLAGBIT    0
107 #endif
108
109
110 /*
111  * Limitation of buffer-alignment for direct IO depends on OS and filesystem,
112  * but XLOG_BLCKSZ is assumed to be enough for it.
113  */
114 #ifdef O_DIRECT
115 #define ALIGNOF_XLOG_BUFFER             XLOG_BLCKSZ
116 #else
117 #define ALIGNOF_XLOG_BUFFER             ALIGNOF_BUFFER
118 #endif
119
120
121 /* File path names (all relative to $PGDATA) */
122 #define BACKUP_LABEL_FILE               "backup_label"
123 #define RECOVERY_COMMAND_FILE   "recovery.conf"
124 #define RECOVERY_COMMAND_DONE   "recovery.done"
125
126
127 /* User-settable parameters */
128 int                     CheckPointSegments = 3;
129 int                     XLOGbuffers = 8;
130 char       *XLogArchiveCommand = NULL;
131 char       *XLOG_sync_method = NULL;
132 const char      XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
133 bool            fullPageWrites = true;
134
135 #ifdef WAL_DEBUG
136 bool            XLOG_DEBUG = false;
137 #endif
138
139 /*
140  * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
141  * preallocated XLOG segments --- we try to have at least XLOGfiles advance
142  * segments but no more than XLOGfileslop segments.  This could
143  * be made a separate GUC variable, but at present I think it's sufficient
144  * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
145  * checkpoint will free no more than 2*CheckPointSegments log segments, and
146  * we want to recycle all of them; the +1 allows boundary cases to happen
147  * without wasting a delete/create-segment cycle.
148  */
149
150 #define XLOGfileslop    (2*CheckPointSegments + 1)
151
152
153 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
154 int                     sync_method = DEFAULT_SYNC_METHOD;
155 static int      open_sync_bit = DEFAULT_SYNC_FLAGBIT;
156
157 #define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
158
159
160 /*
161  * ThisTimeLineID will be same in all backends --- it identifies current
162  * WAL timeline for the database system.
163  */
164 TimeLineID      ThisTimeLineID = 0;
165
166 /* Are we doing recovery from XLOG? */
167 bool            InRecovery = false;
168
169 /* Are we recovering using offline XLOG archives? */
170 static bool InArchiveRecovery = false;
171
172 /* Was the last xlog file restored from archive, or local? */
173 static bool restoredFromArchive = false;
174
175 /* options taken from recovery.conf */
176 static char *recoveryRestoreCommand = NULL;
177 static bool recoveryTarget = false;
178 static bool recoveryTargetExact = false;
179 static bool recoveryTargetInclusive = true;
180 static TransactionId recoveryTargetXid;
181 static time_t recoveryTargetTime;
182
183 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
184 static TransactionId recoveryStopXid;
185 static time_t recoveryStopTime;
186 static bool recoveryStopAfter;
187
188 /* constraint set by read_backup_label */
189 static XLogRecPtr recoveryMinXlogOffset = {0, 0};
190
191 /*
192  * During normal operation, the only timeline we care about is ThisTimeLineID.
193  * During recovery, however, things are more complicated.  To simplify life
194  * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
195  * scan through the WAL history (that is, it is the line that was active when
196  * the currently-scanned WAL record was generated).  We also need these
197  * timeline values:
198  *
199  * recoveryTargetTLI: the desired timeline that we want to end in.
200  *
201  * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
202  * its known parents, newest first (so recoveryTargetTLI is always the
203  * first list member).  Only these TLIs are expected to be seen in the WAL
204  * segments we read, and indeed only these TLIs will be considered as
205  * candidate WAL files to open at all.
206  *
207  * curFileTLI: the TLI appearing in the name of the current input WAL file.
208  * (This is not necessarily the same as ThisTimeLineID, because we could
209  * be scanning data that was copied from an ancestor timeline when the current
210  * file was created.)  During a sequential scan we do not allow this value
211  * to decrease.
212  */
213 static TimeLineID recoveryTargetTLI;
214 static List *expectedTLIs;
215 static TimeLineID curFileTLI;
216
217 /*
218  * MyLastRecPtr points to the start of the last XLOG record inserted by the
219  * current transaction.  If MyLastRecPtr.xrecoff == 0, then the current
220  * xact hasn't yet inserted any transaction-controlled XLOG records.
221  *
222  * Note that XLOG records inserted outside transaction control are not
223  * reflected into MyLastRecPtr.  They do, however, cause MyXactMadeXLogEntry
224  * to be set true.      The latter can be used to test whether the current xact
225  * made any loggable changes (including out-of-xact changes, such as
226  * sequence updates).
227  *
228  * When we insert/update/delete a tuple in a temporary relation, we do not
229  * make any XLOG record, since we don't care about recovering the state of
230  * the temp rel after a crash.  However, we will still need to remember
231  * whether our transaction committed or aborted in that case.  So, we must
232  * set MyXactMadeTempRelUpdate true to indicate that the XID will be of
233  * interest later.
234  */
235 XLogRecPtr      MyLastRecPtr = {0, 0};
236
237 bool            MyXactMadeXLogEntry = false;
238
239 bool            MyXactMadeTempRelUpdate = false;
240
241 /*
242  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
243  * current backend.  It is updated for all inserts, transaction-controlled
244  * or not.      ProcLastRecEnd is similar but points to end+1 of last record.
245  */
246 static XLogRecPtr ProcLastRecPtr = {0, 0};
247
248 XLogRecPtr      ProcLastRecEnd = {0, 0};
249
250 /*
251  * RedoRecPtr is this backend's local copy of the REDO record pointer
252  * (which is almost but not quite the same as a pointer to the most recent
253  * CHECKPOINT record).  We update this from the shared-memory copy,
254  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
255  * hold the Insert lock).  See XLogInsert for details.  We are also allowed
256  * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
257  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
258  * InitXLOGAccess.
259  */
260 static XLogRecPtr RedoRecPtr;
261
262 /*----------
263  * Shared-memory data structures for XLOG control
264  *
265  * LogwrtRqst indicates a byte position that we need to write and/or fsync
266  * the log up to (all records before that point must be written or fsynced).
267  * LogwrtResult indicates the byte positions we have already written/fsynced.
268  * These structs are identical but are declared separately to indicate their
269  * slightly different functions.
270  *
271  * We do a lot of pushups to minimize the amount of access to lockable
272  * shared memory values.  There are actually three shared-memory copies of
273  * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
274  *              XLogCtl->LogwrtResult is protected by info_lck
275  *              XLogCtl->Write.LogwrtResult is protected by WALWriteLock
276  *              XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
277  * One must hold the associated lock to read or write any of these, but
278  * of course no lock is needed to read/write the unshared LogwrtResult.
279  *
280  * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
281  * right", since both are updated by a write or flush operation before
282  * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
283  * is that it can be examined/modified by code that already holds WALWriteLock
284  * without needing to grab info_lck as well.
285  *
286  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
287  * but is updated when convenient.      Again, it exists for the convenience of
288  * code that is already holding WALInsertLock but not the other locks.
289  *
290  * The unshared LogwrtResult may lag behind any or all of these, and again
291  * is updated when convenient.
292  *
293  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
294  * (protected by info_lck), but we don't need to cache any copies of it.
295  *
296  * Note that this all works because the request and result positions can only
297  * advance forward, never back up, and so we can easily determine which of two
298  * values is "more up to date".
299  *
300  * info_lck is only held long enough to read/update the protected variables,
301  * so it's a plain spinlock.  The other locks are held longer (potentially
302  * over I/O operations), so we use LWLocks for them.  These locks are:
303  *
304  * WALInsertLock: must be held to insert a record into the WAL buffers.
305  *
306  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
307  * XLogFlush).
308  *
309  * ControlFileLock: must be held to read/update control file or create
310  * new log file.
311  *
312  * CheckpointLock: must be held to do a checkpoint (ensures only one
313  * checkpointer at a time; even though the postmaster won't launch
314  * parallel checkpoint processes, we need this because manual checkpoints
315  * could be launched simultaneously).
316  *
317  *----------
318  */
319
320 typedef struct XLogwrtRqst
321 {
322         XLogRecPtr      Write;                  /* last byte + 1 to write out */
323         XLogRecPtr      Flush;                  /* last byte + 1 to flush */
324 } XLogwrtRqst;
325
326 typedef struct XLogwrtResult
327 {
328         XLogRecPtr      Write;                  /* last byte + 1 written out */
329         XLogRecPtr      Flush;                  /* last byte + 1 flushed */
330 } XLogwrtResult;
331
332 /*
333  * Shared state data for XLogInsert.
334  */
335 typedef struct XLogCtlInsert
336 {
337         XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
338         XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
339         int                     curridx;                /* current block index in cache */
340         XLogPageHeader currpage;        /* points to header of block in cache */
341         char       *currpos;            /* current insertion point in cache */
342         XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
343         bool            forcePageWrites;        /* forcing full-page writes for PITR? */
344 } XLogCtlInsert;
345
346 /*
347  * Shared state data for XLogWrite/XLogFlush.
348  */
349 typedef struct XLogCtlWrite
350 {
351         XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
352         int                     curridx;                /* cache index of next block to write */
353 } XLogCtlWrite;
354
355 /*
356  * Total shared-memory state for XLOG.
357  */
358 typedef struct XLogCtlData
359 {
360         /* Protected by WALInsertLock: */
361         XLogCtlInsert Insert;
362         /* Protected by info_lck: */
363         XLogwrtRqst LogwrtRqst;
364         XLogwrtResult LogwrtResult;
365         /* Protected by WALWriteLock: */
366         XLogCtlWrite Write;
367
368         /*
369          * These values do not change after startup, although the pointed-to pages
370          * and xlblocks values certainly do.  Permission to read/write the pages
371          * and xlblocks values depends on WALInsertLock and WALWriteLock.
372          */
373         char       *pages;                      /* buffers for unwritten XLOG pages */
374         XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
375         Size            XLogCacheByte;  /* # bytes in xlog buffers */
376         int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
377         TimeLineID      ThisTimeLineID;
378
379         slock_t         info_lck;               /* locks shared LogwrtRqst/LogwrtResult */
380 } XLogCtlData;
381
382 static XLogCtlData *XLogCtl = NULL;
383
384 /*
385  * We maintain an image of pg_control in shared memory.
386  */
387 static ControlFileData *ControlFile = NULL;
388
389 /*
390  * Macros for managing XLogInsert state.  In most cases, the calling routine
391  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
392  * so these are passed as parameters instead of being fetched via XLogCtl.
393  */
394
395 /* Free space remaining in the current xlog page buffer */
396 #define INSERT_FREESPACE(Insert)  \
397         (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
398
399 /* Construct XLogRecPtr value for current insertion point */
400 #define INSERT_RECPTR(recptr,Insert,curridx)  \
401         ( \
402           (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
403           (recptr).xrecoff = \
404                 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
405         )
406
407 #define PrevBufIdx(idx)         \
408                 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
409
410 #define NextBufIdx(idx)         \
411                 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
412
413 /*
414  * Private, possibly out-of-date copy of shared LogwrtResult.
415  * See discussion above.
416  */
417 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
418
419 /*
420  * openLogFile is -1 or a kernel FD for an open log file segment.
421  * When it's open, openLogOff is the current seek offset in the file.
422  * openLogId/openLogSeg identify the segment.  These variables are only
423  * used to write the XLOG, and so will normally refer to the active segment.
424  */
425 static int      openLogFile = -1;
426 static uint32 openLogId = 0;
427 static uint32 openLogSeg = 0;
428 static uint32 openLogOff = 0;
429
430 /*
431  * These variables are used similarly to the ones above, but for reading
432  * the XLOG.  Note, however, that readOff generally represents the offset
433  * of the page just read, not the seek position of the FD itself, which
434  * will be just past that page.
435  */
436 static int      readFile = -1;
437 static uint32 readId = 0;
438 static uint32 readSeg = 0;
439 static uint32 readOff = 0;
440
441 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
442 static char *readBuf = NULL;
443
444 /* Buffer for current ReadRecord result (expandable) */
445 static char *readRecordBuf = NULL;
446 static uint32 readRecordBufSize = 0;
447
448 /* State information for XLOG reading */
449 static XLogRecPtr ReadRecPtr;   /* start of last record read */
450 static XLogRecPtr EndRecPtr;    /* end+1 of last record read */
451 static XLogRecord *nextRecord = NULL;
452 static TimeLineID lastPageTLI = 0;
453
454 static bool InRedo = false;
455
456
457 static void XLogArchiveNotify(const char *xlog);
458 static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
459 static bool XLogArchiveCheckDone(const char *xlog);
460 static void XLogArchiveCleanup(const char *xlog);
461 static void readRecoveryCommandFile(void);
462 static void exitArchiveRecovery(TimeLineID endTLI,
463                                         uint32 endLogId, uint32 endLogSeg);
464 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
465
466 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
467                                 XLogRecPtr *lsn, BkpBlock *bkpb);
468 static bool AdvanceXLInsertBuffer(void);
469 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
470 static int XLogFileInit(uint32 log, uint32 seg,
471                          bool *use_existent, bool use_lock);
472 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
473                                            bool find_free, int *max_advance,
474                                            bool use_lock);
475 static int      XLogFileOpen(uint32 log, uint32 seg);
476 static int      XLogFileRead(uint32 log, uint32 seg, int emode);
477 static void     XLogFileClose(void);
478 static bool RestoreArchivedFile(char *path, const char *xlogfname,
479                                         const char *recovername, off_t expectedSize);
480 static int      PreallocXlogFiles(XLogRecPtr endptr);
481 static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
482                                 int *nsegsremoved, int *nsegsrecycled);
483 static void CleanupBackupHistory(void);
484 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
485 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
486 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
487 static List *readTimeLineHistory(TimeLineID targetTLI);
488 static bool existsTimeLineHistory(TimeLineID probeTLI);
489 static TimeLineID findNewestTimeLine(TimeLineID startTLI);
490 static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
491                                          TimeLineID endTLI,
492                                          uint32 endLogId, uint32 endLogSeg);
493 static void WriteControlFile(void);
494 static void ReadControlFile(void);
495 static char *str_time(time_t tnow);
496 static void issue_xlog_fsync(void);
497
498 #ifdef WAL_DEBUG
499 static void xlog_outrec(StringInfo buf, XLogRecord *record);
500 #endif
501 static bool read_backup_label(XLogRecPtr *checkPointLoc);
502 static void remove_backup_label(void);
503 static void rm_redo_error_callback(void *arg);
504
505
506 /*
507  * Insert an XLOG record having the specified RMID and info bytes,
508  * with the body of the record being the data chunk(s) described by
509  * the rdata chain (see xlog.h for notes about rdata).
510  *
511  * Returns XLOG pointer to end of record (beginning of next record).
512  * This can be used as LSN for data pages affected by the logged action.
513  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
514  * before the data page can be written out.  This implements the basic
515  * WAL rule "write the log before the data".)
516  *
517  * NB: this routine feels free to scribble on the XLogRecData structs,
518  * though not on the data they reference.  This is OK since the XLogRecData
519  * structs are always just temporaries in the calling code.
520  */
521 XLogRecPtr
522 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
523 {
524         XLogCtlInsert *Insert = &XLogCtl->Insert;
525         XLogRecord *record;
526         XLogContRecord *contrecord;
527         XLogRecPtr      RecPtr;
528         XLogRecPtr      WriteRqst;
529         uint32          freespace;
530         int                     curridx;
531         XLogRecData *rdt;
532         Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
533         bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
534         BkpBlock        dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
535         XLogRecPtr      dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
536         XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
537         XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
538         XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
539         pg_crc32        rdata_crc;
540         uint32          len,
541                                 write_len;
542         unsigned        i;
543         XLogwrtRqst LogwrtRqst;
544         bool            updrqst;
545         bool            doPageWrites;
546         bool            no_tran = (rmid == RM_XLOG_ID) ? true : false;
547
548         if (info & XLR_INFO_MASK)
549         {
550                 if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
551                         elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK));
552                 no_tran = true;
553                 info &= ~XLR_INFO_MASK;
554         }
555
556         /*
557          * In bootstrap mode, we don't actually log anything but XLOG resources;
558          * return a phony record pointer.
559          */
560         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
561         {
562                 RecPtr.xlogid = 0;
563                 RecPtr.xrecoff = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
564                 return RecPtr;
565         }
566
567         /*
568          * Here we scan the rdata chain, determine which buffers must be backed
569          * up, and compute the CRC values for the data.  Note that the record
570          * header isn't added into the CRC initially since we don't know the final
571          * length or info bits quite yet.  Thus, the CRC will represent the CRC of
572          * the whole record in the order "rdata, then backup blocks, then record
573          * header".
574          *
575          * We may have to loop back to here if a race condition is detected below.
576          * We could prevent the race by doing all this work while holding the
577          * insert lock, but it seems better to avoid doing CRC calculations while
578          * holding the lock.  This means we have to be careful about modifying the
579          * rdata chain until we know we aren't going to loop back again.  The only
580          * change we allow ourselves to make earlier is to set rdt->data = NULL in
581          * chain items we have decided we will have to back up the whole buffer
582          * for.  This is OK because we will certainly decide the same thing again
583          * for those items if we do it over; doing it here saves an extra pass
584          * over the chain later.
585          */
586 begin:;
587         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
588         {
589                 dtbuf[i] = InvalidBuffer;
590                 dtbuf_bkp[i] = false;
591         }
592
593         /*
594          * Decide if we need to do full-page writes in this XLOG record: true if
595          * full_page_writes is on or we have a PITR request for it.  Since we
596          * don't yet have the insert lock, forcePageWrites could change under us,
597          * but we'll recheck it once we have the lock.
598          */
599         doPageWrites = fullPageWrites || Insert->forcePageWrites;
600
601         INIT_CRC32(rdata_crc);
602         len = 0;
603         for (rdt = rdata;;)
604         {
605                 if (rdt->buffer == InvalidBuffer)
606                 {
607                         /* Simple data, just include it */
608                         len += rdt->len;
609                         COMP_CRC32(rdata_crc, rdt->data, rdt->len);
610                 }
611                 else
612                 {
613                         /* Find info for buffer */
614                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
615                         {
616                                 if (rdt->buffer == dtbuf[i])
617                                 {
618                                         /* Buffer already referenced by earlier chain item */
619                                         if (dtbuf_bkp[i])
620                                                 rdt->data = NULL;
621                                         else if (rdt->data)
622                                         {
623                                                 len += rdt->len;
624                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
625                                         }
626                                         break;
627                                 }
628                                 if (dtbuf[i] == InvalidBuffer)
629                                 {
630                                         /* OK, put it in this slot */
631                                         dtbuf[i] = rdt->buffer;
632                                         if (XLogCheckBuffer(rdt, doPageWrites,
633                                                                                 &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
634                                         {
635                                                 dtbuf_bkp[i] = true;
636                                                 rdt->data = NULL;
637                                         }
638                                         else if (rdt->data)
639                                         {
640                                                 len += rdt->len;
641                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
642                                         }
643                                         break;
644                                 }
645                         }
646                         if (i >= XLR_MAX_BKP_BLOCKS)
647                                 elog(PANIC, "can backup at most %d blocks per xlog record",
648                                          XLR_MAX_BKP_BLOCKS);
649                 }
650                 /* Break out of loop when rdt points to last chain item */
651                 if (rdt->next == NULL)
652                         break;
653                 rdt = rdt->next;
654         }
655
656         /*
657          * Now add the backup block headers and data into the CRC
658          */
659         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
660         {
661                 if (dtbuf_bkp[i])
662                 {
663                         BkpBlock   *bkpb = &(dtbuf_xlg[i]);
664                         char       *page;
665
666                         COMP_CRC32(rdata_crc,
667                                            (char *) bkpb,
668                                            sizeof(BkpBlock));
669                         page = (char *) BufferGetBlock(dtbuf[i]);
670                         if (bkpb->hole_length == 0)
671                         {
672                                 COMP_CRC32(rdata_crc,
673                                                    page,
674                                                    BLCKSZ);
675                         }
676                         else
677                         {
678                                 /* must skip the hole */
679                                 COMP_CRC32(rdata_crc,
680                                                    page,
681                                                    bkpb->hole_offset);
682                                 COMP_CRC32(rdata_crc,
683                                                    page + (bkpb->hole_offset + bkpb->hole_length),
684                                                    BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
685                         }
686                 }
687         }
688
689         /*
690          * NOTE: the test for len == 0 here is somewhat fishy, since in theory all
691          * of the rmgr data might have been suppressed in favor of backup blocks.
692          * Currently, all callers of XLogInsert provide at least some
693          * not-in-a-buffer data and so len == 0 should never happen, but that may
694          * not be true forever.  If you need to remove the len == 0 check, also
695          * remove the check for xl_len == 0 in ReadRecord, below.
696          */
697         if (len == 0)
698                 elog(PANIC, "invalid xlog record length %u", len);
699
700         START_CRIT_SECTION();
701
702         /* update LogwrtResult before doing cache fill check */
703         {
704                 /* use volatile pointer to prevent code rearrangement */
705                 volatile XLogCtlData *xlogctl = XLogCtl;
706
707                 SpinLockAcquire(&xlogctl->info_lck);
708                 LogwrtRqst = xlogctl->LogwrtRqst;
709                 LogwrtResult = xlogctl->LogwrtResult;
710                 SpinLockRelease(&xlogctl->info_lck);
711         }
712
713         /*
714          * If cache is half filled then try to acquire write lock and do
715          * XLogWrite. Ignore any fractional blocks in performing this check.
716          */
717         LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % XLOG_BLCKSZ;
718         if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
719                 (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
720                  XLogCtl->XLogCacheByte / 2))
721         {
722                 if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
723                 {
724                         /*
725                          * Since the amount of data we write here is completely optional
726                          * anyway, tell XLogWrite it can be "flexible" and stop at a
727                          * convenient boundary.  This allows writes triggered by this
728                          * mechanism to synchronize with the cache boundaries, so that in
729                          * a long transaction we'll basically dump alternating halves of
730                          * the buffer array.
731                          */
732                         LogwrtResult = XLogCtl->Write.LogwrtResult;
733                         if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
734                                 XLogWrite(LogwrtRqst, true);
735                         LWLockRelease(WALWriteLock);
736                 }
737         }
738
739         /* Now wait to get insert lock */
740         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
741
742         /*
743          * Check to see if my RedoRecPtr is out of date.  If so, may have to go
744          * back and recompute everything.  This can only happen just after a
745          * checkpoint, so it's better to be slow in this case and fast otherwise.
746          *
747          * If we aren't doing full-page writes then RedoRecPtr doesn't actually
748          * affect the contents of the XLOG record, so we'll update our local
749          * copy but not force a recomputation.
750          */
751         if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
752         {
753                 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
754                 RedoRecPtr = Insert->RedoRecPtr;
755
756                 if (doPageWrites)
757                 {
758                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
759                         {
760                                 if (dtbuf[i] == InvalidBuffer)
761                                         continue;
762                                 if (dtbuf_bkp[i] == false &&
763                                         XLByteLE(dtbuf_lsn[i], RedoRecPtr))
764                                 {
765                                         /*
766                                          * Oops, this buffer now needs to be backed up, but we
767                                          * didn't think so above.  Start over.
768                                          */
769                                         LWLockRelease(WALInsertLock);
770                                         END_CRIT_SECTION();
771                                         goto begin;
772                                 }
773                         }
774                 }
775         }
776
777         /*
778          * Also check to see if forcePageWrites was just turned on; if we
779          * weren't already doing full-page writes then go back and recompute.
780          * (If it was just turned off, we could recompute the record without
781          * full pages, but we choose not to bother.)
782          */
783         if (Insert->forcePageWrites && !doPageWrites)
784         {
785                 /* Oops, must redo it with full-page data */
786                 LWLockRelease(WALInsertLock);
787                 END_CRIT_SECTION();
788                 goto begin;
789         }
790
791         /*
792          * Make additional rdata chain entries for the backup blocks, so that we
793          * don't need to special-case them in the write loop.  Note that we have
794          * now irrevocably changed the input rdata chain.  At the exit of this
795          * loop, write_len includes the backup block data.
796          *
797          * Also set the appropriate info bits to show which buffers were backed
798          * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
799          * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
800          */
801         write_len = len;
802         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
803         {
804                 BkpBlock   *bkpb;
805                 char       *page;
806
807                 if (!dtbuf_bkp[i])
808                         continue;
809
810                 info |= XLR_SET_BKP_BLOCK(i);
811
812                 bkpb = &(dtbuf_xlg[i]);
813                 page = (char *) BufferGetBlock(dtbuf[i]);
814
815                 rdt->next = &(dtbuf_rdt1[i]);
816                 rdt = rdt->next;
817
818                 rdt->data = (char *) bkpb;
819                 rdt->len = sizeof(BkpBlock);
820                 write_len += sizeof(BkpBlock);
821
822                 rdt->next = &(dtbuf_rdt2[i]);
823                 rdt = rdt->next;
824
825                 if (bkpb->hole_length == 0)
826                 {
827                         rdt->data = page;
828                         rdt->len = BLCKSZ;
829                         write_len += BLCKSZ;
830                         rdt->next = NULL;
831                 }
832                 else
833                 {
834                         /* must skip the hole */
835                         rdt->data = page;
836                         rdt->len = bkpb->hole_offset;
837                         write_len += bkpb->hole_offset;
838
839                         rdt->next = &(dtbuf_rdt3[i]);
840                         rdt = rdt->next;
841
842                         rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
843                         rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
844                         write_len += rdt->len;
845                         rdt->next = NULL;
846                 }
847         }
848
849         /*
850          * If there isn't enough space on the current XLOG page for a record
851          * header, advance to the next page (leaving the unused space as zeroes).
852          */
853         updrqst = false;
854         freespace = INSERT_FREESPACE(Insert);
855         if (freespace < SizeOfXLogRecord)
856         {
857                 updrqst = AdvanceXLInsertBuffer();
858                 freespace = INSERT_FREESPACE(Insert);
859         }
860
861         curridx = Insert->curridx;
862         record = (XLogRecord *) Insert->currpos;
863
864         /* Insert record header */
865
866         record->xl_prev = Insert->PrevRecord;
867         record->xl_xid = GetCurrentTransactionIdIfAny();
868         record->xl_tot_len = SizeOfXLogRecord + write_len;
869         record->xl_len = len;           /* doesn't include backup blocks */
870         record->xl_info = info;
871         record->xl_rmid = rmid;
872
873         /* Now we can finish computing the record's CRC */
874         COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
875                            SizeOfXLogRecord - sizeof(pg_crc32));
876         FIN_CRC32(rdata_crc);
877         record->xl_crc = rdata_crc;
878
879         /* Compute record's XLOG location */
880         INSERT_RECPTR(RecPtr, Insert, curridx);
881
882 #ifdef WAL_DEBUG
883         if (XLOG_DEBUG)
884         {
885                 StringInfoData  buf;
886
887                 initStringInfo(&buf);
888                 appendStringInfo(&buf, "INSERT @ %X/%X: ", 
889                                                         RecPtr.xlogid, RecPtr.xrecoff);
890                 xlog_outrec(&buf, record);
891                 if (rdata->data != NULL)
892                 {
893                         appendStringInfo(&buf, " - ");
894                         RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
895                 }
896                 elog(LOG, "%s", buf.data);
897                 pfree(buf.data);
898         }
899 #endif
900
901         /* Record begin of record in appropriate places */
902         if (!no_tran)
903                 MyLastRecPtr = RecPtr;
904         ProcLastRecPtr = RecPtr;
905         Insert->PrevRecord = RecPtr;
906         MyXactMadeXLogEntry = true;
907
908         Insert->currpos += SizeOfXLogRecord;
909         freespace -= SizeOfXLogRecord;
910
911         /*
912          * Append the data, including backup blocks if any
913          */
914         while (write_len)
915         {
916                 while (rdata->data == NULL)
917                         rdata = rdata->next;
918
919                 if (freespace > 0)
920                 {
921                         if (rdata->len > freespace)
922                         {
923                                 memcpy(Insert->currpos, rdata->data, freespace);
924                                 rdata->data += freespace;
925                                 rdata->len -= freespace;
926                                 write_len -= freespace;
927                         }
928                         else
929                         {
930                                 memcpy(Insert->currpos, rdata->data, rdata->len);
931                                 freespace -= rdata->len;
932                                 write_len -= rdata->len;
933                                 Insert->currpos += rdata->len;
934                                 rdata = rdata->next;
935                                 continue;
936                         }
937                 }
938
939                 /* Use next buffer */
940                 updrqst = AdvanceXLInsertBuffer();
941                 curridx = Insert->curridx;
942                 /* Insert cont-record header */
943                 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
944                 contrecord = (XLogContRecord *) Insert->currpos;
945                 contrecord->xl_rem_len = write_len;
946                 Insert->currpos += SizeOfXLogContRecord;
947                 freespace = INSERT_FREESPACE(Insert);
948         }
949
950         /* Ensure next record will be properly aligned */
951         Insert->currpos = (char *) Insert->currpage +
952                 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
953         freespace = INSERT_FREESPACE(Insert);
954
955         /*
956          * The recptr I return is the beginning of the *next* record. This will be
957          * stored as LSN for changed data pages...
958          */
959         INSERT_RECPTR(RecPtr, Insert, curridx);
960
961         /* Need to update shared LogwrtRqst if some block was filled up */
962         if (freespace < SizeOfXLogRecord)
963                 updrqst = true;                 /* curridx is filled and available for writing
964                                                                  * out */
965         else
966                 curridx = PrevBufIdx(curridx);
967         WriteRqst = XLogCtl->xlblocks[curridx];
968
969         LWLockRelease(WALInsertLock);
970
971         if (updrqst)
972         {
973                 /* use volatile pointer to prevent code rearrangement */
974                 volatile XLogCtlData *xlogctl = XLogCtl;
975
976                 SpinLockAcquire(&xlogctl->info_lck);
977                 /* advance global request to include new block(s) */
978                 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
979                         xlogctl->LogwrtRqst.Write = WriteRqst;
980                 /* update local result copy while I have the chance */
981                 LogwrtResult = xlogctl->LogwrtResult;
982                 SpinLockRelease(&xlogctl->info_lck);
983         }
984
985         ProcLastRecEnd = RecPtr;
986
987         END_CRIT_SECTION();
988
989         return RecPtr;
990 }
991
992 /*
993  * Determine whether the buffer referenced by an XLogRecData item has to
994  * be backed up, and if so fill a BkpBlock struct for it.  In any case
995  * save the buffer's LSN at *lsn.
996  */
997 static bool
998 XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
999                                 XLogRecPtr *lsn, BkpBlock *bkpb)
1000 {
1001         PageHeader      page;
1002
1003         page = (PageHeader) BufferGetBlock(rdata->buffer);
1004
1005         /*
1006          * XXX We assume page LSN is first data on *every* page that can be passed
1007          * to XLogInsert, whether it otherwise has the standard page layout or
1008          * not.
1009          */
1010         *lsn = page->pd_lsn;
1011
1012         if (doPageWrites &&
1013                 XLByteLE(page->pd_lsn, RedoRecPtr))
1014         {
1015                 /*
1016                  * The page needs to be backed up, so set up *bkpb
1017                  */
1018                 bkpb->node = BufferGetFileNode(rdata->buffer);
1019                 bkpb->block = BufferGetBlockNumber(rdata->buffer);
1020
1021                 if (rdata->buffer_std)
1022                 {
1023                         /* Assume we can omit data between pd_lower and pd_upper */
1024                         uint16          lower = page->pd_lower;
1025                         uint16          upper = page->pd_upper;
1026
1027                         if (lower >= SizeOfPageHeaderData &&
1028                                 upper > lower &&
1029                                 upper <= BLCKSZ)
1030                         {
1031                                 bkpb->hole_offset = lower;
1032                                 bkpb->hole_length = upper - lower;
1033                         }
1034                         else
1035                         {
1036                                 /* No "hole" to compress out */
1037                                 bkpb->hole_offset = 0;
1038                                 bkpb->hole_length = 0;
1039                         }
1040                 }
1041                 else
1042                 {
1043                         /* Not a standard page header, don't try to eliminate "hole" */
1044                         bkpb->hole_offset = 0;
1045                         bkpb->hole_length = 0;
1046                 }
1047
1048                 return true;                    /* buffer requires backup */
1049         }
1050
1051         return false;                           /* buffer does not need to be backed up */
1052 }
1053
1054 /*
1055  * XLogArchiveNotify
1056  *
1057  * Create an archive notification file
1058  *
1059  * The name of the notification file is the message that will be picked up
1060  * by the archiver, e.g. we write 0000000100000001000000C6.ready
1061  * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
1062  * then when complete, rename it to 0000000100000001000000C6.done
1063  */
1064 static void
1065 XLogArchiveNotify(const char *xlog)
1066 {
1067         char            archiveStatusPath[MAXPGPATH];
1068         FILE       *fd;
1069
1070         /* insert an otherwise empty file called <XLOG>.ready */
1071         StatusFilePath(archiveStatusPath, xlog, ".ready");
1072         fd = AllocateFile(archiveStatusPath, "w");
1073         if (fd == NULL)
1074         {
1075                 ereport(LOG,
1076                                 (errcode_for_file_access(),
1077                                  errmsg("could not create archive status file \"%s\": %m",
1078                                                 archiveStatusPath)));
1079                 return;
1080         }
1081         if (FreeFile(fd))
1082         {
1083                 ereport(LOG,
1084                                 (errcode_for_file_access(),
1085                                  errmsg("could not write archive status file \"%s\": %m",
1086                                                 archiveStatusPath)));
1087                 return;
1088         }
1089
1090         /* Notify archiver that it's got something to do */
1091         if (IsUnderPostmaster)
1092                 SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);
1093 }
1094
1095 /*
1096  * Convenience routine to notify using log/seg representation of filename
1097  */
1098 static void
1099 XLogArchiveNotifySeg(uint32 log, uint32 seg)
1100 {
1101         char            xlog[MAXFNAMELEN];
1102
1103         XLogFileName(xlog, ThisTimeLineID, log, seg);
1104         XLogArchiveNotify(xlog);
1105 }
1106
1107 /*
1108  * XLogArchiveCheckDone
1109  *
1110  * This is called when we are ready to delete or recycle an old XLOG segment
1111  * file or backup history file.  If it is okay to delete it then return true.
1112  * If it is not time to delete it, make sure a .ready file exists, and return
1113  * false.
1114  *
1115  * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists,
1116  * then return false; else create <XLOG>.ready and return false.
1117  *
1118  * The reason we do things this way is so that if the original attempt to
1119  * create <XLOG>.ready fails, we'll retry during subsequent checkpoints.
1120  */
1121 static bool
1122 XLogArchiveCheckDone(const char *xlog)
1123 {
1124         char            archiveStatusPath[MAXPGPATH];
1125         struct stat stat_buf;
1126
1127         /* Always deletable if archiving is off */
1128         if (!XLogArchivingActive())
1129                 return true;
1130
1131         /* First check for .done --- this means archiver is done with it */
1132         StatusFilePath(archiveStatusPath, xlog, ".done");
1133         if (stat(archiveStatusPath, &stat_buf) == 0)
1134                 return true;
1135
1136         /* check for .ready --- this means archiver is still busy with it */
1137         StatusFilePath(archiveStatusPath, xlog, ".ready");
1138         if (stat(archiveStatusPath, &stat_buf) == 0)
1139                 return false;
1140
1141         /* Race condition --- maybe archiver just finished, so recheck */
1142         StatusFilePath(archiveStatusPath, xlog, ".done");
1143         if (stat(archiveStatusPath, &stat_buf) == 0)
1144                 return true;
1145
1146         /* Retry creation of the .ready file */
1147         XLogArchiveNotify(xlog);
1148         return false;
1149 }
1150
1151 /*
1152  * XLogArchiveCleanup
1153  *
1154  * Cleanup archive notification file(s) for a particular xlog segment
1155  */
1156 static void
1157 XLogArchiveCleanup(const char *xlog)
1158 {
1159         char            archiveStatusPath[MAXPGPATH];
1160
1161         /* Remove the .done file */
1162         StatusFilePath(archiveStatusPath, xlog, ".done");
1163         unlink(archiveStatusPath);
1164         /* should we complain about failure? */
1165
1166         /* Remove the .ready file if present --- normally it shouldn't be */
1167         StatusFilePath(archiveStatusPath, xlog, ".ready");
1168         unlink(archiveStatusPath);
1169         /* should we complain about failure? */
1170 }
1171
1172 /*
1173  * Advance the Insert state to the next buffer page, writing out the next
1174  * buffer if it still contains unwritten data.
1175  *
1176  * The global LogwrtRqst.Write pointer needs to be advanced to include the
1177  * just-filled page.  If we can do this for free (without an extra lock),
1178  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
1179  * request update still needs to be done, FALSE if we did it internally.
1180  *
1181  * Must be called with WALInsertLock held.
1182  */
1183 static bool
1184 AdvanceXLInsertBuffer(void)
1185 {
1186         XLogCtlInsert *Insert = &XLogCtl->Insert;
1187         XLogCtlWrite *Write = &XLogCtl->Write;
1188         int                     nextidx = NextBufIdx(Insert->curridx);
1189         bool            update_needed = true;
1190         XLogRecPtr      OldPageRqstPtr;
1191         XLogwrtRqst WriteRqst;
1192         XLogRecPtr      NewPageEndPtr;
1193         XLogPageHeader NewPage;
1194
1195         /* Use Insert->LogwrtResult copy if it's more fresh */
1196         if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
1197                 LogwrtResult = Insert->LogwrtResult;
1198
1199         /*
1200          * Get ending-offset of the buffer page we need to replace (this may be
1201          * zero if the buffer hasn't been used yet).  Fall through if it's already
1202          * written out.
1203          */
1204         OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
1205         if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1206         {
1207                 /* nope, got work to do... */
1208                 XLogRecPtr      FinishedPageRqstPtr;
1209
1210                 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1211
1212                 /* Before waiting, get info_lck and update LogwrtResult */
1213                 {
1214                         /* use volatile pointer to prevent code rearrangement */
1215                         volatile XLogCtlData *xlogctl = XLogCtl;
1216
1217                         SpinLockAcquire(&xlogctl->info_lck);
1218                         if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
1219                                 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
1220                         LogwrtResult = xlogctl->LogwrtResult;
1221                         SpinLockRelease(&xlogctl->info_lck);
1222                 }
1223
1224                 update_needed = false;  /* Did the shared-request update */
1225
1226                 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1227                 {
1228                         /* OK, someone wrote it already */
1229                         Insert->LogwrtResult = LogwrtResult;
1230                 }
1231                 else
1232                 {
1233                         /* Must acquire write lock */
1234                         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1235                         LogwrtResult = Write->LogwrtResult;
1236                         if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1237                         {
1238                                 /* OK, someone wrote it already */
1239                                 LWLockRelease(WALWriteLock);
1240                                 Insert->LogwrtResult = LogwrtResult;
1241                         }
1242                         else
1243                         {
1244                                 /*
1245                                  * Have to write buffers while holding insert lock. This is
1246                                  * not good, so only write as much as we absolutely must.
1247                                  */
1248                                 WriteRqst.Write = OldPageRqstPtr;
1249                                 WriteRqst.Flush.xlogid = 0;
1250                                 WriteRqst.Flush.xrecoff = 0;
1251                                 XLogWrite(WriteRqst, false);
1252                                 LWLockRelease(WALWriteLock);
1253                                 Insert->LogwrtResult = LogwrtResult;
1254                         }
1255                 }
1256         }
1257
1258         /*
1259          * Now the next buffer slot is free and we can set it up to be the next
1260          * output page.
1261          */
1262         NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
1263         if (NewPageEndPtr.xrecoff >= XLogFileSize)
1264         {
1265                 /* crossing a logid boundary */
1266                 NewPageEndPtr.xlogid += 1;
1267                 NewPageEndPtr.xrecoff = XLOG_BLCKSZ;
1268         }
1269         else
1270                 NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
1271         XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
1272         NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
1273
1274         Insert->curridx = nextidx;
1275         Insert->currpage = NewPage;
1276
1277         Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
1278
1279         /*
1280          * Be sure to re-zero the buffer so that bytes beyond what we've written
1281          * will look like zeroes and not valid XLOG records...
1282          */
1283         MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
1284
1285         /*
1286          * Fill the new page's header
1287          */
1288         NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
1289
1290         /* NewPage->xlp_info = 0; */    /* done by memset */
1291         NewPage   ->xlp_tli = ThisTimeLineID;
1292         NewPage   ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
1293         NewPage   ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - XLOG_BLCKSZ;
1294
1295         /*
1296          * If first page of an XLOG segment file, make it a long header.
1297          */
1298         if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
1299         {
1300                 XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
1301
1302                 NewLongPage->xlp_sysid = ControlFile->system_identifier;
1303                 NewLongPage->xlp_seg_size = XLogSegSize;
1304                 NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
1305                 NewPage   ->xlp_info |= XLP_LONG_HEADER;
1306
1307                 Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
1308         }
1309
1310         return update_needed;
1311 }
1312
1313 /*
1314  * Write and/or fsync the log at least as far as WriteRqst indicates.
1315  *
1316  * If flexible == TRUE, we don't have to write as far as WriteRqst, but
1317  * may stop at any convenient boundary (such as a cache or logfile boundary).
1318  * This option allows us to avoid uselessly issuing multiple writes when a
1319  * single one would do.
1320  *
1321  * Must be called with WALWriteLock held.
1322  */
1323 static void
1324 XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
1325 {
1326         XLogCtlWrite *Write = &XLogCtl->Write;
1327         bool            ispartialpage;
1328         bool            finishing_seg;
1329         bool            use_existent;
1330         int                     curridx;
1331         int                     npages;
1332         int                     startidx;
1333         uint32          startoffset;
1334
1335         /* We should always be inside a critical section here */
1336         Assert(CritSectionCount > 0);
1337
1338         /*
1339          * Update local LogwrtResult (caller probably did this already, but...)
1340          */
1341         LogwrtResult = Write->LogwrtResult;
1342
1343         /*
1344          * Since successive pages in the xlog cache are consecutively allocated,
1345          * we can usually gather multiple pages together and issue just one
1346          * write() call.  npages is the number of pages we have determined can be
1347          * written together; startidx is the cache block index of the first one,
1348          * and startoffset is the file offset at which it should go. The latter
1349          * two variables are only valid when npages > 0, but we must initialize
1350          * all of them to keep the compiler quiet.
1351          */
1352         npages = 0;
1353         startidx = 0;
1354         startoffset = 0;
1355
1356         /*
1357          * Within the loop, curridx is the cache block index of the page to
1358          * consider writing.  We advance Write->curridx only after successfully
1359          * writing pages.  (Right now, this refinement is useless since we are
1360          * going to PANIC if any error occurs anyway; but someday it may come in
1361          * useful.)
1362          */
1363         curridx = Write->curridx;
1364
1365         while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1366         {
1367                 /*
1368                  * Make sure we're not ahead of the insert process.  This could happen
1369                  * if we're passed a bogus WriteRqst.Write that is past the end of the
1370                  * last page that's been initialized by AdvanceXLInsertBuffer.
1371                  */
1372                 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
1373                         elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
1374                                  LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1375                                  XLogCtl->xlblocks[curridx].xlogid,
1376                                  XLogCtl->xlblocks[curridx].xrecoff);
1377
1378                 /* Advance LogwrtResult.Write to end of current buffer page */
1379                 LogwrtResult.Write = XLogCtl->xlblocks[curridx];
1380                 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1381
1382                 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1383                 {
1384                         /*
1385                          * Switch to new logfile segment.  We cannot have any pending
1386                          * pages here (since we dump what we have at segment end).
1387                          */
1388                         Assert(npages == 0);
1389                         if (openLogFile >= 0)
1390                                 XLogFileClose();
1391                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1392
1393                         /* create/use new log file */
1394                         use_existent = true;
1395                         openLogFile = XLogFileInit(openLogId, openLogSeg,
1396                                                                            &use_existent, true);
1397                         openLogOff = 0;
1398
1399                         /* update pg_control, unless someone else already did */
1400                         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
1401                         if (ControlFile->logId < openLogId ||
1402                                 (ControlFile->logId == openLogId &&
1403                                  ControlFile->logSeg < openLogSeg + 1))
1404                         {
1405                                 ControlFile->logId = openLogId;
1406                                 ControlFile->logSeg = openLogSeg + 1;
1407                                 ControlFile->time = time(NULL);
1408                                 UpdateControlFile();
1409
1410                                 /*
1411                                  * Signal bgwriter to start a checkpoint if it's been too long
1412                                  * since the last one.  (We look at local copy of RedoRecPtr
1413                                  * which might be a little out of date, but should be close
1414                                  * enough for this purpose.)
1415                                  *
1416                                  * A straight computation of segment number could overflow 32
1417                                  * bits.  Rather than assuming we have working 64-bit
1418                                  * arithmetic, we compare the highest-order bits separately,
1419                                  * and force a checkpoint immediately when they change.
1420                                  */
1421                                 if (IsUnderPostmaster)
1422                                 {
1423                                         uint32          old_segno,
1424                                                                 new_segno;
1425                                         uint32          old_highbits,
1426                                                                 new_highbits;
1427
1428                                         old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
1429                                                 (RedoRecPtr.xrecoff / XLogSegSize);
1430                                         old_highbits = RedoRecPtr.xlogid / XLogSegSize;
1431                                         new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile +
1432                                                 openLogSeg;
1433                                         new_highbits = openLogId / XLogSegSize;
1434                                         if (new_highbits != old_highbits ||
1435                                                 new_segno >= old_segno + (uint32) CheckPointSegments)
1436                                         {
1437 #ifdef WAL_DEBUG
1438                                                 if (XLOG_DEBUG)
1439                                                         elog(LOG, "time for a checkpoint, signaling bgwriter");
1440 #endif
1441                                                 RequestCheckpoint(false, true);
1442                                         }
1443                                 }
1444                         }
1445                         LWLockRelease(ControlFileLock);
1446                 }
1447
1448                 /* Make sure we have the current logfile open */
1449                 if (openLogFile < 0)
1450                 {
1451                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1452                         openLogFile = XLogFileOpen(openLogId, openLogSeg);
1453                         openLogOff = 0;
1454                 }
1455
1456                 /* Add current page to the set of pending pages-to-dump */
1457                 if (npages == 0)
1458                 {
1459                         /* first of group */
1460                         startidx = curridx;
1461                         startoffset = (LogwrtResult.Write.xrecoff - XLOG_BLCKSZ) % XLogSegSize;
1462                 }
1463                 npages++;
1464
1465                 /*
1466                  * Dump the set if this will be the last loop iteration, or if we are
1467                  * at the last page of the cache area (since the next page won't be
1468                  * contiguous in memory), or if we are at the end of the logfile
1469                  * segment.
1470                  */
1471                 finishing_seg = !ispartialpage &&
1472                         (startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
1473
1474                 if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) ||
1475                         curridx == XLogCtl->XLogCacheBlck ||
1476                         finishing_seg)
1477                 {
1478                         char       *from;
1479                         Size            nbytes;
1480
1481                         /* Need to seek in the file? */
1482                         if (openLogOff != startoffset)
1483                         {
1484                                 if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
1485                                         ereport(PANIC,
1486                                                         (errcode_for_file_access(),
1487                                                          errmsg("could not seek in log file %u, "
1488                                                                         "segment %u to offset %u: %m",
1489                                                                         openLogId, openLogSeg, startoffset)));
1490                                 openLogOff = startoffset;
1491                         }
1492
1493                         /* OK to write the page(s) */
1494                         from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
1495                         nbytes = npages * (Size) XLOG_BLCKSZ;
1496                         errno = 0;
1497                         if (write(openLogFile, from, nbytes) != nbytes)
1498                         {
1499                                 /* if write didn't set errno, assume no disk space */
1500                                 if (errno == 0)
1501                                         errno = ENOSPC;
1502                                 ereport(PANIC,
1503                                                 (errcode_for_file_access(),
1504                                                  errmsg("could not write to log file %u, segment %u "
1505                                                                 "at offset %u, length %lu: %m",
1506                                                                 openLogId, openLogSeg,
1507                                                                 openLogOff, (unsigned long) nbytes)));
1508                         }
1509
1510                         /* Update state for write */
1511                         openLogOff += nbytes;
1512                         Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
1513                         npages = 0;
1514
1515                         /*
1516                          * If we just wrote the whole last page of a logfile segment,
1517                          * fsync the segment immediately.  This avoids having to go back
1518                          * and re-open prior segments when an fsync request comes along
1519                          * later. Doing it here ensures that one and only one backend will
1520                          * perform this fsync.
1521                          *
1522                          * This is also the right place to notify the Archiver that the
1523                          * segment is ready to copy to archival storage.
1524                          */
1525                         if (finishing_seg)
1526                         {
1527                                 issue_xlog_fsync();
1528                                 LogwrtResult.Flush = LogwrtResult.Write;                /* end of page */
1529
1530                                 if (XLogArchivingActive())
1531                                         XLogArchiveNotifySeg(openLogId, openLogSeg);
1532                         }
1533                 }
1534
1535                 if (ispartialpage)
1536                 {
1537                         /* Only asked to write a partial page */
1538                         LogwrtResult.Write = WriteRqst.Write;
1539                         break;
1540                 }
1541                 curridx = NextBufIdx(curridx);
1542
1543                 /* If flexible, break out of loop as soon as we wrote something */
1544                 if (flexible && npages == 0)
1545                         break;
1546         }
1547
1548         Assert(npages == 0);
1549         Assert(curridx == Write->curridx);
1550
1551         /*
1552          * If asked to flush, do so
1553          */
1554         if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1555                 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1556         {
1557                 /*
1558                  * Could get here without iterating above loop, in which case we might
1559                  * have no open file or the wrong one.  However, we do not need to
1560                  * fsync more than one file.
1561                  */
1562                 if (sync_method != SYNC_METHOD_OPEN)
1563                 {
1564                         if (openLogFile >= 0 &&
1565                                 !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1566                                 XLogFileClose();
1567                         if (openLogFile < 0)
1568                         {
1569                                 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1570                                 openLogFile = XLogFileOpen(openLogId, openLogSeg);
1571                                 openLogOff = 0;
1572                         }
1573                         issue_xlog_fsync();
1574                 }
1575                 LogwrtResult.Flush = LogwrtResult.Write;
1576         }
1577
1578         /*
1579          * Update shared-memory status
1580          *
1581          * We make sure that the shared 'request' values do not fall behind the
1582          * 'result' values.  This is not absolutely essential, but it saves some
1583          * code in a couple of places.
1584          */
1585         {
1586                 /* use volatile pointer to prevent code rearrangement */
1587                 volatile XLogCtlData *xlogctl = XLogCtl;
1588
1589                 SpinLockAcquire(&xlogctl->info_lck);
1590                 xlogctl->LogwrtResult = LogwrtResult;
1591                 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1592                         xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1593                 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1594                         xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1595                 SpinLockRelease(&xlogctl->info_lck);
1596         }
1597
1598         Write->LogwrtResult = LogwrtResult;
1599 }
1600
1601 /*
1602  * Ensure that all XLOG data through the given position is flushed to disk.
1603  *
1604  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1605  * already held, and we try to avoid acquiring it if possible.
1606  */
1607 void
1608 XLogFlush(XLogRecPtr record)
1609 {
1610         XLogRecPtr      WriteRqstPtr;
1611         XLogwrtRqst WriteRqst;
1612
1613         /* Disabled during REDO */
1614         if (InRedo)
1615                 return;
1616
1617         /* Quick exit if already known flushed */
1618         if (XLByteLE(record, LogwrtResult.Flush))
1619                 return;
1620
1621 #ifdef WAL_DEBUG
1622         if (XLOG_DEBUG)
1623                 elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
1624                          record.xlogid, record.xrecoff,
1625                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1626                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1627 #endif
1628
1629         START_CRIT_SECTION();
1630
1631         /*
1632          * Since fsync is usually a horribly expensive operation, we try to
1633          * piggyback as much data as we can on each fsync: if we see any more data
1634          * entered into the xlog buffer, we'll write and fsync that too, so that
1635          * the final value of LogwrtResult.Flush is as large as possible. This
1636          * gives us some chance of avoiding another fsync immediately after.
1637          */
1638
1639         /* initialize to given target; may increase below */
1640         WriteRqstPtr = record;
1641
1642         /* read LogwrtResult and update local state */
1643         {
1644                 /* use volatile pointer to prevent code rearrangement */
1645                 volatile XLogCtlData *xlogctl = XLogCtl;
1646
1647                 SpinLockAcquire(&xlogctl->info_lck);
1648                 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1649                         WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1650                 LogwrtResult = xlogctl->LogwrtResult;
1651                 SpinLockRelease(&xlogctl->info_lck);
1652         }
1653
1654         /* done already? */
1655         if (!XLByteLE(record, LogwrtResult.Flush))
1656         {
1657                 /* now wait for the write lock */
1658                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1659                 LogwrtResult = XLogCtl->Write.LogwrtResult;
1660                 if (!XLByteLE(record, LogwrtResult.Flush))
1661                 {
1662                         /* try to write/flush later additions to XLOG as well */
1663                         if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
1664                         {
1665                                 XLogCtlInsert *Insert = &XLogCtl->Insert;
1666                                 uint32          freespace = INSERT_FREESPACE(Insert);
1667
1668                                 if (freespace < SizeOfXLogRecord)               /* buffer is full */
1669                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1670                                 else
1671                                 {
1672                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1673                                         WriteRqstPtr.xrecoff -= freespace;
1674                                 }
1675                                 LWLockRelease(WALInsertLock);
1676                                 WriteRqst.Write = WriteRqstPtr;
1677                                 WriteRqst.Flush = WriteRqstPtr;
1678                         }
1679                         else
1680                         {
1681                                 WriteRqst.Write = WriteRqstPtr;
1682                                 WriteRqst.Flush = record;
1683                         }
1684                         XLogWrite(WriteRqst, false);
1685                 }
1686                 LWLockRelease(WALWriteLock);
1687         }
1688
1689         END_CRIT_SECTION();
1690
1691         /*
1692          * If we still haven't flushed to the request point then we have a
1693          * problem; most likely, the requested flush point is past end of XLOG.
1694          * This has been seen to occur when a disk page has a corrupted LSN.
1695          *
1696          * Formerly we treated this as a PANIC condition, but that hurts the
1697          * system's robustness rather than helping it: we do not want to take down
1698          * the whole system due to corruption on one data page.  In particular, if
1699          * the bad page is encountered again during recovery then we would be
1700          * unable to restart the database at all!  (This scenario has actually
1701          * happened in the field several times with 7.1 releases. Note that we
1702          * cannot get here while InRedo is true, but if the bad page is brought in
1703          * and marked dirty during recovery then CreateCheckPoint will try to
1704          * flush it at the end of recovery.)
1705          *
1706          * The current approach is to ERROR under normal conditions, but only
1707          * WARNING during recovery, so that the system can be brought up even if
1708          * there's a corrupt LSN.  Note that for calls from xact.c, the ERROR will
1709          * be promoted to PANIC since xact.c calls this routine inside a critical
1710          * section.  However, calls from bufmgr.c are not within critical sections
1711          * and so we will not force a restart for a bad LSN on a data page.
1712          */
1713         if (XLByteLT(LogwrtResult.Flush, record))
1714                 elog(InRecovery ? WARNING : ERROR,
1715                 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
1716                          record.xlogid, record.xrecoff,
1717                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1718 }
1719
1720 /*
1721  * Create a new XLOG file segment, or open a pre-existing one.
1722  *
1723  * log, seg: identify segment to be created/opened.
1724  *
1725  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1726  * pre-existing file will be deleted).  On return, TRUE if a pre-existing
1727  * file was used.
1728  *
1729  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1730  * place.  This should be TRUE except during bootstrap log creation.  The
1731  * caller must *not* hold the lock at call.
1732  *
1733  * Returns FD of opened file.
1734  *
1735  * Note: errors here are ERROR not PANIC because we might or might not be
1736  * inside a critical section (eg, during checkpoint there is no reason to
1737  * take down the system on failure).  They will promote to PANIC if we are
1738  * in a critical section.
1739  */
1740 static int
1741 XLogFileInit(uint32 log, uint32 seg,
1742                          bool *use_existent, bool use_lock)
1743 {
1744         char            path[MAXPGPATH];
1745         char            tmppath[MAXPGPATH];
1746         char            zbuffer[XLOG_BLCKSZ];
1747         uint32          installed_log;
1748         uint32          installed_seg;
1749         int                     max_advance;
1750         int                     fd;
1751         int                     nbytes;
1752
1753         XLogFilePath(path, ThisTimeLineID, log, seg);
1754
1755         /*
1756          * Try to use existent file (checkpoint maker may have created it already)
1757          */
1758         if (*use_existent)
1759         {
1760                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1761                                                    S_IRUSR | S_IWUSR);
1762                 if (fd < 0)
1763                 {
1764                         if (errno != ENOENT)
1765                                 ereport(ERROR,
1766                                                 (errcode_for_file_access(),
1767                                                  errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1768                                                                 path, log, seg)));
1769                 }
1770                 else
1771                         return fd;
1772         }
1773
1774         /*
1775          * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
1776          * another process is doing the same thing.  If so, we will end up
1777          * pre-creating an extra log segment.  That seems OK, and better than
1778          * holding the lock throughout this lengthy process.
1779          */
1780         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1781
1782         unlink(tmppath);
1783
1784         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1785         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1786                                            S_IRUSR | S_IWUSR);
1787         if (fd < 0)
1788                 ereport(ERROR,
1789                                 (errcode_for_file_access(),
1790                                  errmsg("could not create file \"%s\": %m", tmppath)));
1791
1792         /*
1793          * Zero-fill the file.  We have to do this the hard way to ensure that all
1794          * the file space has really been allocated --- on platforms that allow
1795          * "holes" in files, just seeking to the end doesn't allocate intermediate
1796          * space.  This way, we know that we have all the space and (after the
1797          * fsync below) that all the indirect blocks are down on disk.  Therefore,
1798          * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
1799          * log file.
1800          */
1801         MemSet(zbuffer, 0, sizeof(zbuffer));
1802         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
1803         {
1804                 errno = 0;
1805                 if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
1806                 {
1807                         int                     save_errno = errno;
1808
1809                         /*
1810                          * If we fail to make the file, delete it to release disk space
1811                          */
1812                         unlink(tmppath);
1813                         /* if write didn't set errno, assume problem is no disk space */
1814                         errno = save_errno ? save_errno : ENOSPC;
1815
1816                         ereport(ERROR,
1817                                         (errcode_for_file_access(),
1818                                          errmsg("could not write to file \"%s\": %m", tmppath)));
1819                 }
1820         }
1821
1822         if (pg_fsync(fd) != 0)
1823                 ereport(ERROR,
1824                                 (errcode_for_file_access(),
1825                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1826
1827         if (close(fd))
1828                 ereport(ERROR,
1829                                 (errcode_for_file_access(),
1830                                  errmsg("could not close file \"%s\": %m", tmppath)));
1831
1832         /*
1833          * Now move the segment into place with its final name.
1834          *
1835          * If caller didn't want to use a pre-existing file, get rid of any
1836          * pre-existing file.  Otherwise, cope with possibility that someone else
1837          * has created the file while we were filling ours: if so, use ours to
1838          * pre-create a future log segment.
1839          */
1840         installed_log = log;
1841         installed_seg = seg;
1842         max_advance = XLOGfileslop;
1843         if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
1844                                                                 *use_existent, &max_advance,
1845                                                                 use_lock))
1846         {
1847                 /* No need for any more future segments... */
1848                 unlink(tmppath);
1849         }
1850
1851         /* Set flag to tell caller there was no existent file */
1852         *use_existent = false;
1853
1854         /* Now open original target segment (might not be file I just made) */
1855         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1856                                            S_IRUSR | S_IWUSR);
1857         if (fd < 0)
1858                 ereport(ERROR,
1859                                 (errcode_for_file_access(),
1860                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1861                                   path, log, seg)));
1862
1863         return fd;
1864 }
1865
1866 /*
1867  * Create a new XLOG file segment by copying a pre-existing one.
1868  *
1869  * log, seg: identify segment to be created.
1870  *
1871  * srcTLI, srclog, srcseg: identify segment to be copied (could be from
1872  *              a different timeline)
1873  *
1874  * Currently this is only used during recovery, and so there are no locking
1875  * considerations.      But we should be just as tense as XLogFileInit to avoid
1876  * emplacing a bogus file.
1877  */
1878 static void
1879 XLogFileCopy(uint32 log, uint32 seg,
1880                          TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
1881 {
1882         char            path[MAXPGPATH];
1883         char            tmppath[MAXPGPATH];
1884         char            buffer[XLOG_BLCKSZ];
1885         int                     srcfd;
1886         int                     fd;
1887         int                     nbytes;
1888
1889         /*
1890          * Open the source file
1891          */
1892         XLogFilePath(path, srcTLI, srclog, srcseg);
1893         srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
1894         if (srcfd < 0)
1895                 ereport(ERROR,
1896                                 (errcode_for_file_access(),
1897                                  errmsg("could not open file \"%s\": %m", path)));
1898
1899         /*
1900          * Copy into a temp file name.
1901          */
1902         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1903
1904         unlink(tmppath);
1905
1906         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1907         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1908                                            S_IRUSR | S_IWUSR);
1909         if (fd < 0)
1910                 ereport(ERROR,
1911                                 (errcode_for_file_access(),
1912                                  errmsg("could not create file \"%s\": %m", tmppath)));
1913
1914         /*
1915          * Do the data copying.
1916          */
1917         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
1918         {
1919                 errno = 0;
1920                 if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
1921                 {
1922                         if (errno != 0)
1923                                 ereport(ERROR,
1924                                                 (errcode_for_file_access(),
1925                                                  errmsg("could not read file \"%s\": %m", path)));
1926                         else
1927                                 ereport(ERROR,
1928                                                 (errmsg("not enough data in file \"%s\"", path)));
1929                 }
1930                 errno = 0;
1931                 if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
1932                 {
1933                         int                     save_errno = errno;
1934
1935                         /*
1936                          * If we fail to make the file, delete it to release disk space
1937                          */
1938                         unlink(tmppath);
1939                         /* if write didn't set errno, assume problem is no disk space */
1940                         errno = save_errno ? save_errno : ENOSPC;
1941
1942                         ereport(ERROR,
1943                                         (errcode_for_file_access(),
1944                                          errmsg("could not write to file \"%s\": %m", tmppath)));
1945                 }
1946         }
1947
1948         if (pg_fsync(fd) != 0)
1949                 ereport(ERROR,
1950                                 (errcode_for_file_access(),
1951                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
1952
1953         if (close(fd))
1954                 ereport(ERROR,
1955                                 (errcode_for_file_access(),
1956                                  errmsg("could not close file \"%s\": %m", tmppath)));
1957
1958         close(srcfd);
1959
1960         /*
1961          * Now move the segment into place with its final name.
1962          */
1963         if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
1964                 elog(ERROR, "InstallXLogFileSegment should not have failed");
1965 }
1966
1967 /*
1968  * Install a new XLOG segment file as a current or future log segment.
1969  *
1970  * This is used both to install a newly-created segment (which has a temp
1971  * filename while it's being created) and to recycle an old segment.
1972  *
1973  * *log, *seg: identify segment to install as (or first possible target).
1974  * When find_free is TRUE, these are modified on return to indicate the
1975  * actual installation location or last segment searched.
1976  *
1977  * tmppath: initial name of file to install.  It will be renamed into place.
1978  *
1979  * find_free: if TRUE, install the new segment at the first empty log/seg
1980  * number at or after the passed numbers.  If FALSE, install the new segment
1981  * exactly where specified, deleting any existing segment file there.
1982  *
1983  * *max_advance: maximum number of log/seg slots to advance past the starting
1984  * point.  Fail if no free slot is found in this range.  On return, reduced
1985  * by the number of slots skipped over.  (Irrelevant, and may be NULL,
1986  * when find_free is FALSE.)
1987  *
1988  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1989  * place.  This should be TRUE except during bootstrap log creation.  The
1990  * caller must *not* hold the lock at call.
1991  *
1992  * Returns TRUE if file installed, FALSE if not installed because of
1993  * exceeding max_advance limit.  (Any other kind of failure causes ereport().)
1994  */
1995 static bool
1996 InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
1997                                            bool find_free, int *max_advance,
1998                                            bool use_lock)
1999 {
2000         char            path[MAXPGPATH];
2001         struct stat stat_buf;
2002
2003         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2004
2005         /*
2006          * We want to be sure that only one process does this at a time.
2007          */
2008         if (use_lock)
2009                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
2010
2011         if (!find_free)
2012         {
2013                 /* Force installation: get rid of any pre-existing segment file */
2014                 unlink(path);
2015         }
2016         else
2017         {
2018                 /* Find a free slot to put it in */
2019                 while (stat(path, &stat_buf) == 0)
2020                 {
2021                         if (*max_advance <= 0)
2022                         {
2023                                 /* Failed to find a free slot within specified range */
2024                                 if (use_lock)
2025                                         LWLockRelease(ControlFileLock);
2026                                 return false;
2027                         }
2028                         NextLogSeg(*log, *seg);
2029                         (*max_advance)--;
2030                         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2031                 }
2032         }
2033
2034         /*
2035          * Prefer link() to rename() here just to be really sure that we don't
2036          * overwrite an existing logfile.  However, there shouldn't be one, so
2037          * rename() is an acceptable substitute except for the truly paranoid.
2038          */
2039 #if HAVE_WORKING_LINK
2040         if (link(tmppath, path) < 0)
2041                 ereport(ERROR,
2042                                 (errcode_for_file_access(),
2043                                  errmsg("could not link file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2044                                                 tmppath, path, *log, *seg)));
2045         unlink(tmppath);
2046 #else
2047         if (rename(tmppath, path) < 0)
2048                 ereport(ERROR,
2049                                 (errcode_for_file_access(),
2050                                  errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2051                                                 tmppath, path, *log, *seg)));
2052 #endif
2053
2054         if (use_lock)
2055                 LWLockRelease(ControlFileLock);
2056
2057         return true;
2058 }
2059
2060 /*
2061  * Open a pre-existing logfile segment for writing.
2062  */
2063 static int
2064 XLogFileOpen(uint32 log, uint32 seg)
2065 {
2066         char            path[MAXPGPATH];
2067         int                     fd;
2068
2069         XLogFilePath(path, ThisTimeLineID, log, seg);
2070
2071         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
2072                                            S_IRUSR | S_IWUSR);
2073         if (fd < 0)
2074                 ereport(PANIC,
2075                                 (errcode_for_file_access(),
2076                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2077                                   path, log, seg)));
2078
2079         return fd;
2080 }
2081
2082 /*
2083  * Open a logfile segment for reading (during recovery).
2084  */
2085 static int
2086 XLogFileRead(uint32 log, uint32 seg, int emode)
2087 {
2088         char            path[MAXPGPATH];
2089         char            xlogfname[MAXFNAMELEN];
2090         ListCell   *cell;
2091         int                     fd;
2092
2093         /*
2094          * Loop looking for a suitable timeline ID: we might need to read any of
2095          * the timelines listed in expectedTLIs.
2096          *
2097          * We expect curFileTLI on entry to be the TLI of the preceding file in
2098          * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
2099          * to go backwards; this prevents us from picking up the wrong file when a
2100          * parent timeline extends to higher segment numbers than the child we
2101          * want to read.
2102          */
2103         foreach(cell, expectedTLIs)
2104         {
2105                 TimeLineID      tli = (TimeLineID) lfirst_int(cell);
2106
2107                 if (tli < curFileTLI)
2108                         break;                          /* don't bother looking at too-old TLIs */
2109
2110                 if (InArchiveRecovery)
2111                 {
2112                         XLogFileName(xlogfname, tli, log, seg);
2113                         restoredFromArchive = RestoreArchivedFile(path, xlogfname,
2114                                                                                                           "RECOVERYXLOG",
2115                                                                                                           XLogSegSize);
2116                 }
2117                 else
2118                         XLogFilePath(path, tli, log, seg);
2119
2120                 fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2121                 if (fd >= 0)
2122                 {
2123                         /* Success! */
2124                         curFileTLI = tli;
2125                         return fd;
2126                 }
2127                 if (errno != ENOENT)    /* unexpected failure? */
2128                         ereport(PANIC,
2129                                         (errcode_for_file_access(),
2130                         errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2131                                    path, log, seg)));
2132         }
2133
2134         /* Couldn't find it.  For simplicity, complain about front timeline */
2135         XLogFilePath(path, recoveryTargetTLI, log, seg);
2136         errno = ENOENT;
2137         ereport(emode,
2138                         (errcode_for_file_access(),
2139                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2140                                   path, log, seg)));
2141         return -1;
2142 }
2143
2144 /*
2145  * Close the current logfile segment for writing.
2146  */
2147 static void
2148 XLogFileClose(void)
2149 {
2150         Assert(openLogFile >= 0);
2151
2152         /*
2153          * posix_fadvise is problematic on many platforms: on older x86 Linux
2154          * it just dumps core, and there are reports of problems on PPC platforms
2155          * as well.  The following is therefore disabled for the time being.
2156          * We could consider some kind of configure test to see if it's safe to
2157          * use, but since we lack hard evidence that there's any useful performance
2158          * gain to be had, spending time on that seems unprofitable for now.
2159          */
2160 #ifdef NOT_USED
2161
2162         /*
2163          * WAL segment files will not be re-read in normal operation, so we advise
2164          * OS to release any cached pages.  But do not do so if WAL archiving is
2165          * active, because archiver process could use the cache to read the WAL
2166          * segment.
2167          *
2168          * While O_DIRECT works for O_SYNC, posix_fadvise() works for fsync()
2169          * and O_SYNC, and some platforms only have posix_fadvise().
2170          */
2171 #if defined(HAVE_DECL_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
2172         if (!XLogArchivingActive())
2173                 posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
2174 #endif
2175
2176 #endif /* NOT_USED */
2177
2178         if (close(openLogFile))
2179                 ereport(PANIC,
2180                         (errcode_for_file_access(),
2181                         errmsg("could not close log file %u, segment %u: %m",
2182                                    openLogId, openLogSeg)));
2183         openLogFile = -1;
2184 }
2185
2186 /*
2187  * Attempt to retrieve the specified file from off-line archival storage.
2188  * If successful, fill "path" with its complete path (note that this will be
2189  * a temp file name that doesn't follow the normal naming convention), and
2190  * return TRUE.
2191  *
2192  * If not successful, fill "path" with the name of the normal on-line file
2193  * (which may or may not actually exist, but we'll try to use it), and return
2194  * FALSE.
2195  *
2196  * For fixed-size files, the caller may pass the expected size as an
2197  * additional crosscheck on successful recovery.  If the file size is not
2198  * known, set expectedSize = 0.
2199  */
2200 static bool
2201 RestoreArchivedFile(char *path, const char *xlogfname,
2202                                         const char *recovername, off_t expectedSize)
2203 {
2204         char            xlogpath[MAXPGPATH];
2205         char            xlogRestoreCmd[MAXPGPATH];
2206         char       *dp;
2207         char       *endp;
2208         const char *sp;
2209         int                     rc;
2210         struct stat stat_buf;
2211
2212         /*
2213          * When doing archive recovery, we always prefer an archived log file even
2214          * if a file of the same name exists in XLOGDIR.  The reason is that the
2215          * file in XLOGDIR could be an old, un-filled or partly-filled version
2216          * that was copied and restored as part of backing up $PGDATA.
2217          *
2218          * We could try to optimize this slightly by checking the local copy
2219          * lastchange timestamp against the archived copy, but we have no API to
2220          * do this, nor can we guarantee that the lastchange timestamp was
2221          * preserved correctly when we copied to archive. Our aim is robustness,
2222          * so we elect not to do this.
2223          *
2224          * If we cannot obtain the log file from the archive, however, we will try
2225          * to use the XLOGDIR file if it exists.  This is so that we can make use
2226          * of log segments that weren't yet transferred to the archive.
2227          *
2228          * Notice that we don't actually overwrite any files when we copy back
2229          * from archive because the recoveryRestoreCommand may inadvertently
2230          * restore inappropriate xlogs, or they may be corrupt, so we may wish to
2231          * fallback to the segments remaining in current XLOGDIR later. The
2232          * copy-from-archive filename is always the same, ensuring that we don't
2233          * run out of disk space on long recoveries.
2234          */
2235         snprintf(xlogpath, MAXPGPATH, XLOGDIR "/%s", recovername);
2236
2237         /*
2238          * Make sure there is no existing file named recovername.
2239          */
2240         if (stat(xlogpath, &stat_buf) != 0)
2241         {
2242                 if (errno != ENOENT)
2243                         ereport(FATAL,
2244                                         (errcode_for_file_access(),
2245                                          errmsg("could not stat file \"%s\": %m",
2246                                                         xlogpath)));
2247         }
2248         else
2249         {
2250                 if (unlink(xlogpath) != 0)
2251                         ereport(FATAL,
2252                                         (errcode_for_file_access(),
2253                                          errmsg("could not remove file \"%s\": %m",
2254                                                         xlogpath)));
2255         }
2256
2257         /*
2258          * construct the command to be executed
2259          */
2260         dp = xlogRestoreCmd;
2261         endp = xlogRestoreCmd + MAXPGPATH - 1;
2262         *endp = '\0';
2263
2264         for (sp = recoveryRestoreCommand; *sp; sp++)
2265         {
2266                 if (*sp == '%')
2267                 {
2268                         switch (sp[1])
2269                         {
2270                                 case 'p':
2271                                         /* %p: full path of target file */
2272                                         sp++;
2273                                         StrNCpy(dp, xlogpath, endp - dp);
2274                                         make_native_path(dp);
2275                                         dp += strlen(dp);
2276                                         break;
2277                                 case 'f':
2278                                         /* %f: filename of desired file */
2279                                         sp++;
2280                                         StrNCpy(dp, xlogfname, endp - dp);
2281                                         dp += strlen(dp);
2282                                         break;
2283                                 case '%':
2284                                         /* convert %% to a single % */
2285                                         sp++;
2286                                         if (dp < endp)
2287                                                 *dp++ = *sp;
2288                                         break;
2289                                 default:
2290                                         /* otherwise treat the % as not special */
2291                                         if (dp < endp)
2292                                                 *dp++ = *sp;
2293                                         break;
2294                         }
2295                 }
2296                 else
2297                 {
2298                         if (dp < endp)
2299                                 *dp++ = *sp;
2300                 }
2301         }
2302         *dp = '\0';
2303
2304         ereport(DEBUG3,
2305                         (errmsg_internal("executing restore command \"%s\"",
2306                                                          xlogRestoreCmd)));
2307
2308         /*
2309          * Copy xlog from archival storage to XLOGDIR
2310          */
2311         rc = system(xlogRestoreCmd);
2312         if (rc == 0)
2313         {
2314                 /*
2315                  * command apparently succeeded, but let's make sure the file is
2316                  * really there now and has the correct size.
2317                  *
2318                  * XXX I made wrong-size a fatal error to ensure the DBA would notice
2319                  * it, but is that too strong?  We could try to plow ahead with a
2320                  * local copy of the file ... but the problem is that there probably
2321                  * isn't one, and we'd incorrectly conclude we've reached the end of
2322                  * WAL and we're done recovering ...
2323                  */
2324                 if (stat(xlogpath, &stat_buf) == 0)
2325                 {
2326                         if (expectedSize > 0 && stat_buf.st_size != expectedSize)
2327                                 ereport(FATAL,
2328                                                 (errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
2329                                                                 xlogfname,
2330                                                                 (unsigned long) stat_buf.st_size,
2331                                                                 (unsigned long) expectedSize)));
2332                         else
2333                         {
2334                                 ereport(LOG,
2335                                                 (errmsg("restored log file \"%s\" from archive",
2336                                                                 xlogfname)));
2337                                 strcpy(path, xlogpath);
2338                                 return true;
2339                         }
2340                 }
2341                 else
2342                 {
2343                         /* stat failed */
2344                         if (errno != ENOENT)
2345                                 ereport(FATAL,
2346                                                 (errcode_for_file_access(),
2347                                                  errmsg("could not stat file \"%s\": %m",
2348                                                                 xlogpath)));
2349                 }
2350         }
2351
2352         /*
2353          * remember, we rollforward UNTIL the restore fails so failure here is
2354          * just part of the process... that makes it difficult to determine
2355          * whether the restore failed because there isn't an archive to restore,
2356          * or because the administrator has specified the restore program
2357          * incorrectly.  We have to assume the former.
2358          */
2359         ereport(DEBUG2,
2360                 (errmsg("could not restore file \"%s\" from archive: return code %d",
2361                                 xlogfname, rc)));
2362
2363         /*
2364          * if an archived file is not available, there might still be a version of
2365          * this file in XLOGDIR, so return that as the filename to open.
2366          *
2367          * In many recovery scenarios we expect this to fail also, but if so that
2368          * just means we've reached the end of WAL.
2369          */
2370         snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
2371         return false;
2372 }
2373
2374 /*
2375  * Preallocate log files beyond the specified log endpoint, according to
2376  * the XLOGfile user parameter.
2377  */
2378 static int
2379 PreallocXlogFiles(XLogRecPtr endptr)
2380 {
2381         int                     nsegsadded = 0;
2382         uint32          _logId;
2383         uint32          _logSeg;
2384         int                     lf;
2385         bool            use_existent;
2386
2387         XLByteToPrevSeg(endptr, _logId, _logSeg);
2388         if ((endptr.xrecoff - 1) % XLogSegSize >=
2389                 (uint32) (0.75 * XLogSegSize))
2390         {
2391                 NextLogSeg(_logId, _logSeg);
2392                 use_existent = true;
2393                 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
2394                 close(lf);
2395                 if (!use_existent)
2396                         nsegsadded++;
2397         }
2398         return nsegsadded;
2399 }
2400
2401 /*
2402  * Remove or move offline all log files older or equal to passed log/seg#
2403  *
2404  * endptr is current (or recent) end of xlog; this is used to determine
2405  * whether we want to recycle rather than delete no-longer-wanted log files.
2406  */
2407 static void
2408 MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
2409                                 int *nsegsremoved, int *nsegsrecycled)
2410 {
2411         uint32          endlogId;
2412         uint32          endlogSeg;
2413         int                     max_advance;
2414         DIR                *xldir;
2415         struct dirent *xlde;
2416         char            lastoff[MAXFNAMELEN];
2417         char            path[MAXPGPATH];
2418
2419         *nsegsremoved = 0;
2420         *nsegsrecycled = 0;
2421
2422         /*
2423          * Initialize info about where to try to recycle to.  We allow recycling
2424          * segments up to XLOGfileslop segments beyond the current XLOG location.
2425          */
2426         XLByteToPrevSeg(endptr, endlogId, endlogSeg);
2427         max_advance = XLOGfileslop;
2428
2429         xldir = AllocateDir(XLOGDIR);
2430         if (xldir == NULL)
2431                 ereport(ERROR,
2432                                 (errcode_for_file_access(),
2433                                  errmsg("could not open transaction log directory \"%s\": %m",
2434                                                 XLOGDIR)));
2435
2436         XLogFileName(lastoff, ThisTimeLineID, log, seg);
2437
2438         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2439         {
2440                 /*
2441                  * We ignore the timeline part of the XLOG segment identifiers in
2442                  * deciding whether a segment is still needed.  This ensures that we
2443                  * won't prematurely remove a segment from a parent timeline. We could
2444                  * probably be a little more proactive about removing segments of
2445                  * non-parent timelines, but that would be a whole lot more
2446                  * complicated.
2447                  *
2448                  * We use the alphanumeric sorting property of the filenames to decide
2449                  * which ones are earlier than the lastoff segment.
2450                  */
2451                 if (strlen(xlde->d_name) == 24 &&
2452                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2453                         strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
2454                 {
2455                         if (XLogArchiveCheckDone(xlde->d_name))
2456                         {
2457                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2458
2459                                 /*
2460                                  * Before deleting the file, see if it can be recycled as a
2461                                  * future log segment.
2462                                  */
2463                                 if (InstallXLogFileSegment(&endlogId, &endlogSeg, path,
2464                                                                                    true, &max_advance,
2465                                                                                    true))
2466                                 {
2467                                         ereport(DEBUG2,
2468                                                         (errmsg("recycled transaction log file \"%s\"",
2469                                                                         xlde->d_name)));
2470                                         (*nsegsrecycled)++;
2471                                         /* Needn't recheck that slot on future iterations */
2472                                         if (max_advance > 0)
2473                                         {
2474                                                 NextLogSeg(endlogId, endlogSeg);
2475                                                 max_advance--;
2476                                         }
2477                                 }
2478                                 else
2479                                 {
2480                                         /* No need for any more future segments... */
2481                                         ereport(DEBUG2,
2482                                                         (errmsg("removing transaction log file \"%s\"",
2483                                                                         xlde->d_name)));
2484                                         unlink(path);
2485                                         (*nsegsremoved)++;
2486                                 }
2487
2488                                 XLogArchiveCleanup(xlde->d_name);
2489                         }
2490                 }
2491         }
2492
2493         FreeDir(xldir);
2494 }
2495
2496 /*
2497  * Remove previous backup history files.  This also retries creation of
2498  * .ready files for any backup history files for which XLogArchiveNotify
2499  * failed earlier.
2500  */
2501 static void
2502 CleanupBackupHistory(void)
2503 {
2504         DIR                *xldir;
2505         struct dirent *xlde;
2506         char            path[MAXPGPATH];
2507
2508         xldir = AllocateDir(XLOGDIR);
2509         if (xldir == NULL)
2510                 ereport(ERROR,
2511                                 (errcode_for_file_access(),
2512                                  errmsg("could not open transaction log directory \"%s\": %m",
2513                                                 XLOGDIR)));
2514
2515         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2516         {
2517                 if (strlen(xlde->d_name) > 24 &&
2518                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2519                         strcmp(xlde->d_name + strlen(xlde->d_name) - strlen(".backup"),
2520                                    ".backup") == 0)
2521                 {
2522                         if (XLogArchiveCheckDone(xlde->d_name))
2523                         {
2524                                 ereport(DEBUG2,
2525                                 (errmsg("removing transaction log backup history file \"%s\"",
2526                                                 xlde->d_name)));
2527                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2528                                 unlink(path);
2529                                 XLogArchiveCleanup(xlde->d_name);
2530                         }
2531                 }
2532         }
2533
2534         FreeDir(xldir);
2535 }
2536
2537 /*
2538  * Restore the backup blocks present in an XLOG record, if any.
2539  *
2540  * We assume all of the record has been read into memory at *record.
2541  *
2542  * Note: when a backup block is available in XLOG, we restore it
2543  * unconditionally, even if the page in the database appears newer.
2544  * This is to protect ourselves against database pages that were partially
2545  * or incorrectly written during a crash.  We assume that the XLOG data
2546  * must be good because it has passed a CRC check, while the database
2547  * page might not be.  This will force us to replay all subsequent
2548  * modifications of the page that appear in XLOG, rather than possibly
2549  * ignoring them as already applied, but that's not a huge drawback.
2550  */
2551 static void
2552 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
2553 {
2554         Relation        reln;
2555         Buffer          buffer;
2556         Page            page;
2557         BkpBlock        bkpb;
2558         char       *blk;
2559         int                     i;
2560
2561         blk = (char *) XLogRecGetData(record) + record->xl_len;
2562         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2563         {
2564                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2565                         continue;
2566
2567                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2568                 blk += sizeof(BkpBlock);
2569
2570                 reln = XLogOpenRelation(bkpb.node);
2571                 buffer = XLogReadBuffer(reln, bkpb.block, true);
2572                 Assert(BufferIsValid(buffer));
2573                 page = (Page) BufferGetPage(buffer);
2574
2575                 if (bkpb.hole_length == 0)
2576                 {
2577                         memcpy((char *) page, blk, BLCKSZ);
2578                 }
2579                 else
2580                 {
2581                         /* must zero-fill the hole */
2582                         MemSet((char *) page, 0, BLCKSZ);
2583                         memcpy((char *) page, blk, bkpb.hole_offset);
2584                         memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
2585                                    blk + bkpb.hole_offset,
2586                                    BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
2587                 }
2588
2589                 PageSetLSN(page, lsn);
2590                 PageSetTLI(page, ThisTimeLineID);
2591                 MarkBufferDirty(buffer);
2592                 UnlockReleaseBuffer(buffer);
2593
2594                 blk += BLCKSZ - bkpb.hole_length;
2595         }
2596 }
2597
2598 /*
2599  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
2600  * record (other than to the minimal extent of computing the amount of
2601  * data to read in) until we've checked the CRCs.
2602  *
2603  * We assume all of the record has been read into memory at *record.
2604  */
2605 static bool
2606 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
2607 {
2608         pg_crc32        crc;
2609         int                     i;
2610         uint32          len = record->xl_len;
2611         BkpBlock        bkpb;
2612         char       *blk;
2613
2614         /* First the rmgr data */
2615         INIT_CRC32(crc);
2616         COMP_CRC32(crc, XLogRecGetData(record), len);
2617
2618         /* Add in the backup blocks, if any */
2619         blk = (char *) XLogRecGetData(record) + len;
2620         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2621         {
2622                 uint32          blen;
2623
2624                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2625                         continue;
2626
2627                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2628                 if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
2629                 {
2630                         ereport(emode,
2631                                         (errmsg("incorrect hole size in record at %X/%X",
2632                                                         recptr.xlogid, recptr.xrecoff)));
2633                         return false;
2634                 }
2635                 blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
2636                 COMP_CRC32(crc, blk, blen);
2637                 blk += blen;
2638         }
2639
2640         /* Check that xl_tot_len agrees with our calculation */
2641         if (blk != (char *) record + record->xl_tot_len)
2642         {
2643                 ereport(emode,
2644                                 (errmsg("incorrect total length in record at %X/%X",
2645                                                 recptr.xlogid, recptr.xrecoff)));
2646                 return false;
2647         }
2648
2649         /* Finally include the record header */
2650         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
2651                            SizeOfXLogRecord - sizeof(pg_crc32));
2652         FIN_CRC32(crc);
2653
2654         if (!EQ_CRC32(record->xl_crc, crc))
2655         {
2656                 ereport(emode,
2657                 (errmsg("incorrect resource manager data checksum in record at %X/%X",
2658                                 recptr.xlogid, recptr.xrecoff)));
2659                 return false;
2660         }
2661
2662         return true;
2663 }
2664
2665 /*
2666  * Attempt to read an XLOG record.
2667  *
2668  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
2669  * try to read a record just after the last one previously read.
2670  *
2671  * If no valid record is available, returns NULL, or fails if emode is PANIC.
2672  * (emode must be either PANIC or LOG.)
2673  *
2674  * The record is copied into readRecordBuf, so that on successful return,
2675  * the returned record pointer always points there.
2676  */
2677 static XLogRecord *
2678 ReadRecord(XLogRecPtr *RecPtr, int emode)
2679 {
2680         XLogRecord *record;
2681         char       *buffer;
2682         XLogRecPtr      tmpRecPtr = EndRecPtr;
2683         bool            randAccess = false;
2684         uint32          len,
2685                                 total_len;
2686         uint32          targetPageOff;
2687         uint32          targetRecOff;
2688         uint32          pageHeaderSize;
2689
2690         if (readBuf == NULL)
2691         {
2692                 /*
2693                  * First time through, permanently allocate readBuf.  We do it this
2694                  * way, rather than just making a static array, for two reasons: (1)
2695                  * no need to waste the storage in most instantiations of the backend;
2696                  * (2) a static char array isn't guaranteed to have any particular
2697                  * alignment, whereas malloc() will provide MAXALIGN'd storage.
2698                  */
2699                 readBuf = (char *) malloc(XLOG_BLCKSZ);
2700                 Assert(readBuf != NULL);
2701         }
2702
2703         if (RecPtr == NULL)
2704         {
2705                 RecPtr = &tmpRecPtr;
2706                 /* fast case if next record is on same page */
2707                 if (nextRecord != NULL)
2708                 {
2709                         record = nextRecord;
2710                         goto got_record;
2711                 }
2712                 /* align old recptr to next page */
2713                 if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
2714                         tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
2715                 if (tmpRecPtr.xrecoff >= XLogFileSize)
2716                 {
2717                         (tmpRecPtr.xlogid)++;
2718                         tmpRecPtr.xrecoff = 0;
2719                 }
2720                 /* We will account for page header size below */
2721         }
2722         else
2723         {
2724                 if (!XRecOffIsValid(RecPtr->xrecoff))
2725                         ereport(PANIC,
2726                                         (errmsg("invalid record offset at %X/%X",
2727                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2728
2729                 /*
2730                  * Since we are going to a random position in WAL, forget any prior
2731                  * state about what timeline we were in, and allow it to be any
2732                  * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
2733                  * to go backwards (but we can't reset that variable right here, since
2734                  * we might not change files at all).
2735                  */
2736                 lastPageTLI = 0;                /* see comment in ValidXLOGHeader */
2737                 randAccess = true;              /* allow curFileTLI to go backwards too */
2738         }
2739
2740         if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
2741         {
2742                 close(readFile);
2743                 readFile = -1;
2744         }
2745         XLByteToSeg(*RecPtr, readId, readSeg);
2746         if (readFile < 0)
2747         {
2748                 /* Now it's okay to reset curFileTLI if random fetch */
2749                 if (randAccess)
2750                         curFileTLI = 0;
2751
2752                 readFile = XLogFileRead(readId, readSeg, emode);
2753                 if (readFile < 0)
2754                         goto next_record_is_invalid;
2755
2756                 /*
2757                  * Whenever switching to a new WAL segment, we read the first page of
2758                  * the file and validate its header, even if that's not where the
2759                  * target record is.  This is so that we can check the additional
2760                  * identification info that is present in the first page's "long"
2761                  * header.
2762                  */
2763                 readOff = 0;
2764                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2765                 {
2766                         ereport(emode,
2767                                         (errcode_for_file_access(),
2768                                          errmsg("could not read from log file %u, segment %u, offset %u: %m",
2769                                                         readId, readSeg, readOff)));
2770                         goto next_record_is_invalid;
2771                 }
2772                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2773                         goto next_record_is_invalid;
2774         }
2775
2776         targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
2777         if (readOff != targetPageOff)
2778         {
2779                 readOff = targetPageOff;
2780                 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
2781                 {
2782                         ereport(emode,
2783                                         (errcode_for_file_access(),
2784                                          errmsg("could not seek in log file %u, segment %u to offset %u: %m",
2785                                                         readId, readSeg, readOff)));
2786                         goto next_record_is_invalid;
2787                 }
2788                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2789                 {
2790                         ereport(emode,
2791                                         (errcode_for_file_access(),
2792                                          errmsg("could not read from log file %u, segment %u at offset %u: %m",
2793                                                         readId, readSeg, readOff)));
2794                         goto next_record_is_invalid;
2795                 }
2796                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2797                         goto next_record_is_invalid;
2798         }
2799         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2800         targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
2801         if (targetRecOff == 0)
2802         {
2803                 /*
2804                  * Can only get here in the continuing-from-prev-page case, because
2805                  * XRecOffIsValid eliminated the zero-page-offset case otherwise. Need
2806                  * to skip over the new page's header.
2807                  */
2808                 tmpRecPtr.xrecoff += pageHeaderSize;
2809                 targetRecOff = pageHeaderSize;
2810         }
2811         else if (targetRecOff < pageHeaderSize)
2812         {
2813                 ereport(emode,
2814                                 (errmsg("invalid record offset at %X/%X",
2815                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2816                 goto next_record_is_invalid;
2817         }
2818         if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
2819                 targetRecOff == pageHeaderSize)
2820         {
2821                 ereport(emode,
2822                                 (errmsg("contrecord is requested by %X/%X",
2823                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2824                 goto next_record_is_invalid;
2825         }
2826         record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
2827
2828 got_record:;
2829
2830         /*
2831          * Currently, xl_len == 0 must be bad data, but that might not be true
2832          * forever.  See note in XLogInsert.
2833          */
2834         if (record->xl_len == 0)
2835         {
2836                 ereport(emode,
2837                                 (errmsg("record with zero length at %X/%X",
2838                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2839                 goto next_record_is_invalid;
2840         }
2841         if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
2842                 record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
2843                 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
2844         {
2845                 ereport(emode,
2846                                 (errmsg("invalid record length at %X/%X",
2847                                                 RecPtr->xlogid, RecPtr->xrecoff)));
2848                 goto next_record_is_invalid;
2849         }
2850         if (record->xl_rmid > RM_MAX_ID)
2851         {
2852                 ereport(emode,
2853                                 (errmsg("invalid resource manager ID %u at %X/%X",
2854                                                 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
2855                 goto next_record_is_invalid;
2856         }
2857         if (randAccess)
2858         {
2859                 /*
2860                  * We can't exactly verify the prev-link, but surely it should be less
2861                  * than the record's own address.
2862                  */
2863                 if (!XLByteLT(record->xl_prev, *RecPtr))
2864                 {
2865                         ereport(emode,
2866                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
2867                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
2868                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2869                         goto next_record_is_invalid;
2870                 }
2871         }
2872         else
2873         {
2874                 /*
2875                  * Record's prev-link should exactly match our previous location. This
2876                  * check guards against torn WAL pages where a stale but valid-looking
2877                  * WAL record starts on a sector boundary.
2878                  */
2879                 if (!XLByteEQ(record->xl_prev, ReadRecPtr))
2880                 {
2881                         ereport(emode,
2882                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
2883                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
2884                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2885                         goto next_record_is_invalid;
2886                 }
2887         }
2888
2889         /*
2890          * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
2891          * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
2892          * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
2893          * enough for all "normal" records, but very large commit or abort records
2894          * might need more space.)
2895          */
2896         total_len = record->xl_tot_len;
2897         if (total_len > readRecordBufSize)
2898         {
2899                 uint32          newSize = total_len;
2900
2901                 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
2902                 newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
2903                 if (readRecordBuf)
2904                         free(readRecordBuf);
2905                 readRecordBuf = (char *) malloc(newSize);
2906                 if (!readRecordBuf)
2907                 {
2908                         readRecordBufSize = 0;
2909                         /* We treat this as a "bogus data" condition */
2910                         ereport(emode,
2911                                         (errmsg("record length %u at %X/%X too long",
2912                                                         total_len, RecPtr->xlogid, RecPtr->xrecoff)));
2913                         goto next_record_is_invalid;
2914                 }
2915                 readRecordBufSize = newSize;
2916         }
2917
2918         buffer = readRecordBuf;
2919         nextRecord = NULL;
2920         len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
2921         if (total_len > len)
2922         {
2923                 /* Need to reassemble record */
2924                 XLogContRecord *contrecord;
2925                 uint32          gotlen = len;
2926
2927                 memcpy(buffer, record, len);
2928                 record = (XLogRecord *) buffer;
2929                 buffer += len;
2930                 for (;;)
2931                 {
2932                         readOff += XLOG_BLCKSZ;
2933                         if (readOff >= XLogSegSize)
2934                         {
2935                                 close(readFile);
2936                                 readFile = -1;
2937                                 NextLogSeg(readId, readSeg);
2938                                 readFile = XLogFileRead(readId, readSeg, emode);
2939                                 if (readFile < 0)
2940                                         goto next_record_is_invalid;
2941                                 readOff = 0;
2942                         }
2943                         if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
2944                         {
2945                                 ereport(emode,
2946                                                 (errcode_for_file_access(),
2947                                                  errmsg("could not read from log file %u, segment %u, offset %u: %m",
2948                                                                 readId, readSeg, readOff)));
2949                                 goto next_record_is_invalid;
2950                         }
2951                         if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
2952                                 goto next_record_is_invalid;
2953                         if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
2954                         {
2955                                 ereport(emode,
2956                                                 (errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
2957                                                                 readId, readSeg, readOff)));
2958                                 goto next_record_is_invalid;
2959                         }
2960                         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2961                         contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize);
2962                         if (contrecord->xl_rem_len == 0 ||
2963                                 total_len != (contrecord->xl_rem_len + gotlen))
2964                         {
2965                                 ereport(emode,
2966                                                 (errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
2967                                                                 contrecord->xl_rem_len,
2968                                                                 readId, readSeg, readOff)));
2969                                 goto next_record_is_invalid;
2970                         }
2971                         len = XLOG_BLCKSZ - pageHeaderSize - SizeOfXLogContRecord;
2972                         if (contrecord->xl_rem_len > len)
2973                         {
2974                                 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
2975                                 gotlen += len;
2976                                 buffer += len;
2977                                 continue;
2978                         }
2979                         memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
2980                                    contrecord->xl_rem_len);
2981                         break;
2982                 }
2983                 if (!RecordIsValid(record, *RecPtr, emode))
2984                         goto next_record_is_invalid;
2985                 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
2986                 if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
2987                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
2988                 {
2989                         nextRecord = (XLogRecord *) ((char *) contrecord +
2990                                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
2991                 }
2992                 EndRecPtr.xlogid = readId;
2993                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
2994                         pageHeaderSize +
2995                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
2996                 ReadRecPtr = *RecPtr;
2997                 return record;
2998         }
2999
3000         /* Record does not cross a page boundary */
3001         if (!RecordIsValid(record, *RecPtr, emode))
3002                 goto next_record_is_invalid;
3003         if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
3004                 MAXALIGN(total_len))
3005                 nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
3006         EndRecPtr.xlogid = RecPtr->xlogid;
3007         EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
3008         ReadRecPtr = *RecPtr;
3009         memcpy(buffer, record, total_len);
3010         return (XLogRecord *) buffer;
3011
3012 next_record_is_invalid:;
3013         close(readFile);
3014         readFile = -1;
3015         nextRecord = NULL;
3016         return NULL;
3017 }
3018
3019 /*
3020  * Check whether the xlog header of a page just read in looks valid.
3021  *
3022  * This is just a convenience subroutine to avoid duplicated code in
3023  * ReadRecord.  It's not intended for use from anywhere else.
3024  */
3025 static bool
3026 ValidXLOGHeader(XLogPageHeader hdr, int emode)
3027 {
3028         XLogRecPtr      recaddr;
3029
3030         if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
3031         {
3032                 ereport(emode,
3033                                 (errmsg("invalid magic number %04X in log file %u, segment %u, offset %u",
3034                                                 hdr->xlp_magic, readId, readSeg, readOff)));
3035                 return false;
3036         }
3037         if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
3038         {
3039                 ereport(emode,
3040                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3041                                                 hdr->xlp_info, readId, readSeg, readOff)));
3042                 return false;
3043         }
3044         if (hdr->xlp_info & XLP_LONG_HEADER)
3045         {
3046                 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
3047
3048                 if (longhdr->xlp_sysid != ControlFile->system_identifier)
3049                 {
3050                         char            fhdrident_str[32];
3051                         char            sysident_str[32];
3052
3053                         /*
3054                          * Format sysids separately to keep platform-dependent format code
3055                          * out of the translatable message string.
3056                          */
3057                         snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
3058                                          longhdr->xlp_sysid);
3059                         snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
3060                                          ControlFile->system_identifier);
3061                         ereport(emode,
3062                                         (errmsg("WAL file is from different system"),
3063                                          errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
3064                                                            fhdrident_str, sysident_str)));
3065                         return false;
3066                 }
3067                 if (longhdr->xlp_seg_size != XLogSegSize)
3068                 {
3069                         ereport(emode,
3070                                         (errmsg("WAL file is from different system"),
3071                                          errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
3072                         return false;
3073                 }
3074                 if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
3075                 {
3076                         ereport(emode,
3077                                         (errmsg("WAL file is from different system"),
3078                                          errdetail("Incorrect XLOG_BLCKSZ in page header.")));
3079                         return false;
3080                 }
3081         }
3082         else if (readOff == 0)
3083         {
3084                 /* hmm, first page of file doesn't have a long header? */
3085                 ereport(emode,
3086                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3087                                                 hdr->xlp_info, readId, readSeg, readOff)));
3088                 return false;
3089         }
3090
3091         recaddr.xlogid = readId;
3092         recaddr.xrecoff = readSeg * XLogSegSize + readOff;
3093         if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
3094         {
3095                 ereport(emode,
3096                                 (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
3097                                                 hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
3098                                                 readId, readSeg, readOff)));
3099                 return false;
3100         }
3101
3102         /*
3103          * Check page TLI is one of the expected values.
3104          */
3105         if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
3106         {
3107                 ereport(emode,
3108                                 (errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u",
3109                                                 hdr->xlp_tli,
3110                                                 readId, readSeg, readOff)));
3111                 return false;
3112         }
3113
3114         /*
3115          * Since child timelines are always assigned a TLI greater than their
3116          * immediate parent's TLI, we should never see TLI go backwards across
3117          * successive pages of a consistent WAL sequence.
3118          *
3119          * Of course this check should only be applied when advancing sequentially
3120          * across pages; therefore ReadRecord resets lastPageTLI to zero when
3121          * going to a random page.
3122          */
3123         if (hdr->xlp_tli < lastPageTLI)
3124         {
3125                 ereport(emode,
3126                                 (errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u",
3127                                                 hdr->xlp_tli, lastPageTLI,
3128                                                 readId, readSeg, readOff)));
3129                 return false;
3130         }
3131         lastPageTLI = hdr->xlp_tli;
3132         return true;
3133 }
3134
3135 /*
3136  * Try to read a timeline's history file.
3137  *
3138  * If successful, return the list of component TLIs (the given TLI followed by
3139  * its ancestor TLIs).  If we can't find the history file, assume that the
3140  * timeline has no parents, and return a list of just the specified timeline
3141  * ID.
3142  */
3143 static List *
3144 readTimeLineHistory(TimeLineID targetTLI)
3145 {
3146         List       *result;
3147         char            path[MAXPGPATH];
3148         char            histfname[MAXFNAMELEN];
3149         char            fline[MAXPGPATH];
3150         FILE       *fd;
3151
3152         if (InArchiveRecovery)
3153         {
3154                 TLHistoryFileName(histfname, targetTLI);
3155                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3156         }
3157         else
3158                 TLHistoryFilePath(path, targetTLI);
3159
3160         fd = AllocateFile(path, "r");
3161         if (fd == NULL)
3162         {
3163                 if (errno != ENOENT)
3164                         ereport(FATAL,
3165                                         (errcode_for_file_access(),
3166                                          errmsg("could not open file \"%s\": %m", path)));
3167                 /* Not there, so assume no parents */
3168                 return list_make1_int((int) targetTLI);
3169         }
3170
3171         result = NIL;
3172
3173         /*
3174          * Parse the file...
3175          */
3176         while (fgets(fline, MAXPGPATH, fd) != NULL)
3177         {
3178                 /* skip leading whitespace and check for # comment */
3179                 char       *ptr;
3180                 char       *endptr;
3181                 TimeLineID      tli;
3182
3183                 for (ptr = fline; *ptr; ptr++)
3184                 {
3185                         if (!isspace((unsigned char) *ptr))
3186                                 break;
3187                 }
3188                 if (*ptr == '\0' || *ptr == '#')
3189                         continue;
3190
3191                 /* expect a numeric timeline ID as first field of line */
3192                 tli = (TimeLineID) strtoul(ptr, &endptr, 0);
3193                 if (endptr == ptr)
3194                         ereport(FATAL,
3195                                         (errmsg("syntax error in history file: %s", fline),
3196                                          errhint("Expected a numeric timeline ID.")));
3197
3198                 if (result &&
3199                         tli <= (TimeLineID) linitial_int(result))
3200                         ereport(FATAL,
3201                                         (errmsg("invalid data in history file: %s", fline),
3202                                    errhint("Timeline IDs must be in increasing sequence.")));
3203
3204                 /* Build list with newest item first */
3205                 result = lcons_int((int) tli, result);
3206
3207                 /* we ignore the remainder of each line */
3208         }
3209
3210         FreeFile(fd);
3211
3212         if (result &&
3213                 targetTLI <= (TimeLineID) linitial_int(result))
3214                 ereport(FATAL,
3215                                 (errmsg("invalid data in history file \"%s\"", path),
3216                         errhint("Timeline IDs must be less than child timeline's ID.")));
3217
3218         result = lcons_int((int) targetTLI, result);
3219
3220         ereport(DEBUG3,
3221                         (errmsg_internal("history of timeline %u is %s",
3222                                                          targetTLI, nodeToString(result))));
3223
3224         return result;
3225 }
3226
3227 /*
3228  * Probe whether a timeline history file exists for the given timeline ID
3229  */
3230 static bool
3231 existsTimeLineHistory(TimeLineID probeTLI)
3232 {
3233         char            path[MAXPGPATH];
3234         char            histfname[MAXFNAMELEN];
3235         FILE       *fd;
3236
3237         if (InArchiveRecovery)
3238         {
3239                 TLHistoryFileName(histfname, probeTLI);
3240                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3241         }
3242         else
3243                 TLHistoryFilePath(path, probeTLI);
3244
3245         fd = AllocateFile(path, "r");
3246         if (fd != NULL)
3247         {
3248                 FreeFile(fd);
3249                 return true;
3250         }
3251         else
3252         {
3253                 if (errno != ENOENT)
3254                         ereport(FATAL,
3255                                         (errcode_for_file_access(),
3256                                          errmsg("could not open file \"%s\": %m", path)));
3257                 return false;
3258         }
3259 }
3260
3261 /*
3262  * Find the newest existing timeline, assuming that startTLI exists.
3263  *
3264  * Note: while this is somewhat heuristic, it does positively guarantee
3265  * that (result + 1) is not a known timeline, and therefore it should
3266  * be safe to assign that ID to a new timeline.
3267  */
3268 static TimeLineID
3269 findNewestTimeLine(TimeLineID startTLI)
3270 {
3271         TimeLineID      newestTLI;
3272         TimeLineID      probeTLI;
3273
3274         /*
3275          * The algorithm is just to probe for the existence of timeline history
3276          * files.  XXX is it useful to allow gaps in the sequence?
3277          */
3278         newestTLI = startTLI;
3279
3280         for (probeTLI = startTLI + 1;; probeTLI++)
3281         {
3282                 if (existsTimeLineHistory(probeTLI))
3283                 {
3284                         newestTLI = probeTLI;           /* probeTLI exists */
3285                 }
3286                 else
3287                 {
3288                         /* doesn't exist, assume we're done */
3289                         break;
3290                 }
3291         }
3292
3293         return newestTLI;
3294 }
3295
3296 /*
3297  * Create a new timeline history file.
3298  *
3299  *      newTLI: ID of the new timeline
3300  *      parentTLI: ID of its immediate parent
3301  *      endTLI et al: ID of the last used WAL file, for annotation purposes
3302  *
3303  * Currently this is only used during recovery, and so there are no locking
3304  * considerations.      But we should be just as tense as XLogFileInit to avoid
3305  * emplacing a bogus file.
3306  */
3307 static void
3308 writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
3309                                          TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
3310 {
3311         char            path[MAXPGPATH];
3312         char            tmppath[MAXPGPATH];
3313         char            histfname[MAXFNAMELEN];
3314         char            xlogfname[MAXFNAMELEN];
3315         char            buffer[BLCKSZ];
3316         int                     srcfd;
3317         int                     fd;
3318         int                     nbytes;
3319
3320         Assert(newTLI > parentTLI); /* else bad selection of newTLI */
3321
3322         /*
3323          * Write into a temp file name.
3324          */
3325         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3326
3327         unlink(tmppath);
3328
3329         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
3330         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
3331                                            S_IRUSR | S_IWUSR);
3332         if (fd < 0)
3333                 ereport(ERROR,
3334                                 (errcode_for_file_access(),
3335                                  errmsg("could not create file \"%s\": %m", tmppath)));
3336
3337         /*
3338          * If a history file exists for the parent, copy it verbatim
3339          */
3340         if (InArchiveRecovery)
3341         {
3342                 TLHistoryFileName(histfname, parentTLI);
3343                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3344         }
3345         else
3346                 TLHistoryFilePath(path, parentTLI);
3347
3348         srcfd = BasicOpenFile(path, O_RDONLY, 0);
3349         if (srcfd < 0)
3350         {
3351                 if (errno != ENOENT)
3352                         ereport(ERROR,
3353                                         (errcode_for_file_access(),
3354                                          errmsg("could not open file \"%s\": %m", path)));
3355                 /* Not there, so assume parent has no parents */
3356         }
3357         else
3358         {
3359                 for (;;)
3360                 {
3361                         errno = 0;
3362                         nbytes = (int) read(srcfd, buffer, sizeof(buffer));
3363                         if (nbytes < 0 || errno != 0)
3364                                 ereport(ERROR,
3365                                                 (errcode_for_file_access(),
3366                                                  errmsg("could not read file \"%s\": %m", path)));
3367                         if (nbytes == 0)
3368                                 break;
3369                         errno = 0;
3370                         if ((int) write(fd, buffer, nbytes) != nbytes)
3371                         {
3372                                 int                     save_errno = errno;
3373
3374                                 /*
3375                                  * If we fail to make the file, delete it to release disk
3376                                  * space
3377                                  */
3378                                 unlink(tmppath);
3379
3380                                 /*
3381                                  * if write didn't set errno, assume problem is no disk space
3382                                  */
3383                                 errno = save_errno ? save_errno : ENOSPC;
3384
3385                                 ereport(ERROR,
3386                                                 (errcode_for_file_access(),
3387                                          errmsg("could not write to file \"%s\": %m", tmppath)));
3388                         }
3389                 }
3390                 close(srcfd);
3391         }
3392
3393         /*
3394          * Append one line with the details of this timeline split.
3395          *
3396          * If we did have a parent file, insert an extra newline just in case the
3397          * parent file failed to end with one.
3398          */
3399         XLogFileName(xlogfname, endTLI, endLogId, endLogSeg);
3400
3401         snprintf(buffer, sizeof(buffer),
3402                          "%s%u\t%s\t%s transaction %u at %s\n",
3403                          (srcfd < 0) ? "" : "\n",
3404                          parentTLI,
3405                          xlogfname,
3406                          recoveryStopAfter ? "after" : "before",
3407                          recoveryStopXid,
3408                          str_time(recoveryStopTime));
3409
3410         nbytes = strlen(buffer);
3411         errno = 0;
3412         if ((int) write(fd, buffer, nbytes) != nbytes)
3413         {
3414                 int                     save_errno = errno;
3415
3416                 /*
3417                  * If we fail to make the file, delete it to release disk space
3418                  */
3419                 unlink(tmppath);
3420                 /* if write didn't set errno, assume problem is no disk space */
3421                 errno = save_errno ? save_errno : ENOSPC;
3422
3423                 ereport(ERROR,
3424                                 (errcode_for_file_access(),
3425                                  errmsg("could not write to file \"%s\": %m", tmppath)));
3426         }
3427
3428         if (pg_fsync(fd) != 0)
3429                 ereport(ERROR,
3430                                 (errcode_for_file_access(),
3431                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
3432
3433         if (close(fd))
3434                 ereport(ERROR,
3435                                 (errcode_for_file_access(),
3436                                  errmsg("could not close file \"%s\": %m", tmppath)));
3437
3438
3439         /*
3440          * Now move the completed history file into place with its final name.
3441          */
3442         TLHistoryFilePath(path, newTLI);
3443
3444         /*
3445          * Prefer link() to rename() here just to be really sure that we don't
3446          * overwrite an existing logfile.  However, there shouldn't be one, so
3447          * rename() is an acceptable substitute except for the truly paranoid.
3448          */
3449 #if HAVE_WORKING_LINK
3450         if (link(tmppath, path) < 0)
3451                 ereport(ERROR,
3452                                 (errcode_for_file_access(),
3453                                  errmsg("could not link file \"%s\" to \"%s\": %m",
3454                                                 tmppath, path)));
3455         unlink(tmppath);
3456 #else
3457         if (rename(tmppath, path) < 0)
3458                 ereport(ERROR,
3459                                 (errcode_for_file_access(),
3460                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
3461                                                 tmppath, path)));
3462 #endif
3463
3464         /* The history file can be archived immediately. */
3465         TLHistoryFileName(histfname, newTLI);
3466         XLogArchiveNotify(histfname);
3467 }
3468
3469 /*
3470  * I/O routines for pg_control
3471  *
3472  * *ControlFile is a buffer in shared memory that holds an image of the
3473  * contents of pg_control.      WriteControlFile() initializes pg_control
3474  * given a preloaded buffer, ReadControlFile() loads the buffer from
3475  * the pg_control file (during postmaster or standalone-backend startup),
3476  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
3477  *
3478  * For simplicity, WriteControlFile() initializes the fields of pg_control
3479  * that are related to checking backend/database compatibility, and
3480  * ReadControlFile() verifies they are correct.  We could split out the
3481  * I/O and compatibility-check functions, but there seems no need currently.
3482  */
3483 static void
3484 WriteControlFile(void)
3485 {
3486         int                     fd;
3487         char            buffer[PG_CONTROL_SIZE]; /* need not be aligned */
3488         char       *localeptr;
3489
3490         /*
3491          * Initialize version and compatibility-check fields
3492          */
3493         ControlFile->pg_control_version = PG_CONTROL_VERSION;
3494         ControlFile->catalog_version_no = CATALOG_VERSION_NO;
3495
3496         ControlFile->maxAlign = MAXIMUM_ALIGNOF;
3497         ControlFile->floatFormat = FLOATFORMAT_VALUE;
3498
3499         ControlFile->blcksz = BLCKSZ;
3500         ControlFile->relseg_size = RELSEG_SIZE;
3501         ControlFile->xlog_blcksz = XLOG_BLCKSZ;
3502         ControlFile->xlog_seg_size = XLOG_SEG_SIZE;
3503
3504         ControlFile->nameDataLen = NAMEDATALEN;
3505         ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
3506
3507 #ifdef HAVE_INT64_TIMESTAMP
3508         ControlFile->enableIntTimes = TRUE;
3509 #else
3510         ControlFile->enableIntTimes = FALSE;
3511 #endif
3512
3513         ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
3514         localeptr = setlocale(LC_COLLATE, NULL);
3515         if (!localeptr)
3516                 ereport(PANIC,
3517                                 (errmsg("invalid LC_COLLATE setting")));
3518         StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
3519         localeptr = setlocale(LC_CTYPE, NULL);
3520         if (!localeptr)
3521                 ereport(PANIC,
3522                                 (errmsg("invalid LC_CTYPE setting")));
3523         StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
3524
3525         /* Contents are protected with a CRC */
3526         INIT_CRC32(ControlFile->crc);
3527         COMP_CRC32(ControlFile->crc,
3528                            (char *) ControlFile,
3529                            offsetof(ControlFileData, crc));
3530         FIN_CRC32(ControlFile->crc);
3531
3532         /*
3533          * We write out PG_CONTROL_SIZE bytes into pg_control, zero-padding the
3534          * excess over sizeof(ControlFileData).  This reduces the odds of
3535          * premature-EOF errors when reading pg_control.  We'll still fail when we
3536          * check the contents of the file, but hopefully with a more specific
3537          * error than "couldn't read pg_control".
3538          */
3539         if (sizeof(ControlFileData) > PG_CONTROL_SIZE)
3540                 elog(PANIC, "sizeof(ControlFileData) is larger than PG_CONTROL_SIZE; fix either one");
3541
3542         memset(buffer, 0, PG_CONTROL_SIZE);
3543         memcpy(buffer, ControlFile, sizeof(ControlFileData));
3544
3545         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3546                                            O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
3547                                            S_IRUSR | S_IWUSR);
3548         if (fd < 0)
3549                 ereport(PANIC,
3550                                 (errcode_for_file_access(),
3551                                  errmsg("could not create control file \"%s\": %m",
3552                                                 XLOG_CONTROL_FILE)));
3553
3554         errno = 0;
3555         if (write(fd, buffer, PG_CONTROL_SIZE) != PG_CONTROL_SIZE)
3556         {
3557                 /* if write didn't set errno, assume problem is no disk space */
3558                 if (errno == 0)
3559                         errno = ENOSPC;
3560                 ereport(PANIC,
3561                                 (errcode_for_file_access(),
3562                                  errmsg("could not write to control file: %m")));
3563         }
3564
3565         if (pg_fsync(fd) != 0)
3566                 ereport(PANIC,
3567                                 (errcode_for_file_access(),
3568                                  errmsg("could not fsync control file: %m")));
3569
3570         if (close(fd))
3571                 ereport(PANIC,
3572                                 (errcode_for_file_access(),
3573                                  errmsg("could not close control file: %m")));
3574 }
3575
3576 static void
3577 ReadControlFile(void)
3578 {
3579         pg_crc32        crc;
3580         int                     fd;
3581
3582         /*
3583          * Read data...
3584          */
3585         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3586                                            O_RDWR | PG_BINARY,
3587                                            S_IRUSR | S_IWUSR);
3588         if (fd < 0)
3589                 ereport(PANIC,
3590                                 (errcode_for_file_access(),
3591                                  errmsg("could not open control file \"%s\": %m",
3592                                                 XLOG_CONTROL_FILE)));
3593
3594         if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3595                 ereport(PANIC,
3596                                 (errcode_for_file_access(),
3597                                  errmsg("could not read from control file: %m")));
3598
3599         close(fd);
3600
3601         /*
3602          * Check for expected pg_control format version.  If this is wrong, the
3603          * CRC check will likely fail because we'll be checking the wrong number
3604          * of bytes.  Complaining about wrong version will probably be more
3605          * enlightening than complaining about wrong CRC.
3606          */
3607         if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
3608                 ereport(FATAL,
3609                                 (errmsg("database files are incompatible with server"),
3610                                  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
3611                                   " but the server was compiled with PG_CONTROL_VERSION %d.",
3612                                                 ControlFile->pg_control_version, PG_CONTROL_VERSION),
3613                                  errhint("It looks like you need to initdb.")));
3614         /* Now check the CRC. */
3615         INIT_CRC32(crc);
3616         COMP_CRC32(crc,
3617                            (char *) ControlFile,
3618                            offsetof(ControlFileData, crc));
3619         FIN_CRC32(crc);
3620
3621         if (!EQ_CRC32(crc, ControlFile->crc))
3622                 ereport(FATAL,
3623                                 (errmsg("incorrect checksum in control file")));
3624
3625         /*
3626          * Do compatibility checking immediately.  We do this here for 2 reasons:
3627          *
3628          * (1) if the database isn't compatible with the backend executable, we
3629          * want to abort before we can possibly do any damage;
3630          *
3631          * (2) this code is executed in the postmaster, so the setlocale() will
3632          * propagate to forked backends, which aren't going to read this file for
3633          * themselves.  (These locale settings are considered critical
3634          * compatibility items because they can affect sort order of indexes.)
3635          */
3636         if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
3637                 ereport(FATAL,
3638                                 (errmsg("database files are incompatible with server"),
3639                                  errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
3640                                   " but the server was compiled with CATALOG_VERSION_NO %d.",
3641                                                 ControlFile->catalog_version_no, CATALOG_VERSION_NO),
3642                                  errhint("It looks like you need to initdb.")));
3643         if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
3644                 ereport(FATAL,
3645                                 (errmsg("database files are incompatible with server"),
3646                    errdetail("The database cluster was initialized with MAXALIGN %d,"
3647                                          " but the server was compiled with MAXALIGN %d.",
3648                                          ControlFile->maxAlign, MAXIMUM_ALIGNOF),
3649                                  errhint("It looks like you need to initdb.")));
3650         if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
3651                 ereport(FATAL,
3652                                 (errmsg("database files are incompatible with server"),
3653                                  errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
3654                                  errhint("It looks like you need to initdb.")));
3655         if (ControlFile->blcksz != BLCKSZ)
3656                 ereport(FATAL,
3657                                 (errmsg("database files are incompatible with server"),
3658                          errdetail("The database cluster was initialized with BLCKSZ %d,"
3659                                            " but the server was compiled with BLCKSZ %d.",
3660                                            ControlFile->blcksz, BLCKSZ),
3661                                  errhint("It looks like you need to recompile or initdb.")));
3662         if (ControlFile->relseg_size != RELSEG_SIZE)
3663                 ereport(FATAL,
3664                                 (errmsg("database files are incompatible with server"),
3665                 errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
3666                                   " but the server was compiled with RELSEG_SIZE %d.",
3667                                   ControlFile->relseg_size, RELSEG_SIZE),
3668                                  errhint("It looks like you need to recompile or initdb.")));
3669         if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
3670                 ereport(FATAL,
3671                                 (errmsg("database files are incompatible with server"),
3672                          errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
3673                                            " but the server was compiled with XLOG_BLCKSZ %d.",
3674                                            ControlFile->xlog_blcksz, XLOG_BLCKSZ),
3675                                  errhint("It looks like you need to recompile or initdb.")));
3676         if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
3677                 ereport(FATAL,
3678                                 (errmsg("database files are incompatible with server"),
3679                                  errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d,"
3680                                            " but the server was compiled with XLOG_SEG_SIZE %d.",
3681                                                    ControlFile->xlog_seg_size, XLOG_SEG_SIZE),
3682                                  errhint("It looks like you need to recompile or initdb.")));
3683         if (ControlFile->nameDataLen != NAMEDATALEN)
3684                 ereport(FATAL,
3685                                 (errmsg("database files are incompatible with server"),
3686                 errdetail("The database cluster was initialized with NAMEDATALEN %d,"
3687                                   " but the server was compiled with NAMEDATALEN %d.",
3688                                   ControlFile->nameDataLen, NAMEDATALEN),
3689                                  errhint("It looks like you need to recompile or initdb.")));
3690         if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
3691                 ereport(FATAL,
3692                                 (errmsg("database files are incompatible with server"),
3693                                  errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
3694                                           " but the server was compiled with INDEX_MAX_KEYS %d.",
3695                                                    ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
3696                                  errhint("It looks like you need to recompile or initdb.")));
3697
3698 #ifdef HAVE_INT64_TIMESTAMP
3699         if (ControlFile->enableIntTimes != TRUE)
3700                 ereport(FATAL,
3701                                 (errmsg("database files are incompatible with server"),
3702                                  errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
3703                                   " but the server was compiled with HAVE_INT64_TIMESTAMP."),
3704                                  errhint("It looks like you need to recompile or initdb.")));
3705 #else
3706         if (ControlFile->enableIntTimes != FALSE)
3707                 ereport(FATAL,
3708                                 (errmsg("database files are incompatible with server"),
3709                                  errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
3710                            " but the server was compiled without HAVE_INT64_TIMESTAMP."),
3711                                  errhint("It looks like you need to recompile or initdb.")));
3712 #endif
3713
3714         if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
3715                 ereport(FATAL,
3716                                 (errmsg("database files are incompatible with server"),
3717                                  errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
3718                                   " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
3719                                                    ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
3720                                  errhint("It looks like you need to recompile or initdb.")));
3721         if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
3722                 ereport(FATAL,
3723                         (errmsg("database files are incompatible with operating system"),
3724                          errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
3725                                            " which is not recognized by setlocale().",
3726                                            ControlFile->lc_collate),
3727                          errhint("It looks like you need to initdb or install locale support.")));
3728         if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
3729                 ereport(FATAL,
3730                         (errmsg("database files are incompatible with operating system"),
3731                 errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
3732                                   " which is not recognized by setlocale().",
3733                                   ControlFile->lc_ctype),
3734                          errhint("It looks like you need to initdb or install locale support.")));
3735
3736         /* Make the fixed locale settings visible as GUC variables, too */
3737         SetConfigOption("lc_collate", ControlFile->lc_collate,
3738                                         PGC_INTERNAL, PGC_S_OVERRIDE);
3739         SetConfigOption("lc_ctype", ControlFile->lc_ctype,
3740                                         PGC_INTERNAL, PGC_S_OVERRIDE);
3741 }
3742
3743 void
3744 UpdateControlFile(void)
3745 {
3746         int                     fd;
3747
3748         INIT_CRC32(ControlFile->crc);
3749         COMP_CRC32(ControlFile->crc,
3750                            (char *) ControlFile,
3751                            offsetof(ControlFileData, crc));
3752         FIN_CRC32(ControlFile->crc);
3753
3754         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3755                                            O_RDWR | PG_BINARY,
3756                                            S_IRUSR | S_IWUSR);
3757         if (fd < 0)
3758                 ereport(PANIC,
3759                                 (errcode_for_file_access(),
3760                                  errmsg("could not open control file \"%s\": %m",
3761                                                 XLOG_CONTROL_FILE)));
3762
3763         errno = 0;
3764         if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3765         {
3766                 /* if write didn't set errno, assume problem is no disk space */
3767                 if (errno == 0)
3768                         errno = ENOSPC;
3769                 ereport(PANIC,
3770                                 (errcode_for_file_access(),
3771                                  errmsg("could not write to control file: %m")));
3772         }
3773
3774         if (pg_fsync(fd) != 0)
3775                 ereport(PANIC,
3776                                 (errcode_for_file_access(),
3777                                  errmsg("could not fsync control file: %m")));
3778
3779         if (close(fd))
3780                 ereport(PANIC,
3781                                 (errcode_for_file_access(),
3782                                  errmsg("could not close control file: %m")));
3783 }
3784
3785 /*
3786  * Initialization of shared memory for XLOG
3787  */
3788 Size
3789 XLOGShmemSize(void)
3790 {
3791         Size            size;
3792
3793         /* XLogCtl */
3794         size = sizeof(XLogCtlData);
3795         /* xlblocks array */
3796         size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
3797         /* extra alignment padding for XLOG I/O buffers */
3798         size = add_size(size, ALIGNOF_XLOG_BUFFER);
3799         /* and the buffers themselves */
3800         size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
3801
3802         /*
3803          * Note: we don't count ControlFileData, it comes out of the "slop factor"
3804          * added by CreateSharedMemoryAndSemaphores.  This lets us use this
3805          * routine again below to compute the actual allocation size.
3806          */
3807
3808         return size;
3809 }
3810
3811 void
3812 XLOGShmemInit(void)
3813 {
3814         bool            foundCFile,
3815                                 foundXLog;
3816         char       *allocptr;
3817
3818         ControlFile = (ControlFileData *)
3819                 ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
3820         XLogCtl = (XLogCtlData *)
3821                 ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
3822
3823         if (foundCFile || foundXLog)
3824         {
3825                 /* both should be present or neither */
3826                 Assert(foundCFile && foundXLog);
3827                 return;
3828         }
3829
3830         memset(XLogCtl, 0, sizeof(XLogCtlData));
3831
3832         /*
3833          * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
3834          * multiple of the alignment for same, so no extra alignment padding is
3835          * needed here.
3836          */
3837         allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
3838         XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
3839         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
3840         allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
3841
3842         /*
3843          * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
3844          */
3845         allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
3846         XLogCtl->pages = allocptr;
3847         memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
3848
3849         /*
3850          * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
3851          * in additional info.)
3852          */
3853         XLogCtl->XLogCacheByte = (Size) XLOG_BLCKSZ * XLOGbuffers;
3854
3855         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
3856         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
3857         SpinLockInit(&XLogCtl->info_lck);
3858
3859         /*
3860          * If we are not in bootstrap mode, pg_control should already exist. Read
3861          * and validate it immediately (see comments in ReadControlFile() for the
3862          * reasons why).
3863          */
3864         if (!IsBootstrapProcessingMode())
3865                 ReadControlFile();
3866 }
3867
3868 /*
3869  * This func must be called ONCE on system install.  It creates pg_control
3870  * and the initial XLOG segment.
3871  */
3872 void
3873 BootStrapXLOG(void)
3874 {
3875         CheckPoint      checkPoint;
3876         char       *buffer;
3877         XLogPageHeader page;
3878         XLogLongPageHeader longpage;
3879         XLogRecord *record;
3880         bool            use_existent;
3881         uint64          sysidentifier;
3882         struct timeval tv;
3883         pg_crc32        crc;
3884
3885         /*
3886          * Select a hopefully-unique system identifier code for this installation.
3887          * We use the result of gettimeofday(), including the fractional seconds
3888          * field, as being about as unique as we can easily get.  (Think not to
3889          * use random(), since it hasn't been seeded and there's no portable way
3890          * to seed it other than the system clock value...)  The upper half of the
3891          * uint64 value is just the tv_sec part, while the lower half is the XOR
3892          * of tv_sec and tv_usec.  This is to ensure that we don't lose uniqueness
3893          * unnecessarily if "uint64" is really only 32 bits wide.  A person
3894          * knowing this encoding can determine the initialization time of the
3895          * installation, which could perhaps be useful sometimes.
3896          */
3897         gettimeofday(&tv, NULL);
3898         sysidentifier = ((uint64) tv.tv_sec) << 32;
3899         sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
3900
3901         /* First timeline ID is always 1 */
3902         ThisTimeLineID = 1;
3903
3904         /* page buffer must be aligned suitably for O_DIRECT */
3905         buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
3906         page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
3907         memset(page, 0, XLOG_BLCKSZ);
3908
3909         /* Set up information for the initial checkpoint record */
3910         checkPoint.redo.xlogid = 0;
3911         checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
3912         checkPoint.undo = checkPoint.redo;
3913         checkPoint.ThisTimeLineID = ThisTimeLineID;
3914         checkPoint.nextXid = FirstNormalTransactionId;
3915         checkPoint.nextOid = FirstBootstrapObjectId;
3916         checkPoint.nextMulti = FirstMultiXactId;
3917         checkPoint.nextMultiOffset = 0;
3918         checkPoint.time = time(NULL);
3919
3920         ShmemVariableCache->nextXid = checkPoint.nextXid;
3921         ShmemVariableCache->nextOid = checkPoint.nextOid;
3922         ShmemVariableCache->oidCount = 0;
3923         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
3924
3925         /* Set up the XLOG page header */
3926         page->xlp_magic = XLOG_PAGE_MAGIC;
3927         page->xlp_info = XLP_LONG_HEADER;
3928         page->xlp_tli = ThisTimeLineID;
3929         page->xlp_pageaddr.xlogid = 0;
3930         page->xlp_pageaddr.xrecoff = 0;
3931         longpage = (XLogLongPageHeader) page;
3932         longpage->xlp_sysid = sysidentifier;
3933         longpage->xlp_seg_size = XLogSegSize;
3934         longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
3935
3936         /* Insert the initial checkpoint record */
3937         record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
3938         record->xl_prev.xlogid = 0;
3939         record->xl_prev.xrecoff = 0;
3940         record->xl_xid = InvalidTransactionId;
3941         record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
3942         record->xl_len = sizeof(checkPoint);
3943         record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
3944         record->xl_rmid = RM_XLOG_ID;
3945         memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
3946
3947         INIT_CRC32(crc);
3948         COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
3949         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
3950                            SizeOfXLogRecord - sizeof(pg_crc32));
3951         FIN_CRC32(crc);
3952         record->xl_crc = crc;
3953
3954         /* Create first XLOG segment file */
3955         use_existent = false;
3956         openLogFile = XLogFileInit(0, 0, &use_existent, false);
3957
3958         /* Write the first page with the initial record */
3959         errno = 0;
3960         if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3961         {
3962                 /* if write didn't set errno, assume problem is no disk space */
3963                 if (errno == 0)
3964                         errno = ENOSPC;
3965                 ereport(PANIC,
3966                                 (errcode_for_file_access(),
3967                           errmsg("could not write bootstrap transaction log file: %m")));
3968         }
3969
3970         if (pg_fsync(openLogFile) != 0)
3971                 ereport(PANIC,
3972                                 (errcode_for_file_access(),
3973                           errmsg("could not fsync bootstrap transaction log file: %m")));
3974
3975         if (close(openLogFile))
3976                 ereport(PANIC,
3977                                 (errcode_for_file_access(),
3978                           errmsg("could not close bootstrap transaction log file: %m")));
3979
3980         openLogFile = -1;
3981
3982         /* Now create pg_control */
3983
3984         memset(ControlFile, 0, sizeof(ControlFileData));
3985         /* Initialize pg_control status fields */
3986         ControlFile->system_identifier = sysidentifier;
3987         ControlFile->state = DB_SHUTDOWNED;
3988         ControlFile->time = checkPoint.time;
3989         ControlFile->logId = 0;
3990         ControlFile->logSeg = 1;
3991         ControlFile->checkPoint = checkPoint.redo;
3992         ControlFile->checkPointCopy = checkPoint;
3993         /* some additional ControlFile fields are set in WriteControlFile() */
3994
3995         WriteControlFile();
3996
3997         /* Bootstrap the commit log, too */
3998         BootStrapCLOG();
3999         BootStrapSUBTRANS();
4000         BootStrapMultiXact();
4001
4002         pfree(buffer);
4003 }
4004
4005 static char *
4006 str_time(time_t tnow)
4007 {
4008         static char buf[128];
4009
4010         strftime(buf, sizeof(buf),
4011                          "%Y-%m-%d %H:%M:%S %Z",
4012                          localtime(&tnow));
4013
4014         return buf;
4015 }
4016
4017 /*
4018  * See if there is a recovery command file (recovery.conf), and if so
4019  * read in parameters for archive recovery.
4020  *
4021  * XXX longer term intention is to expand this to
4022  * cater for additional parameters and controls
4023  * possibly use a flex lexer similar to the GUC one
4024  */
4025 static void
4026 readRecoveryCommandFile(void)
4027 {
4028         FILE       *fd;
4029         char            cmdline[MAXPGPATH];
4030         TimeLineID      rtli = 0;
4031         bool            rtliGiven = false;
4032         bool            syntaxError = false;
4033
4034         fd = AllocateFile(RECOVERY_COMMAND_FILE, "r");
4035         if (fd == NULL)
4036         {
4037                 if (errno == ENOENT)
4038                         return;                         /* not there, so no archive recovery */
4039                 ereport(FATAL,
4040                                 (errcode_for_file_access(),
4041                                  errmsg("could not open recovery command file \"%s\": %m",
4042                                                 RECOVERY_COMMAND_FILE)));
4043         }
4044
4045         ereport(LOG,
4046                         (errmsg("starting archive recovery")));
4047
4048         /*
4049          * Parse the file...
4050          */
4051         while (fgets(cmdline, MAXPGPATH, fd) != NULL)
4052         {
4053                 /* skip leading whitespace and check for # comment */
4054                 char       *ptr;
4055                 char       *tok1;
4056                 char       *tok2;
4057
4058                 for (ptr = cmdline; *ptr; ptr++)
4059                 {
4060                         if (!isspace((unsigned char) *ptr))
4061                                 break;
4062                 }
4063                 if (*ptr == '\0' || *ptr == '#')
4064                         continue;
4065
4066                 /* identify the quoted parameter value */
4067                 tok1 = strtok(ptr, "'");
4068                 if (!tok1)
4069                 {
4070                         syntaxError = true;
4071                         break;
4072                 }
4073                 tok2 = strtok(NULL, "'");
4074                 if (!tok2)
4075                 {
4076                         syntaxError = true;
4077                         break;
4078                 }
4079                 /* reparse to get just the parameter name */
4080                 tok1 = strtok(ptr, " \t=");
4081                 if (!tok1)
4082                 {
4083                         syntaxError = true;
4084                         break;
4085                 }
4086
4087                 if (strcmp(tok1, "restore_command") == 0)
4088                 {
4089                         recoveryRestoreCommand = pstrdup(tok2);
4090                         ereport(LOG,
4091                                         (errmsg("restore_command = \"%s\"",
4092                                                         recoveryRestoreCommand)));
4093                 }
4094                 else if (strcmp(tok1, "recovery_target_timeline") == 0)
4095                 {
4096                         rtliGiven = true;
4097                         if (strcmp(tok2, "latest") == 0)
4098                                 rtli = 0;
4099                         else
4100                         {
4101                                 errno = 0;
4102                                 rtli = (TimeLineID) strtoul(tok2, NULL, 0);
4103                                 if (errno == EINVAL || errno == ERANGE)
4104                                         ereport(FATAL,
4105                                                         (errmsg("recovery_target_timeline is not a valid number: \"%s\"",
4106                                                                         tok2)));
4107                         }
4108                         if (rtli)
4109                                 ereport(LOG,
4110                                                 (errmsg("recovery_target_timeline = %u", rtli)));
4111                         else
4112                                 ereport(LOG,
4113                                                 (errmsg("recovery_target_timeline = latest")));
4114                 }
4115                 else if (strcmp(tok1, "recovery_target_xid") == 0)
4116                 {
4117                         errno = 0;
4118                         recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0);
4119                         if (errno == EINVAL || errno == ERANGE)
4120                                 ereport(FATAL,
4121                                  (errmsg("recovery_target_xid is not a valid number: \"%s\"",
4122                                                  tok2)));
4123                         ereport(LOG,
4124                                         (errmsg("recovery_target_xid = %u",
4125                                                         recoveryTargetXid)));
4126                         recoveryTarget = true;
4127                         recoveryTargetExact = true;
4128                 }
4129                 else if (strcmp(tok1, "recovery_target_time") == 0)
4130                 {
4131                         /*
4132                          * if recovery_target_xid specified, then this overrides
4133                          * recovery_target_time
4134                          */
4135                         if (recoveryTargetExact)
4136                                 continue;
4137                         recoveryTarget = true;
4138                         recoveryTargetExact = false;
4139
4140                         /*
4141                          * Convert the time string given by the user to the time_t format.
4142                          * We use type abstime's input converter because we know abstime
4143                          * has the same representation as time_t.
4144                          */
4145                         recoveryTargetTime = (time_t)
4146                                 DatumGetAbsoluteTime(DirectFunctionCall1(abstimein,
4147                                                                                                          CStringGetDatum(tok2)));
4148                         ereport(LOG,
4149                                         (errmsg("recovery_target_time = %s",
4150                                                         DatumGetCString(DirectFunctionCall1(abstimeout,
4151                                 AbsoluteTimeGetDatum((AbsoluteTime) recoveryTargetTime))))));
4152                 }
4153                 else if (strcmp(tok1, "recovery_target_inclusive") == 0)
4154                 {
4155                         /*
4156                          * does nothing if a recovery_target is not also set
4157                          */
4158                         if (strcmp(tok2, "true") == 0)
4159                                 recoveryTargetInclusive = true;
4160                         else
4161                         {
4162                                 recoveryTargetInclusive = false;
4163                                 tok2 = "false";
4164                         }
4165                         ereport(LOG,
4166                                         (errmsg("recovery_target_inclusive = %s", tok2)));
4167                 }
4168                 else
4169                         ereport(FATAL,
4170                                         (errmsg("unrecognized recovery parameter \"%s\"",
4171                                                         tok1)));
4172         }
4173
4174         FreeFile(fd);
4175
4176         if (syntaxError)
4177                 ereport(FATAL,
4178                                 (errmsg("syntax error in recovery command file: %s",
4179                                                 cmdline),
4180                           errhint("Lines should have the format parameter = 'value'.")));
4181
4182         /* Check that required parameters were supplied */
4183         if (recoveryRestoreCommand == NULL)
4184                 ereport(FATAL,
4185                                 (errmsg("recovery command file \"%s\" did not specify restore_command",
4186                                                 RECOVERY_COMMAND_FILE)));
4187
4188         /* Enable fetching from archive recovery area */
4189         InArchiveRecovery = true;
4190
4191         /*
4192          * If user specified recovery_target_timeline, validate it or compute the
4193          * "latest" value.      We can't do this until after we've gotten the restore
4194          * command and set InArchiveRecovery, because we need to fetch timeline
4195          * history files from the archive.
4196          */
4197         if (rtliGiven)
4198         {
4199                 if (rtli)
4200                 {
4201                         /* Timeline 1 does not have a history file, all else should */
4202                         if (rtli != 1 && !existsTimeLineHistory(rtli))
4203                                 ereport(FATAL,
4204                                                 (errmsg("recovery_target_timeline %u does not exist",
4205                                                                 rtli)));
4206                         recoveryTargetTLI = rtli;
4207                 }
4208                 else
4209                 {
4210                         /* We start the "latest" search from pg_control's timeline */
4211                         recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI);
4212                 }
4213         }
4214 }
4215
4216 /*
4217  * Exit archive-recovery state
4218  */
4219 static void
4220 exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
4221 {
4222         char            recoveryPath[MAXPGPATH];
4223         char            xlogpath[MAXPGPATH];
4224
4225         /*
4226          * We are no longer in archive recovery state.
4227          */
4228         InArchiveRecovery = false;
4229
4230         /*
4231          * We should have the ending log segment currently open.  Verify, and then
4232          * close it (to avoid problems on Windows with trying to rename or delete
4233          * an open file).
4234          */
4235         Assert(readFile >= 0);
4236         Assert(readId == endLogId);
4237         Assert(readSeg == endLogSeg);
4238
4239         close(readFile);
4240         readFile = -1;
4241
4242         /*
4243          * If the segment was fetched from archival storage, we want to replace
4244          * the existing xlog segment (if any) with the archival version.  This is
4245          * because whatever is in XLOGDIR is very possibly older than what we have
4246          * from the archives, since it could have come from restoring a PGDATA
4247          * backup.      In any case, the archival version certainly is more
4248          * descriptive of what our current database state is, because that is what
4249          * we replayed from.
4250          *
4251          * Note that if we are establishing a new timeline, ThisTimeLineID is
4252          * already set to the new value, and so we will create a new file instead
4253          * of overwriting any existing file.
4254          */
4255         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
4256         XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4257
4258         if (restoredFromArchive)
4259         {
4260                 ereport(DEBUG3,
4261                                 (errmsg_internal("moving last restored xlog to \"%s\"",
4262                                                                  xlogpath)));
4263                 unlink(xlogpath);               /* might or might not exist */
4264                 if (rename(recoveryPath, xlogpath) != 0)
4265                         ereport(FATAL,
4266                                         (errcode_for_file_access(),
4267                                          errmsg("could not rename file \"%s\" to \"%s\": %m",
4268                                                         recoveryPath, xlogpath)));
4269                 /* XXX might we need to fix permissions on the file? */
4270         }
4271         else
4272         {
4273                 /*
4274                  * If the latest segment is not archival, but there's still a
4275                  * RECOVERYXLOG laying about, get rid of it.
4276                  */
4277                 unlink(recoveryPath);   /* ignore any error */
4278
4279                 /*
4280                  * If we are establishing a new timeline, we have to copy data from
4281                  * the last WAL segment of the old timeline to create a starting WAL
4282                  * segment for the new timeline.
4283                  */
4284                 if (endTLI != ThisTimeLineID)
4285                         XLogFileCopy(endLogId, endLogSeg,
4286                                                  endTLI, endLogId, endLogSeg);
4287         }
4288
4289         /*
4290          * Let's just make real sure there are not .ready or .done flags posted
4291          * for the new segment.
4292          */
4293         XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4294         XLogArchiveCleanup(xlogpath);
4295
4296         /* Get rid of any remaining recovered timeline-history file, too */
4297         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
4298         unlink(recoveryPath);           /* ignore any error */
4299
4300         /*
4301          * Rename the config file out of the way, so that we don't accidentally
4302          * re-enter archive recovery mode in a subsequent crash.
4303          */
4304         unlink(RECOVERY_COMMAND_DONE);
4305         if (rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE) != 0)
4306                 ereport(FATAL,
4307                                 (errcode_for_file_access(),
4308                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
4309                                                 RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE)));
4310
4311         ereport(LOG,
4312                         (errmsg("archive recovery complete")));
4313 }
4314
4315 /*
4316  * For point-in-time recovery, this function decides whether we want to
4317  * stop applying the XLOG at or after the current record.
4318  *
4319  * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
4320  * *includeThis is set TRUE if we should apply this record before stopping.
4321  * Also, some information is saved in recoveryStopXid et al for use in
4322  * annotating the new timeline's history file.
4323  */
4324 static bool
4325 recoveryStopsHere(XLogRecord *record, bool *includeThis)
4326 {
4327         bool            stopsHere;
4328         uint8           record_info;
4329         time_t          recordXtime;
4330
4331         /* Do we have a PITR target at all? */
4332         if (!recoveryTarget)
4333                 return false;
4334
4335         /* We only consider stopping at COMMIT or ABORT records */
4336         if (record->xl_rmid != RM_XACT_ID)
4337                 return false;
4338         record_info = record->xl_info & ~XLR_INFO_MASK;
4339         if (record_info == XLOG_XACT_COMMIT)
4340         {
4341                 xl_xact_commit *recordXactCommitData;
4342
4343                 recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
4344                 recordXtime = recordXactCommitData->xtime;
4345         }
4346         else if (record_info == XLOG_XACT_ABORT)
4347         {
4348                 xl_xact_abort *recordXactAbortData;
4349
4350                 recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
4351                 recordXtime = recordXactAbortData->xtime;
4352         }
4353         else
4354                 return false;
4355
4356         if (recoveryTargetExact)
4357         {
4358                 /*
4359                  * there can be only one transaction end record with this exact
4360                  * transactionid
4361                  *
4362                  * when testing for an xid, we MUST test for equality only, since
4363                  * transactions are numbered in the order they start, not the order
4364                  * they complete. A higher numbered xid will complete before you about
4365                  * 50% of the time...
4366                  */
4367                 stopsHere = (record->xl_xid == recoveryTargetXid);
4368                 if (stopsHere)
4369                         *includeThis = recoveryTargetInclusive;
4370         }
4371         else
4372         {
4373                 /*
4374                  * there can be many transactions that share the same commit time, so
4375                  * we stop after the last one, if we are inclusive, or stop at the
4376                  * first one if we are exclusive
4377                  */
4378                 if (recoveryTargetInclusive)
4379                         stopsHere = (recordXtime > recoveryTargetTime);
4380                 else
4381                         stopsHere = (recordXtime >= recoveryTargetTime);
4382                 if (stopsHere)
4383                         *includeThis = false;
4384         }
4385
4386         if (stopsHere)
4387         {
4388                 recoveryStopXid = record->xl_xid;
4389                 recoveryStopTime = recordXtime;
4390                 recoveryStopAfter = *includeThis;
4391
4392                 if (record_info == XLOG_XACT_COMMIT)
4393                 {
4394                         if (recoveryStopAfter)
4395                                 ereport(LOG,
4396                                                 (errmsg("recovery stopping after commit of transaction %u, time %s",
4397                                                           recoveryStopXid, str_time(recoveryStopTime))));
4398                         else
4399                                 ereport(LOG,
4400                                                 (errmsg("recovery stopping before commit of transaction %u, time %s",
4401                                                           recoveryStopXid, str_time(recoveryStopTime))));
4402                 }
4403                 else
4404                 {
4405                         if (recoveryStopAfter)
4406                                 ereport(LOG,
4407                                                 (errmsg("recovery stopping after abort of transaction %u, time %s",
4408                                                           recoveryStopXid, str_time(recoveryStopTime))));
4409                         else
4410                                 ereport(LOG,
4411                                                 (errmsg("recovery stopping before abort of transaction %u, time %s",
4412                                                           recoveryStopXid, str_time(recoveryStopTime))));
4413                 }
4414         }
4415
4416         return stopsHere;
4417 }
4418
4419 /*
4420  * This must be called ONCE during postmaster or standalone-backend startup
4421  */
4422 void
4423 StartupXLOG(void)
4424 {
4425         XLogCtlInsert *Insert;
4426         CheckPoint      checkPoint;
4427         bool            wasShutdown;
4428         bool            needNewTimeLine = false;
4429         XLogRecPtr      RecPtr,
4430                                 LastRec,
4431                                 checkPointLoc,
4432                                 EndOfLog;
4433         uint32          endLogId;
4434         uint32          endLogSeg;
4435         XLogRecord *record;
4436         uint32          freespace;
4437         TransactionId oldestActiveXID;
4438
4439         CritSectionCount++;
4440
4441         /*
4442          * Read control file and check XLOG status looks valid.
4443          *
4444          * Note: in most control paths, *ControlFile is already valid and we need
4445          * not do ReadControlFile() here, but might as well do it to be sure.
4446          */
4447         ReadControlFile();
4448
4449         if (ControlFile->logSeg == 0 ||
4450                 ControlFile->state < DB_SHUTDOWNED ||
4451                 ControlFile->state > DB_IN_PRODUCTION ||
4452                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
4453                 ereport(FATAL,
4454                                 (errmsg("control file contains invalid data")));
4455
4456         if (ControlFile->state == DB_SHUTDOWNED)
4457                 ereport(LOG,
4458                                 (errmsg("database system was shut down at %s",
4459                                                 str_time(ControlFile->time))));
4460         else if (ControlFile->state == DB_SHUTDOWNING)
4461                 ereport(LOG,
4462                                 (errmsg("database system shutdown was interrupted at %s",
4463                                                 str_time(ControlFile->time))));
4464         else if (ControlFile->state == DB_IN_RECOVERY)
4465                 ereport(LOG,
4466                    (errmsg("database system was interrupted while in recovery at %s",
4467                                    str_time(ControlFile->time)),
4468                         errhint("This probably means that some data is corrupted and"
4469                                         " you will have to use the last backup for recovery.")));
4470         else if (ControlFile->state == DB_IN_PRODUCTION)
4471                 ereport(LOG,
4472                                 (errmsg("database system was interrupted at %s",
4473                                                 str_time(ControlFile->time))));
4474
4475         /* This is just to allow attaching to startup process with a debugger */
4476 #ifdef XLOG_REPLAY_DELAY
4477         if (ControlFile->state != DB_SHUTDOWNED)
4478                 pg_usleep(60000000L);
4479 #endif
4480
4481         /*
4482          * Initialize on the assumption we want to recover to the same timeline
4483          * that's active according to pg_control.
4484          */
4485         recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
4486
4487         /*
4488          * Check for recovery control file, and if so set up state for offline
4489          * recovery
4490          */
4491         readRecoveryCommandFile();
4492
4493         /* Now we can determine the list of expected TLIs */
4494         expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
4495
4496         /*
4497          * If pg_control's timeline is not in expectedTLIs, then we cannot
4498          * proceed: the backup is not part of the history of the requested
4499          * timeline.
4500          */
4501         if (!list_member_int(expectedTLIs,
4502                                                  (int) ControlFile->checkPointCopy.ThisTimeLineID))
4503                 ereport(FATAL,
4504                                 (errmsg("requested timeline %u is not a child of database system timeline %u",
4505                                                 recoveryTargetTLI,
4506                                                 ControlFile->checkPointCopy.ThisTimeLineID)));
4507
4508         if (read_backup_label(&checkPointLoc))
4509         {
4510                 /*
4511                  * When a backup_label file is present, we want to roll forward from
4512                  * the checkpoint it identifies, rather than using pg_control.
4513                  */
4514                 record = ReadCheckpointRecord(checkPointLoc, 0);
4515                 if (record != NULL)
4516                 {
4517                         ereport(LOG,
4518                                         (errmsg("checkpoint record is at %X/%X",
4519                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4520                         InRecovery = true;      /* force recovery even if SHUTDOWNED */
4521                 }
4522                 else
4523                 {
4524                         ereport(PANIC,
4525                                         (errmsg("could not locate required checkpoint record"),
4526                                          errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
4527                 }
4528         }
4529         else
4530         {
4531                 /*
4532                  * Get the last valid checkpoint record.  If the latest one according
4533                  * to pg_control is broken, try the next-to-last one.
4534                  */
4535                 checkPointLoc = ControlFile->checkPoint;
4536                 record = ReadCheckpointRecord(checkPointLoc, 1);
4537                 if (record != NULL)
4538                 {
4539                         ereport(LOG,
4540                                         (errmsg("checkpoint record is at %X/%X",
4541                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4542                 }
4543                 else
4544                 {
4545                         checkPointLoc = ControlFile->prevCheckPoint;
4546                         record = ReadCheckpointRecord(checkPointLoc, 2);
4547                         if (record != NULL)
4548                         {
4549                                 ereport(LOG,
4550                                                 (errmsg("using previous checkpoint record at %X/%X",
4551                                                           checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4552                                 InRecovery = true;              /* force recovery even if SHUTDOWNED */
4553                         }
4554                         else
4555                                 ereport(PANIC,
4556                                          (errmsg("could not locate a valid checkpoint record")));
4557                 }
4558         }
4559
4560         LastRec = RecPtr = checkPointLoc;
4561         memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
4562         wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
4563
4564         ereport(LOG,
4565          (errmsg("redo record is at %X/%X; undo record is at %X/%X; shutdown %s",
4566                          checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
4567                          checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
4568                          wasShutdown ? "TRUE" : "FALSE")));
4569         ereport(LOG,
4570                         (errmsg("next transaction ID: %u; next OID: %u",
4571                                         checkPoint.nextXid, checkPoint.nextOid)));
4572         ereport(LOG,
4573                         (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
4574                                         checkPoint.nextMulti, checkPoint.nextMultiOffset)));
4575         if (!TransactionIdIsNormal(checkPoint.nextXid))
4576                 ereport(PANIC,
4577                                 (errmsg("invalid next transaction ID")));
4578
4579         ShmemVariableCache->nextXid = checkPoint.nextXid;
4580         ShmemVariableCache->nextOid = checkPoint.nextOid;
4581         ShmemVariableCache->oidCount = 0;
4582         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4583
4584         /*
4585          * We must replay WAL entries using the same TimeLineID they were created
4586          * under, so temporarily adopt the TLI indicated by the checkpoint (see
4587          * also xlog_redo()).
4588          */
4589         ThisTimeLineID = checkPoint.ThisTimeLineID;
4590
4591         RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
4592
4593         if (XLByteLT(RecPtr, checkPoint.redo))
4594                 ereport(PANIC,
4595                                 (errmsg("invalid redo in checkpoint record")));
4596         if (checkPoint.undo.xrecoff == 0)
4597                 checkPoint.undo = RecPtr;
4598
4599         /*
4600          * Check whether we need to force recovery from WAL.  If it appears to
4601          * have been a clean shutdown and we did not have a recovery.conf file,
4602          * then assume no recovery needed.
4603          */
4604         if (XLByteLT(checkPoint.undo, RecPtr) ||
4605                 XLByteLT(checkPoint.redo, RecPtr))
4606         {
4607                 if (wasShutdown)
4608                         ereport(PANIC,
4609                                 (errmsg("invalid redo/undo record in shutdown checkpoint")));
4610                 InRecovery = true;
4611         }
4612         else if (ControlFile->state != DB_SHUTDOWNED)
4613                 InRecovery = true;
4614         else if (InArchiveRecovery)
4615         {
4616                 /* force recovery due to presence of recovery.conf */
4617                 InRecovery = true;
4618         }
4619
4620         /* REDO */
4621         if (InRecovery)
4622         {
4623                 int                     rmid;
4624
4625                 if (InArchiveRecovery)
4626                         ereport(LOG,
4627                                         (errmsg("automatic recovery in progress")));
4628                 else
4629                         ereport(LOG,
4630                                         (errmsg("database system was not properly shut down; "
4631                                                         "automatic recovery in progress")));
4632                 ControlFile->state = DB_IN_RECOVERY;
4633                 ControlFile->time = time(NULL);
4634                 UpdateControlFile();
4635
4636                 /* Start up the recovery environment */
4637                 XLogInitRelationCache();
4638
4639                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
4640                 {
4641                         if (RmgrTable[rmid].rm_startup != NULL)
4642                                 RmgrTable[rmid].rm_startup();
4643                 }
4644
4645                 /*
4646                  * Find the first record that logically follows the checkpoint --- it
4647                  * might physically precede it, though.
4648                  */
4649                 if (XLByteLT(checkPoint.redo, RecPtr))
4650                 {
4651                         /* back up to find the record */
4652                         record = ReadRecord(&(checkPoint.redo), PANIC);
4653                 }
4654                 else
4655                 {
4656                         /* just have to read next record after CheckPoint */
4657                         record = ReadRecord(NULL, LOG);
4658                 }
4659
4660                 if (record != NULL)
4661                 {
4662                         bool            recoveryContinue = true;
4663                         bool            recoveryApply = true;
4664                         ErrorContextCallback    errcontext;
4665
4666                         InRedo = true;
4667                         ereport(LOG,
4668                                         (errmsg("redo starts at %X/%X",
4669                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
4670
4671                         /*
4672                          * main redo apply loop
4673                          */
4674                         do
4675                         {
4676 #ifdef WAL_DEBUG
4677                                 if (XLOG_DEBUG)
4678                                 {
4679                                         StringInfoData  buf;
4680
4681                                         initStringInfo(&buf);
4682                                         appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
4683                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
4684                                                         EndRecPtr.xlogid, EndRecPtr.xrecoff);
4685                                         xlog_outrec(&buf, record);
4686                                         appendStringInfo(&buf, " - ");
4687                                         RmgrTable[record->xl_rmid].rm_desc(&buf,
4688                                                                                                            record->xl_info,
4689                                                                                                            XLogRecGetData(record));
4690                                         elog(LOG, "%s", buf.data);
4691                                         pfree(buf.data);
4692                                 }
4693 #endif
4694
4695                                 /*
4696                                  * Have we reached our recovery target?
4697                                  */
4698                                 if (recoveryStopsHere(record, &recoveryApply))
4699                                 {
4700                                         needNewTimeLine = true;         /* see below */
4701                                         recoveryContinue = false;
4702                                         if (!recoveryApply)
4703                                                 break;
4704                                 }
4705
4706                                 /* Setup error traceback support for ereport() */
4707                                 errcontext.callback = rm_redo_error_callback;
4708                                 errcontext.arg = (void *) record;
4709                                 errcontext.previous = error_context_stack;
4710                                 error_context_stack = &errcontext;
4711
4712                                 /* nextXid must be beyond record's xid */
4713                                 if (TransactionIdFollowsOrEquals(record->xl_xid,
4714                                                                                                  ShmemVariableCache->nextXid))
4715                                 {
4716                                         ShmemVariableCache->nextXid = record->xl_xid;
4717                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
4718                                 }
4719
4720                                 if (record->xl_info & XLR_BKP_BLOCK_MASK)
4721                                         RestoreBkpBlocks(record, EndRecPtr);
4722
4723                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
4724
4725                                 /* Pop the error context stack */
4726                                 error_context_stack = errcontext.previous;
4727
4728                                 LastRec = ReadRecPtr;
4729
4730                                 record = ReadRecord(NULL, LOG);
4731                         } while (record != NULL && recoveryContinue);
4732
4733                         /*
4734                          * end of main redo apply loop
4735                          */
4736
4737                         ereport(LOG,
4738                                         (errmsg("redo done at %X/%X",
4739                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
4740                         InRedo = false;
4741                 }
4742                 else
4743                 {
4744                         /* there are no WAL records following the checkpoint */
4745                         ereport(LOG,
4746                                         (errmsg("redo is not required")));
4747                 }
4748         }
4749
4750         /*
4751          * Re-fetch the last valid or last applied record, so we can identify the
4752          * exact endpoint of what we consider the valid portion of WAL.
4753          */
4754         record = ReadRecord(&LastRec, PANIC);
4755         EndOfLog = EndRecPtr;
4756         XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
4757
4758         /*
4759          * Complain if we did not roll forward far enough to render the backup
4760          * dump consistent.
4761          */
4762         if (XLByteLT(EndOfLog, recoveryMinXlogOffset))
4763         {
4764                 if (needNewTimeLine)    /* stopped because of stop request */
4765                         ereport(FATAL,
4766                                         (errmsg("requested recovery stop point is before end time of backup dump")));
4767                 else
4768                         /* ran off end of WAL */
4769                         ereport(FATAL,
4770                                         (errmsg("WAL ends before end time of backup dump")));
4771         }
4772
4773         /*
4774          * Consider whether we need to assign a new timeline ID.
4775          *
4776          * If we stopped short of the end of WAL during recovery, then we are
4777          * generating a new timeline and must assign it a unique new ID.
4778          * Otherwise, we can just extend the timeline we were in when we ran out
4779          * of WAL.
4780          */
4781         if (needNewTimeLine)
4782         {
4783                 ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
4784                 ereport(LOG,
4785                                 (errmsg("selected new timeline ID: %u", ThisTimeLineID)));
4786                 writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
4787                                                          curFileTLI, endLogId, endLogSeg);
4788         }
4789
4790         /* Save the selected TimeLineID in shared memory, too */
4791         XLogCtl->ThisTimeLineID = ThisTimeLineID;
4792
4793         /*
4794          * We are now done reading the old WAL.  Turn off archive fetching if it
4795          * was active, and make a writable copy of the last WAL segment. (Note
4796          * that we also have a copy of the last block of the old WAL in readBuf;
4797          * we will use that below.)
4798          */
4799         if (InArchiveRecovery)
4800                 exitArchiveRecovery(curFileTLI, endLogId, endLogSeg);
4801
4802         /*
4803          * Prepare to write WAL starting at EndOfLog position, and init xlog
4804          * buffer cache using the block containing the last record from the
4805          * previous incarnation.
4806          */
4807         openLogId = endLogId;
4808         openLogSeg = endLogSeg;
4809         openLogFile = XLogFileOpen(openLogId, openLogSeg);
4810         openLogOff = 0;
4811         ControlFile->logId = openLogId;
4812         ControlFile->logSeg = openLogSeg + 1;
4813         Insert = &XLogCtl->Insert;
4814         Insert->PrevRecord = LastRec;
4815         XLogCtl->xlblocks[0].xlogid = openLogId;
4816         XLogCtl->xlblocks[0].xrecoff =
4817                 ((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
4818
4819         /*
4820          * Tricky point here: readBuf contains the *last* block that the LastRec
4821          * record spans, not the one it starts in.      The last block is indeed the
4822          * one we want to use.
4823          */
4824         Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
4825         memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
4826         Insert->currpos = (char *) Insert->currpage +
4827                 (EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
4828
4829         LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
4830
4831         XLogCtl->Write.LogwrtResult = LogwrtResult;
4832         Insert->LogwrtResult = LogwrtResult;
4833         XLogCtl->LogwrtResult = LogwrtResult;
4834
4835         XLogCtl->LogwrtRqst.Write = EndOfLog;
4836         XLogCtl->LogwrtRqst.Flush = EndOfLog;
4837
4838         freespace = INSERT_FREESPACE(Insert);
4839         if (freespace > 0)
4840         {
4841                 /* Make sure rest of page is zero */
4842                 MemSet(Insert->currpos, 0, freespace);
4843                 XLogCtl->Write.curridx = 0;
4844         }
4845         else
4846         {
4847                 /*
4848                  * Whenever Write.LogwrtResult points to exactly the end of a page,
4849                  * Write.curridx must point to the *next* page (see XLogWrite()).
4850                  *
4851                  * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
4852                  * this is sufficient.  The first actual attempt to insert a log
4853                  * record will advance the insert state.
4854                  */
4855                 XLogCtl->Write.curridx = NextBufIdx(0);
4856         }
4857
4858         /* Pre-scan prepared transactions to find out the range of XIDs present */
4859         oldestActiveXID = PrescanPreparedTransactions();
4860
4861         if (InRecovery)
4862         {
4863                 int                     rmid;
4864
4865                 /*
4866                  * Allow resource managers to do any required cleanup.
4867                  */
4868                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
4869                 {
4870                         if (RmgrTable[rmid].rm_cleanup != NULL)
4871                                 RmgrTable[rmid].rm_cleanup();
4872                 }
4873
4874                 /*
4875                  * Check to see if the XLOG sequence contained any unresolved
4876                  * references to uninitialized pages.
4877                  */
4878                 XLogCheckInvalidPages();
4879
4880                 /*
4881                  * Reset pgstat data, because it may be invalid after recovery.
4882                  */
4883                 pgstat_reset_all();
4884
4885                 /*
4886                  * Perform a new checkpoint to update our recovery activity to disk.
4887                  *
4888                  * Note that we write a shutdown checkpoint rather than an on-line
4889                  * one. This is not particularly critical, but since we may be
4890                  * assigning a new TLI, using a shutdown checkpoint allows us to have
4891                  * the rule that TLI only changes in shutdown checkpoints, which
4892                  * allows some extra error checking in xlog_redo.
4893                  *
4894                  * In case we had to use the secondary checkpoint, make sure that it
4895                  * will still be shown as the secondary checkpoint after this
4896                  * CreateCheckPoint operation; we don't want the broken primary
4897                  * checkpoint to become prevCheckPoint...
4898                  */
4899                 if (XLByteEQ(checkPointLoc, ControlFile->prevCheckPoint))
4900                         ControlFile->checkPoint = checkPointLoc;
4901
4902                 CreateCheckPoint(true, true);
4903
4904                 /*
4905                  * Close down recovery environment
4906                  */
4907                 XLogCloseRelationCache();
4908
4909                 /*
4910                  * Now that we've checkpointed the recovery, it's safe to flush old
4911                  * backup_label, if present.
4912                  */
4913                 remove_backup_label();
4914         }
4915
4916         /*
4917          * Preallocate additional log files, if wanted.
4918          */
4919         (void) PreallocXlogFiles(EndOfLog);
4920
4921         /*
4922          * Okay, we're officially UP.
4923          */
4924         InRecovery = false;
4925
4926         ControlFile->state = DB_IN_PRODUCTION;
4927         ControlFile->time = time(NULL);
4928         UpdateControlFile();
4929
4930         /* Start up the commit log and related stuff, too */
4931         StartupCLOG();
4932         StartupSUBTRANS(oldestActiveXID);
4933         StartupMultiXact();
4934
4935         /* Reload shared-memory state for prepared transactions */
4936         RecoverPreparedTransactions();
4937
4938         ereport(LOG,
4939                         (errmsg("database system is ready")));
4940         CritSectionCount--;
4941
4942         /* Shut down readFile facility, free space */
4943         if (readFile >= 0)
4944         {
4945                 close(readFile);
4946                 readFile = -1;
4947         }
4948         if (readBuf)
4949         {
4950                 free(readBuf);
4951                 readBuf = NULL;
4952         }
4953         if (readRecordBuf)
4954         {
4955                 free(readRecordBuf);
4956                 readRecordBuf = NULL;
4957                 readRecordBufSize = 0;
4958         }
4959 }
4960
4961 /*
4962  * Subroutine to try to fetch and validate a prior checkpoint record.
4963  *
4964  * whichChkpt identifies the checkpoint (merely for reporting purposes).
4965  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
4966  */
4967 static XLogRecord *
4968 ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
4969 {
4970         XLogRecord *record;
4971
4972         if (!XRecOffIsValid(RecPtr.xrecoff))
4973         {
4974                 switch (whichChkpt)
4975                 {
4976                         case 1:
4977                                 ereport(LOG,
4978                                 (errmsg("invalid primary checkpoint link in control file")));
4979                                 break;
4980                         case 2:
4981                                 ereport(LOG,
4982                                                 (errmsg("invalid secondary checkpoint link in control file")));
4983                                 break;
4984                         default:
4985                                 ereport(LOG,
4986                                    (errmsg("invalid checkpoint link in backup_label file")));
4987                                 break;
4988                 }
4989                 return NULL;
4990         }
4991
4992         record = ReadRecord(&RecPtr, LOG);
4993
4994         if (record == NULL)
4995         {
4996                 switch (whichChkpt)
4997                 {
4998                         case 1:
4999                                 ereport(LOG,
5000                                                 (errmsg("invalid primary checkpoint record")));
5001                                 break;
5002                         case 2:
5003                                 ereport(LOG,
5004                                                 (errmsg("invalid secondary checkpoint record")));
5005                                 break;
5006                         default:
5007                                 ereport(LOG,
5008                                                 (errmsg("invalid checkpoint record")));
5009                                 break;
5010                 }
5011                 return NULL;
5012         }
5013         if (record->xl_rmid != RM_XLOG_ID)
5014         {
5015                 switch (whichChkpt)
5016                 {
5017                         case 1:
5018                                 ereport(LOG,
5019                                                 (errmsg("invalid resource manager ID in primary checkpoint record")));
5020                                 break;
5021                         case 2:
5022                                 ereport(LOG,
5023                                                 (errmsg("invalid resource manager ID in secondary checkpoint record")));
5024                                 break;
5025                         default:
5026                                 ereport(LOG,
5027                                 (errmsg("invalid resource manager ID in checkpoint record")));
5028                                 break;
5029                 }
5030                 return NULL;
5031         }
5032         if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
5033                 record->xl_info != XLOG_CHECKPOINT_ONLINE)
5034         {
5035                 switch (whichChkpt)
5036                 {
5037                         case 1:
5038                                 ereport(LOG,
5039                                    (errmsg("invalid xl_info in primary checkpoint record")));
5040                                 break;
5041                         case 2:
5042                                 ereport(LOG,
5043                                  (errmsg("invalid xl_info in secondary checkpoint record")));
5044                                 break;
5045                         default:
5046                                 ereport(LOG,
5047                                                 (errmsg("invalid xl_info in checkpoint record")));
5048                                 break;
5049                 }
5050                 return NULL;
5051         }
5052         if (record->xl_len != sizeof(CheckPoint) ||
5053                 record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
5054         {
5055                 switch (whichChkpt)
5056                 {
5057                         case 1:
5058                                 ereport(LOG,
5059                                         (errmsg("invalid length of primary checkpoint record")));
5060                                 break;
5061                         case 2:
5062                                 ereport(LOG,
5063                                   (errmsg("invalid length of secondary checkpoint record")));
5064                                 break;
5065                         default:
5066                                 ereport(LOG,
5067                                                 (errmsg("invalid length of checkpoint record")));
5068                                 break;
5069                 }
5070                 return NULL;
5071         }
5072         return record;
5073 }
5074
5075 /*
5076  * This must be called during startup of a backend process, except that
5077  * it need not be called in a standalone backend (which does StartupXLOG
5078  * instead).  We need to initialize the local copies of ThisTimeLineID and
5079  * RedoRecPtr.
5080  *
5081  * Note: before Postgres 8.0, we went to some effort to keep the postmaster
5082  * process's copies of ThisTimeLineID and RedoRecPtr valid too.  This was
5083  * unnecessary however, since the postmaster itself never touches XLOG anyway.
5084  */
5085 void
5086 InitXLOGAccess(void)
5087 {
5088         /* ThisTimeLineID doesn't change so we need no lock to copy it */
5089         ThisTimeLineID = XLogCtl->ThisTimeLineID;
5090         /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
5091         (void) GetRedoRecPtr();
5092 }
5093
5094 /*
5095  * Once spawned, a backend may update its local RedoRecPtr from
5096  * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
5097  * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
5098  */
5099 XLogRecPtr
5100 GetRedoRecPtr(void)
5101 {
5102         /* use volatile pointer to prevent code rearrangement */
5103         volatile XLogCtlData *xlogctl = XLogCtl;
5104
5105         SpinLockAcquire(&xlogctl->info_lck);
5106         Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
5107         RedoRecPtr = xlogctl->Insert.RedoRecPtr;
5108         SpinLockRelease(&xlogctl->info_lck);
5109
5110         return RedoRecPtr;
5111 }
5112
5113 /*
5114  * GetRecentNextXid - get the nextXid value saved by the most recent checkpoint
5115  *
5116  * This is currently used only by the autovacuum daemon.  To check for
5117  * impending XID wraparound, autovac needs an approximate idea of the current
5118  * XID counter, and it needs it before choosing which DB to attach to, hence
5119  * before it sets up a PGPROC, hence before it can take any LWLocks.  But it
5120  * has attached to shared memory, and so we can let it reach into the shared
5121  * ControlFile structure and pull out the last checkpoint nextXID.
5122  *
5123  * Since we don't take any sort of lock, we have to assume that reading a
5124  * TransactionId is atomic ... but that assumption is made elsewhere, too,
5125  * and in any case the worst possible consequence of a bogus result is that
5126  * autovac issues an unnecessary database-wide VACUUM.
5127  *
5128  * Note: we could also choose to read ShmemVariableCache->nextXid in an
5129  * unlocked fashion, thus getting a more up-to-date result; but since that
5130  * changes far more frequently than the controlfile checkpoint copy, it would
5131  * pose a far higher risk of bogus result if we did have a nonatomic-read
5132  * problem.
5133  *
5134  * A (theoretically) completely safe answer is to read the actual pg_control
5135  * file into local process memory, but that certainly seems like overkill.
5136  */
5137 TransactionId
5138 GetRecentNextXid(void)
5139 {
5140         return ControlFile->checkPointCopy.nextXid;
5141 }
5142
5143 /*
5144  * This must be called ONCE during postmaster or standalone-backend shutdown
5145  */
5146 void
5147 ShutdownXLOG(int code, Datum arg)
5148 {
5149         ereport(LOG,
5150                         (errmsg("shutting down")));
5151
5152         CritSectionCount++;
5153         CreateCheckPoint(true, true);
5154         ShutdownCLOG();
5155         ShutdownSUBTRANS();
5156         ShutdownMultiXact();
5157         CritSectionCount--;
5158
5159         ereport(LOG,
5160                         (errmsg("database system is shut down")));
5161 }
5162
5163 /*
5164  * Perform a checkpoint --- either during shutdown, or on-the-fly
5165  *
5166  * If force is true, we force a checkpoint regardless of whether any XLOG
5167  * activity has occurred since the last one.
5168  */
5169 void
5170 CreateCheckPoint(bool shutdown, bool force)
5171 {
5172         CheckPoint      checkPoint;
5173         XLogRecPtr      recptr;
5174         XLogCtlInsert *Insert = &XLogCtl->Insert;
5175         XLogRecData rdata;
5176         uint32          freespace;
5177         uint32          _logId;
5178         uint32          _logSeg;
5179         int                     nsegsadded = 0;
5180         int                     nsegsremoved = 0;
5181         int                     nsegsrecycled = 0;
5182
5183         /*
5184          * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
5185          * (This is just pro forma, since in the present system structure there is
5186          * only one process that is allowed to issue checkpoints at any given
5187          * time.)
5188          */
5189         LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
5190
5191         /*
5192          * Use a critical section to force system panic if we have trouble.
5193          */
5194         START_CRIT_SECTION();
5195
5196         if (shutdown)
5197         {
5198                 ControlFile->state = DB_SHUTDOWNING;
5199                 ControlFile->time = time(NULL);
5200                 UpdateControlFile();
5201         }
5202
5203         MemSet(&checkPoint, 0, sizeof(checkPoint));
5204         checkPoint.ThisTimeLineID = ThisTimeLineID;
5205         checkPoint.time = time(NULL);
5206
5207         /*
5208          * We must hold CheckpointStartLock while determining the checkpoint REDO
5209          * pointer.  This ensures that any concurrent transaction commits will be
5210          * either not yet logged, or logged and recorded in pg_clog. See notes in
5211          * RecordTransactionCommit().
5212          */
5213         LWLockAcquire(CheckpointStartLock, LW_EXCLUSIVE);
5214
5215         /* And we need WALInsertLock too */
5216         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5217
5218         /*
5219          * If this isn't a shutdown or forced checkpoint, and we have not inserted
5220          * any XLOG records since the start of the last checkpoint, skip the
5221          * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
5222          * when the system is idle. That wastes log space, and more importantly it
5223          * exposes us to possible loss of both current and previous checkpoint
5224          * records if the machine crashes just as we're writing the update.
5225          * (Perhaps it'd make even more sense to checkpoint only when the previous
5226          * checkpoint record is in a different xlog page?)
5227          *
5228          * We have to make two tests to determine that nothing has happened since
5229          * the start of the last checkpoint: current insertion point must match
5230          * the end of the last checkpoint record, and its redo pointer must point
5231          * to itself.
5232          */
5233         if (!shutdown && !force)
5234         {
5235                 XLogRecPtr      curInsert;
5236
5237                 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
5238                 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
5239                         curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
5240                         MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
5241                         ControlFile->checkPoint.xlogid ==
5242                         ControlFile->checkPointCopy.redo.xlogid &&
5243                         ControlFile->checkPoint.xrecoff ==
5244                         ControlFile->checkPointCopy.redo.xrecoff)
5245                 {
5246                         LWLockRelease(WALInsertLock);
5247                         LWLockRelease(CheckpointStartLock);
5248                         LWLockRelease(CheckpointLock);
5249                         END_CRIT_SECTION();
5250                         return;
5251                 }
5252         }
5253
5254         /*
5255          * Compute new REDO record ptr = location of next XLOG record.
5256          *
5257          * NB: this is NOT necessarily where the checkpoint record itself will be,
5258          * since other backends may insert more XLOG records while we're off doing
5259          * the buffer flush work.  Those XLOG records are logically after the
5260          * checkpoint, even though physically before it.  Got that?
5261          */
5262         freespace = INSERT_FREESPACE(Insert);
5263         if (freespace < SizeOfXLogRecord)
5264         {
5265                 (void) AdvanceXLInsertBuffer();
5266                 /* OK to ignore update return flag, since we will do flush anyway */
5267                 freespace = INSERT_FREESPACE(Insert);
5268         }
5269         INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
5270
5271         /*
5272          * Here we update the shared RedoRecPtr for future XLogInsert calls; this
5273          * must be done while holding the insert lock AND the info_lck.
5274          *
5275          * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
5276          * pointing past where it really needs to point.  This is okay; the only
5277          * consequence is that XLogInsert might back up whole buffers that it
5278          * didn't really need to.  We can't postpone advancing RedoRecPtr because
5279          * XLogInserts that happen while we are dumping buffers must assume that
5280          * their buffer changes are not included in the checkpoint.
5281          */
5282         {
5283                 /* use volatile pointer to prevent code rearrangement */
5284                 volatile XLogCtlData *xlogctl = XLogCtl;
5285
5286                 SpinLockAcquire(&xlogctl->info_lck);
5287                 RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
5288                 SpinLockRelease(&xlogctl->info_lck);
5289         }
5290
5291         /*
5292          * Now we can release insert lock and checkpoint start lock, allowing
5293          * other xacts to proceed even while we are flushing disk buffers.
5294          */
5295         LWLockRelease(WALInsertLock);
5296
5297         LWLockRelease(CheckpointStartLock);
5298
5299         /*
5300          * Get the other info we need for the checkpoint record.
5301          */
5302         LWLockAcquire(XidGenLock, LW_SHARED);
5303         checkPoint.nextXid = ShmemVariableCache->nextXid;
5304         LWLockRelease(XidGenLock);
5305
5306         LWLockAcquire(OidGenLock, LW_SHARED);
5307         checkPoint.nextOid = ShmemVariableCache->nextOid;
5308         if (!shutdown)
5309                 checkPoint.nextOid += ShmemVariableCache->oidCount;
5310         LWLockRelease(OidGenLock);
5311
5312         MultiXactGetCheckptMulti(shutdown,
5313                                                          &checkPoint.nextMulti,
5314                                                          &checkPoint.nextMultiOffset);
5315
5316         /*
5317          * Having constructed the checkpoint record, ensure all shmem disk buffers
5318          * and commit-log buffers are flushed to disk.
5319          *
5320          * This I/O could fail for various reasons.  If so, we will fail to
5321          * complete the checkpoint, but there is no reason to force a system
5322          * panic. Accordingly, exit critical section while doing it.  (If we are
5323          * doing a shutdown checkpoint, we probably *should* panic --- but that
5324          * will happen anyway because we'll still be inside the critical section
5325          * established by ShutdownXLOG.)
5326          */
5327         END_CRIT_SECTION();
5328
5329         if (!shutdown)
5330                 ereport(DEBUG2,
5331                                 (errmsg("checkpoint starting")));
5332
5333         CheckPointCLOG();
5334         CheckPointSUBTRANS();
5335         CheckPointMultiXact();
5336         FlushBufferPool();
5337         /* We deliberately delay 2PC checkpointing as long as possible */
5338         CheckPointTwoPhase(checkPoint.redo);
5339
5340         START_CRIT_SECTION();
5341
5342         /*
5343          * Now insert the checkpoint record into XLOG.
5344          */
5345         rdata.data = (char *) (&checkPoint);
5346         rdata.len = sizeof(checkPoint);
5347         rdata.buffer = InvalidBuffer;
5348         rdata.next = NULL;
5349
5350         recptr = XLogInsert(RM_XLOG_ID,
5351                                                 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
5352                                                 XLOG_CHECKPOINT_ONLINE,
5353                                                 &rdata);
5354
5355         XLogFlush(recptr);
5356
5357         /*
5358          * We now have ProcLastRecPtr = start of actual checkpoint record, recptr
5359          * = end of actual checkpoint record.
5360          */
5361         if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
5362                 ereport(PANIC,
5363                                 (errmsg("concurrent transaction log activity while database system is shutting down")));
5364
5365         /*
5366          * Select point at which we can truncate the log, which we base on the
5367          * prior checkpoint's earliest info.
5368          */
5369         XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
5370
5371         /*
5372          * Update the control file.
5373          */
5374         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5375         if (shutdown)
5376                 ControlFile->state = DB_SHUTDOWNED;
5377         ControlFile->prevCheckPoint = ControlFile->checkPoint;
5378         ControlFile->checkPoint = ProcLastRecPtr;
5379         ControlFile->checkPointCopy = checkPoint;
5380         ControlFile->time = time(NULL);
5381         UpdateControlFile();
5382         LWLockRelease(ControlFileLock);
5383
5384         /*
5385          * We are now done with critical updates; no need for system panic if we
5386          * have trouble while fooling with offline log segments.
5387          */
5388         END_CRIT_SECTION();
5389
5390         /*
5391          * Delete offline log files (those no longer needed even for previous
5392          * checkpoint).
5393          */
5394         if (_logId || _logSeg)
5395         {
5396                 PrevLogSeg(_logId, _logSeg);
5397                 MoveOfflineLogs(_logId, _logSeg, recptr,
5398                                                 &nsegsremoved, &nsegsrecycled);
5399         }
5400
5401         /*
5402          * Make more log segments if needed.  (Do this after deleting offline log
5403          * segments, to avoid having peak disk space usage higher than necessary.)
5404          */
5405         if (!shutdown)
5406                 nsegsadded = PreallocXlogFiles(recptr);
5407
5408         /*
5409          * Truncate pg_subtrans if possible.  We can throw away all data before
5410          * the oldest XMIN of any running transaction.  No future transaction will
5411          * attempt to reference any pg_subtrans entry older than that (see Asserts
5412          * in subtrans.c).      During recovery, though, we mustn't do this because
5413          * StartupSUBTRANS hasn't been called yet.
5414          */
5415         if (!InRecovery)
5416                 TruncateSUBTRANS(GetOldestXmin(true, false));
5417
5418         if (!shutdown)
5419                 ereport(DEBUG2,
5420                                 (errmsg("checkpoint complete; %d transaction log file(s) added, %d removed, %d recycled",
5421                                                 nsegsadded, nsegsremoved, nsegsrecycled)));
5422
5423         LWLockRelease(CheckpointLock);
5424 }
5425
5426 /*
5427  * Write a NEXTOID log record
5428  */
5429 void
5430 XLogPutNextOid(Oid nextOid)
5431 {
5432         XLogRecData rdata;
5433
5434         rdata.data = (char *) (&nextOid);
5435         rdata.len = sizeof(Oid);
5436         rdata.buffer = InvalidBuffer;
5437         rdata.next = NULL;
5438         (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
5439
5440         /*
5441          * We need not flush the NEXTOID record immediately, because any of the
5442          * just-allocated OIDs could only reach disk as part of a tuple insert or
5443          * update that would have its own XLOG record that must follow the NEXTOID
5444          * record.      Therefore, the standard buffer LSN interlock applied to those
5445          * records will ensure no such OID reaches disk before the NEXTOID record
5446          * does.
5447          */
5448 }
5449
5450 /*
5451  * XLOG resource manager's routines
5452  */
5453 void
5454 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
5455 {
5456         uint8           info = record->xl_info & ~XLR_INFO_MASK;
5457
5458         if (info == XLOG_NEXTOID)
5459         {
5460                 Oid                     nextOid;
5461
5462                 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
5463                 if (ShmemVariableCache->nextOid < nextOid)
5464                 {
5465                         ShmemVariableCache->nextOid = nextOid;
5466                         ShmemVariableCache->oidCount = 0;
5467                 }
5468         }
5469         else if (info == XLOG_CHECKPOINT_SHUTDOWN)
5470         {
5471                 CheckPoint      checkPoint;
5472
5473                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5474                 /* In a SHUTDOWN checkpoint, believe the counters exactly */
5475                 ShmemVariableCache->nextXid = checkPoint.nextXid;
5476                 ShmemVariableCache->nextOid = checkPoint.nextOid;
5477                 ShmemVariableCache->oidCount = 0;
5478                 MultiXactSetNextMXact(checkPoint.nextMulti,
5479                                                           checkPoint.nextMultiOffset);
5480
5481                 /*
5482                  * TLI may change in a shutdown checkpoint, but it shouldn't decrease
5483                  */
5484                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
5485                 {
5486                         if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
5487                                 !list_member_int(expectedTLIs,
5488                                                                  (int) checkPoint.ThisTimeLineID))
5489                                 ereport(PANIC,
5490                                                 (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
5491                                                                 checkPoint.ThisTimeLineID, ThisTimeLineID)));
5492                         /* Following WAL records should be run with new TLI */
5493                         ThisTimeLineID = checkPoint.ThisTimeLineID;
5494                 }
5495         }
5496         else if (info == XLOG_CHECKPOINT_ONLINE)
5497         {
5498                 CheckPoint      checkPoint;
5499
5500                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
5501                 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
5502                 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
5503                                                                   checkPoint.nextXid))
5504                         ShmemVariableCache->nextXid = checkPoint.nextXid;
5505                 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
5506                 {
5507                         ShmemVariableCache->nextOid = checkPoint.nextOid;
5508                         ShmemVariableCache->oidCount = 0;
5509                 }
5510                 MultiXactAdvanceNextMXact(checkPoint.nextMulti,
5511                                                                   checkPoint.nextMultiOffset);
5512                 /* TLI should not change in an on-line checkpoint */
5513                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
5514                         ereport(PANIC,
5515                                         (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
5516                                                         checkPoint.ThisTimeLineID, ThisTimeLineID)));
5517         }
5518 }
5519
5520 void
5521 xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
5522 {
5523         uint8                   info = xl_info & ~XLR_INFO_MASK;
5524
5525         if (info == XLOG_CHECKPOINT_SHUTDOWN ||
5526                 info == XLOG_CHECKPOINT_ONLINE)
5527         {
5528                 CheckPoint *checkpoint = (CheckPoint *) rec;
5529
5530                 appendStringInfo(buf, "checkpoint: redo %X/%X; undo %X/%X; "
5531                                 "tli %u; xid %u; oid %u; multi %u; offset %u; %s",
5532                                 checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
5533                                 checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
5534                                 checkpoint->ThisTimeLineID, checkpoint->nextXid,
5535                                 checkpoint->nextOid,
5536                                 checkpoint->nextMulti,
5537                                 checkpoint->nextMultiOffset,
5538                                 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
5539         }
5540         else if (info == XLOG_NEXTOID)
5541         {
5542                 Oid                     nextOid;
5543
5544                 memcpy(&nextOid, rec, sizeof(Oid));
5545                 appendStringInfo(buf, "nextOid: %u", nextOid);
5546         }
5547         else
5548                 appendStringInfo(buf, "UNKNOWN");
5549 }
5550
5551 #ifdef WAL_DEBUG
5552
5553 static void
5554 xlog_outrec(StringInfo buf, XLogRecord *record)
5555 {
5556         int                     i;
5557
5558         appendStringInfo(buf, "prev %X/%X; xid %u",
5559                                          record->xl_prev.xlogid, record->xl_prev.xrecoff,
5560                                          record->xl_xid);
5561
5562         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
5563         {
5564                 if (record->xl_info & XLR_SET_BKP_BLOCK(i))
5565                         appendStringInfo(buf, "; bkpb%d", i+1);
5566         }
5567
5568         appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
5569 }
5570 #endif   /* WAL_DEBUG */
5571
5572
5573 /*
5574  * GUC support
5575  */
5576 const char *
5577 assign_xlog_sync_method(const char *method, bool doit, GucSource source)
5578 {
5579         int                     new_sync_method;
5580         int                     new_sync_bit;
5581
5582         if (pg_strcasecmp(method, "fsync") == 0)
5583         {
5584                 new_sync_method = SYNC_METHOD_FSYNC;
5585                 new_sync_bit = 0;
5586         }
5587 #ifdef HAVE_FSYNC_WRITETHROUGH
5588         else if (pg_strcasecmp(method, "fsync_writethrough") == 0)
5589         {
5590                 new_sync_method = SYNC_METHOD_FSYNC_WRITETHROUGH;
5591                 new_sync_bit = 0;
5592         }
5593 #endif
5594 #ifdef HAVE_FDATASYNC
5595         else if (pg_strcasecmp(method, "fdatasync") == 0)
5596         {
5597                 new_sync_method = SYNC_METHOD_FDATASYNC;
5598                 new_sync_bit = 0;
5599         }
5600 #endif
5601 #ifdef OPEN_SYNC_FLAG
5602         else if (pg_strcasecmp(method, "open_sync") == 0)
5603         {
5604                 new_sync_method = SYNC_METHOD_OPEN;
5605                 new_sync_bit = OPEN_SYNC_FLAG;
5606         }
5607 #endif
5608 #ifdef OPEN_DATASYNC_FLAG
5609         else if (pg_strcasecmp(method, "open_datasync") == 0)
5610         {
5611                 new_sync_method = SYNC_METHOD_OPEN;
5612                 new_sync_bit = OPEN_DATASYNC_FLAG;
5613         }
5614 #endif
5615         else
5616                 return NULL;
5617
5618         if (!doit)
5619                 return method;
5620
5621         if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
5622         {
5623                 /*
5624                  * To ensure that no blocks escape unsynced, force an fsync on the
5625                  * currently open log segment (if any).  Also, if the open flag is
5626                  * changing, close the log file so it will be reopened (with new flag
5627                  * bit) at next use.
5628                  */
5629                 if (openLogFile >= 0)
5630                 {
5631                         if (pg_fsync(openLogFile) != 0)
5632                                 ereport(PANIC,
5633                                                 (errcode_for_file_access(),
5634                                                  errmsg("could not fsync log file %u, segment %u: %m",
5635                                                                 openLogId, openLogSeg)));
5636                         if (open_sync_bit != new_sync_bit)
5637                                 XLogFileClose();
5638                 }
5639                 sync_method = new_sync_method;
5640                 open_sync_bit = new_sync_bit;
5641         }
5642
5643         return method;
5644 }
5645
5646
5647 /*
5648  * Issue appropriate kind of fsync (if any) on the current XLOG output file
5649  */
5650 static void
5651 issue_xlog_fsync(void)
5652 {
5653         switch (sync_method)
5654         {
5655                 case SYNC_METHOD_FSYNC:
5656                         if (pg_fsync_no_writethrough(openLogFile) != 0)
5657                                 ereport(PANIC,
5658                                                 (errcode_for_file_access(),
5659                                                  errmsg("could not fsync log file %u, segment %u: %m",
5660                                                                 openLogId, openLogSeg)));
5661                         break;
5662 #ifdef HAVE_FSYNC_WRITETHROUGH
5663                 case SYNC_METHOD_FSYNC_WRITETHROUGH:
5664                         if (pg_fsync_writethrough(openLogFile) != 0)
5665                                 ereport(PANIC,
5666                                                 (errcode_for_file_access(),
5667                                                  errmsg("could not fsync write-through log file %u, segment %u: %m",
5668                                                                 openLogId, openLogSeg)));
5669                         break;
5670 #endif
5671 #ifdef HAVE_FDATASYNC
5672                 case SYNC_METHOD_FDATASYNC:
5673                         if (pg_fdatasync(openLogFile) != 0)
5674                                 ereport(PANIC,
5675                                                 (errcode_for_file_access(),
5676                                         errmsg("could not fdatasync log file %u, segment %u: %m",
5677                                                    openLogId, openLogSeg)));
5678                         break;
5679 #endif
5680                 case SYNC_METHOD_OPEN:
5681                         /* write synced it already */
5682                         break;
5683                 default:
5684                         elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
5685                         break;
5686         }
5687 }
5688
5689
5690 /*
5691  * pg_start_backup: set up for taking an on-line backup dump
5692  *
5693  * Essentially what this does is to create a backup label file in $PGDATA,
5694  * where it will be archived as part of the backup dump.  The label file
5695  * contains the user-supplied label string (typically this would be used
5696  * to tell where the backup dump will be stored) and the starting time and
5697  * starting WAL offset for the dump.
5698  */
5699 Datum
5700 pg_start_backup(PG_FUNCTION_ARGS)
5701 {
5702         text       *backupid = PG_GETARG_TEXT_P(0);
5703         text       *result;
5704         char       *backupidstr;
5705         XLogRecPtr      checkpointloc;
5706         XLogRecPtr      startpoint;
5707         time_t          stamp_time;
5708         char            strfbuf[128];
5709         char            xlogfilename[MAXFNAMELEN];
5710         uint32          _logId;
5711         uint32          _logSeg;
5712         struct stat stat_buf;
5713         FILE       *fp;
5714
5715         if (!superuser())
5716                 ereport(ERROR,
5717                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5718                                  (errmsg("must be superuser to run a backup"))));
5719
5720         if (!XLogArchivingActive())
5721                 ereport(ERROR,
5722                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5723                                  (errmsg("WAL archiving is not active"),
5724                                   (errhint("archive_command must be defined before "
5725                                                    "online backups can be made safely.")))));
5726
5727         backupidstr = DatumGetCString(DirectFunctionCall1(textout,
5728                                                                                                  PointerGetDatum(backupid)));
5729
5730         /*
5731          * Mark backup active in shared memory.  We must do full-page WAL writes
5732          * during an on-line backup even if not doing so at other times, because
5733          * it's quite possible for the backup dump to obtain a "torn" (partially
5734          * written) copy of a database page if it reads the page concurrently
5735          * with our write to the same page.  This can be fixed as long as the
5736          * first write to the page in the WAL sequence is a full-page write.
5737          * Hence, we turn on forcePageWrites and then force a CHECKPOINT, to
5738          * ensure there are no dirty pages in shared memory that might get
5739          * dumped while the backup is in progress without having a corresponding
5740          * WAL record.  (Once the backup is complete, we need not force full-page
5741          * writes anymore, since we expect that any pages not modified during
5742          * the backup interval must have been correctly captured by the backup.)
5743          *
5744          * We must hold WALInsertLock to change the value of forcePageWrites,
5745          * to ensure adequate interlocking against XLogInsert().
5746          */
5747         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5748         if (XLogCtl->Insert.forcePageWrites)
5749         {
5750                 LWLockRelease(WALInsertLock);
5751                 ereport(ERROR,
5752                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5753                                  errmsg("a backup is already in progress"),
5754                                  errhint("Run pg_stop_backup() and try again.")));
5755         }
5756         XLogCtl->Insert.forcePageWrites = true;
5757         LWLockRelease(WALInsertLock);
5758
5759         /* Use a TRY block to ensure we release forcePageWrites if fail below */
5760         PG_TRY();
5761         {
5762                 /*
5763                  * Force a CHECKPOINT.  Aside from being necessary to prevent torn
5764                  * page problems, this guarantees that two successive backup runs will
5765                  * have different checkpoint positions and hence different history
5766                  * file names, even if nothing happened in between.
5767                  */
5768                 RequestCheckpoint(true, false);
5769
5770                 /*
5771                  * Now we need to fetch the checkpoint record location, and also its
5772                  * REDO pointer.  The oldest point in WAL that would be needed to
5773                  * restore starting from the checkpoint is precisely the REDO pointer.
5774                  */
5775                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5776                 checkpointloc = ControlFile->checkPoint;
5777                 startpoint = ControlFile->checkPointCopy.redo;
5778                 LWLockRelease(ControlFileLock);
5779
5780                 XLByteToSeg(startpoint, _logId, _logSeg);
5781                 XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
5782
5783                 /*
5784                  * We deliberately use strftime/localtime not the src/timezone
5785                  * functions, so that backup labels will consistently be recorded in
5786                  * the same timezone regardless of TimeZone setting.  This matches
5787                  * elog.c's practice.
5788                  */
5789                 stamp_time = time(NULL);
5790                 strftime(strfbuf, sizeof(strfbuf),
5791                                  "%Y-%m-%d %H:%M:%S %Z",
5792                                  localtime(&stamp_time));
5793
5794                 /*
5795                  * Check for existing backup label --- implies a backup is already
5796                  * running.  (XXX given that we checked forcePageWrites above, maybe
5797                  * it would be OK to just unlink any such label file?)
5798                  */
5799                 if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0)
5800                 {
5801                         if (errno != ENOENT)
5802                                 ereport(ERROR,
5803                                                 (errcode_for_file_access(),
5804                                                  errmsg("could not stat file \"%s\": %m",
5805                                                                 BACKUP_LABEL_FILE)));
5806                 }
5807                 else
5808                         ereport(ERROR,
5809                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5810                                          errmsg("a backup is already in progress"),
5811                                          errhint("If you're sure there is no backup in progress, remove file \"%s\" and try again.",
5812                                                          BACKUP_LABEL_FILE)));
5813
5814                 /*
5815                  * Okay, write the file
5816                  */
5817                 fp = AllocateFile(BACKUP_LABEL_FILE, "w");
5818                 if (!fp)
5819                         ereport(ERROR,
5820                                         (errcode_for_file_access(),
5821                                          errmsg("could not create file \"%s\": %m",
5822                                                         BACKUP_LABEL_FILE)));
5823                 fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
5824                                 startpoint.xlogid, startpoint.xrecoff, xlogfilename);
5825                 fprintf(fp, "CHECKPOINT LOCATION: %X/%X\n",
5826                                 checkpointloc.xlogid, checkpointloc.xrecoff);
5827                 fprintf(fp, "START TIME: %s\n", strfbuf);
5828                 fprintf(fp, "LABEL: %s\n", backupidstr);
5829                 if (fflush(fp) || ferror(fp) || FreeFile(fp))
5830                         ereport(ERROR,
5831                                         (errcode_for_file_access(),
5832                                          errmsg("could not write file \"%s\": %m",
5833                                                         BACKUP_LABEL_FILE)));
5834         }
5835         PG_CATCH();
5836         {
5837                 /* Turn off forcePageWrites on failure */
5838                 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5839                 XLogCtl->Insert.forcePageWrites = false;
5840                 LWLockRelease(WALInsertLock);
5841
5842                 PG_RE_THROW();
5843         }
5844         PG_END_TRY();
5845
5846         /*
5847          * We're done.  As a convenience, return the starting WAL offset.
5848          */
5849         snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
5850                          startpoint.xlogid, startpoint.xrecoff);
5851         result = DatumGetTextP(DirectFunctionCall1(textin,
5852                                                                                          CStringGetDatum(xlogfilename)));
5853         PG_RETURN_TEXT_P(result);
5854 }
5855
5856 /*
5857  * pg_stop_backup: finish taking an on-line backup dump
5858  *
5859  * We remove the backup label file created by pg_start_backup, and instead
5860  * create a backup history file in pg_xlog (whence it will immediately be
5861  * archived).  The backup history file contains the same info found in
5862  * the label file, plus the backup-end time and WAL offset.
5863  */
5864 Datum
5865 pg_stop_backup(PG_FUNCTION_ARGS)
5866 {
5867         text       *result;
5868         XLogCtlInsert *Insert = &XLogCtl->Insert;
5869         XLogRecPtr      startpoint;
5870         XLogRecPtr      stoppoint;
5871         time_t          stamp_time;
5872         char            strfbuf[128];
5873         char            histfilepath[MAXPGPATH];
5874         char            startxlogfilename[MAXFNAMELEN];
5875         char            stopxlogfilename[MAXFNAMELEN];
5876         uint32          _logId;
5877         uint32          _logSeg;
5878         FILE       *lfp;
5879         FILE       *fp;
5880         char            ch;
5881         int                     ich;
5882
5883         if (!superuser())
5884                 ereport(ERROR,
5885                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
5886                                  (errmsg("must be superuser to run a backup"))));
5887
5888         /*
5889          * Get the current end-of-WAL position; it will be unsafe to use this dump
5890          * to restore to a point in advance of this time.  We can also clear
5891          * forcePageWrites here.
5892          */
5893         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5894         INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
5895         XLogCtl->Insert.forcePageWrites = false;
5896         LWLockRelease(WALInsertLock);
5897
5898         XLByteToSeg(stoppoint, _logId, _logSeg);
5899         XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
5900
5901         /*
5902          * We deliberately use strftime/localtime not the src/timezone functions,
5903          * so that backup labels will consistently be recorded in the same
5904          * timezone regardless of TimeZone setting.  This matches elog.c's
5905          * practice.
5906          */
5907         stamp_time = time(NULL);
5908         strftime(strfbuf, sizeof(strfbuf),
5909                          "%Y-%m-%d %H:%M:%S %Z",
5910                          localtime(&stamp_time));
5911
5912         /*
5913          * Open the existing label file
5914          */
5915         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
5916         if (!lfp)
5917         {
5918                 if (errno != ENOENT)
5919                         ereport(ERROR,
5920                                         (errcode_for_file_access(),
5921                                          errmsg("could not read file \"%s\": %m",
5922                                                         BACKUP_LABEL_FILE)));
5923                 ereport(ERROR,
5924                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5925                                  errmsg("a backup is not in progress")));
5926         }
5927
5928         /*
5929          * Read and parse the START WAL LOCATION line (this code is pretty crude,
5930          * but we are not expecting any variability in the file format).
5931          */
5932         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %24s)%c",
5933                            &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
5934                            &ch) != 4 || ch != '\n')
5935                 ereport(ERROR,
5936                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
5937                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
5938
5939         /*
5940          * Write the backup history file
5941          */
5942         XLByteToSeg(startpoint, _logId, _logSeg);
5943         BackupHistoryFilePath(histfilepath, ThisTimeLineID, _logId, _logSeg,
5944                                                   startpoint.xrecoff % XLogSegSize);
5945         fp = AllocateFile(histfilepath, "w");
5946         if (!fp)
5947                 ereport(ERROR,
5948                                 (errcode_for_file_access(),
5949                                  errmsg("could not create file \"%s\": %m",
5950                                                 histfilepath)));
5951         fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
5952                         startpoint.xlogid, startpoint.xrecoff, startxlogfilename);
5953         fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
5954                         stoppoint.xlogid, stoppoint.xrecoff, stopxlogfilename);
5955         /* transfer remaining lines from label to history file */
5956         while ((ich = fgetc(lfp)) != EOF)
5957                 fputc(ich, fp);
5958         fprintf(fp, "STOP TIME: %s\n", strfbuf);
5959         if (fflush(fp) || ferror(fp) || FreeFile(fp))
5960                 ereport(ERROR,
5961                                 (errcode_for_file_access(),
5962                                  errmsg("could not write file \"%s\": %m",
5963                                                 histfilepath)));
5964
5965         /*
5966          * Close and remove the backup label file
5967          */
5968         if (ferror(lfp) || FreeFile(lfp))
5969                 ereport(ERROR,
5970                                 (errcode_for_file_access(),
5971                                  errmsg("could not read file \"%s\": %m",
5972                                                 BACKUP_LABEL_FILE)));
5973         if (unlink(BACKUP_LABEL_FILE) != 0)
5974                 ereport(ERROR,
5975                                 (errcode_for_file_access(),
5976                                  errmsg("could not remove file \"%s\": %m",
5977                                                 BACKUP_LABEL_FILE)));
5978
5979         /*
5980          * Clean out any no-longer-needed history files.  As a side effect,
5981          * this will post a .ready file for the newly created history file,
5982          * notifying the archiver that history file may be archived immediately.
5983          */
5984         CleanupBackupHistory();
5985
5986         /*
5987          * We're done.  As a convenience, return the ending WAL offset.
5988          */
5989         snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
5990                          stoppoint.xlogid, stoppoint.xrecoff);
5991         result = DatumGetTextP(DirectFunctionCall1(textin,
5992                                                                                  CStringGetDatum(stopxlogfilename)));
5993         PG_RETURN_TEXT_P(result);
5994 }
5995
5996 /*
5997  * read_backup_label: check to see if a backup_label file is present
5998  *
5999  * If we see a backup_label during recovery, we assume that we are recovering
6000  * from a backup dump file, and we therefore roll forward from the checkpoint
6001  * identified by the label file, NOT what pg_control says.      This avoids the
6002  * problem that pg_control might have been archived one or more checkpoints
6003  * later than the start of the dump, and so if we rely on it as the start
6004  * point, we will fail to restore a consistent database state.
6005  *
6006  * We also attempt to retrieve the corresponding backup history file.
6007  * If successful, set recoveryMinXlogOffset to constrain valid PITR stopping
6008  * points.
6009  *
6010  * Returns TRUE if a backup_label was found (and fills the checkpoint
6011  * location into *checkPointLoc); returns FALSE if not.
6012  */
6013 static bool
6014 read_backup_label(XLogRecPtr *checkPointLoc)
6015 {
6016         XLogRecPtr      startpoint;
6017         XLogRecPtr      stoppoint;
6018         char            histfilename[MAXFNAMELEN];
6019         char            histfilepath[MAXPGPATH];
6020         char            startxlogfilename[MAXFNAMELEN];
6021         char            stopxlogfilename[MAXFNAMELEN];
6022         TimeLineID      tli;
6023         uint32          _logId;
6024         uint32          _logSeg;
6025         FILE       *lfp;
6026         FILE       *fp;
6027         char            ch;
6028
6029         /*
6030          * See if label file is present
6031          */
6032         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6033         if (!lfp)
6034         {
6035                 if (errno != ENOENT)
6036                         ereport(FATAL,
6037                                         (errcode_for_file_access(),
6038                                          errmsg("could not read file \"%s\": %m",
6039                                                         BACKUP_LABEL_FILE)));
6040                 return false;                   /* it's not there, all is fine */
6041         }
6042
6043         /*
6044          * Read and parse the START WAL LOCATION and CHECKPOINT lines (this code
6045          * is pretty crude, but we are not expecting any variability in the file
6046          * format).
6047          */
6048         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
6049                            &startpoint.xlogid, &startpoint.xrecoff, &tli,
6050                            startxlogfilename, &ch) != 5 || ch != '\n')
6051                 ereport(FATAL,
6052                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6053                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6054         if (fscanf(lfp, "CHECKPOINT LOCATION: %X/%X%c",
6055                            &checkPointLoc->xlogid, &checkPointLoc->xrecoff,
6056                            &ch) != 3 || ch != '\n')
6057                 ereport(FATAL,
6058                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6059                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6060         if (ferror(lfp) || FreeFile(lfp))
6061                 ereport(FATAL,
6062                                 (errcode_for_file_access(),
6063                                  errmsg("could not read file \"%s\": %m",
6064                                                 BACKUP_LABEL_FILE)));
6065
6066         /*
6067          * Try to retrieve the backup history file (no error if we can't)
6068          */
6069         XLByteToSeg(startpoint, _logId, _logSeg);
6070         BackupHistoryFileName(histfilename, tli, _logId, _logSeg,
6071                                                   startpoint.xrecoff % XLogSegSize);
6072
6073         if (InArchiveRecovery)
6074                 RestoreArchivedFile(histfilepath, histfilename, "RECOVERYHISTORY", 0);
6075         else
6076                 BackupHistoryFilePath(histfilepath, tli, _logId, _logSeg,
6077                                                           startpoint.xrecoff % XLogSegSize);
6078
6079         fp = AllocateFile(histfilepath, "r");
6080         if (fp)
6081         {
6082                 /*
6083                  * Parse history file to identify stop point.
6084                  */
6085                 if (fscanf(fp, "START WAL LOCATION: %X/%X (file %24s)%c",
6086                                    &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
6087                                    &ch) != 4 || ch != '\n')
6088                         ereport(FATAL,
6089                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6090                                          errmsg("invalid data in file \"%s\"", histfilename)));
6091                 if (fscanf(fp, "STOP WAL LOCATION: %X/%X (file %24s)%c",
6092                                    &stoppoint.xlogid, &stoppoint.xrecoff, stopxlogfilename,
6093                                    &ch) != 4 || ch != '\n')
6094                         ereport(FATAL,
6095                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6096                                          errmsg("invalid data in file \"%s\"", histfilename)));
6097                 recoveryMinXlogOffset = stoppoint;
6098                 if (ferror(fp) || FreeFile(fp))
6099                         ereport(FATAL,
6100                                         (errcode_for_file_access(),
6101                                          errmsg("could not read file \"%s\": %m",
6102                                                         histfilepath)));
6103         }
6104
6105         return true;
6106 }
6107
6108 /*
6109  * remove_backup_label: remove any extant backup_label after successful
6110  * recovery.  Once we have completed the end-of-recovery checkpoint there
6111  * is no reason to have to replay from the start point indicated by the
6112  * label (and indeed we'll probably have removed/recycled the needed WAL
6113  * segments), so remove the label to prevent trouble in later crash recoveries.
6114  */
6115 static void
6116 remove_backup_label(void)
6117 {
6118         if (unlink(BACKUP_LABEL_FILE) != 0)
6119                 if (errno != ENOENT)
6120                         ereport(FATAL,
6121                                         (errcode_for_file_access(),
6122                                          errmsg("could not remove file \"%s\": %m",
6123                                                         BACKUP_LABEL_FILE)));
6124 }
6125
6126 /*
6127  * Error context callback for errors occurring during rm_redo().
6128  */
6129 static void
6130 rm_redo_error_callback(void *arg)
6131 {
6132         XLogRecord              *record = (XLogRecord *) arg;
6133         StringInfoData   buf;
6134
6135         initStringInfo(&buf);
6136         RmgrTable[record->xl_rmid].rm_desc(&buf, 
6137                                                                            record->xl_info, 
6138                                                                            XLogRecGetData(record));
6139
6140         /* don't bother emitting empty description */
6141         if (buf.len > 0)
6142                 errcontext("xlog redo %s", buf.data);
6143
6144         pfree(buf.data);
6145 }