OSDN Git Service

Introduce the concept of relation forks. An smgr relation can now consist
[pg-rex/syncrep.git] / src / backend / access / transam / xlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  *              PostgreSQL transaction log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.317 2008/08/11 11:05:10 heikki Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include <ctype.h>
18 #include <signal.h>
19 #include <time.h>
20 #include <sys/stat.h>
21 #include <sys/time.h>
22 #include <sys/wait.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/multixact.h"
27 #include "access/subtrans.h"
28 #include "access/transam.h"
29 #include "access/tuptoaster.h"
30 #include "access/twophase.h"
31 #include "access/xact.h"
32 #include "access/xlog_internal.h"
33 #include "access/xlogutils.h"
34 #include "catalog/catversion.h"
35 #include "catalog/pg_control.h"
36 #include "catalog/pg_type.h"
37 #include "funcapi.h"
38 #include "miscadmin.h"
39 #include "pgstat.h"
40 #include "postmaster/bgwriter.h"
41 #include "storage/bufmgr.h"
42 #include "storage/fd.h"
43 #include "storage/ipc.h"
44 #include "storage/pmsignal.h"
45 #include "storage/procarray.h"
46 #include "storage/smgr.h"
47 #include "storage/spin.h"
48 #include "utils/builtins.h"
49 #include "utils/pg_locale.h"
50 #include "utils/ps_status.h"
51
52
53 /* File path names (all relative to $PGDATA) */
54 #define BACKUP_LABEL_FILE               "backup_label"
55 #define BACKUP_LABEL_OLD                "backup_label.old"
56 #define RECOVERY_COMMAND_FILE   "recovery.conf"
57 #define RECOVERY_COMMAND_DONE   "recovery.done"
58
59
60 /* User-settable parameters */
61 int                     CheckPointSegments = 3;
62 int                     XLOGbuffers = 8;
63 int                     XLogArchiveTimeout = 0;
64 bool            XLogArchiveMode = false;
65 char       *XLogArchiveCommand = NULL;
66 bool            fullPageWrites = true;
67 bool            log_checkpoints = false;
68 int             sync_method = DEFAULT_SYNC_METHOD;
69
70 #ifdef WAL_DEBUG
71 bool            XLOG_DEBUG = false;
72 #endif
73
74 /*
75  * XLOGfileslop is the maximum number of preallocated future XLOG segments.
76  * When we are done with an old XLOG segment file, we will recycle it as a
77  * future XLOG segment as long as there aren't already XLOGfileslop future
78  * segments; else we'll delete it.  This could be made a separate GUC
79  * variable, but at present I think it's sufficient to hardwire it as
80  * 2*CheckPointSegments+1.      Under normal conditions, a checkpoint will free
81  * no more than 2*CheckPointSegments log segments, and we want to recycle all
82  * of them; the +1 allows boundary cases to happen without wasting a
83  * delete/create-segment cycle.
84  */
85 #define XLOGfileslop    (2*CheckPointSegments + 1)
86
87 /*
88  * GUC support
89  */
90 const struct config_enum_entry sync_method_options[] = {
91         {"fsync", SYNC_METHOD_FSYNC, false},
92 #ifdef HAVE_FSYNC_WRITETHROUGH
93         {"fsync_writethrough", SYNC_METHOD_FSYNC_WRITETHROUGH, false},
94 #endif
95 #ifdef HAVE_FDATASYNC
96         {"fdatasync", SYNC_METHOD_FDATASYNC, false},
97 #endif
98 #ifdef OPEN_SYNC_FLAG
99         {"open_sync", SYNC_METHOD_OPEN, false},
100 #endif
101 #ifdef OPEN_DATASYNC_FLAG
102         {"open_datasync", SYNC_METHOD_OPEN_DSYNC, false},
103 #endif
104         {NULL, 0, false}
105 };
106
107 /*
108  * Statistics for current checkpoint are collected in this global struct.
109  * Because only the background writer or a stand-alone backend can perform
110  * checkpoints, this will be unused in normal backends.
111  */
112 CheckpointStatsData CheckpointStats;
113
114 /*
115  * ThisTimeLineID will be same in all backends --- it identifies current
116  * WAL timeline for the database system.
117  */
118 TimeLineID      ThisTimeLineID = 0;
119
120 /* Are we doing recovery from XLOG? */
121 bool            InRecovery = false;
122
123 /* Are we recovering using offline XLOG archives? */
124 static bool InArchiveRecovery = false;
125
126 /* Was the last xlog file restored from archive, or local? */
127 static bool restoredFromArchive = false;
128
129 /* options taken from recovery.conf */
130 static char *recoveryRestoreCommand = NULL;
131 static bool recoveryTarget = false;
132 static bool recoveryTargetExact = false;
133 static bool recoveryTargetInclusive = true;
134 static bool recoveryLogRestartpoints = false;
135 static TransactionId recoveryTargetXid;
136 static TimestampTz recoveryTargetTime;
137 static TimestampTz recoveryLastXTime = 0;
138
139 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
140 static TransactionId recoveryStopXid;
141 static TimestampTz recoveryStopTime;
142 static bool recoveryStopAfter;
143
144 /*
145  * During normal operation, the only timeline we care about is ThisTimeLineID.
146  * During recovery, however, things are more complicated.  To simplify life
147  * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
148  * scan through the WAL history (that is, it is the line that was active when
149  * the currently-scanned WAL record was generated).  We also need these
150  * timeline values:
151  *
152  * recoveryTargetTLI: the desired timeline that we want to end in.
153  *
154  * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
155  * its known parents, newest first (so recoveryTargetTLI is always the
156  * first list member).  Only these TLIs are expected to be seen in the WAL
157  * segments we read, and indeed only these TLIs will be considered as
158  * candidate WAL files to open at all.
159  *
160  * curFileTLI: the TLI appearing in the name of the current input WAL file.
161  * (This is not necessarily the same as ThisTimeLineID, because we could
162  * be scanning data that was copied from an ancestor timeline when the current
163  * file was created.)  During a sequential scan we do not allow this value
164  * to decrease.
165  */
166 static TimeLineID recoveryTargetTLI;
167 static List *expectedTLIs;
168 static TimeLineID curFileTLI;
169
170 /*
171  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
172  * current backend.  It is updated for all inserts.  XactLastRecEnd points to
173  * end+1 of the last record, and is reset when we end a top-level transaction,
174  * or start a new one; so it can be used to tell if the current transaction has
175  * created any XLOG records.
176  */
177 static XLogRecPtr ProcLastRecPtr = {0, 0};
178
179 XLogRecPtr      XactLastRecEnd = {0, 0};
180
181 /*
182  * RedoRecPtr is this backend's local copy of the REDO record pointer
183  * (which is almost but not quite the same as a pointer to the most recent
184  * CHECKPOINT record).  We update this from the shared-memory copy,
185  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
186  * hold the Insert lock).  See XLogInsert for details.  We are also allowed
187  * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
188  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
189  * InitXLOGAccess.
190  */
191 static XLogRecPtr RedoRecPtr;
192
193 /*----------
194  * Shared-memory data structures for XLOG control
195  *
196  * LogwrtRqst indicates a byte position that we need to write and/or fsync
197  * the log up to (all records before that point must be written or fsynced).
198  * LogwrtResult indicates the byte positions we have already written/fsynced.
199  * These structs are identical but are declared separately to indicate their
200  * slightly different functions.
201  *
202  * We do a lot of pushups to minimize the amount of access to lockable
203  * shared memory values.  There are actually three shared-memory copies of
204  * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
205  *              XLogCtl->LogwrtResult is protected by info_lck
206  *              XLogCtl->Write.LogwrtResult is protected by WALWriteLock
207  *              XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
208  * One must hold the associated lock to read or write any of these, but
209  * of course no lock is needed to read/write the unshared LogwrtResult.
210  *
211  * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
212  * right", since both are updated by a write or flush operation before
213  * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
214  * is that it can be examined/modified by code that already holds WALWriteLock
215  * without needing to grab info_lck as well.
216  *
217  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
218  * but is updated when convenient.      Again, it exists for the convenience of
219  * code that is already holding WALInsertLock but not the other locks.
220  *
221  * The unshared LogwrtResult may lag behind any or all of these, and again
222  * is updated when convenient.
223  *
224  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
225  * (protected by info_lck), but we don't need to cache any copies of it.
226  *
227  * Note that this all works because the request and result positions can only
228  * advance forward, never back up, and so we can easily determine which of two
229  * values is "more up to date".
230  *
231  * info_lck is only held long enough to read/update the protected variables,
232  * so it's a plain spinlock.  The other locks are held longer (potentially
233  * over I/O operations), so we use LWLocks for them.  These locks are:
234  *
235  * WALInsertLock: must be held to insert a record into the WAL buffers.
236  *
237  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
238  * XLogFlush).
239  *
240  * ControlFileLock: must be held to read/update control file or create
241  * new log file.
242  *
243  * CheckpointLock: must be held to do a checkpoint (ensures only one
244  * checkpointer at a time; currently, with all checkpoints done by the
245  * bgwriter, this is just pro forma).
246  *
247  *----------
248  */
249
250 typedef struct XLogwrtRqst
251 {
252         XLogRecPtr      Write;                  /* last byte + 1 to write out */
253         XLogRecPtr      Flush;                  /* last byte + 1 to flush */
254 } XLogwrtRqst;
255
256 typedef struct XLogwrtResult
257 {
258         XLogRecPtr      Write;                  /* last byte + 1 written out */
259         XLogRecPtr      Flush;                  /* last byte + 1 flushed */
260 } XLogwrtResult;
261
262 /*
263  * Shared state data for XLogInsert.
264  */
265 typedef struct XLogCtlInsert
266 {
267         XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
268         XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
269         int                     curridx;                /* current block index in cache */
270         XLogPageHeader currpage;        /* points to header of block in cache */
271         char       *currpos;            /* current insertion point in cache */
272         XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
273         bool            forcePageWrites;        /* forcing full-page writes for PITR? */
274 } XLogCtlInsert;
275
276 /*
277  * Shared state data for XLogWrite/XLogFlush.
278  */
279 typedef struct XLogCtlWrite
280 {
281         XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
282         int                     curridx;                /* cache index of next block to write */
283         pg_time_t       lastSegSwitchTime;              /* time of last xlog segment switch */
284 } XLogCtlWrite;
285
286 /*
287  * Total shared-memory state for XLOG.
288  */
289 typedef struct XLogCtlData
290 {
291         /* Protected by WALInsertLock: */
292         XLogCtlInsert Insert;
293
294         /* Protected by info_lck: */
295         XLogwrtRqst LogwrtRqst;
296         XLogwrtResult LogwrtResult;
297         uint32          ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
298         TransactionId ckptXid;
299         XLogRecPtr      asyncCommitLSN; /* LSN of newest async commit */
300
301         /* Protected by WALWriteLock: */
302         XLogCtlWrite Write;
303
304         /*
305          * These values do not change after startup, although the pointed-to pages
306          * and xlblocks values certainly do.  Permission to read/write the pages
307          * and xlblocks values depends on WALInsertLock and WALWriteLock.
308          */
309         char       *pages;                      /* buffers for unwritten XLOG pages */
310         XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
311         int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
312         TimeLineID      ThisTimeLineID;
313
314         slock_t         info_lck;               /* locks shared variables shown above */
315 } XLogCtlData;
316
317 static XLogCtlData *XLogCtl = NULL;
318
319 /*
320  * We maintain an image of pg_control in shared memory.
321  */
322 static ControlFileData *ControlFile = NULL;
323
324 /*
325  * Macros for managing XLogInsert state.  In most cases, the calling routine
326  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
327  * so these are passed as parameters instead of being fetched via XLogCtl.
328  */
329
330 /* Free space remaining in the current xlog page buffer */
331 #define INSERT_FREESPACE(Insert)  \
332         (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
333
334 /* Construct XLogRecPtr value for current insertion point */
335 #define INSERT_RECPTR(recptr,Insert,curridx)  \
336         ( \
337           (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
338           (recptr).xrecoff = \
339                 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
340         )
341
342 #define PrevBufIdx(idx)         \
343                 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
344
345 #define NextBufIdx(idx)         \
346                 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
347
348 /*
349  * Private, possibly out-of-date copy of shared LogwrtResult.
350  * See discussion above.
351  */
352 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
353
354 /*
355  * openLogFile is -1 or a kernel FD for an open log file segment.
356  * When it's open, openLogOff is the current seek offset in the file.
357  * openLogId/openLogSeg identify the segment.  These variables are only
358  * used to write the XLOG, and so will normally refer to the active segment.
359  */
360 static int      openLogFile = -1;
361 static uint32 openLogId = 0;
362 static uint32 openLogSeg = 0;
363 static uint32 openLogOff = 0;
364
365 /*
366  * These variables are used similarly to the ones above, but for reading
367  * the XLOG.  Note, however, that readOff generally represents the offset
368  * of the page just read, not the seek position of the FD itself, which
369  * will be just past that page.
370  */
371 static int      readFile = -1;
372 static uint32 readId = 0;
373 static uint32 readSeg = 0;
374 static uint32 readOff = 0;
375
376 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
377 static char *readBuf = NULL;
378
379 /* Buffer for current ReadRecord result (expandable) */
380 static char *readRecordBuf = NULL;
381 static uint32 readRecordBufSize = 0;
382
383 /* State information for XLOG reading */
384 static XLogRecPtr ReadRecPtr;   /* start of last record read */
385 static XLogRecPtr EndRecPtr;    /* end+1 of last record read */
386 static XLogRecord *nextRecord = NULL;
387 static TimeLineID lastPageTLI = 0;
388
389 static bool InRedo = false;
390
391
392 static void XLogArchiveNotify(const char *xlog);
393 static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
394 static bool XLogArchiveCheckDone(const char *xlog, bool create_if_missing);
395 static void XLogArchiveCleanup(const char *xlog);
396 static void readRecoveryCommandFile(void);
397 static void exitArchiveRecovery(TimeLineID endTLI,
398                                         uint32 endLogId, uint32 endLogSeg);
399 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
400 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
401
402 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
403                                 XLogRecPtr *lsn, BkpBlock *bkpb);
404 static bool AdvanceXLInsertBuffer(bool new_segment);
405 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
406 static int XLogFileInit(uint32 log, uint32 seg,
407                          bool *use_existent, bool use_lock);
408 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
409                                            bool find_free, int *max_advance,
410                                            bool use_lock);
411 static int      XLogFileOpen(uint32 log, uint32 seg);
412 static int      XLogFileRead(uint32 log, uint32 seg, int emode);
413 static void XLogFileClose(void);
414 static bool RestoreArchivedFile(char *path, const char *xlogfname,
415                                         const char *recovername, off_t expectedSize);
416 static void PreallocXlogFiles(XLogRecPtr endptr);
417 static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
418 static void CleanupBackupHistory(void);
419 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
420 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
421 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
422 static List *readTimeLineHistory(TimeLineID targetTLI);
423 static bool existsTimeLineHistory(TimeLineID probeTLI);
424 static TimeLineID findNewestTimeLine(TimeLineID startTLI);
425 static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
426                                          TimeLineID endTLI,
427                                          uint32 endLogId, uint32 endLogSeg);
428 static void WriteControlFile(void);
429 static void ReadControlFile(void);
430 static char *str_time(pg_time_t tnow);
431 #ifdef WAL_DEBUG
432 static void xlog_outrec(StringInfo buf, XLogRecord *record);
433 #endif
434 static void issue_xlog_fsync(void);
435 static void pg_start_backup_callback(int code, Datum arg);
436 static bool read_backup_label(XLogRecPtr *checkPointLoc,
437                                   XLogRecPtr *minRecoveryLoc);
438 static void rm_redo_error_callback(void *arg);
439 static int get_sync_bit(int method);
440
441
442 /*
443  * Insert an XLOG record having the specified RMID and info bytes,
444  * with the body of the record being the data chunk(s) described by
445  * the rdata chain (see xlog.h for notes about rdata).
446  *
447  * Returns XLOG pointer to end of record (beginning of next record).
448  * This can be used as LSN for data pages affected by the logged action.
449  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
450  * before the data page can be written out.  This implements the basic
451  * WAL rule "write the log before the data".)
452  *
453  * NB: this routine feels free to scribble on the XLogRecData structs,
454  * though not on the data they reference.  This is OK since the XLogRecData
455  * structs are always just temporaries in the calling code.
456  */
457 XLogRecPtr
458 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
459 {
460         XLogCtlInsert *Insert = &XLogCtl->Insert;
461         XLogRecord *record;
462         XLogContRecord *contrecord;
463         XLogRecPtr      RecPtr;
464         XLogRecPtr      WriteRqst;
465         uint32          freespace;
466         int                     curridx;
467         XLogRecData *rdt;
468         Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
469         bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
470         BkpBlock        dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
471         XLogRecPtr      dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
472         XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
473         XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
474         XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
475         pg_crc32        rdata_crc;
476         uint32          len,
477                                 write_len;
478         unsigned        i;
479         bool            updrqst;
480         bool            doPageWrites;
481         bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
482
483         /* info's high bits are reserved for use by me */
484         if (info & XLR_INFO_MASK)
485                 elog(PANIC, "invalid xlog info mask %02X", info);
486
487         /*
488          * In bootstrap mode, we don't actually log anything but XLOG resources;
489          * return a phony record pointer.
490          */
491         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
492         {
493                 RecPtr.xlogid = 0;
494                 RecPtr.xrecoff = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
495                 return RecPtr;
496         }
497
498         /*
499          * Here we scan the rdata chain, determine which buffers must be backed
500          * up, and compute the CRC values for the data.  Note that the record
501          * header isn't added into the CRC initially since we don't know the final
502          * length or info bits quite yet.  Thus, the CRC will represent the CRC of
503          * the whole record in the order "rdata, then backup blocks, then record
504          * header".
505          *
506          * We may have to loop back to here if a race condition is detected below.
507          * We could prevent the race by doing all this work while holding the
508          * insert lock, but it seems better to avoid doing CRC calculations while
509          * holding the lock.  This means we have to be careful about modifying the
510          * rdata chain until we know we aren't going to loop back again.  The only
511          * change we allow ourselves to make earlier is to set rdt->data = NULL in
512          * chain items we have decided we will have to back up the whole buffer
513          * for.  This is OK because we will certainly decide the same thing again
514          * for those items if we do it over; doing it here saves an extra pass
515          * over the chain later.
516          */
517 begin:;
518         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
519         {
520                 dtbuf[i] = InvalidBuffer;
521                 dtbuf_bkp[i] = false;
522         }
523
524         /*
525          * Decide if we need to do full-page writes in this XLOG record: true if
526          * full_page_writes is on or we have a PITR request for it.  Since we
527          * don't yet have the insert lock, forcePageWrites could change under us,
528          * but we'll recheck it once we have the lock.
529          */
530         doPageWrites = fullPageWrites || Insert->forcePageWrites;
531
532         INIT_CRC32(rdata_crc);
533         len = 0;
534         for (rdt = rdata;;)
535         {
536                 if (rdt->buffer == InvalidBuffer)
537                 {
538                         /* Simple data, just include it */
539                         len += rdt->len;
540                         COMP_CRC32(rdata_crc, rdt->data, rdt->len);
541                 }
542                 else
543                 {
544                         /* Find info for buffer */
545                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
546                         {
547                                 if (rdt->buffer == dtbuf[i])
548                                 {
549                                         /* Buffer already referenced by earlier chain item */
550                                         if (dtbuf_bkp[i])
551                                                 rdt->data = NULL;
552                                         else if (rdt->data)
553                                         {
554                                                 len += rdt->len;
555                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
556                                         }
557                                         break;
558                                 }
559                                 if (dtbuf[i] == InvalidBuffer)
560                                 {
561                                         /* OK, put it in this slot */
562                                         dtbuf[i] = rdt->buffer;
563                                         if (XLogCheckBuffer(rdt, doPageWrites,
564                                                                                 &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
565                                         {
566                                                 dtbuf_bkp[i] = true;
567                                                 rdt->data = NULL;
568                                         }
569                                         else if (rdt->data)
570                                         {
571                                                 len += rdt->len;
572                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
573                                         }
574                                         break;
575                                 }
576                         }
577                         if (i >= XLR_MAX_BKP_BLOCKS)
578                                 elog(PANIC, "can backup at most %d blocks per xlog record",
579                                          XLR_MAX_BKP_BLOCKS);
580                 }
581                 /* Break out of loop when rdt points to last chain item */
582                 if (rdt->next == NULL)
583                         break;
584                 rdt = rdt->next;
585         }
586
587         /*
588          * Now add the backup block headers and data into the CRC
589          */
590         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
591         {
592                 if (dtbuf_bkp[i])
593                 {
594                         BkpBlock   *bkpb = &(dtbuf_xlg[i]);
595                         char       *page;
596
597                         COMP_CRC32(rdata_crc,
598                                            (char *) bkpb,
599                                            sizeof(BkpBlock));
600                         page = (char *) BufferGetBlock(dtbuf[i]);
601                         if (bkpb->hole_length == 0)
602                         {
603                                 COMP_CRC32(rdata_crc,
604                                                    page,
605                                                    BLCKSZ);
606                         }
607                         else
608                         {
609                                 /* must skip the hole */
610                                 COMP_CRC32(rdata_crc,
611                                                    page,
612                                                    bkpb->hole_offset);
613                                 COMP_CRC32(rdata_crc,
614                                                    page + (bkpb->hole_offset + bkpb->hole_length),
615                                                    BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
616                         }
617                 }
618         }
619
620         /*
621          * NOTE: We disallow len == 0 because it provides a useful bit of extra
622          * error checking in ReadRecord.  This means that all callers of
623          * XLogInsert must supply at least some not-in-a-buffer data.  However, we
624          * make an exception for XLOG SWITCH records because we don't want them to
625          * ever cross a segment boundary.
626          */
627         if (len == 0 && !isLogSwitch)
628                 elog(PANIC, "invalid xlog record length %u", len);
629
630         START_CRIT_SECTION();
631
632         /* Now wait to get insert lock */
633         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
634
635         /*
636          * Check to see if my RedoRecPtr is out of date.  If so, may have to go
637          * back and recompute everything.  This can only happen just after a
638          * checkpoint, so it's better to be slow in this case and fast otherwise.
639          *
640          * If we aren't doing full-page writes then RedoRecPtr doesn't actually
641          * affect the contents of the XLOG record, so we'll update our local copy
642          * but not force a recomputation.
643          */
644         if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
645         {
646                 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
647                 RedoRecPtr = Insert->RedoRecPtr;
648
649                 if (doPageWrites)
650                 {
651                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
652                         {
653                                 if (dtbuf[i] == InvalidBuffer)
654                                         continue;
655                                 if (dtbuf_bkp[i] == false &&
656                                         XLByteLE(dtbuf_lsn[i], RedoRecPtr))
657                                 {
658                                         /*
659                                          * Oops, this buffer now needs to be backed up, but we
660                                          * didn't think so above.  Start over.
661                                          */
662                                         LWLockRelease(WALInsertLock);
663                                         END_CRIT_SECTION();
664                                         goto begin;
665                                 }
666                         }
667                 }
668         }
669
670         /*
671          * Also check to see if forcePageWrites was just turned on; if we weren't
672          * already doing full-page writes then go back and recompute. (If it was
673          * just turned off, we could recompute the record without full pages, but
674          * we choose not to bother.)
675          */
676         if (Insert->forcePageWrites && !doPageWrites)
677         {
678                 /* Oops, must redo it with full-page data */
679                 LWLockRelease(WALInsertLock);
680                 END_CRIT_SECTION();
681                 goto begin;
682         }
683
684         /*
685          * Make additional rdata chain entries for the backup blocks, so that we
686          * don't need to special-case them in the write loop.  Note that we have
687          * now irrevocably changed the input rdata chain.  At the exit of this
688          * loop, write_len includes the backup block data.
689          *
690          * Also set the appropriate info bits to show which buffers were backed
691          * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
692          * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
693          */
694         write_len = len;
695         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
696         {
697                 BkpBlock   *bkpb;
698                 char       *page;
699
700                 if (!dtbuf_bkp[i])
701                         continue;
702
703                 info |= XLR_SET_BKP_BLOCK(i);
704
705                 bkpb = &(dtbuf_xlg[i]);
706                 page = (char *) BufferGetBlock(dtbuf[i]);
707
708                 rdt->next = &(dtbuf_rdt1[i]);
709                 rdt = rdt->next;
710
711                 rdt->data = (char *) bkpb;
712                 rdt->len = sizeof(BkpBlock);
713                 write_len += sizeof(BkpBlock);
714
715                 rdt->next = &(dtbuf_rdt2[i]);
716                 rdt = rdt->next;
717
718                 if (bkpb->hole_length == 0)
719                 {
720                         rdt->data = page;
721                         rdt->len = BLCKSZ;
722                         write_len += BLCKSZ;
723                         rdt->next = NULL;
724                 }
725                 else
726                 {
727                         /* must skip the hole */
728                         rdt->data = page;
729                         rdt->len = bkpb->hole_offset;
730                         write_len += bkpb->hole_offset;
731
732                         rdt->next = &(dtbuf_rdt3[i]);
733                         rdt = rdt->next;
734
735                         rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
736                         rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
737                         write_len += rdt->len;
738                         rdt->next = NULL;
739                 }
740         }
741
742         /*
743          * If we backed up any full blocks and online backup is not in progress,
744          * mark the backup blocks as removable.  This allows the WAL archiver to
745          * know whether it is safe to compress archived WAL data by transforming
746          * full-block records into the non-full-block format.
747          *
748          * Note: we could just set the flag whenever !forcePageWrites, but
749          * defining it like this leaves the info bit free for some potential other
750          * use in records without any backup blocks.
751          */
752         if ((info & XLR_BKP_BLOCK_MASK) && !Insert->forcePageWrites)
753                 info |= XLR_BKP_REMOVABLE;
754
755         /*
756          * If there isn't enough space on the current XLOG page for a record
757          * header, advance to the next page (leaving the unused space as zeroes).
758          */
759         updrqst = false;
760         freespace = INSERT_FREESPACE(Insert);
761         if (freespace < SizeOfXLogRecord)
762         {
763                 updrqst = AdvanceXLInsertBuffer(false);
764                 freespace = INSERT_FREESPACE(Insert);
765         }
766
767         /* Compute record's XLOG location */
768         curridx = Insert->curridx;
769         INSERT_RECPTR(RecPtr, Insert, curridx);
770
771         /*
772          * If the record is an XLOG_SWITCH, and we are exactly at the start of a
773          * segment, we need not insert it (and don't want to because we'd like
774          * consecutive switch requests to be no-ops).  Instead, make sure
775          * everything is written and flushed through the end of the prior segment,
776          * and return the prior segment's end address.
777          */
778         if (isLogSwitch &&
779                 (RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
780         {
781                 /* We can release insert lock immediately */
782                 LWLockRelease(WALInsertLock);
783
784                 RecPtr.xrecoff -= SizeOfXLogLongPHD;
785                 if (RecPtr.xrecoff == 0)
786                 {
787                         /* crossing a logid boundary */
788                         RecPtr.xlogid -= 1;
789                         RecPtr.xrecoff = XLogFileSize;
790                 }
791
792                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
793                 LogwrtResult = XLogCtl->Write.LogwrtResult;
794                 if (!XLByteLE(RecPtr, LogwrtResult.Flush))
795                 {
796                         XLogwrtRqst FlushRqst;
797
798                         FlushRqst.Write = RecPtr;
799                         FlushRqst.Flush = RecPtr;
800                         XLogWrite(FlushRqst, false, false);
801                 }
802                 LWLockRelease(WALWriteLock);
803
804                 END_CRIT_SECTION();
805
806                 return RecPtr;
807         }
808
809         /* Insert record header */
810
811         record = (XLogRecord *) Insert->currpos;
812         record->xl_prev = Insert->PrevRecord;
813         record->xl_xid = GetCurrentTransactionIdIfAny();
814         record->xl_tot_len = SizeOfXLogRecord + write_len;
815         record->xl_len = len;           /* doesn't include backup blocks */
816         record->xl_info = info;
817         record->xl_rmid = rmid;
818
819         /* Now we can finish computing the record's CRC */
820         COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
821                            SizeOfXLogRecord - sizeof(pg_crc32));
822         FIN_CRC32(rdata_crc);
823         record->xl_crc = rdata_crc;
824
825 #ifdef WAL_DEBUG
826         if (XLOG_DEBUG)
827         {
828                 StringInfoData buf;
829
830                 initStringInfo(&buf);
831                 appendStringInfo(&buf, "INSERT @ %X/%X: ",
832                                                  RecPtr.xlogid, RecPtr.xrecoff);
833                 xlog_outrec(&buf, record);
834                 if (rdata->data != NULL)
835                 {
836                         appendStringInfo(&buf, " - ");
837                         RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
838                 }
839                 elog(LOG, "%s", buf.data);
840                 pfree(buf.data);
841         }
842 #endif
843
844         /* Record begin of record in appropriate places */
845         ProcLastRecPtr = RecPtr;
846         Insert->PrevRecord = RecPtr;
847
848         Insert->currpos += SizeOfXLogRecord;
849         freespace -= SizeOfXLogRecord;
850
851         /*
852          * Append the data, including backup blocks if any
853          */
854         while (write_len)
855         {
856                 while (rdata->data == NULL)
857                         rdata = rdata->next;
858
859                 if (freespace > 0)
860                 {
861                         if (rdata->len > freespace)
862                         {
863                                 memcpy(Insert->currpos, rdata->data, freespace);
864                                 rdata->data += freespace;
865                                 rdata->len -= freespace;
866                                 write_len -= freespace;
867                         }
868                         else
869                         {
870                                 memcpy(Insert->currpos, rdata->data, rdata->len);
871                                 freespace -= rdata->len;
872                                 write_len -= rdata->len;
873                                 Insert->currpos += rdata->len;
874                                 rdata = rdata->next;
875                                 continue;
876                         }
877                 }
878
879                 /* Use next buffer */
880                 updrqst = AdvanceXLInsertBuffer(false);
881                 curridx = Insert->curridx;
882                 /* Insert cont-record header */
883                 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
884                 contrecord = (XLogContRecord *) Insert->currpos;
885                 contrecord->xl_rem_len = write_len;
886                 Insert->currpos += SizeOfXLogContRecord;
887                 freespace = INSERT_FREESPACE(Insert);
888         }
889
890         /* Ensure next record will be properly aligned */
891         Insert->currpos = (char *) Insert->currpage +
892                 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
893         freespace = INSERT_FREESPACE(Insert);
894
895         /*
896          * The recptr I return is the beginning of the *next* record. This will be
897          * stored as LSN for changed data pages...
898          */
899         INSERT_RECPTR(RecPtr, Insert, curridx);
900
901         /*
902          * If the record is an XLOG_SWITCH, we must now write and flush all the
903          * existing data, and then forcibly advance to the start of the next
904          * segment.  It's not good to do this I/O while holding the insert lock,
905          * but there seems too much risk of confusion if we try to release the
906          * lock sooner.  Fortunately xlog switch needn't be a high-performance
907          * operation anyway...
908          */
909         if (isLogSwitch)
910         {
911                 XLogCtlWrite *Write = &XLogCtl->Write;
912                 XLogwrtRqst FlushRqst;
913                 XLogRecPtr      OldSegEnd;
914
915                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
916
917                 /*
918                  * Flush through the end of the page containing XLOG_SWITCH, and
919                  * perform end-of-segment actions (eg, notifying archiver).
920                  */
921                 WriteRqst = XLogCtl->xlblocks[curridx];
922                 FlushRqst.Write = WriteRqst;
923                 FlushRqst.Flush = WriteRqst;
924                 XLogWrite(FlushRqst, false, true);
925
926                 /* Set up the next buffer as first page of next segment */
927                 /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
928                 (void) AdvanceXLInsertBuffer(true);
929
930                 /* There should be no unwritten data */
931                 curridx = Insert->curridx;
932                 Assert(curridx == Write->curridx);
933
934                 /* Compute end address of old segment */
935                 OldSegEnd = XLogCtl->xlblocks[curridx];
936                 OldSegEnd.xrecoff -= XLOG_BLCKSZ;
937                 if (OldSegEnd.xrecoff == 0)
938                 {
939                         /* crossing a logid boundary */
940                         OldSegEnd.xlogid -= 1;
941                         OldSegEnd.xrecoff = XLogFileSize;
942                 }
943
944                 /* Make it look like we've written and synced all of old segment */
945                 LogwrtResult.Write = OldSegEnd;
946                 LogwrtResult.Flush = OldSegEnd;
947
948                 /*
949                  * Update shared-memory status --- this code should match XLogWrite
950                  */
951                 {
952                         /* use volatile pointer to prevent code rearrangement */
953                         volatile XLogCtlData *xlogctl = XLogCtl;
954
955                         SpinLockAcquire(&xlogctl->info_lck);
956                         xlogctl->LogwrtResult = LogwrtResult;
957                         if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
958                                 xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
959                         if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
960                                 xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
961                         SpinLockRelease(&xlogctl->info_lck);
962                 }
963
964                 Write->LogwrtResult = LogwrtResult;
965
966                 LWLockRelease(WALWriteLock);
967
968                 updrqst = false;                /* done already */
969         }
970         else
971         {
972                 /* normal case, ie not xlog switch */
973
974                 /* Need to update shared LogwrtRqst if some block was filled up */
975                 if (freespace < SizeOfXLogRecord)
976                 {
977                         /* curridx is filled and available for writing out */
978                         updrqst = true;
979                 }
980                 else
981                 {
982                         /* if updrqst already set, write through end of previous buf */
983                         curridx = PrevBufIdx(curridx);
984                 }
985                 WriteRqst = XLogCtl->xlblocks[curridx];
986         }
987
988         LWLockRelease(WALInsertLock);
989
990         if (updrqst)
991         {
992                 /* use volatile pointer to prevent code rearrangement */
993                 volatile XLogCtlData *xlogctl = XLogCtl;
994
995                 SpinLockAcquire(&xlogctl->info_lck);
996                 /* advance global request to include new block(s) */
997                 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
998                         xlogctl->LogwrtRqst.Write = WriteRqst;
999                 /* update local result copy while I have the chance */
1000                 LogwrtResult = xlogctl->LogwrtResult;
1001                 SpinLockRelease(&xlogctl->info_lck);
1002         }
1003
1004         XactLastRecEnd = RecPtr;
1005
1006         END_CRIT_SECTION();
1007
1008         return RecPtr;
1009 }
1010
1011 /*
1012  * Determine whether the buffer referenced by an XLogRecData item has to
1013  * be backed up, and if so fill a BkpBlock struct for it.  In any case
1014  * save the buffer's LSN at *lsn.
1015  */
1016 static bool
1017 XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
1018                                 XLogRecPtr *lsn, BkpBlock *bkpb)
1019 {
1020         Page            page;
1021
1022         page = BufferGetPage(rdata->buffer);
1023
1024         /*
1025          * XXX We assume page LSN is first data on *every* page that can be passed
1026          * to XLogInsert, whether it otherwise has the standard page layout or
1027          * not.
1028          */
1029         *lsn = PageGetLSN(page);
1030
1031         if (doPageWrites &&
1032                 XLByteLE(PageGetLSN(page), RedoRecPtr))
1033         {
1034                 /*
1035                  * The page needs to be backed up, so set up *bkpb
1036                  */
1037                 BufferGetTag(rdata->buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
1038
1039                 if (rdata->buffer_std)
1040                 {
1041                         /* Assume we can omit data between pd_lower and pd_upper */
1042                         uint16          lower = ((PageHeader) page)->pd_lower;
1043                         uint16          upper = ((PageHeader) page)->pd_upper;
1044
1045                         if (lower >= SizeOfPageHeaderData &&
1046                                 upper > lower &&
1047                                 upper <= BLCKSZ)
1048                         {
1049                                 bkpb->hole_offset = lower;
1050                                 bkpb->hole_length = upper - lower;
1051                         }
1052                         else
1053                         {
1054                                 /* No "hole" to compress out */
1055                                 bkpb->hole_offset = 0;
1056                                 bkpb->hole_length = 0;
1057                         }
1058                 }
1059                 else
1060                 {
1061                         /* Not a standard page header, don't try to eliminate "hole" */
1062                         bkpb->hole_offset = 0;
1063                         bkpb->hole_length = 0;
1064                 }
1065
1066                 return true;                    /* buffer requires backup */
1067         }
1068
1069         return false;                           /* buffer does not need to be backed up */
1070 }
1071
1072 /*
1073  * XLogArchiveNotify
1074  *
1075  * Create an archive notification file
1076  *
1077  * The name of the notification file is the message that will be picked up
1078  * by the archiver, e.g. we write 0000000100000001000000C6.ready
1079  * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
1080  * then when complete, rename it to 0000000100000001000000C6.done
1081  */
1082 static void
1083 XLogArchiveNotify(const char *xlog)
1084 {
1085         char            archiveStatusPath[MAXPGPATH];
1086         FILE       *fd;
1087
1088         /* insert an otherwise empty file called <XLOG>.ready */
1089         StatusFilePath(archiveStatusPath, xlog, ".ready");
1090         fd = AllocateFile(archiveStatusPath, "w");
1091         if (fd == NULL)
1092         {
1093                 ereport(LOG,
1094                                 (errcode_for_file_access(),
1095                                  errmsg("could not create archive status file \"%s\": %m",
1096                                                 archiveStatusPath)));
1097                 return;
1098         }
1099         if (FreeFile(fd))
1100         {
1101                 ereport(LOG,
1102                                 (errcode_for_file_access(),
1103                                  errmsg("could not write archive status file \"%s\": %m",
1104                                                 archiveStatusPath)));
1105                 return;
1106         }
1107
1108         /* Notify archiver that it's got something to do */
1109         if (IsUnderPostmaster)
1110                 SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);
1111 }
1112
1113 /*
1114  * Convenience routine to notify using log/seg representation of filename
1115  */
1116 static void
1117 XLogArchiveNotifySeg(uint32 log, uint32 seg)
1118 {
1119         char            xlog[MAXFNAMELEN];
1120
1121         XLogFileName(xlog, ThisTimeLineID, log, seg);
1122         XLogArchiveNotify(xlog);
1123 }
1124
1125 /*
1126  * XLogArchiveCheckDone
1127  *
1128  * This is called when we are ready to delete or recycle an old XLOG segment
1129  * file or backup history file.  If it is okay to delete it then return true.
1130  * If it is not time to delete it, make sure a .ready file exists, and return
1131  * false.
1132  *
1133  * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists,
1134  * then return false; else create <XLOG>.ready and return false.
1135  *
1136  * The reason we do things this way is so that if the original attempt to
1137  * create <XLOG>.ready fails, we'll retry during subsequent checkpoints.
1138  */
1139 static bool
1140 XLogArchiveCheckDone(const char *xlog, bool create_if_missing)
1141 {
1142         char            archiveStatusPath[MAXPGPATH];
1143         struct stat stat_buf;
1144
1145         /* Always deletable if archiving is off */
1146         if (!XLogArchivingActive())
1147                 return true;
1148
1149         /* First check for .done --- this means archiver is done with it */
1150         StatusFilePath(archiveStatusPath, xlog, ".done");
1151         if (stat(archiveStatusPath, &stat_buf) == 0)
1152                 return true;
1153
1154         /* check for .ready --- this means archiver is still busy with it */
1155         StatusFilePath(archiveStatusPath, xlog, ".ready");
1156         if (stat(archiveStatusPath, &stat_buf) == 0)
1157                 return false;
1158
1159         /* Race condition --- maybe archiver just finished, so recheck */
1160         StatusFilePath(archiveStatusPath, xlog, ".done");
1161         if (stat(archiveStatusPath, &stat_buf) == 0)
1162                 return true;
1163
1164         /* Retry creation of the .ready file */
1165         if (create_if_missing)
1166                 XLogArchiveNotify(xlog);
1167
1168         return false;
1169 }
1170
1171 /*
1172  * XLogArchiveCleanup
1173  *
1174  * Cleanup archive notification file(s) for a particular xlog segment
1175  */
1176 static void
1177 XLogArchiveCleanup(const char *xlog)
1178 {
1179         char            archiveStatusPath[MAXPGPATH];
1180
1181         /* Remove the .done file */
1182         StatusFilePath(archiveStatusPath, xlog, ".done");
1183         unlink(archiveStatusPath);
1184         /* should we complain about failure? */
1185
1186         /* Remove the .ready file if present --- normally it shouldn't be */
1187         StatusFilePath(archiveStatusPath, xlog, ".ready");
1188         unlink(archiveStatusPath);
1189         /* should we complain about failure? */
1190 }
1191
1192 /*
1193  * Advance the Insert state to the next buffer page, writing out the next
1194  * buffer if it still contains unwritten data.
1195  *
1196  * If new_segment is TRUE then we set up the next buffer page as the first
1197  * page of the next xlog segment file, possibly but not usually the next
1198  * consecutive file page.
1199  *
1200  * The global LogwrtRqst.Write pointer needs to be advanced to include the
1201  * just-filled page.  If we can do this for free (without an extra lock),
1202  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
1203  * request update still needs to be done, FALSE if we did it internally.
1204  *
1205  * Must be called with WALInsertLock held.
1206  */
1207 static bool
1208 AdvanceXLInsertBuffer(bool new_segment)
1209 {
1210         XLogCtlInsert *Insert = &XLogCtl->Insert;
1211         XLogCtlWrite *Write = &XLogCtl->Write;
1212         int                     nextidx = NextBufIdx(Insert->curridx);
1213         bool            update_needed = true;
1214         XLogRecPtr      OldPageRqstPtr;
1215         XLogwrtRqst WriteRqst;
1216         XLogRecPtr      NewPageEndPtr;
1217         XLogPageHeader NewPage;
1218
1219         /* Use Insert->LogwrtResult copy if it's more fresh */
1220         if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
1221                 LogwrtResult = Insert->LogwrtResult;
1222
1223         /*
1224          * Get ending-offset of the buffer page we need to replace (this may be
1225          * zero if the buffer hasn't been used yet).  Fall through if it's already
1226          * written out.
1227          */
1228         OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
1229         if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1230         {
1231                 /* nope, got work to do... */
1232                 XLogRecPtr      FinishedPageRqstPtr;
1233
1234                 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1235
1236                 /* Before waiting, get info_lck and update LogwrtResult */
1237                 {
1238                         /* use volatile pointer to prevent code rearrangement */
1239                         volatile XLogCtlData *xlogctl = XLogCtl;
1240
1241                         SpinLockAcquire(&xlogctl->info_lck);
1242                         if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
1243                                 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
1244                         LogwrtResult = xlogctl->LogwrtResult;
1245                         SpinLockRelease(&xlogctl->info_lck);
1246                 }
1247
1248                 update_needed = false;  /* Did the shared-request update */
1249
1250                 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1251                 {
1252                         /* OK, someone wrote it already */
1253                         Insert->LogwrtResult = LogwrtResult;
1254                 }
1255                 else
1256                 {
1257                         /* Must acquire write lock */
1258                         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1259                         LogwrtResult = Write->LogwrtResult;
1260                         if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1261                         {
1262                                 /* OK, someone wrote it already */
1263                                 LWLockRelease(WALWriteLock);
1264                                 Insert->LogwrtResult = LogwrtResult;
1265                         }
1266                         else
1267                         {
1268                                 /*
1269                                  * Have to write buffers while holding insert lock. This is
1270                                  * not good, so only write as much as we absolutely must.
1271                                  */
1272                                 WriteRqst.Write = OldPageRqstPtr;
1273                                 WriteRqst.Flush.xlogid = 0;
1274                                 WriteRqst.Flush.xrecoff = 0;
1275                                 XLogWrite(WriteRqst, false, false);
1276                                 LWLockRelease(WALWriteLock);
1277                                 Insert->LogwrtResult = LogwrtResult;
1278                         }
1279                 }
1280         }
1281
1282         /*
1283          * Now the next buffer slot is free and we can set it up to be the next
1284          * output page.
1285          */
1286         NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
1287
1288         if (new_segment)
1289         {
1290                 /* force it to a segment start point */
1291                 NewPageEndPtr.xrecoff += XLogSegSize - 1;
1292                 NewPageEndPtr.xrecoff -= NewPageEndPtr.xrecoff % XLogSegSize;
1293         }
1294
1295         if (NewPageEndPtr.xrecoff >= XLogFileSize)
1296         {
1297                 /* crossing a logid boundary */
1298                 NewPageEndPtr.xlogid += 1;
1299                 NewPageEndPtr.xrecoff = XLOG_BLCKSZ;
1300         }
1301         else
1302                 NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
1303         XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
1304         NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
1305
1306         Insert->curridx = nextidx;
1307         Insert->currpage = NewPage;
1308
1309         Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
1310
1311         /*
1312          * Be sure to re-zero the buffer so that bytes beyond what we've written
1313          * will look like zeroes and not valid XLOG records...
1314          */
1315         MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
1316
1317         /*
1318          * Fill the new page's header
1319          */
1320         NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
1321
1322         /* NewPage->xlp_info = 0; */    /* done by memset */
1323         NewPage   ->xlp_tli = ThisTimeLineID;
1324         NewPage   ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
1325         NewPage   ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - XLOG_BLCKSZ;
1326
1327         /*
1328          * If first page of an XLOG segment file, make it a long header.
1329          */
1330         if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
1331         {
1332                 XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
1333
1334                 NewLongPage->xlp_sysid = ControlFile->system_identifier;
1335                 NewLongPage->xlp_seg_size = XLogSegSize;
1336                 NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
1337                 NewPage   ->xlp_info |= XLP_LONG_HEADER;
1338
1339                 Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
1340         }
1341
1342         return update_needed;
1343 }
1344
1345 /*
1346  * Check whether we've consumed enough xlog space that a checkpoint is needed.
1347  *
1348  * Caller must have just finished filling the open log file (so that
1349  * openLogId/openLogSeg are valid).  We measure the distance from RedoRecPtr
1350  * to the open log file and see if that exceeds CheckPointSegments.
1351  *
1352  * Note: it is caller's responsibility that RedoRecPtr is up-to-date.
1353  */
1354 static bool
1355 XLogCheckpointNeeded(void)
1356 {
1357         /*
1358          * A straight computation of segment number could overflow 32 bits. Rather
1359          * than assuming we have working 64-bit arithmetic, we compare the
1360          * highest-order bits separately, and force a checkpoint immediately when
1361          * they change.
1362          */
1363         uint32          old_segno,
1364                                 new_segno;
1365         uint32          old_highbits,
1366                                 new_highbits;
1367
1368         old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
1369                 (RedoRecPtr.xrecoff / XLogSegSize);
1370         old_highbits = RedoRecPtr.xlogid / XLogSegSize;
1371         new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile + openLogSeg;
1372         new_highbits = openLogId / XLogSegSize;
1373         if (new_highbits != old_highbits ||
1374                 new_segno >= old_segno + (uint32) (CheckPointSegments - 1))
1375                 return true;
1376         return false;
1377 }
1378
1379 /*
1380  * Write and/or fsync the log at least as far as WriteRqst indicates.
1381  *
1382  * If flexible == TRUE, we don't have to write as far as WriteRqst, but
1383  * may stop at any convenient boundary (such as a cache or logfile boundary).
1384  * This option allows us to avoid uselessly issuing multiple writes when a
1385  * single one would do.
1386  *
1387  * If xlog_switch == TRUE, we are intending an xlog segment switch, so
1388  * perform end-of-segment actions after writing the last page, even if
1389  * it's not physically the end of its segment.  (NB: this will work properly
1390  * only if caller specifies WriteRqst == page-end and flexible == false,
1391  * and there is some data to write.)
1392  *
1393  * Must be called with WALWriteLock held.
1394  */
1395 static void
1396 XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
1397 {
1398         XLogCtlWrite *Write = &XLogCtl->Write;
1399         bool            ispartialpage;
1400         bool            last_iteration;
1401         bool            finishing_seg;
1402         bool            use_existent;
1403         int                     curridx;
1404         int                     npages;
1405         int                     startidx;
1406         uint32          startoffset;
1407
1408         /* We should always be inside a critical section here */
1409         Assert(CritSectionCount > 0);
1410
1411         /*
1412          * Update local LogwrtResult (caller probably did this already, but...)
1413          */
1414         LogwrtResult = Write->LogwrtResult;
1415
1416         /*
1417          * Since successive pages in the xlog cache are consecutively allocated,
1418          * we can usually gather multiple pages together and issue just one
1419          * write() call.  npages is the number of pages we have determined can be
1420          * written together; startidx is the cache block index of the first one,
1421          * and startoffset is the file offset at which it should go. The latter
1422          * two variables are only valid when npages > 0, but we must initialize
1423          * all of them to keep the compiler quiet.
1424          */
1425         npages = 0;
1426         startidx = 0;
1427         startoffset = 0;
1428
1429         /*
1430          * Within the loop, curridx is the cache block index of the page to
1431          * consider writing.  We advance Write->curridx only after successfully
1432          * writing pages.  (Right now, this refinement is useless since we are
1433          * going to PANIC if any error occurs anyway; but someday it may come in
1434          * useful.)
1435          */
1436         curridx = Write->curridx;
1437
1438         while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1439         {
1440                 /*
1441                  * Make sure we're not ahead of the insert process.  This could happen
1442                  * if we're passed a bogus WriteRqst.Write that is past the end of the
1443                  * last page that's been initialized by AdvanceXLInsertBuffer.
1444                  */
1445                 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
1446                         elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
1447                                  LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1448                                  XLogCtl->xlblocks[curridx].xlogid,
1449                                  XLogCtl->xlblocks[curridx].xrecoff);
1450
1451                 /* Advance LogwrtResult.Write to end of current buffer page */
1452                 LogwrtResult.Write = XLogCtl->xlblocks[curridx];
1453                 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1454
1455                 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1456                 {
1457                         /*
1458                          * Switch to new logfile segment.  We cannot have any pending
1459                          * pages here (since we dump what we have at segment end).
1460                          */
1461                         Assert(npages == 0);
1462                         if (openLogFile >= 0)
1463                                 XLogFileClose();
1464                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1465
1466                         /* create/use new log file */
1467                         use_existent = true;
1468                         openLogFile = XLogFileInit(openLogId, openLogSeg,
1469                                                                            &use_existent, true);
1470                         openLogOff = 0;
1471                 }
1472
1473                 /* Make sure we have the current logfile open */
1474                 if (openLogFile < 0)
1475                 {
1476                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1477                         openLogFile = XLogFileOpen(openLogId, openLogSeg);
1478                         openLogOff = 0;
1479                 }
1480
1481                 /* Add current page to the set of pending pages-to-dump */
1482                 if (npages == 0)
1483                 {
1484                         /* first of group */
1485                         startidx = curridx;
1486                         startoffset = (LogwrtResult.Write.xrecoff - XLOG_BLCKSZ) % XLogSegSize;
1487                 }
1488                 npages++;
1489
1490                 /*
1491                  * Dump the set if this will be the last loop iteration, or if we are
1492                  * at the last page of the cache area (since the next page won't be
1493                  * contiguous in memory), or if we are at the end of the logfile
1494                  * segment.
1495                  */
1496                 last_iteration = !XLByteLT(LogwrtResult.Write, WriteRqst.Write);
1497
1498                 finishing_seg = !ispartialpage &&
1499                         (startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
1500
1501                 if (last_iteration ||
1502                         curridx == XLogCtl->XLogCacheBlck ||
1503                         finishing_seg)
1504                 {
1505                         char       *from;
1506                         Size            nbytes;
1507
1508                         /* Need to seek in the file? */
1509                         if (openLogOff != startoffset)
1510                         {
1511                                 if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
1512                                         ereport(PANIC,
1513                                                         (errcode_for_file_access(),
1514                                                          errmsg("could not seek in log file %u, "
1515                                                                         "segment %u to offset %u: %m",
1516                                                                         openLogId, openLogSeg, startoffset)));
1517                                 openLogOff = startoffset;
1518                         }
1519
1520                         /* OK to write the page(s) */
1521                         from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
1522                         nbytes = npages * (Size) XLOG_BLCKSZ;
1523                         errno = 0;
1524                         if (write(openLogFile, from, nbytes) != nbytes)
1525                         {
1526                                 /* if write didn't set errno, assume no disk space */
1527                                 if (errno == 0)
1528                                         errno = ENOSPC;
1529                                 ereport(PANIC,
1530                                                 (errcode_for_file_access(),
1531                                                  errmsg("could not write to log file %u, segment %u "
1532                                                                 "at offset %u, length %lu: %m",
1533                                                                 openLogId, openLogSeg,
1534                                                                 openLogOff, (unsigned long) nbytes)));
1535                         }
1536
1537                         /* Update state for write */
1538                         openLogOff += nbytes;
1539                         Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
1540                         npages = 0;
1541
1542                         /*
1543                          * If we just wrote the whole last page of a logfile segment,
1544                          * fsync the segment immediately.  This avoids having to go back
1545                          * and re-open prior segments when an fsync request comes along
1546                          * later. Doing it here ensures that one and only one backend will
1547                          * perform this fsync.
1548                          *
1549                          * We also do this if this is the last page written for an xlog
1550                          * switch.
1551                          *
1552                          * This is also the right place to notify the Archiver that the
1553                          * segment is ready to copy to archival storage, and to update the
1554                          * timer for archive_timeout, and to signal for a checkpoint if
1555                          * too many logfile segments have been used since the last
1556                          * checkpoint.
1557                          */
1558                         if (finishing_seg || (xlog_switch && last_iteration))
1559                         {
1560                                 issue_xlog_fsync();
1561                                 LogwrtResult.Flush = LogwrtResult.Write;                /* end of page */
1562
1563                                 if (XLogArchivingActive())
1564                                         XLogArchiveNotifySeg(openLogId, openLogSeg);
1565
1566                                 Write->lastSegSwitchTime = (pg_time_t) time(NULL);
1567
1568                                 /*
1569                                  * Signal bgwriter to start a checkpoint if we've consumed too
1570                                  * much xlog since the last one.  For speed, we first check
1571                                  * using the local copy of RedoRecPtr, which might be out of
1572                                  * date; if it looks like a checkpoint is needed, forcibly
1573                                  * update RedoRecPtr and recheck.
1574                                  */
1575                                 if (IsUnderPostmaster &&
1576                                         XLogCheckpointNeeded())
1577                                 {
1578                                         (void) GetRedoRecPtr();
1579                                         if (XLogCheckpointNeeded())
1580                                                 RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
1581                                 }
1582                         }
1583                 }
1584
1585                 if (ispartialpage)
1586                 {
1587                         /* Only asked to write a partial page */
1588                         LogwrtResult.Write = WriteRqst.Write;
1589                         break;
1590                 }
1591                 curridx = NextBufIdx(curridx);
1592
1593                 /* If flexible, break out of loop as soon as we wrote something */
1594                 if (flexible && npages == 0)
1595                         break;
1596         }
1597
1598         Assert(npages == 0);
1599         Assert(curridx == Write->curridx);
1600
1601         /*
1602          * If asked to flush, do so
1603          */
1604         if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1605                 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1606         {
1607                 /*
1608                  * Could get here without iterating above loop, in which case we might
1609                  * have no open file or the wrong one.  However, we do not need to
1610                  * fsync more than one file.
1611                  */
1612                 if (sync_method != SYNC_METHOD_OPEN &&
1613                         sync_method != SYNC_METHOD_OPEN_DSYNC)
1614                 {
1615                         if (openLogFile >= 0 &&
1616                                 !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1617                                 XLogFileClose();
1618                         if (openLogFile < 0)
1619                         {
1620                                 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1621                                 openLogFile = XLogFileOpen(openLogId, openLogSeg);
1622                                 openLogOff = 0;
1623                         }
1624                         issue_xlog_fsync();
1625                 }
1626                 LogwrtResult.Flush = LogwrtResult.Write;
1627         }
1628
1629         /*
1630          * Update shared-memory status
1631          *
1632          * We make sure that the shared 'request' values do not fall behind the
1633          * 'result' values.  This is not absolutely essential, but it saves some
1634          * code in a couple of places.
1635          */
1636         {
1637                 /* use volatile pointer to prevent code rearrangement */
1638                 volatile XLogCtlData *xlogctl = XLogCtl;
1639
1640                 SpinLockAcquire(&xlogctl->info_lck);
1641                 xlogctl->LogwrtResult = LogwrtResult;
1642                 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1643                         xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1644                 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1645                         xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1646                 SpinLockRelease(&xlogctl->info_lck);
1647         }
1648
1649         Write->LogwrtResult = LogwrtResult;
1650 }
1651
1652 /*
1653  * Record the LSN for an asynchronous transaction commit.
1654  * (This should not be called for aborts, nor for synchronous commits.)
1655  */
1656 void
1657 XLogSetAsyncCommitLSN(XLogRecPtr asyncCommitLSN)
1658 {
1659         /* use volatile pointer to prevent code rearrangement */
1660         volatile XLogCtlData *xlogctl = XLogCtl;
1661
1662         SpinLockAcquire(&xlogctl->info_lck);
1663         if (XLByteLT(xlogctl->asyncCommitLSN, asyncCommitLSN))
1664                 xlogctl->asyncCommitLSN = asyncCommitLSN;
1665         SpinLockRelease(&xlogctl->info_lck);
1666 }
1667
1668 /*
1669  * Ensure that all XLOG data through the given position is flushed to disk.
1670  *
1671  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1672  * already held, and we try to avoid acquiring it if possible.
1673  */
1674 void
1675 XLogFlush(XLogRecPtr record)
1676 {
1677         XLogRecPtr      WriteRqstPtr;
1678         XLogwrtRqst WriteRqst;
1679
1680         /* Disabled during REDO */
1681         if (InRedo)
1682                 return;
1683
1684         /* Quick exit if already known flushed */
1685         if (XLByteLE(record, LogwrtResult.Flush))
1686                 return;
1687
1688 #ifdef WAL_DEBUG
1689         if (XLOG_DEBUG)
1690                 elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
1691                          record.xlogid, record.xrecoff,
1692                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1693                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1694 #endif
1695
1696         START_CRIT_SECTION();
1697
1698         /*
1699          * Since fsync is usually a horribly expensive operation, we try to
1700          * piggyback as much data as we can on each fsync: if we see any more data
1701          * entered into the xlog buffer, we'll write and fsync that too, so that
1702          * the final value of LogwrtResult.Flush is as large as possible. This
1703          * gives us some chance of avoiding another fsync immediately after.
1704          */
1705
1706         /* initialize to given target; may increase below */
1707         WriteRqstPtr = record;
1708
1709         /* read LogwrtResult and update local state */
1710         {
1711                 /* use volatile pointer to prevent code rearrangement */
1712                 volatile XLogCtlData *xlogctl = XLogCtl;
1713
1714                 SpinLockAcquire(&xlogctl->info_lck);
1715                 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1716                         WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1717                 LogwrtResult = xlogctl->LogwrtResult;
1718                 SpinLockRelease(&xlogctl->info_lck);
1719         }
1720
1721         /* done already? */
1722         if (!XLByteLE(record, LogwrtResult.Flush))
1723         {
1724                 /* now wait for the write lock */
1725                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1726                 LogwrtResult = XLogCtl->Write.LogwrtResult;
1727                 if (!XLByteLE(record, LogwrtResult.Flush))
1728                 {
1729                         /* try to write/flush later additions to XLOG as well */
1730                         if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
1731                         {
1732                                 XLogCtlInsert *Insert = &XLogCtl->Insert;
1733                                 uint32          freespace = INSERT_FREESPACE(Insert);
1734
1735                                 if (freespace < SizeOfXLogRecord)               /* buffer is full */
1736                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1737                                 else
1738                                 {
1739                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1740                                         WriteRqstPtr.xrecoff -= freespace;
1741                                 }
1742                                 LWLockRelease(WALInsertLock);
1743                                 WriteRqst.Write = WriteRqstPtr;
1744                                 WriteRqst.Flush = WriteRqstPtr;
1745                         }
1746                         else
1747                         {
1748                                 WriteRqst.Write = WriteRqstPtr;
1749                                 WriteRqst.Flush = record;
1750                         }
1751                         XLogWrite(WriteRqst, false, false);
1752                 }
1753                 LWLockRelease(WALWriteLock);
1754         }
1755
1756         END_CRIT_SECTION();
1757
1758         /*
1759          * If we still haven't flushed to the request point then we have a
1760          * problem; most likely, the requested flush point is past end of XLOG.
1761          * This has been seen to occur when a disk page has a corrupted LSN.
1762          *
1763          * Formerly we treated this as a PANIC condition, but that hurts the
1764          * system's robustness rather than helping it: we do not want to take down
1765          * the whole system due to corruption on one data page.  In particular, if
1766          * the bad page is encountered again during recovery then we would be
1767          * unable to restart the database at all!  (This scenario has actually
1768          * happened in the field several times with 7.1 releases. Note that we
1769          * cannot get here while InRedo is true, but if the bad page is brought in
1770          * and marked dirty during recovery then CreateCheckPoint will try to
1771          * flush it at the end of recovery.)
1772          *
1773          * The current approach is to ERROR under normal conditions, but only
1774          * WARNING during recovery, so that the system can be brought up even if
1775          * there's a corrupt LSN.  Note that for calls from xact.c, the ERROR will
1776          * be promoted to PANIC since xact.c calls this routine inside a critical
1777          * section.  However, calls from bufmgr.c are not within critical sections
1778          * and so we will not force a restart for a bad LSN on a data page.
1779          */
1780         if (XLByteLT(LogwrtResult.Flush, record))
1781                 elog(InRecovery ? WARNING : ERROR,
1782                 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
1783                          record.xlogid, record.xrecoff,
1784                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1785 }
1786
1787 /*
1788  * Flush xlog, but without specifying exactly where to flush to.
1789  *
1790  * We normally flush only completed blocks; but if there is nothing to do on
1791  * that basis, we check for unflushed async commits in the current incomplete
1792  * block, and flush through the latest one of those.  Thus, if async commits
1793  * are not being used, we will flush complete blocks only.      We can guarantee
1794  * that async commits reach disk after at most three cycles; normally only
1795  * one or two.  (We allow XLogWrite to write "flexibly", meaning it can stop
1796  * at the end of the buffer ring; this makes a difference only with very high
1797  * load or long wal_writer_delay, but imposes one extra cycle for the worst
1798  * case for async commits.)
1799  *
1800  * This routine is invoked periodically by the background walwriter process.
1801  */
1802 void
1803 XLogBackgroundFlush(void)
1804 {
1805         XLogRecPtr      WriteRqstPtr;
1806         bool            flexible = true;
1807
1808         /* read LogwrtResult and update local state */
1809         {
1810                 /* use volatile pointer to prevent code rearrangement */
1811                 volatile XLogCtlData *xlogctl = XLogCtl;
1812
1813                 SpinLockAcquire(&xlogctl->info_lck);
1814                 LogwrtResult = xlogctl->LogwrtResult;
1815                 WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1816                 SpinLockRelease(&xlogctl->info_lck);
1817         }
1818
1819         /* back off to last completed page boundary */
1820         WriteRqstPtr.xrecoff -= WriteRqstPtr.xrecoff % XLOG_BLCKSZ;
1821
1822         /* if we have already flushed that far, consider async commit records */
1823         if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
1824         {
1825                 /* use volatile pointer to prevent code rearrangement */
1826                 volatile XLogCtlData *xlogctl = XLogCtl;
1827
1828                 SpinLockAcquire(&xlogctl->info_lck);
1829                 WriteRqstPtr = xlogctl->asyncCommitLSN;
1830                 SpinLockRelease(&xlogctl->info_lck);
1831                 flexible = false;               /* ensure it all gets written */
1832         }
1833
1834         /* Done if already known flushed */
1835         if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
1836                 return;
1837
1838 #ifdef WAL_DEBUG
1839         if (XLOG_DEBUG)
1840                 elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
1841                          WriteRqstPtr.xlogid, WriteRqstPtr.xrecoff,
1842                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1843                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1844 #endif
1845
1846         START_CRIT_SECTION();
1847
1848         /* now wait for the write lock */
1849         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1850         LogwrtResult = XLogCtl->Write.LogwrtResult;
1851         if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
1852         {
1853                 XLogwrtRqst WriteRqst;
1854
1855                 WriteRqst.Write = WriteRqstPtr;
1856                 WriteRqst.Flush = WriteRqstPtr;
1857                 XLogWrite(WriteRqst, flexible, false);
1858         }
1859         LWLockRelease(WALWriteLock);
1860
1861         END_CRIT_SECTION();
1862 }
1863
1864 /*
1865  * Flush any previous asynchronously-committed transactions' commit records.
1866  *
1867  * NOTE: it is unwise to assume that this provides any strong guarantees.
1868  * In particular, because of the inexact LSN bookkeeping used by clog.c,
1869  * we cannot assume that hint bits will be settable for these transactions.
1870  */
1871 void
1872 XLogAsyncCommitFlush(void)
1873 {
1874         XLogRecPtr      WriteRqstPtr;
1875
1876         /* use volatile pointer to prevent code rearrangement */
1877         volatile XLogCtlData *xlogctl = XLogCtl;
1878
1879         SpinLockAcquire(&xlogctl->info_lck);
1880         WriteRqstPtr = xlogctl->asyncCommitLSN;
1881         SpinLockRelease(&xlogctl->info_lck);
1882
1883         XLogFlush(WriteRqstPtr);
1884 }
1885
1886 /*
1887  * Test whether XLOG data has been flushed up to (at least) the given position.
1888  *
1889  * Returns true if a flush is still needed.  (It may be that someone else
1890  * is already in process of flushing that far, however.)
1891  */
1892 bool
1893 XLogNeedsFlush(XLogRecPtr record)
1894 {
1895         /* Quick exit if already known flushed */
1896         if (XLByteLE(record, LogwrtResult.Flush))
1897                 return false;
1898
1899         /* read LogwrtResult and update local state */
1900         {
1901                 /* use volatile pointer to prevent code rearrangement */
1902                 volatile XLogCtlData *xlogctl = XLogCtl;
1903
1904                 SpinLockAcquire(&xlogctl->info_lck);
1905                 LogwrtResult = xlogctl->LogwrtResult;
1906                 SpinLockRelease(&xlogctl->info_lck);
1907         }
1908
1909         /* check again */
1910         if (XLByteLE(record, LogwrtResult.Flush))
1911                 return false;
1912
1913         return true;
1914 }
1915
1916 /*
1917  * Create a new XLOG file segment, or open a pre-existing one.
1918  *
1919  * log, seg: identify segment to be created/opened.
1920  *
1921  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1922  * pre-existing file will be deleted).  On return, TRUE if a pre-existing
1923  * file was used.
1924  *
1925  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1926  * place.  This should be TRUE except during bootstrap log creation.  The
1927  * caller must *not* hold the lock at call.
1928  *
1929  * Returns FD of opened file.
1930  *
1931  * Note: errors here are ERROR not PANIC because we might or might not be
1932  * inside a critical section (eg, during checkpoint there is no reason to
1933  * take down the system on failure).  They will promote to PANIC if we are
1934  * in a critical section.
1935  */
1936 static int
1937 XLogFileInit(uint32 log, uint32 seg,
1938                          bool *use_existent, bool use_lock)
1939 {
1940         char            path[MAXPGPATH];
1941         char            tmppath[MAXPGPATH];
1942         char       *zbuffer;
1943         uint32          installed_log;
1944         uint32          installed_seg;
1945         int                     max_advance;
1946         int                     fd;
1947         int                     nbytes;
1948
1949         XLogFilePath(path, ThisTimeLineID, log, seg);
1950
1951         /*
1952          * Try to use existent file (checkpoint maker may have created it already)
1953          */
1954         if (*use_existent)
1955         {
1956                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
1957                                                    S_IRUSR | S_IWUSR);
1958                 if (fd < 0)
1959                 {
1960                         if (errno != ENOENT)
1961                                 ereport(ERROR,
1962                                                 (errcode_for_file_access(),
1963                                                  errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1964                                                                 path, log, seg)));
1965                 }
1966                 else
1967                         return fd;
1968         }
1969
1970         /*
1971          * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
1972          * another process is doing the same thing.  If so, we will end up
1973          * pre-creating an extra log segment.  That seems OK, and better than
1974          * holding the lock throughout this lengthy process.
1975          */
1976         elog(DEBUG2, "creating and filling new WAL file");
1977
1978         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1979
1980         unlink(tmppath);
1981
1982         /* do not use get_sync_bit() here --- want to fsync only at end of fill */
1983         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1984                                            S_IRUSR | S_IWUSR);
1985         if (fd < 0)
1986                 ereport(ERROR,
1987                                 (errcode_for_file_access(),
1988                                  errmsg("could not create file \"%s\": %m", tmppath)));
1989
1990         /*
1991          * Zero-fill the file.  We have to do this the hard way to ensure that all
1992          * the file space has really been allocated --- on platforms that allow
1993          * "holes" in files, just seeking to the end doesn't allocate intermediate
1994          * space.  This way, we know that we have all the space and (after the
1995          * fsync below) that all the indirect blocks are down on disk.  Therefore,
1996          * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
1997          * log file.
1998          *
1999          * Note: palloc zbuffer, instead of just using a local char array, to
2000          * ensure it is reasonably well-aligned; this may save a few cycles
2001          * transferring data to the kernel.
2002          */
2003         zbuffer = (char *) palloc0(XLOG_BLCKSZ);
2004         for (nbytes = 0; nbytes < XLogSegSize; nbytes += XLOG_BLCKSZ)
2005         {
2006                 errno = 0;
2007                 if ((int) write(fd, zbuffer, XLOG_BLCKSZ) != (int) XLOG_BLCKSZ)
2008                 {
2009                         int                     save_errno = errno;
2010
2011                         /*
2012                          * If we fail to make the file, delete it to release disk space
2013                          */
2014                         unlink(tmppath);
2015                         /* if write didn't set errno, assume problem is no disk space */
2016                         errno = save_errno ? save_errno : ENOSPC;
2017
2018                         ereport(ERROR,
2019                                         (errcode_for_file_access(),
2020                                          errmsg("could not write to file \"%s\": %m", tmppath)));
2021                 }
2022         }
2023         pfree(zbuffer);
2024
2025         if (pg_fsync(fd) != 0)
2026                 ereport(ERROR,
2027                                 (errcode_for_file_access(),
2028                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
2029
2030         if (close(fd))
2031                 ereport(ERROR,
2032                                 (errcode_for_file_access(),
2033                                  errmsg("could not close file \"%s\": %m", tmppath)));
2034
2035         /*
2036          * Now move the segment into place with its final name.
2037          *
2038          * If caller didn't want to use a pre-existing file, get rid of any
2039          * pre-existing file.  Otherwise, cope with possibility that someone else
2040          * has created the file while we were filling ours: if so, use ours to
2041          * pre-create a future log segment.
2042          */
2043         installed_log = log;
2044         installed_seg = seg;
2045         max_advance = XLOGfileslop;
2046         if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
2047                                                                 *use_existent, &max_advance,
2048                                                                 use_lock))
2049         {
2050                 /* No need for any more future segments... */
2051                 unlink(tmppath);
2052         }
2053
2054         elog(DEBUG2, "done creating and filling new WAL file");
2055
2056         /* Set flag to tell caller there was no existent file */
2057         *use_existent = false;
2058
2059         /* Now open original target segment (might not be file I just made) */
2060         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
2061                                            S_IRUSR | S_IWUSR);
2062         if (fd < 0)
2063                 ereport(ERROR,
2064                                 (errcode_for_file_access(),
2065                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2066                                   path, log, seg)));
2067
2068         return fd;
2069 }
2070
2071 /*
2072  * Create a new XLOG file segment by copying a pre-existing one.
2073  *
2074  * log, seg: identify segment to be created.
2075  *
2076  * srcTLI, srclog, srcseg: identify segment to be copied (could be from
2077  *              a different timeline)
2078  *
2079  * Currently this is only used during recovery, and so there are no locking
2080  * considerations.      But we should be just as tense as XLogFileInit to avoid
2081  * emplacing a bogus file.
2082  */
2083 static void
2084 XLogFileCopy(uint32 log, uint32 seg,
2085                          TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
2086 {
2087         char            path[MAXPGPATH];
2088         char            tmppath[MAXPGPATH];
2089         char            buffer[XLOG_BLCKSZ];
2090         int                     srcfd;
2091         int                     fd;
2092         int                     nbytes;
2093
2094         /*
2095          * Open the source file
2096          */
2097         XLogFilePath(path, srcTLI, srclog, srcseg);
2098         srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2099         if (srcfd < 0)
2100                 ereport(ERROR,
2101                                 (errcode_for_file_access(),
2102                                  errmsg("could not open file \"%s\": %m", path)));
2103
2104         /*
2105          * Copy into a temp file name.
2106          */
2107         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
2108
2109         unlink(tmppath);
2110
2111         /* do not use get_sync_bit() here --- want to fsync only at end of fill */
2112         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
2113                                            S_IRUSR | S_IWUSR);
2114         if (fd < 0)
2115                 ereport(ERROR,
2116                                 (errcode_for_file_access(),
2117                                  errmsg("could not create file \"%s\": %m", tmppath)));
2118
2119         /*
2120          * Do the data copying.
2121          */
2122         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
2123         {
2124                 errno = 0;
2125                 if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
2126                 {
2127                         if (errno != 0)
2128                                 ereport(ERROR,
2129                                                 (errcode_for_file_access(),
2130                                                  errmsg("could not read file \"%s\": %m", path)));
2131                         else
2132                                 ereport(ERROR,
2133                                                 (errmsg("not enough data in file \"%s\"", path)));
2134                 }
2135                 errno = 0;
2136                 if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
2137                 {
2138                         int                     save_errno = errno;
2139
2140                         /*
2141                          * If we fail to make the file, delete it to release disk space
2142                          */
2143                         unlink(tmppath);
2144                         /* if write didn't set errno, assume problem is no disk space */
2145                         errno = save_errno ? save_errno : ENOSPC;
2146
2147                         ereport(ERROR,
2148                                         (errcode_for_file_access(),
2149                                          errmsg("could not write to file \"%s\": %m", tmppath)));
2150                 }
2151         }
2152
2153         if (pg_fsync(fd) != 0)
2154                 ereport(ERROR,
2155                                 (errcode_for_file_access(),
2156                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
2157
2158         if (close(fd))
2159                 ereport(ERROR,
2160                                 (errcode_for_file_access(),
2161                                  errmsg("could not close file \"%s\": %m", tmppath)));
2162
2163         close(srcfd);
2164
2165         /*
2166          * Now move the segment into place with its final name.
2167          */
2168         if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
2169                 elog(ERROR, "InstallXLogFileSegment should not have failed");
2170 }
2171
2172 /*
2173  * Install a new XLOG segment file as a current or future log segment.
2174  *
2175  * This is used both to install a newly-created segment (which has a temp
2176  * filename while it's being created) and to recycle an old segment.
2177  *
2178  * *log, *seg: identify segment to install as (or first possible target).
2179  * When find_free is TRUE, these are modified on return to indicate the
2180  * actual installation location or last segment searched.
2181  *
2182  * tmppath: initial name of file to install.  It will be renamed into place.
2183  *
2184  * find_free: if TRUE, install the new segment at the first empty log/seg
2185  * number at or after the passed numbers.  If FALSE, install the new segment
2186  * exactly where specified, deleting any existing segment file there.
2187  *
2188  * *max_advance: maximum number of log/seg slots to advance past the starting
2189  * point.  Fail if no free slot is found in this range.  On return, reduced
2190  * by the number of slots skipped over.  (Irrelevant, and may be NULL,
2191  * when find_free is FALSE.)
2192  *
2193  * use_lock: if TRUE, acquire ControlFileLock while moving file into
2194  * place.  This should be TRUE except during bootstrap log creation.  The
2195  * caller must *not* hold the lock at call.
2196  *
2197  * Returns TRUE if file installed, FALSE if not installed because of
2198  * exceeding max_advance limit.  On Windows, we also return FALSE if we
2199  * can't rename the file into place because someone's got it open.
2200  * (Any other kind of failure causes ereport().)
2201  */
2202 static bool
2203 InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
2204                                            bool find_free, int *max_advance,
2205                                            bool use_lock)
2206 {
2207         char            path[MAXPGPATH];
2208         struct stat stat_buf;
2209
2210         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2211
2212         /*
2213          * We want to be sure that only one process does this at a time.
2214          */
2215         if (use_lock)
2216                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
2217
2218         if (!find_free)
2219         {
2220                 /* Force installation: get rid of any pre-existing segment file */
2221                 unlink(path);
2222         }
2223         else
2224         {
2225                 /* Find a free slot to put it in */
2226                 while (stat(path, &stat_buf) == 0)
2227                 {
2228                         if (*max_advance <= 0)
2229                         {
2230                                 /* Failed to find a free slot within specified range */
2231                                 if (use_lock)
2232                                         LWLockRelease(ControlFileLock);
2233                                 return false;
2234                         }
2235                         NextLogSeg(*log, *seg);
2236                         (*max_advance)--;
2237                         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2238                 }
2239         }
2240
2241         /*
2242          * Prefer link() to rename() here just to be really sure that we don't
2243          * overwrite an existing logfile.  However, there shouldn't be one, so
2244          * rename() is an acceptable substitute except for the truly paranoid.
2245          */
2246 #if HAVE_WORKING_LINK
2247         if (link(tmppath, path) < 0)
2248                 ereport(ERROR,
2249                                 (errcode_for_file_access(),
2250                                  errmsg("could not link file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2251                                                 tmppath, path, *log, *seg)));
2252         unlink(tmppath);
2253 #else
2254         if (rename(tmppath, path) < 0)
2255         {
2256 #ifdef WIN32
2257 #if !defined(__CYGWIN__)
2258                 if (GetLastError() == ERROR_ACCESS_DENIED)
2259 #else
2260                 if (errno == EACCES)
2261 #endif
2262                 {
2263                         if (use_lock)
2264                                 LWLockRelease(ControlFileLock);
2265                         return false;
2266                 }
2267 #endif   /* WIN32 */
2268
2269                 ereport(ERROR,
2270                                 (errcode_for_file_access(),
2271                                  errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2272                                                 tmppath, path, *log, *seg)));
2273         }
2274 #endif
2275
2276         if (use_lock)
2277                 LWLockRelease(ControlFileLock);
2278
2279         return true;
2280 }
2281
2282 /*
2283  * Open a pre-existing logfile segment for writing.
2284  */
2285 static int
2286 XLogFileOpen(uint32 log, uint32 seg)
2287 {
2288         char            path[MAXPGPATH];
2289         int                     fd;
2290
2291         XLogFilePath(path, ThisTimeLineID, log, seg);
2292
2293         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
2294                                            S_IRUSR | S_IWUSR);
2295         if (fd < 0)
2296                 ereport(PANIC,
2297                                 (errcode_for_file_access(),
2298                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2299                                   path, log, seg)));
2300
2301         return fd;
2302 }
2303
2304 /*
2305  * Open a logfile segment for reading (during recovery).
2306  */
2307 static int
2308 XLogFileRead(uint32 log, uint32 seg, int emode)
2309 {
2310         char            path[MAXPGPATH];
2311         char            xlogfname[MAXFNAMELEN];
2312         char            activitymsg[MAXFNAMELEN + 16];
2313         ListCell   *cell;
2314         int                     fd;
2315
2316         /*
2317          * Loop looking for a suitable timeline ID: we might need to read any of
2318          * the timelines listed in expectedTLIs.
2319          *
2320          * We expect curFileTLI on entry to be the TLI of the preceding file in
2321          * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
2322          * to go backwards; this prevents us from picking up the wrong file when a
2323          * parent timeline extends to higher segment numbers than the child we
2324          * want to read.
2325          */
2326         foreach(cell, expectedTLIs)
2327         {
2328                 TimeLineID      tli = (TimeLineID) lfirst_int(cell);
2329
2330                 if (tli < curFileTLI)
2331                         break;                          /* don't bother looking at too-old TLIs */
2332
2333                 XLogFileName(xlogfname, tli, log, seg);
2334
2335                 if (InArchiveRecovery)
2336                 {
2337                         /* Report recovery progress in PS display */
2338                         snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
2339                                          xlogfname);
2340                         set_ps_display(activitymsg, false);
2341
2342                         restoredFromArchive = RestoreArchivedFile(path, xlogfname,
2343                                                                                                           "RECOVERYXLOG",
2344                                                                                                           XLogSegSize);
2345                 }
2346                 else
2347                         XLogFilePath(path, tli, log, seg);
2348
2349                 fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2350                 if (fd >= 0)
2351                 {
2352                         /* Success! */
2353                         curFileTLI = tli;
2354
2355                         /* Report recovery progress in PS display */
2356                         snprintf(activitymsg, sizeof(activitymsg), "recovering %s",
2357                                          xlogfname);
2358                         set_ps_display(activitymsg, false);
2359
2360                         return fd;
2361                 }
2362                 if (errno != ENOENT)    /* unexpected failure? */
2363                         ereport(PANIC,
2364                                         (errcode_for_file_access(),
2365                         errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2366                                    path, log, seg)));
2367         }
2368
2369         /* Couldn't find it.  For simplicity, complain about front timeline */
2370         XLogFilePath(path, recoveryTargetTLI, log, seg);
2371         errno = ENOENT;
2372         ereport(emode,
2373                         (errcode_for_file_access(),
2374                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2375                                   path, log, seg)));
2376         return -1;
2377 }
2378
2379 /*
2380  * Close the current logfile segment for writing.
2381  */
2382 static void
2383 XLogFileClose(void)
2384 {
2385         Assert(openLogFile >= 0);
2386
2387         /*
2388          * posix_fadvise is problematic on many platforms: on older x86 Linux it
2389          * just dumps core, and there are reports of problems on PPC platforms as
2390          * well.  The following is therefore disabled for the time being. We could
2391          * consider some kind of configure test to see if it's safe to use, but
2392          * since we lack hard evidence that there's any useful performance gain to
2393          * be had, spending time on that seems unprofitable for now.
2394          */
2395 #ifdef NOT_USED
2396
2397         /*
2398          * WAL segment files will not be re-read in normal operation, so we advise
2399          * OS to release any cached pages.      But do not do so if WAL archiving is
2400          * active, because archiver process could use the cache to read the WAL
2401          * segment.
2402          *
2403          * While O_DIRECT works for O_SYNC, posix_fadvise() works for fsync() and
2404          * O_SYNC, and some platforms only have posix_fadvise().
2405          */
2406 #if defined(HAVE_DECL_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
2407         if (!XLogArchivingActive())
2408                 posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
2409 #endif
2410 #endif   /* NOT_USED */
2411
2412         if (close(openLogFile))
2413                 ereport(PANIC,
2414                                 (errcode_for_file_access(),
2415                                  errmsg("could not close log file %u, segment %u: %m",
2416                                                 openLogId, openLogSeg)));
2417         openLogFile = -1;
2418 }
2419
2420 /*
2421  * Attempt to retrieve the specified file from off-line archival storage.
2422  * If successful, fill "path" with its complete path (note that this will be
2423  * a temp file name that doesn't follow the normal naming convention), and
2424  * return TRUE.
2425  *
2426  * If not successful, fill "path" with the name of the normal on-line file
2427  * (which may or may not actually exist, but we'll try to use it), and return
2428  * FALSE.
2429  *
2430  * For fixed-size files, the caller may pass the expected size as an
2431  * additional crosscheck on successful recovery.  If the file size is not
2432  * known, set expectedSize = 0.
2433  */
2434 static bool
2435 RestoreArchivedFile(char *path, const char *xlogfname,
2436                                         const char *recovername, off_t expectedSize)
2437 {
2438         char            xlogpath[MAXPGPATH];
2439         char            xlogRestoreCmd[MAXPGPATH];
2440         char            lastRestartPointFname[MAXPGPATH];
2441         char       *dp;
2442         char       *endp;
2443         const char *sp;
2444         int                     rc;
2445         bool            signaled;
2446         struct stat stat_buf;
2447         uint32          restartLog;
2448         uint32          restartSeg;
2449
2450         /*
2451          * When doing archive recovery, we always prefer an archived log file even
2452          * if a file of the same name exists in XLOGDIR.  The reason is that the
2453          * file in XLOGDIR could be an old, un-filled or partly-filled version
2454          * that was copied and restored as part of backing up $PGDATA.
2455          *
2456          * We could try to optimize this slightly by checking the local copy
2457          * lastchange timestamp against the archived copy, but we have no API to
2458          * do this, nor can we guarantee that the lastchange timestamp was
2459          * preserved correctly when we copied to archive. Our aim is robustness,
2460          * so we elect not to do this.
2461          *
2462          * If we cannot obtain the log file from the archive, however, we will try
2463          * to use the XLOGDIR file if it exists.  This is so that we can make use
2464          * of log segments that weren't yet transferred to the archive.
2465          *
2466          * Notice that we don't actually overwrite any files when we copy back
2467          * from archive because the recoveryRestoreCommand may inadvertently
2468          * restore inappropriate xlogs, or they may be corrupt, so we may wish to
2469          * fallback to the segments remaining in current XLOGDIR later. The
2470          * copy-from-archive filename is always the same, ensuring that we don't
2471          * run out of disk space on long recoveries.
2472          */
2473         snprintf(xlogpath, MAXPGPATH, XLOGDIR "/%s", recovername);
2474
2475         /*
2476          * Make sure there is no existing file named recovername.
2477          */
2478         if (stat(xlogpath, &stat_buf) != 0)
2479         {
2480                 if (errno != ENOENT)
2481                         ereport(FATAL,
2482                                         (errcode_for_file_access(),
2483                                          errmsg("could not stat file \"%s\": %m",
2484                                                         xlogpath)));
2485         }
2486         else
2487         {
2488                 if (unlink(xlogpath) != 0)
2489                         ereport(FATAL,
2490                                         (errcode_for_file_access(),
2491                                          errmsg("could not remove file \"%s\": %m",
2492                                                         xlogpath)));
2493         }
2494
2495         /*
2496          * Calculate the archive file cutoff point for use during log shipping
2497          * replication. All files earlier than this point can be deleted
2498          * from the archive, though there is no requirement to do so.
2499          *
2500          * We initialise this with the filename of an InvalidXLogRecPtr, which
2501          * will prevent the deletion of any WAL files from the archive
2502          * because of the alphabetic sorting property of WAL filenames. 
2503          *
2504          * Once we have successfully located the redo pointer of the checkpoint
2505          * from which we start recovery we never request a file prior to the redo
2506          * pointer of the last restartpoint. When redo begins we know that we
2507          * have successfully located it, so there is no need for additional
2508          * status flags to signify the point when we can begin deleting WAL files
2509          * from the archive. 
2510          */
2511         if (InRedo)
2512         {
2513                 XLByteToSeg(ControlFile->checkPointCopy.redo,
2514                                         restartLog, restartSeg);
2515                 XLogFileName(lastRestartPointFname,
2516                                          ControlFile->checkPointCopy.ThisTimeLineID,
2517                                          restartLog, restartSeg);
2518                 /* we shouldn't need anything earlier than last restart point */
2519                 Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
2520         }
2521         else
2522                 XLogFileName(lastRestartPointFname, 0, 0, 0);
2523
2524         /*
2525          * construct the command to be executed
2526          */
2527         dp = xlogRestoreCmd;
2528         endp = xlogRestoreCmd + MAXPGPATH - 1;
2529         *endp = '\0';
2530
2531         for (sp = recoveryRestoreCommand; *sp; sp++)
2532         {
2533                 if (*sp == '%')
2534                 {
2535                         switch (sp[1])
2536                         {
2537                                 case 'p':
2538                                         /* %p: relative path of target file */
2539                                         sp++;
2540                                         StrNCpy(dp, xlogpath, endp - dp);
2541                                         make_native_path(dp);
2542                                         dp += strlen(dp);
2543                                         break;
2544                                 case 'f':
2545                                         /* %f: filename of desired file */
2546                                         sp++;
2547                                         StrNCpy(dp, xlogfname, endp - dp);
2548                                         dp += strlen(dp);
2549                                         break;
2550                                 case 'r':
2551                                         /* %r: filename of last restartpoint */
2552                                         sp++;
2553                                         StrNCpy(dp, lastRestartPointFname, endp - dp);
2554                                         dp += strlen(dp);
2555                                         break;
2556                                 case '%':
2557                                         /* convert %% to a single % */
2558                                         sp++;
2559                                         if (dp < endp)
2560                                                 *dp++ = *sp;
2561                                         break;
2562                                 default:
2563                                         /* otherwise treat the % as not special */
2564                                         if (dp < endp)
2565                                                 *dp++ = *sp;
2566                                         break;
2567                         }
2568                 }
2569                 else
2570                 {
2571                         if (dp < endp)
2572                                 *dp++ = *sp;
2573                 }
2574         }
2575         *dp = '\0';
2576
2577         ereport(DEBUG3,
2578                         (errmsg_internal("executing restore command \"%s\"",
2579                                                          xlogRestoreCmd)));
2580
2581         /*
2582          * Copy xlog from archival storage to XLOGDIR
2583          */
2584         rc = system(xlogRestoreCmd);
2585         if (rc == 0)
2586         {
2587                 /*
2588                  * command apparently succeeded, but let's make sure the file is
2589                  * really there now and has the correct size.
2590                  *
2591                  * XXX I made wrong-size a fatal error to ensure the DBA would notice
2592                  * it, but is that too strong?  We could try to plow ahead with a
2593                  * local copy of the file ... but the problem is that there probably
2594                  * isn't one, and we'd incorrectly conclude we've reached the end of
2595                  * WAL and we're done recovering ...
2596                  */
2597                 if (stat(xlogpath, &stat_buf) == 0)
2598                 {
2599                         if (expectedSize > 0 && stat_buf.st_size != expectedSize)
2600                                 ereport(FATAL,
2601                                                 (errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
2602                                                                 xlogfname,
2603                                                                 (unsigned long) stat_buf.st_size,
2604                                                                 (unsigned long) expectedSize)));
2605                         else
2606                         {
2607                                 ereport(LOG,
2608                                                 (errmsg("restored log file \"%s\" from archive",
2609                                                                 xlogfname)));
2610                                 strcpy(path, xlogpath);
2611                                 return true;
2612                         }
2613                 }
2614                 else
2615                 {
2616                         /* stat failed */
2617                         if (errno != ENOENT)
2618                                 ereport(FATAL,
2619                                                 (errcode_for_file_access(),
2620                                                  errmsg("could not stat file \"%s\": %m",
2621                                                                 xlogpath)));
2622                 }
2623         }
2624
2625         /*
2626          * Remember, we rollforward UNTIL the restore fails so failure here is
2627          * just part of the process... that makes it difficult to determine
2628          * whether the restore failed because there isn't an archive to restore,
2629          * or because the administrator has specified the restore program
2630          * incorrectly.  We have to assume the former.
2631          *
2632          * However, if the failure was due to any sort of signal, it's best to
2633          * punt and abort recovery.  (If we "return false" here, upper levels will
2634          * assume that recovery is complete and start up the database!) It's
2635          * essential to abort on child SIGINT and SIGQUIT, because per spec
2636          * system() ignores SIGINT and SIGQUIT while waiting; if we see one of
2637          * those it's a good bet we should have gotten it too.  Aborting on other
2638          * signals such as SIGTERM seems a good idea as well.
2639          *
2640          * Per the Single Unix Spec, shells report exit status > 128 when a called
2641          * command died on a signal.  Also, 126 and 127 are used to report
2642          * problems such as an unfindable command; treat those as fatal errors
2643          * too.
2644          */
2645         signaled = WIFSIGNALED(rc) || WEXITSTATUS(rc) > 125;
2646
2647         ereport(signaled ? FATAL : DEBUG2,
2648                 (errmsg("could not restore file \"%s\" from archive: return code %d",
2649                                 xlogfname, rc)));
2650
2651         /*
2652          * if an archived file is not available, there might still be a version of
2653          * this file in XLOGDIR, so return that as the filename to open.
2654          *
2655          * In many recovery scenarios we expect this to fail also, but if so that
2656          * just means we've reached the end of WAL.
2657          */
2658         snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
2659         return false;
2660 }
2661
2662 /*
2663  * Preallocate log files beyond the specified log endpoint.
2664  *
2665  * XXX this is currently extremely conservative, since it forces only one
2666  * future log segment to exist, and even that only if we are 75% done with
2667  * the current one.  This is only appropriate for very low-WAL-volume systems.
2668  * High-volume systems will be OK once they've built up a sufficient set of
2669  * recycled log segments, but the startup transient is likely to include
2670  * a lot of segment creations by foreground processes, which is not so good.
2671  */
2672 static void
2673 PreallocXlogFiles(XLogRecPtr endptr)
2674 {
2675         uint32          _logId;
2676         uint32          _logSeg;
2677         int                     lf;
2678         bool            use_existent;
2679
2680         XLByteToPrevSeg(endptr, _logId, _logSeg);
2681         if ((endptr.xrecoff - 1) % XLogSegSize >=
2682                 (uint32) (0.75 * XLogSegSize))
2683         {
2684                 NextLogSeg(_logId, _logSeg);
2685                 use_existent = true;
2686                 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
2687                 close(lf);
2688                 if (!use_existent)
2689                         CheckpointStats.ckpt_segs_added++;
2690         }
2691 }
2692
2693 /*
2694  * Recycle or remove all log files older or equal to passed log/seg#
2695  *
2696  * endptr is current (or recent) end of xlog; this is used to determine
2697  * whether we want to recycle rather than delete no-longer-wanted log files.
2698  */
2699 static void
2700 RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
2701 {
2702         uint32          endlogId;
2703         uint32          endlogSeg;
2704         int                     max_advance;
2705         DIR                *xldir;
2706         struct dirent *xlde;
2707         char            lastoff[MAXFNAMELEN];
2708         char            path[MAXPGPATH];
2709
2710         /*
2711          * Initialize info about where to try to recycle to.  We allow recycling
2712          * segments up to XLOGfileslop segments beyond the current XLOG location.
2713          */
2714         XLByteToPrevSeg(endptr, endlogId, endlogSeg);
2715         max_advance = XLOGfileslop;
2716
2717         xldir = AllocateDir(XLOGDIR);
2718         if (xldir == NULL)
2719                 ereport(ERROR,
2720                                 (errcode_for_file_access(),
2721                                  errmsg("could not open transaction log directory \"%s\": %m",
2722                                                 XLOGDIR)));
2723
2724         XLogFileName(lastoff, ThisTimeLineID, log, seg);
2725
2726         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2727         {
2728                 /*
2729                  * We ignore the timeline part of the XLOG segment identifiers in
2730                  * deciding whether a segment is still needed.  This ensures that we
2731                  * won't prematurely remove a segment from a parent timeline. We could
2732                  * probably be a little more proactive about removing segments of
2733                  * non-parent timelines, but that would be a whole lot more
2734                  * complicated.
2735                  *
2736                  * We use the alphanumeric sorting property of the filenames to decide
2737                  * which ones are earlier than the lastoff segment.
2738                  */
2739                 if (strlen(xlde->d_name) == 24 &&
2740                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2741                         strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
2742                 {
2743                         if (XLogArchiveCheckDone(xlde->d_name, true))
2744                         {
2745                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2746
2747                                 /*
2748                                  * Before deleting the file, see if it can be recycled as a
2749                                  * future log segment.
2750                                  */
2751                                 if (InstallXLogFileSegment(&endlogId, &endlogSeg, path,
2752                                                                                    true, &max_advance,
2753                                                                                    true))
2754                                 {
2755                                         ereport(DEBUG2,
2756                                                         (errmsg("recycled transaction log file \"%s\"",
2757                                                                         xlde->d_name)));
2758                                         CheckpointStats.ckpt_segs_recycled++;
2759                                         /* Needn't recheck that slot on future iterations */
2760                                         if (max_advance > 0)
2761                                         {
2762                                                 NextLogSeg(endlogId, endlogSeg);
2763                                                 max_advance--;
2764                                         }
2765                                 }
2766                                 else
2767                                 {
2768                                         /* No need for any more future segments... */
2769                                         ereport(DEBUG2,
2770                                                         (errmsg("removing transaction log file \"%s\"",
2771                                                                         xlde->d_name)));
2772                                         unlink(path);
2773                                         CheckpointStats.ckpt_segs_removed++;
2774                                 }
2775
2776                                 XLogArchiveCleanup(xlde->d_name);
2777                         }
2778                 }
2779         }
2780
2781         FreeDir(xldir);
2782 }
2783
2784 /*
2785  * Remove previous backup history files.  This also retries creation of
2786  * .ready files for any backup history files for which XLogArchiveNotify
2787  * failed earlier.
2788  */
2789 static void
2790 CleanupBackupHistory(void)
2791 {
2792         DIR                *xldir;
2793         struct dirent *xlde;
2794         char            path[MAXPGPATH];
2795
2796         xldir = AllocateDir(XLOGDIR);
2797         if (xldir == NULL)
2798                 ereport(ERROR,
2799                                 (errcode_for_file_access(),
2800                                  errmsg("could not open transaction log directory \"%s\": %m",
2801                                                 XLOGDIR)));
2802
2803         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2804         {
2805                 if (strlen(xlde->d_name) > 24 &&
2806                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2807                         strcmp(xlde->d_name + strlen(xlde->d_name) - strlen(".backup"),
2808                                    ".backup") == 0)
2809                 {
2810                         if (XLogArchiveCheckDone(xlde->d_name, true))
2811                         {
2812                                 ereport(DEBUG2,
2813                                 (errmsg("removing transaction log backup history file \"%s\"",
2814                                                 xlde->d_name)));
2815                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2816                                 unlink(path);
2817                                 XLogArchiveCleanup(xlde->d_name);
2818                         }
2819                 }
2820         }
2821
2822         FreeDir(xldir);
2823 }
2824
2825 /*
2826  * Restore the backup blocks present in an XLOG record, if any.
2827  *
2828  * We assume all of the record has been read into memory at *record.
2829  *
2830  * Note: when a backup block is available in XLOG, we restore it
2831  * unconditionally, even if the page in the database appears newer.
2832  * This is to protect ourselves against database pages that were partially
2833  * or incorrectly written during a crash.  We assume that the XLOG data
2834  * must be good because it has passed a CRC check, while the database
2835  * page might not be.  This will force us to replay all subsequent
2836  * modifications of the page that appear in XLOG, rather than possibly
2837  * ignoring them as already applied, but that's not a huge drawback.
2838  */
2839 static void
2840 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
2841 {
2842         Buffer          buffer;
2843         Page            page;
2844         BkpBlock        bkpb;
2845         char       *blk;
2846         int                     i;
2847
2848         blk = (char *) XLogRecGetData(record) + record->xl_len;
2849         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2850         {
2851                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2852                         continue;
2853
2854                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2855                 blk += sizeof(BkpBlock);
2856
2857                 buffer = XLogReadBufferWithFork(bkpb.node, bkpb.fork, bkpb.block,
2858                                                                                 true);
2859                 Assert(BufferIsValid(buffer));
2860                 page = (Page) BufferGetPage(buffer);
2861
2862                 if (bkpb.hole_length == 0)
2863                 {
2864                         memcpy((char *) page, blk, BLCKSZ);
2865                 }
2866                 else
2867                 {
2868                         /* must zero-fill the hole */
2869                         MemSet((char *) page, 0, BLCKSZ);
2870                         memcpy((char *) page, blk, bkpb.hole_offset);
2871                         memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
2872                                    blk + bkpb.hole_offset,
2873                                    BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
2874                 }
2875
2876                 PageSetLSN(page, lsn);
2877                 PageSetTLI(page, ThisTimeLineID);
2878                 MarkBufferDirty(buffer);
2879                 UnlockReleaseBuffer(buffer);
2880
2881                 blk += BLCKSZ - bkpb.hole_length;
2882         }
2883 }
2884
2885 /*
2886  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
2887  * record (other than to the minimal extent of computing the amount of
2888  * data to read in) until we've checked the CRCs.
2889  *
2890  * We assume all of the record has been read into memory at *record.
2891  */
2892 static bool
2893 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
2894 {
2895         pg_crc32        crc;
2896         int                     i;
2897         uint32          len = record->xl_len;
2898         BkpBlock        bkpb;
2899         char       *blk;
2900
2901         /* First the rmgr data */
2902         INIT_CRC32(crc);
2903         COMP_CRC32(crc, XLogRecGetData(record), len);
2904
2905         /* Add in the backup blocks, if any */
2906         blk = (char *) XLogRecGetData(record) + len;
2907         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2908         {
2909                 uint32          blen;
2910
2911                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2912                         continue;
2913
2914                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2915                 if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
2916                 {
2917                         ereport(emode,
2918                                         (errmsg("incorrect hole size in record at %X/%X",
2919                                                         recptr.xlogid, recptr.xrecoff)));
2920                         return false;
2921                 }
2922                 blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
2923                 COMP_CRC32(crc, blk, blen);
2924                 blk += blen;
2925         }
2926
2927         /* Check that xl_tot_len agrees with our calculation */
2928         if (blk != (char *) record + record->xl_tot_len)
2929         {
2930                 ereport(emode,
2931                                 (errmsg("incorrect total length in record at %X/%X",
2932                                                 recptr.xlogid, recptr.xrecoff)));
2933                 return false;
2934         }
2935
2936         /* Finally include the record header */
2937         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
2938                            SizeOfXLogRecord - sizeof(pg_crc32));
2939         FIN_CRC32(crc);
2940
2941         if (!EQ_CRC32(record->xl_crc, crc))
2942         {
2943                 ereport(emode,
2944                 (errmsg("incorrect resource manager data checksum in record at %X/%X",
2945                                 recptr.xlogid, recptr.xrecoff)));
2946                 return false;
2947         }
2948
2949         return true;
2950 }
2951
2952 /*
2953  * Attempt to read an XLOG record.
2954  *
2955  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
2956  * try to read a record just after the last one previously read.
2957  *
2958  * If no valid record is available, returns NULL, or fails if emode is PANIC.
2959  * (emode must be either PANIC or LOG.)
2960  *
2961  * The record is copied into readRecordBuf, so that on successful return,
2962  * the returned record pointer always points there.
2963  */
2964 static XLogRecord *
2965 ReadRecord(XLogRecPtr *RecPtr, int emode)
2966 {
2967         XLogRecord *record;
2968         char       *buffer;
2969         XLogRecPtr      tmpRecPtr = EndRecPtr;
2970         bool            randAccess = false;
2971         uint32          len,
2972                                 total_len;
2973         uint32          targetPageOff;
2974         uint32          targetRecOff;
2975         uint32          pageHeaderSize;
2976
2977         if (readBuf == NULL)
2978         {
2979                 /*
2980                  * First time through, permanently allocate readBuf.  We do it this
2981                  * way, rather than just making a static array, for two reasons: (1)
2982                  * no need to waste the storage in most instantiations of the backend;
2983                  * (2) a static char array isn't guaranteed to have any particular
2984                  * alignment, whereas malloc() will provide MAXALIGN'd storage.
2985                  */
2986                 readBuf = (char *) malloc(XLOG_BLCKSZ);
2987                 Assert(readBuf != NULL);
2988         }
2989
2990         if (RecPtr == NULL)
2991         {
2992                 RecPtr = &tmpRecPtr;
2993                 /* fast case if next record is on same page */
2994                 if (nextRecord != NULL)
2995                 {
2996                         record = nextRecord;
2997                         goto got_record;
2998                 }
2999                 /* align old recptr to next page */
3000                 if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
3001                         tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
3002                 if (tmpRecPtr.xrecoff >= XLogFileSize)
3003                 {
3004                         (tmpRecPtr.xlogid)++;
3005                         tmpRecPtr.xrecoff = 0;
3006                 }
3007                 /* We will account for page header size below */
3008         }
3009         else
3010         {
3011                 if (!XRecOffIsValid(RecPtr->xrecoff))
3012                         ereport(PANIC,
3013                                         (errmsg("invalid record offset at %X/%X",
3014                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3015
3016                 /*
3017                  * Since we are going to a random position in WAL, forget any prior
3018                  * state about what timeline we were in, and allow it to be any
3019                  * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
3020                  * to go backwards (but we can't reset that variable right here, since
3021                  * we might not change files at all).
3022                  */
3023                 lastPageTLI = 0;                /* see comment in ValidXLOGHeader */
3024                 randAccess = true;              /* allow curFileTLI to go backwards too */
3025         }
3026
3027         if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
3028         {
3029                 close(readFile);
3030                 readFile = -1;
3031         }
3032         XLByteToSeg(*RecPtr, readId, readSeg);
3033         if (readFile < 0)
3034         {
3035                 /* Now it's okay to reset curFileTLI if random fetch */
3036                 if (randAccess)
3037                         curFileTLI = 0;
3038
3039                 readFile = XLogFileRead(readId, readSeg, emode);
3040                 if (readFile < 0)
3041                         goto next_record_is_invalid;
3042
3043                 /*
3044                  * Whenever switching to a new WAL segment, we read the first page of
3045                  * the file and validate its header, even if that's not where the
3046                  * target record is.  This is so that we can check the additional
3047                  * identification info that is present in the first page's "long"
3048                  * header.
3049                  */
3050                 readOff = 0;
3051                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3052                 {
3053                         ereport(emode,
3054                                         (errcode_for_file_access(),
3055                                          errmsg("could not read from log file %u, segment %u, offset %u: %m",
3056                                                         readId, readSeg, readOff)));
3057                         goto next_record_is_invalid;
3058                 }
3059                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
3060                         goto next_record_is_invalid;
3061         }
3062
3063         targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
3064         if (readOff != targetPageOff)
3065         {
3066                 readOff = targetPageOff;
3067                 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
3068                 {
3069                         ereport(emode,
3070                                         (errcode_for_file_access(),
3071                                          errmsg("could not seek in log file %u, segment %u to offset %u: %m",
3072                                                         readId, readSeg, readOff)));
3073                         goto next_record_is_invalid;
3074                 }
3075                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3076                 {
3077                         ereport(emode,
3078                                         (errcode_for_file_access(),
3079                                          errmsg("could not read from log file %u, segment %u, offset %u: %m",
3080                                                         readId, readSeg, readOff)));
3081                         goto next_record_is_invalid;
3082                 }
3083                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
3084                         goto next_record_is_invalid;
3085         }
3086         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3087         targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
3088         if (targetRecOff == 0)
3089         {
3090                 /*
3091                  * Can only get here in the continuing-from-prev-page case, because
3092                  * XRecOffIsValid eliminated the zero-page-offset case otherwise. Need
3093                  * to skip over the new page's header.
3094                  */
3095                 tmpRecPtr.xrecoff += pageHeaderSize;
3096                 targetRecOff = pageHeaderSize;
3097         }
3098         else if (targetRecOff < pageHeaderSize)
3099         {
3100                 ereport(emode,
3101                                 (errmsg("invalid record offset at %X/%X",
3102                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3103                 goto next_record_is_invalid;
3104         }
3105         if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
3106                 targetRecOff == pageHeaderSize)
3107         {
3108                 ereport(emode,
3109                                 (errmsg("contrecord is requested by %X/%X",
3110                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3111                 goto next_record_is_invalid;
3112         }
3113         record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
3114
3115 got_record:;
3116
3117         /*
3118          * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
3119          * required.
3120          */
3121         if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
3122         {
3123                 if (record->xl_len != 0)
3124                 {
3125                         ereport(emode,
3126                                         (errmsg("invalid xlog switch record at %X/%X",
3127                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3128                         goto next_record_is_invalid;
3129                 }
3130         }
3131         else if (record->xl_len == 0)
3132         {
3133                 ereport(emode,
3134                                 (errmsg("record with zero length at %X/%X",
3135                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3136                 goto next_record_is_invalid;
3137         }
3138         if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
3139                 record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
3140                 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
3141         {
3142                 ereport(emode,
3143                                 (errmsg("invalid record length at %X/%X",
3144                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3145                 goto next_record_is_invalid;
3146         }
3147         if (record->xl_rmid > RM_MAX_ID)
3148         {
3149                 ereport(emode,
3150                                 (errmsg("invalid resource manager ID %u at %X/%X",
3151                                                 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
3152                 goto next_record_is_invalid;
3153         }
3154         if (randAccess)
3155         {
3156                 /*
3157                  * We can't exactly verify the prev-link, but surely it should be less
3158                  * than the record's own address.
3159                  */
3160                 if (!XLByteLT(record->xl_prev, *RecPtr))
3161                 {
3162                         ereport(emode,
3163                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
3164                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
3165                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3166                         goto next_record_is_invalid;
3167                 }
3168         }
3169         else
3170         {
3171                 /*
3172                  * Record's prev-link should exactly match our previous location. This
3173                  * check guards against torn WAL pages where a stale but valid-looking
3174                  * WAL record starts on a sector boundary.
3175                  */
3176                 if (!XLByteEQ(record->xl_prev, ReadRecPtr))
3177                 {
3178                         ereport(emode,
3179                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
3180                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
3181                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3182                         goto next_record_is_invalid;
3183                 }
3184         }
3185
3186         /*
3187          * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
3188          * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
3189          * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
3190          * enough for all "normal" records, but very large commit or abort records
3191          * might need more space.)
3192          */
3193         total_len = record->xl_tot_len;
3194         if (total_len > readRecordBufSize)
3195         {
3196                 uint32          newSize = total_len;
3197
3198                 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
3199                 newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
3200                 if (readRecordBuf)
3201                         free(readRecordBuf);
3202                 readRecordBuf = (char *) malloc(newSize);
3203                 if (!readRecordBuf)
3204                 {
3205                         readRecordBufSize = 0;
3206                         /* We treat this as a "bogus data" condition */
3207                         ereport(emode,
3208                                         (errmsg("record length %u at %X/%X too long",
3209                                                         total_len, RecPtr->xlogid, RecPtr->xrecoff)));
3210                         goto next_record_is_invalid;
3211                 }
3212                 readRecordBufSize = newSize;
3213         }
3214
3215         buffer = readRecordBuf;
3216         nextRecord = NULL;
3217         len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
3218         if (total_len > len)
3219         {
3220                 /* Need to reassemble record */
3221                 XLogContRecord *contrecord;
3222                 uint32          gotlen = len;
3223
3224                 memcpy(buffer, record, len);
3225                 record = (XLogRecord *) buffer;
3226                 buffer += len;
3227                 for (;;)
3228                 {
3229                         readOff += XLOG_BLCKSZ;
3230                         if (readOff >= XLogSegSize)
3231                         {
3232                                 close(readFile);
3233                                 readFile = -1;
3234                                 NextLogSeg(readId, readSeg);
3235                                 readFile = XLogFileRead(readId, readSeg, emode);
3236                                 if (readFile < 0)
3237                                         goto next_record_is_invalid;
3238                                 readOff = 0;
3239                         }
3240                         if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3241                         {
3242                                 ereport(emode,
3243                                                 (errcode_for_file_access(),
3244                                                  errmsg("could not read from log file %u, segment %u, offset %u: %m",
3245                                                                 readId, readSeg, readOff)));
3246                                 goto next_record_is_invalid;
3247                         }
3248                         if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
3249                                 goto next_record_is_invalid;
3250                         if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
3251                         {
3252                                 ereport(emode,
3253                                                 (errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
3254                                                                 readId, readSeg, readOff)));
3255                                 goto next_record_is_invalid;
3256                         }
3257                         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3258                         contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize);
3259                         if (contrecord->xl_rem_len == 0 ||
3260                                 total_len != (contrecord->xl_rem_len + gotlen))
3261                         {
3262                                 ereport(emode,
3263                                                 (errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
3264                                                                 contrecord->xl_rem_len,
3265                                                                 readId, readSeg, readOff)));
3266                                 goto next_record_is_invalid;
3267                         }
3268                         len = XLOG_BLCKSZ - pageHeaderSize - SizeOfXLogContRecord;
3269                         if (contrecord->xl_rem_len > len)
3270                         {
3271                                 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
3272                                 gotlen += len;
3273                                 buffer += len;
3274                                 continue;
3275                         }
3276                         memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
3277                                    contrecord->xl_rem_len);
3278                         break;
3279                 }
3280                 if (!RecordIsValid(record, *RecPtr, emode))
3281                         goto next_record_is_invalid;
3282                 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3283                 if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
3284                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
3285                 {
3286                         nextRecord = (XLogRecord *) ((char *) contrecord +
3287                                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
3288                 }
3289                 EndRecPtr.xlogid = readId;
3290                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
3291                         pageHeaderSize +
3292                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
3293                 ReadRecPtr = *RecPtr;
3294                 /* needn't worry about XLOG SWITCH, it can't cross page boundaries */
3295                 return record;
3296         }
3297
3298         /* Record does not cross a page boundary */
3299         if (!RecordIsValid(record, *RecPtr, emode))
3300                 goto next_record_is_invalid;
3301         if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
3302                 MAXALIGN(total_len))
3303                 nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
3304         EndRecPtr.xlogid = RecPtr->xlogid;
3305         EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
3306         ReadRecPtr = *RecPtr;
3307         memcpy(buffer, record, total_len);
3308
3309         /*
3310          * Special processing if it's an XLOG SWITCH record
3311          */
3312         if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
3313         {
3314                 /* Pretend it extends to end of segment */
3315                 EndRecPtr.xrecoff += XLogSegSize - 1;
3316                 EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
3317                 nextRecord = NULL;              /* definitely not on same page */
3318
3319                 /*
3320                  * Pretend that readBuf contains the last page of the segment. This is
3321                  * just to avoid Assert failure in StartupXLOG if XLOG ends with this
3322                  * segment.
3323                  */
3324                 readOff = XLogSegSize - XLOG_BLCKSZ;
3325         }
3326         return (XLogRecord *) buffer;
3327
3328 next_record_is_invalid:;
3329         if (readFile >= 0)
3330         {
3331                 close(readFile);
3332                 readFile = -1;
3333         }
3334         nextRecord = NULL;
3335         return NULL;
3336 }
3337
3338 /*
3339  * Check whether the xlog header of a page just read in looks valid.
3340  *
3341  * This is just a convenience subroutine to avoid duplicated code in
3342  * ReadRecord.  It's not intended for use from anywhere else.
3343  */
3344 static bool
3345 ValidXLOGHeader(XLogPageHeader hdr, int emode)
3346 {
3347         XLogRecPtr      recaddr;
3348
3349         if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
3350         {
3351                 ereport(emode,
3352                                 (errmsg("invalid magic number %04X in log file %u, segment %u, offset %u",
3353                                                 hdr->xlp_magic, readId, readSeg, readOff)));
3354                 return false;
3355         }
3356         if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
3357         {
3358                 ereport(emode,
3359                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3360                                                 hdr->xlp_info, readId, readSeg, readOff)));
3361                 return false;
3362         }
3363         if (hdr->xlp_info & XLP_LONG_HEADER)
3364         {
3365                 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
3366
3367                 if (longhdr->xlp_sysid != ControlFile->system_identifier)
3368                 {
3369                         char            fhdrident_str[32];
3370                         char            sysident_str[32];
3371
3372                         /*
3373                          * Format sysids separately to keep platform-dependent format code
3374                          * out of the translatable message string.
3375                          */
3376                         snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
3377                                          longhdr->xlp_sysid);
3378                         snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
3379                                          ControlFile->system_identifier);
3380                         ereport(emode,
3381                                         (errmsg("WAL file is from different system"),
3382                                          errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
3383                                                            fhdrident_str, sysident_str)));
3384                         return false;
3385                 }
3386                 if (longhdr->xlp_seg_size != XLogSegSize)
3387                 {
3388                         ereport(emode,
3389                                         (errmsg("WAL file is from different system"),
3390                                          errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
3391                         return false;
3392                 }
3393                 if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
3394                 {
3395                         ereport(emode,
3396                                         (errmsg("WAL file is from different system"),
3397                                          errdetail("Incorrect XLOG_BLCKSZ in page header.")));
3398                         return false;
3399                 }
3400         }
3401         else if (readOff == 0)
3402         {
3403                 /* hmm, first page of file doesn't have a long header? */
3404                 ereport(emode,
3405                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3406                                                 hdr->xlp_info, readId, readSeg, readOff)));
3407                 return false;
3408         }
3409
3410         recaddr.xlogid = readId;
3411         recaddr.xrecoff = readSeg * XLogSegSize + readOff;
3412         if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
3413         {
3414                 ereport(emode,
3415                                 (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
3416                                                 hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
3417                                                 readId, readSeg, readOff)));
3418                 return false;
3419         }
3420
3421         /*
3422          * Check page TLI is one of the expected values.
3423          */
3424         if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
3425         {
3426                 ereport(emode,
3427                                 (errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u",
3428                                                 hdr->xlp_tli,
3429                                                 readId, readSeg, readOff)));
3430                 return false;
3431         }
3432
3433         /*
3434          * Since child timelines are always assigned a TLI greater than their
3435          * immediate parent's TLI, we should never see TLI go backwards across
3436          * successive pages of a consistent WAL sequence.
3437          *
3438          * Of course this check should only be applied when advancing sequentially
3439          * across pages; therefore ReadRecord resets lastPageTLI to zero when
3440          * going to a random page.
3441          */
3442         if (hdr->xlp_tli < lastPageTLI)
3443         {
3444                 ereport(emode,
3445                                 (errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u",
3446                                                 hdr->xlp_tli, lastPageTLI,
3447                                                 readId, readSeg, readOff)));
3448                 return false;
3449         }
3450         lastPageTLI = hdr->xlp_tli;
3451         return true;
3452 }
3453
3454 /*
3455  * Try to read a timeline's history file.
3456  *
3457  * If successful, return the list of component TLIs (the given TLI followed by
3458  * its ancestor TLIs).  If we can't find the history file, assume that the
3459  * timeline has no parents, and return a list of just the specified timeline
3460  * ID.
3461  */
3462 static List *
3463 readTimeLineHistory(TimeLineID targetTLI)
3464 {
3465         List       *result;
3466         char            path[MAXPGPATH];
3467         char            histfname[MAXFNAMELEN];
3468         char            fline[MAXPGPATH];
3469         FILE       *fd;
3470
3471         if (InArchiveRecovery)
3472         {
3473                 TLHistoryFileName(histfname, targetTLI);
3474                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3475         }
3476         else
3477                 TLHistoryFilePath(path, targetTLI);
3478
3479         fd = AllocateFile(path, "r");
3480         if (fd == NULL)
3481         {
3482                 if (errno != ENOENT)
3483                         ereport(FATAL,
3484                                         (errcode_for_file_access(),
3485                                          errmsg("could not open file \"%s\": %m", path)));
3486                 /* Not there, so assume no parents */
3487                 return list_make1_int((int) targetTLI);
3488         }
3489
3490         result = NIL;
3491
3492         /*
3493          * Parse the file...
3494          */
3495         while (fgets(fline, sizeof(fline), fd) != NULL)
3496         {
3497                 /* skip leading whitespace and check for # comment */
3498                 char       *ptr;
3499                 char       *endptr;
3500                 TimeLineID      tli;
3501
3502                 for (ptr = fline; *ptr; ptr++)
3503                 {
3504                         if (!isspace((unsigned char) *ptr))
3505                                 break;
3506                 }
3507                 if (*ptr == '\0' || *ptr == '#')
3508                         continue;
3509
3510                 /* expect a numeric timeline ID as first field of line */
3511                 tli = (TimeLineID) strtoul(ptr, &endptr, 0);
3512                 if (endptr == ptr)
3513                         ereport(FATAL,
3514                                         (errmsg("syntax error in history file: %s", fline),
3515                                          errhint("Expected a numeric timeline ID.")));
3516
3517                 if (result &&
3518                         tli <= (TimeLineID) linitial_int(result))
3519                         ereport(FATAL,
3520                                         (errmsg("invalid data in history file: %s", fline),
3521                                    errhint("Timeline IDs must be in increasing sequence.")));
3522
3523                 /* Build list with newest item first */
3524                 result = lcons_int((int) tli, result);
3525
3526                 /* we ignore the remainder of each line */
3527         }
3528
3529         FreeFile(fd);
3530
3531         if (result &&
3532                 targetTLI <= (TimeLineID) linitial_int(result))
3533                 ereport(FATAL,
3534                                 (errmsg("invalid data in history file \"%s\"", path),
3535                         errhint("Timeline IDs must be less than child timeline's ID.")));
3536
3537         result = lcons_int((int) targetTLI, result);
3538
3539         ereport(DEBUG3,
3540                         (errmsg_internal("history of timeline %u is %s",
3541                                                          targetTLI, nodeToString(result))));
3542
3543         return result;
3544 }
3545
3546 /*
3547  * Probe whether a timeline history file exists for the given timeline ID
3548  */
3549 static bool
3550 existsTimeLineHistory(TimeLineID probeTLI)
3551 {
3552         char            path[MAXPGPATH];
3553         char            histfname[MAXFNAMELEN];
3554         FILE       *fd;
3555
3556         if (InArchiveRecovery)
3557         {
3558                 TLHistoryFileName(histfname, probeTLI);
3559                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3560         }
3561         else
3562                 TLHistoryFilePath(path, probeTLI);
3563
3564         fd = AllocateFile(path, "r");
3565         if (fd != NULL)
3566         {
3567                 FreeFile(fd);
3568                 return true;
3569         }
3570         else
3571         {
3572                 if (errno != ENOENT)
3573                         ereport(FATAL,
3574                                         (errcode_for_file_access(),
3575                                          errmsg("could not open file \"%s\": %m", path)));
3576                 return false;
3577         }
3578 }
3579
3580 /*
3581  * Find the newest existing timeline, assuming that startTLI exists.
3582  *
3583  * Note: while this is somewhat heuristic, it does positively guarantee
3584  * that (result + 1) is not a known timeline, and therefore it should
3585  * be safe to assign that ID to a new timeline.
3586  */
3587 static TimeLineID
3588 findNewestTimeLine(TimeLineID startTLI)
3589 {
3590         TimeLineID      newestTLI;
3591         TimeLineID      probeTLI;
3592
3593         /*
3594          * The algorithm is just to probe for the existence of timeline history
3595          * files.  XXX is it useful to allow gaps in the sequence?
3596          */
3597         newestTLI = startTLI;
3598
3599         for (probeTLI = startTLI + 1;; probeTLI++)
3600         {
3601                 if (existsTimeLineHistory(probeTLI))
3602                 {
3603                         newestTLI = probeTLI;           /* probeTLI exists */
3604                 }
3605                 else
3606                 {
3607                         /* doesn't exist, assume we're done */
3608                         break;
3609                 }
3610         }
3611
3612         return newestTLI;
3613 }
3614
3615 /*
3616  * Create a new timeline history file.
3617  *
3618  *      newTLI: ID of the new timeline
3619  *      parentTLI: ID of its immediate parent
3620  *      endTLI et al: ID of the last used WAL file, for annotation purposes
3621  *
3622  * Currently this is only used during recovery, and so there are no locking
3623  * considerations.      But we should be just as tense as XLogFileInit to avoid
3624  * emplacing a bogus file.
3625  */
3626 static void
3627 writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
3628                                          TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
3629 {
3630         char            path[MAXPGPATH];
3631         char            tmppath[MAXPGPATH];
3632         char            histfname[MAXFNAMELEN];
3633         char            xlogfname[MAXFNAMELEN];
3634         char            buffer[BLCKSZ];
3635         int                     srcfd;
3636         int                     fd;
3637         int                     nbytes;
3638
3639         Assert(newTLI > parentTLI); /* else bad selection of newTLI */
3640
3641         /*
3642          * Write into a temp file name.
3643          */
3644         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3645
3646         unlink(tmppath);
3647
3648         /* do not use get_sync_bit() here --- want to fsync only at end of fill */
3649         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
3650                                            S_IRUSR | S_IWUSR);
3651         if (fd < 0)
3652                 ereport(ERROR,
3653                                 (errcode_for_file_access(),
3654                                  errmsg("could not create file \"%s\": %m", tmppath)));
3655
3656         /*
3657          * If a history file exists for the parent, copy it verbatim
3658          */
3659         if (InArchiveRecovery)
3660         {
3661                 TLHistoryFileName(histfname, parentTLI);
3662                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3663         }
3664         else
3665                 TLHistoryFilePath(path, parentTLI);
3666
3667         srcfd = BasicOpenFile(path, O_RDONLY, 0);
3668         if (srcfd < 0)
3669         {
3670                 if (errno != ENOENT)
3671                         ereport(ERROR,
3672                                         (errcode_for_file_access(),
3673                                          errmsg("could not open file \"%s\": %m", path)));
3674                 /* Not there, so assume parent has no parents */
3675         }
3676         else
3677         {
3678                 for (;;)
3679                 {
3680                         errno = 0;
3681                         nbytes = (int) read(srcfd, buffer, sizeof(buffer));
3682                         if (nbytes < 0 || errno != 0)
3683                                 ereport(ERROR,
3684                                                 (errcode_for_file_access(),
3685                                                  errmsg("could not read file \"%s\": %m", path)));
3686                         if (nbytes == 0)
3687                                 break;
3688                         errno = 0;
3689                         if ((int) write(fd, buffer, nbytes) != nbytes)
3690                         {
3691                                 int                     save_errno = errno;
3692
3693                                 /*
3694                                  * If we fail to make the file, delete it to release disk
3695                                  * space
3696                                  */
3697                                 unlink(tmppath);
3698
3699                                 /*
3700                                  * if write didn't set errno, assume problem is no disk space
3701                                  */
3702                                 errno = save_errno ? save_errno : ENOSPC;
3703
3704                                 ereport(ERROR,
3705                                                 (errcode_for_file_access(),
3706                                          errmsg("could not write to file \"%s\": %m", tmppath)));
3707                         }
3708                 }
3709                 close(srcfd);
3710         }
3711
3712         /*
3713          * Append one line with the details of this timeline split.
3714          *
3715          * If we did have a parent file, insert an extra newline just in case the
3716          * parent file failed to end with one.
3717          */
3718         XLogFileName(xlogfname, endTLI, endLogId, endLogSeg);
3719
3720         snprintf(buffer, sizeof(buffer),
3721                          "%s%u\t%s\t%s transaction %u at %s\n",
3722                          (srcfd < 0) ? "" : "\n",
3723                          parentTLI,
3724                          xlogfname,
3725                          recoveryStopAfter ? "after" : "before",
3726                          recoveryStopXid,
3727                          timestamptz_to_str(recoveryStopTime));
3728
3729         nbytes = strlen(buffer);
3730         errno = 0;
3731         if ((int) write(fd, buffer, nbytes) != nbytes)
3732         {
3733                 int                     save_errno = errno;
3734
3735                 /*
3736                  * If we fail to make the file, delete it to release disk space
3737                  */
3738                 unlink(tmppath);
3739                 /* if write didn't set errno, assume problem is no disk space */
3740                 errno = save_errno ? save_errno : ENOSPC;
3741
3742                 ereport(ERROR,
3743                                 (errcode_for_file_access(),
3744                                  errmsg("could not write to file \"%s\": %m", tmppath)));
3745         }
3746
3747         if (pg_fsync(fd) != 0)
3748                 ereport(ERROR,
3749                                 (errcode_for_file_access(),
3750                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
3751
3752         if (close(fd))
3753                 ereport(ERROR,
3754                                 (errcode_for_file_access(),
3755                                  errmsg("could not close file \"%s\": %m", tmppath)));
3756
3757
3758         /*
3759          * Now move the completed history file into place with its final name.
3760          */
3761         TLHistoryFilePath(path, newTLI);
3762
3763         /*
3764          * Prefer link() to rename() here just to be really sure that we don't
3765          * overwrite an existing logfile.  However, there shouldn't be one, so
3766          * rename() is an acceptable substitute except for the truly paranoid.
3767          */
3768 #if HAVE_WORKING_LINK
3769         if (link(tmppath, path) < 0)
3770                 ereport(ERROR,
3771                                 (errcode_for_file_access(),
3772                                  errmsg("could not link file \"%s\" to \"%s\": %m",
3773                                                 tmppath, path)));
3774         unlink(tmppath);
3775 #else
3776         if (rename(tmppath, path) < 0)
3777                 ereport(ERROR,
3778                                 (errcode_for_file_access(),
3779                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
3780                                                 tmppath, path)));
3781 #endif
3782
3783         /* The history file can be archived immediately. */
3784         TLHistoryFileName(histfname, newTLI);
3785         XLogArchiveNotify(histfname);
3786 }
3787
3788 /*
3789  * I/O routines for pg_control
3790  *
3791  * *ControlFile is a buffer in shared memory that holds an image of the
3792  * contents of pg_control.      WriteControlFile() initializes pg_control
3793  * given a preloaded buffer, ReadControlFile() loads the buffer from
3794  * the pg_control file (during postmaster or standalone-backend startup),
3795  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
3796  *
3797  * For simplicity, WriteControlFile() initializes the fields of pg_control
3798  * that are related to checking backend/database compatibility, and
3799  * ReadControlFile() verifies they are correct.  We could split out the
3800  * I/O and compatibility-check functions, but there seems no need currently.
3801  */
3802 static void
3803 WriteControlFile(void)
3804 {
3805         int                     fd;
3806         char            buffer[PG_CONTROL_SIZE];                /* need not be aligned */
3807         char       *localeptr;
3808
3809         /*
3810          * Initialize version and compatibility-check fields
3811          */
3812         ControlFile->pg_control_version = PG_CONTROL_VERSION;
3813         ControlFile->catalog_version_no = CATALOG_VERSION_NO;
3814
3815         ControlFile->maxAlign = MAXIMUM_ALIGNOF;
3816         ControlFile->floatFormat = FLOATFORMAT_VALUE;
3817
3818         ControlFile->blcksz = BLCKSZ;
3819         ControlFile->relseg_size = RELSEG_SIZE;
3820         ControlFile->xlog_blcksz = XLOG_BLCKSZ;
3821         ControlFile->xlog_seg_size = XLOG_SEG_SIZE;
3822
3823         ControlFile->nameDataLen = NAMEDATALEN;
3824         ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
3825
3826         ControlFile->toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE;
3827
3828 #ifdef HAVE_INT64_TIMESTAMP
3829         ControlFile->enableIntTimes = true;
3830 #else
3831         ControlFile->enableIntTimes = false;
3832 #endif
3833         ControlFile->float4ByVal = FLOAT4PASSBYVAL;
3834         ControlFile->float8ByVal = FLOAT8PASSBYVAL;
3835
3836         ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
3837         localeptr = setlocale(LC_COLLATE, NULL);
3838         if (!localeptr)
3839                 ereport(PANIC,
3840                                 (errmsg("invalid LC_COLLATE setting")));
3841         StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
3842         localeptr = setlocale(LC_CTYPE, NULL);
3843         if (!localeptr)
3844                 ereport(PANIC,
3845                                 (errmsg("invalid LC_CTYPE setting")));
3846         StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
3847
3848         /* Contents are protected with a CRC */
3849         INIT_CRC32(ControlFile->crc);
3850         COMP_CRC32(ControlFile->crc,
3851                            (char *) ControlFile,
3852                            offsetof(ControlFileData, crc));
3853         FIN_CRC32(ControlFile->crc);
3854
3855         /*
3856          * We write out PG_CONTROL_SIZE bytes into pg_control, zero-padding the
3857          * excess over sizeof(ControlFileData).  This reduces the odds of
3858          * premature-EOF errors when reading pg_control.  We'll still fail when we
3859          * check the contents of the file, but hopefully with a more specific
3860          * error than "couldn't read pg_control".
3861          */
3862         if (sizeof(ControlFileData) > PG_CONTROL_SIZE)
3863                 elog(PANIC, "sizeof(ControlFileData) is larger than PG_CONTROL_SIZE; fix either one");
3864
3865         memset(buffer, 0, PG_CONTROL_SIZE);
3866         memcpy(buffer, ControlFile, sizeof(ControlFileData));
3867
3868         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3869                                            O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
3870                                            S_IRUSR | S_IWUSR);
3871         if (fd < 0)
3872                 ereport(PANIC,
3873                                 (errcode_for_file_access(),
3874                                  errmsg("could not create control file \"%s\": %m",
3875                                                 XLOG_CONTROL_FILE)));
3876
3877         errno = 0;
3878         if (write(fd, buffer, PG_CONTROL_SIZE) != PG_CONTROL_SIZE)
3879         {
3880                 /* if write didn't set errno, assume problem is no disk space */
3881                 if (errno == 0)
3882                         errno = ENOSPC;
3883                 ereport(PANIC,
3884                                 (errcode_for_file_access(),
3885                                  errmsg("could not write to control file: %m")));
3886         }
3887
3888         if (pg_fsync(fd) != 0)
3889                 ereport(PANIC,
3890                                 (errcode_for_file_access(),
3891                                  errmsg("could not fsync control file: %m")));
3892
3893         if (close(fd))
3894                 ereport(PANIC,
3895                                 (errcode_for_file_access(),
3896                                  errmsg("could not close control file: %m")));
3897 }
3898
3899 static void
3900 ReadControlFile(void)
3901 {
3902         pg_crc32        crc;
3903         int                     fd;
3904
3905         /*
3906          * Read data...
3907          */
3908         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3909                                            O_RDWR | PG_BINARY,
3910                                            S_IRUSR | S_IWUSR);
3911         if (fd < 0)
3912                 ereport(PANIC,
3913                                 (errcode_for_file_access(),
3914                                  errmsg("could not open control file \"%s\": %m",
3915                                                 XLOG_CONTROL_FILE)));
3916
3917         if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3918                 ereport(PANIC,
3919                                 (errcode_for_file_access(),
3920                                  errmsg("could not read from control file: %m")));
3921
3922         close(fd);
3923
3924         /*
3925          * Check for expected pg_control format version.  If this is wrong, the
3926          * CRC check will likely fail because we'll be checking the wrong number
3927          * of bytes.  Complaining about wrong version will probably be more
3928          * enlightening than complaining about wrong CRC.
3929          */
3930
3931         if (ControlFile->pg_control_version != PG_CONTROL_VERSION && ControlFile->pg_control_version % 65536 == 0 && ControlFile->pg_control_version / 65536 != 0)
3932                 ereport(FATAL,
3933                                 (errmsg("database files are incompatible with server"),
3934                                  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d (0x%08x),"
3935                                                    " but the server was compiled with PG_CONTROL_VERSION %d (0x%08x).",
3936                                                    ControlFile->pg_control_version, ControlFile->pg_control_version,
3937                                                    PG_CONTROL_VERSION, PG_CONTROL_VERSION),
3938                                  errhint("This could be a problem of mismatched byte ordering.  It looks like you need to initdb.")));
3939
3940         if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
3941                 ereport(FATAL,
3942                                 (errmsg("database files are incompatible with server"),
3943                                  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
3944                                   " but the server was compiled with PG_CONTROL_VERSION %d.",
3945                                                 ControlFile->pg_control_version, PG_CONTROL_VERSION),
3946                                  errhint("It looks like you need to initdb.")));
3947
3948         /* Now check the CRC. */
3949         INIT_CRC32(crc);
3950         COMP_CRC32(crc,
3951                            (char *) ControlFile,
3952                            offsetof(ControlFileData, crc));
3953         FIN_CRC32(crc);
3954
3955         if (!EQ_CRC32(crc, ControlFile->crc))
3956                 ereport(FATAL,
3957                                 (errmsg("incorrect checksum in control file")));
3958
3959         /*
3960          * Do compatibility checking immediately.  We do this here for 2 reasons:
3961          *
3962          * (1) if the database isn't compatible with the backend executable, we
3963          * want to abort before we can possibly do any damage;
3964          *
3965          * (2) this code is executed in the postmaster, so the setlocale() will
3966          * propagate to forked backends, which aren't going to read this file for
3967          * themselves.  (These locale settings are considered critical
3968          * compatibility items because they can affect sort order of indexes.)
3969          */
3970         if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
3971                 ereport(FATAL,
3972                                 (errmsg("database files are incompatible with server"),
3973                                  errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
3974                                   " but the server was compiled with CATALOG_VERSION_NO %d.",
3975                                                 ControlFile->catalog_version_no, CATALOG_VERSION_NO),
3976                                  errhint("It looks like you need to initdb.")));
3977         if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
3978                 ereport(FATAL,
3979                                 (errmsg("database files are incompatible with server"),
3980                    errdetail("The database cluster was initialized with MAXALIGN %d,"
3981                                          " but the server was compiled with MAXALIGN %d.",
3982                                          ControlFile->maxAlign, MAXIMUM_ALIGNOF),
3983                                  errhint("It looks like you need to initdb.")));
3984         if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
3985                 ereport(FATAL,
3986                                 (errmsg("database files are incompatible with server"),
3987                                  errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
3988                                  errhint("It looks like you need to initdb.")));
3989         if (ControlFile->blcksz != BLCKSZ)
3990                 ereport(FATAL,
3991                                 (errmsg("database files are incompatible with server"),
3992                          errdetail("The database cluster was initialized with BLCKSZ %d,"
3993                                            " but the server was compiled with BLCKSZ %d.",
3994                                            ControlFile->blcksz, BLCKSZ),
3995                                  errhint("It looks like you need to recompile or initdb.")));
3996         if (ControlFile->relseg_size != RELSEG_SIZE)
3997                 ereport(FATAL,
3998                                 (errmsg("database files are incompatible with server"),
3999                 errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
4000                                   " but the server was compiled with RELSEG_SIZE %d.",
4001                                   ControlFile->relseg_size, RELSEG_SIZE),
4002                                  errhint("It looks like you need to recompile or initdb.")));
4003         if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
4004                 ereport(FATAL,
4005                                 (errmsg("database files are incompatible with server"),
4006                 errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
4007                                   " but the server was compiled with XLOG_BLCKSZ %d.",
4008                                   ControlFile->xlog_blcksz, XLOG_BLCKSZ),
4009                                  errhint("It looks like you need to recompile or initdb.")));
4010         if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
4011                 ereport(FATAL,
4012                                 (errmsg("database files are incompatible with server"),
4013                                  errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d,"
4014                                            " but the server was compiled with XLOG_SEG_SIZE %d.",
4015                                                    ControlFile->xlog_seg_size, XLOG_SEG_SIZE),
4016                                  errhint("It looks like you need to recompile or initdb.")));
4017         if (ControlFile->nameDataLen != NAMEDATALEN)
4018                 ereport(FATAL,
4019                                 (errmsg("database files are incompatible with server"),
4020                 errdetail("The database cluster was initialized with NAMEDATALEN %d,"
4021                                   " but the server was compiled with NAMEDATALEN %d.",
4022                                   ControlFile->nameDataLen, NAMEDATALEN),
4023                                  errhint("It looks like you need to recompile or initdb.")));
4024         if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
4025                 ereport(FATAL,
4026                                 (errmsg("database files are incompatible with server"),
4027                                  errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
4028                                           " but the server was compiled with INDEX_MAX_KEYS %d.",
4029                                                    ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
4030                                  errhint("It looks like you need to recompile or initdb.")));
4031         if (ControlFile->toast_max_chunk_size != TOAST_MAX_CHUNK_SIZE)
4032                 ereport(FATAL,
4033                                 (errmsg("database files are incompatible with server"),
4034                                  errdetail("The database cluster was initialized with TOAST_MAX_CHUNK_SIZE %d,"
4035                                 " but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.",
4036                           ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE),
4037                                  errhint("It looks like you need to recompile or initdb.")));
4038
4039 #ifdef HAVE_INT64_TIMESTAMP
4040         if (ControlFile->enableIntTimes != true)
4041                 ereport(FATAL,
4042                                 (errmsg("database files are incompatible with server"),
4043                                  errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
4044                                   " but the server was compiled with HAVE_INT64_TIMESTAMP."),
4045                                  errhint("It looks like you need to recompile or initdb.")));
4046 #else
4047         if (ControlFile->enableIntTimes != false)
4048                 ereport(FATAL,
4049                                 (errmsg("database files are incompatible with server"),
4050                                  errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
4051                            " but the server was compiled without HAVE_INT64_TIMESTAMP."),
4052                                  errhint("It looks like you need to recompile or initdb.")));
4053 #endif
4054
4055 #ifdef USE_FLOAT4_BYVAL
4056         if (ControlFile->float4ByVal != true)
4057                 ereport(FATAL,
4058                                 (errmsg("database files are incompatible with server"),
4059                                  errdetail("The database cluster was initialized without USE_FLOAT4_BYVAL"
4060                                                    " but the server was compiled with USE_FLOAT4_BYVAL."),
4061                                  errhint("It looks like you need to recompile or initdb.")));
4062 #else
4063         if (ControlFile->float4ByVal != false)
4064                 ereport(FATAL,
4065                                 (errmsg("database files are incompatible with server"),
4066                                  errdetail("The database cluster was initialized with USE_FLOAT4_BYVAL"
4067                                                    " but the server was compiled without USE_FLOAT4_BYVAL."),
4068                                  errhint("It looks like you need to recompile or initdb.")));
4069 #endif
4070
4071 #ifdef USE_FLOAT8_BYVAL
4072         if (ControlFile->float8ByVal != true)
4073                 ereport(FATAL,
4074                                 (errmsg("database files are incompatible with server"),
4075                                  errdetail("The database cluster was initialized without USE_FLOAT8_BYVAL"
4076                                                    " but the server was compiled with USE_FLOAT8_BYVAL."),
4077                                  errhint("It looks like you need to recompile or initdb.")));
4078 #else
4079         if (ControlFile->float8ByVal != false)
4080                 ereport(FATAL,
4081                                 (errmsg("database files are incompatible with server"),
4082                                  errdetail("The database cluster was initialized with USE_FLOAT8_BYVAL"
4083                                                    " but the server was compiled without USE_FLOAT8_BYVAL."),
4084                                  errhint("It looks like you need to recompile or initdb.")));
4085 #endif
4086
4087         if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
4088                 ereport(FATAL,
4089                                 (errmsg("database files are incompatible with server"),
4090                                  errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
4091                                   " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
4092                                                    ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
4093                                  errhint("It looks like you need to recompile or initdb.")));
4094         if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
4095                 ereport(FATAL,
4096                         (errmsg("database files are incompatible with operating system"),
4097                          errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
4098                                            " which is not recognized by setlocale().",
4099                                            ControlFile->lc_collate),
4100                          errhint("It looks like you need to initdb or install locale support.")));
4101         if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
4102                 ereport(FATAL,
4103                         (errmsg("database files are incompatible with operating system"),
4104                 errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
4105                                   " which is not recognized by setlocale().",
4106                                   ControlFile->lc_ctype),
4107                          errhint("It looks like you need to initdb or install locale support.")));
4108
4109         /* Make the fixed locale settings visible as GUC variables, too */
4110         SetConfigOption("lc_collate", ControlFile->lc_collate,
4111                                         PGC_INTERNAL, PGC_S_OVERRIDE);
4112         SetConfigOption("lc_ctype", ControlFile->lc_ctype,
4113                                         PGC_INTERNAL, PGC_S_OVERRIDE);
4114 }
4115
4116 void
4117 UpdateControlFile(void)
4118 {
4119         int                     fd;
4120
4121         INIT_CRC32(ControlFile->crc);
4122         COMP_CRC32(ControlFile->crc,
4123                            (char *) ControlFile,
4124                            offsetof(ControlFileData, crc));
4125         FIN_CRC32(ControlFile->crc);
4126
4127         fd = BasicOpenFile(XLOG_CONTROL_FILE,
4128                                            O_RDWR | PG_BINARY,
4129                                            S_IRUSR | S_IWUSR);
4130         if (fd < 0)
4131                 ereport(PANIC,
4132                                 (errcode_for_file_access(),
4133                                  errmsg("could not open control file \"%s\": %m",
4134                                                 XLOG_CONTROL_FILE)));
4135
4136         errno = 0;
4137         if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
4138         {
4139                 /* if write didn't set errno, assume problem is no disk space */
4140                 if (errno == 0)
4141                         errno = ENOSPC;
4142                 ereport(PANIC,
4143                                 (errcode_for_file_access(),
4144                                  errmsg("could not write to control file: %m")));
4145         }
4146
4147         if (pg_fsync(fd) != 0)
4148                 ereport(PANIC,
4149                                 (errcode_for_file_access(),
4150                                  errmsg("could not fsync control file: %m")));
4151
4152         if (close(fd))
4153                 ereport(PANIC,
4154                                 (errcode_for_file_access(),
4155                                  errmsg("could not close control file: %m")));
4156 }
4157
4158 /*
4159  * Initialization of shared memory for XLOG
4160  */
4161 Size
4162 XLOGShmemSize(void)
4163 {
4164         Size            size;
4165
4166         /* XLogCtl */
4167         size = sizeof(XLogCtlData);
4168         /* xlblocks array */
4169         size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
4170         /* extra alignment padding for XLOG I/O buffers */
4171         size = add_size(size, ALIGNOF_XLOG_BUFFER);
4172         /* and the buffers themselves */
4173         size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
4174
4175         /*
4176          * Note: we don't count ControlFileData, it comes out of the "slop factor"
4177          * added by CreateSharedMemoryAndSemaphores.  This lets us use this
4178          * routine again below to compute the actual allocation size.
4179          */
4180
4181         return size;
4182 }
4183
4184 void
4185 XLOGShmemInit(void)
4186 {
4187         bool            foundCFile,
4188                                 foundXLog;
4189         char       *allocptr;
4190
4191         ControlFile = (ControlFileData *)
4192                 ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
4193         XLogCtl = (XLogCtlData *)
4194                 ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
4195
4196         if (foundCFile || foundXLog)
4197         {
4198                 /* both should be present or neither */
4199                 Assert(foundCFile && foundXLog);
4200                 return;
4201         }
4202
4203         memset(XLogCtl, 0, sizeof(XLogCtlData));
4204
4205         /*
4206          * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
4207          * multiple of the alignment for same, so no extra alignment padding is
4208          * needed here.
4209          */
4210         allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
4211         XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
4212         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
4213         allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
4214
4215         /*
4216          * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
4217          */
4218         allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
4219         XLogCtl->pages = allocptr;
4220         memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
4221
4222         /*
4223          * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
4224          * in additional info.)
4225          */
4226         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
4227         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
4228         SpinLockInit(&XLogCtl->info_lck);
4229
4230         /*
4231          * If we are not in bootstrap mode, pg_control should already exist. Read
4232          * and validate it immediately (see comments in ReadControlFile() for the
4233          * reasons why).
4234          */
4235         if (!IsBootstrapProcessingMode())
4236                 ReadControlFile();
4237 }
4238
4239 /*
4240  * This func must be called ONCE on system install.  It creates pg_control
4241  * and the initial XLOG segment.
4242  */
4243 void
4244 BootStrapXLOG(void)
4245 {
4246         CheckPoint      checkPoint;
4247         char       *buffer;
4248         XLogPageHeader page;
4249         XLogLongPageHeader longpage;
4250         XLogRecord *record;
4251         bool            use_existent;
4252         uint64          sysidentifier;
4253         struct timeval tv;
4254         pg_crc32        crc;
4255
4256         /*
4257          * Select a hopefully-unique system identifier code for this installation.
4258          * We use the result of gettimeofday(), including the fractional seconds
4259          * field, as being about as unique as we can easily get.  (Think not to
4260          * use random(), since it hasn't been seeded and there's no portable way
4261          * to seed it other than the system clock value...)  The upper half of the
4262          * uint64 value is just the tv_sec part, while the lower half is the XOR
4263          * of tv_sec and tv_usec.  This is to ensure that we don't lose uniqueness
4264          * unnecessarily if "uint64" is really only 32 bits wide.  A person
4265          * knowing this encoding can determine the initialization time of the
4266          * installation, which could perhaps be useful sometimes.
4267          */
4268         gettimeofday(&tv, NULL);
4269         sysidentifier = ((uint64) tv.tv_sec) << 32;
4270         sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
4271
4272         /* First timeline ID is always 1 */
4273         ThisTimeLineID = 1;
4274
4275         /* page buffer must be aligned suitably for O_DIRECT */
4276         buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
4277         page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
4278         memset(page, 0, XLOG_BLCKSZ);
4279
4280         /* Set up information for the initial checkpoint record */
4281         checkPoint.redo.xlogid = 0;
4282         checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
4283         checkPoint.ThisTimeLineID = ThisTimeLineID;
4284         checkPoint.nextXidEpoch = 0;
4285         checkPoint.nextXid = FirstNormalTransactionId;
4286         checkPoint.nextOid = FirstBootstrapObjectId;
4287         checkPoint.nextMulti = FirstMultiXactId;
4288         checkPoint.nextMultiOffset = 0;
4289         checkPoint.time = (pg_time_t) time(NULL);
4290
4291         ShmemVariableCache->nextXid = checkPoint.nextXid;
4292         ShmemVariableCache->nextOid = checkPoint.nextOid;
4293         ShmemVariableCache->oidCount = 0;
4294         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4295
4296         /* Set up the XLOG page header */
4297         page->xlp_magic = XLOG_PAGE_MAGIC;
4298         page->xlp_info = XLP_LONG_HEADER;
4299         page->xlp_tli = ThisTimeLineID;
4300         page->xlp_pageaddr.xlogid = 0;
4301         page->xlp_pageaddr.xrecoff = 0;
4302         longpage = (XLogLongPageHeader) page;
4303         longpage->xlp_sysid = sysidentifier;
4304         longpage->xlp_seg_size = XLogSegSize;
4305         longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
4306
4307         /* Insert the initial checkpoint record */
4308         record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
4309         record->xl_prev.xlogid = 0;
4310         record->xl_prev.xrecoff = 0;
4311         record->xl_xid = InvalidTransactionId;
4312         record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
4313         record->xl_len = sizeof(checkPoint);
4314         record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
4315         record->xl_rmid = RM_XLOG_ID;
4316         memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
4317
4318         INIT_CRC32(crc);
4319         COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
4320         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
4321                            SizeOfXLogRecord - sizeof(pg_crc32));
4322         FIN_CRC32(crc);
4323         record->xl_crc = crc;
4324
4325         /* Create first XLOG segment file */
4326         use_existent = false;
4327         openLogFile = XLogFileInit(0, 0, &use_existent, false);
4328
4329         /* Write the first page with the initial record */
4330         errno = 0;
4331         if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
4332         {
4333                 /* if write didn't set errno, assume problem is no disk space */
4334                 if (errno == 0)
4335                         errno = ENOSPC;
4336                 ereport(PANIC,
4337                                 (errcode_for_file_access(),
4338                           errmsg("could not write bootstrap transaction log file: %m")));
4339         }
4340
4341         if (pg_fsync(openLogFile) != 0)
4342                 ereport(PANIC,
4343                                 (errcode_for_file_access(),
4344                           errmsg("could not fsync bootstrap transaction log file: %m")));
4345
4346         if (close(openLogFile))
4347                 ereport(PANIC,
4348                                 (errcode_for_file_access(),
4349                           errmsg("could not close bootstrap transaction log file: %m")));
4350
4351         openLogFile = -1;
4352
4353         /* Now create pg_control */
4354
4355         memset(ControlFile, 0, sizeof(ControlFileData));
4356         /* Initialize pg_control status fields */
4357         ControlFile->system_identifier = sysidentifier;
4358         ControlFile->state = DB_SHUTDOWNED;
4359         ControlFile->time = checkPoint.time;
4360         ControlFile->checkPoint = checkPoint.redo;
4361         ControlFile->checkPointCopy = checkPoint;
4362         /* some additional ControlFile fields are set in WriteControlFile() */
4363
4364         WriteControlFile();
4365
4366         /* Bootstrap the commit log, too */
4367         BootStrapCLOG();
4368         BootStrapSUBTRANS();
4369         BootStrapMultiXact();
4370
4371         pfree(buffer);
4372 }
4373
4374 static char *
4375 str_time(pg_time_t tnow)
4376 {
4377         static char buf[128];
4378
4379         pg_strftime(buf, sizeof(buf),
4380                                 "%Y-%m-%d %H:%M:%S %Z",
4381                                 pg_localtime(&tnow, log_timezone));
4382
4383         return buf;
4384 }
4385
4386 /*
4387  * See if there is a recovery command file (recovery.conf), and if so
4388  * read in parameters for archive recovery.
4389  *
4390  * XXX longer term intention is to expand this to
4391  * cater for additional parameters and controls
4392  * possibly use a flex lexer similar to the GUC one
4393  */
4394 static void
4395 readRecoveryCommandFile(void)
4396 {
4397         FILE       *fd;
4398         char            cmdline[MAXPGPATH];
4399         TimeLineID      rtli = 0;
4400         bool            rtliGiven = false;
4401         bool            syntaxError = false;
4402
4403         fd = AllocateFile(RECOVERY_COMMAND_FILE, "r");
4404         if (fd == NULL)
4405         {
4406                 if (errno == ENOENT)
4407                         return;                         /* not there, so no archive recovery */
4408                 ereport(FATAL,
4409                                 (errcode_for_file_access(),
4410                                  errmsg("could not open recovery command file \"%s\": %m",
4411                                                 RECOVERY_COMMAND_FILE)));
4412         }
4413
4414         ereport(LOG,
4415                         (errmsg("starting archive recovery")));
4416
4417         /*
4418          * Parse the file...
4419          */
4420         while (fgets(cmdline, sizeof(cmdline), fd) != NULL)
4421         {
4422                 /* skip leading whitespace and check for # comment */
4423                 char       *ptr;
4424                 char       *tok1;
4425                 char       *tok2;
4426
4427                 for (ptr = cmdline; *ptr; ptr++)
4428                 {
4429                         if (!isspace((unsigned char) *ptr))
4430                                 break;
4431                 }
4432                 if (*ptr == '\0' || *ptr == '#')
4433                         continue;
4434
4435                 /* identify the quoted parameter value */
4436                 tok1 = strtok(ptr, "'");
4437                 if (!tok1)
4438                 {
4439                         syntaxError = true;
4440                         break;
4441                 }
4442                 tok2 = strtok(NULL, "'");
4443                 if (!tok2)
4444                 {
4445                         syntaxError = true;
4446                         break;
4447                 }
4448                 /* reparse to get just the parameter name */
4449                 tok1 = strtok(ptr, " \t=");
4450                 if (!tok1)
4451                 {
4452                         syntaxError = true;
4453                         break;
4454                 }
4455
4456                 if (strcmp(tok1, "restore_command") == 0)
4457                 {
4458                         recoveryRestoreCommand = pstrdup(tok2);
4459                         ereport(LOG,
4460                                         (errmsg("restore_command = '%s'",
4461                                                         recoveryRestoreCommand)));
4462                 }
4463                 else if (strcmp(tok1, "recovery_target_timeline") == 0)
4464                 {
4465                         rtliGiven = true;
4466                         if (strcmp(tok2, "latest") == 0)
4467                                 rtli = 0;
4468                         else
4469                         {
4470                                 errno = 0;
4471                                 rtli = (TimeLineID) strtoul(tok2, NULL, 0);
4472                                 if (errno == EINVAL || errno == ERANGE)
4473                                         ereport(FATAL,
4474                                                         (errmsg("recovery_target_timeline is not a valid number: \"%s\"",
4475                                                                         tok2)));
4476                         }
4477                         if (rtli)
4478                                 ereport(LOG,
4479                                                 (errmsg("recovery_target_timeline = %u", rtli)));
4480                         else
4481                                 ereport(LOG,
4482                                                 (errmsg("recovery_target_timeline = latest")));
4483                 }
4484                 else if (strcmp(tok1, "recovery_target_xid") == 0)
4485                 {
4486                         errno = 0;
4487                         recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0);
4488                         if (errno == EINVAL || errno == ERANGE)
4489                                 ereport(FATAL,
4490                                  (errmsg("recovery_target_xid is not a valid number: \"%s\"",
4491                                                  tok2)));
4492                         ereport(LOG,
4493                                         (errmsg("recovery_target_xid = %u",
4494                                                         recoveryTargetXid)));
4495                         recoveryTarget = true;
4496                         recoveryTargetExact = true;
4497                 }
4498                 else if (strcmp(tok1, "recovery_target_time") == 0)
4499                 {
4500                         /*
4501                          * if recovery_target_xid specified, then this overrides
4502                          * recovery_target_time
4503                          */
4504                         if (recoveryTargetExact)
4505                                 continue;
4506                         recoveryTarget = true;
4507                         recoveryTargetExact = false;
4508
4509                         /*
4510                          * Convert the time string given by the user to TimestampTz form.
4511                          */
4512                         recoveryTargetTime =
4513                                 DatumGetTimestampTz(DirectFunctionCall3(timestamptz_in,
4514                                                                                                                 CStringGetDatum(tok2),
4515                                                                                                 ObjectIdGetDatum(InvalidOid),
4516                                                                                                                 Int32GetDatum(-1)));
4517                         ereport(LOG,
4518                                         (errmsg("recovery_target_time = '%s'",
4519                                                         timestamptz_to_str(recoveryTargetTime))));
4520                 }
4521                 else if (strcmp(tok1, "recovery_target_inclusive") == 0)
4522                 {
4523                         /*
4524                          * does nothing if a recovery_target is not also set
4525                          */
4526                         if (!parse_bool(tok2, &recoveryTargetInclusive))
4527                                   ereport(ERROR,
4528                                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4529                                           errmsg("parameter \"recovery_target_inclusive\" requires a Boolean value")));
4530                         ereport(LOG,
4531                                         (errmsg("recovery_target_inclusive = %s", tok2)));
4532                 }
4533                 else if (strcmp(tok1, "log_restartpoints") == 0)
4534                 {
4535                         /*
4536                          * does nothing if a recovery_target is not also set
4537                          */
4538                         if (!parse_bool(tok2, &recoveryLogRestartpoints))
4539                                   ereport(ERROR,
4540                                                         (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
4541                                           errmsg("parameter \"log_restartpoints\" requires a Boolean value")));
4542                         ereport(LOG,
4543                                         (errmsg("log_restartpoints = %s", tok2)));
4544                 }
4545                 else
4546                         ereport(FATAL,
4547                                         (errmsg("unrecognized recovery parameter \"%s\"",
4548                                                         tok1)));
4549         }
4550
4551         FreeFile(fd);
4552
4553         if (syntaxError)
4554                 ereport(FATAL,
4555                                 (errmsg("syntax error in recovery command file: %s",
4556                                                 cmdline),
4557                           errhint("Lines should have the format parameter = 'value'.")));
4558
4559         /* Check that required parameters were supplied */
4560         if (recoveryRestoreCommand == NULL)
4561                 ereport(FATAL,
4562                                 (errmsg("recovery command file \"%s\" did not specify restore_command",
4563                                                 RECOVERY_COMMAND_FILE)));
4564
4565         /* Enable fetching from archive recovery area */
4566         InArchiveRecovery = true;
4567
4568         /*
4569          * If user specified recovery_target_timeline, validate it or compute the
4570          * "latest" value.      We can't do this until after we've gotten the restore
4571          * command and set InArchiveRecovery, because we need to fetch timeline
4572          * history files from the archive.
4573          */
4574         if (rtliGiven)
4575         {
4576                 if (rtli)
4577                 {
4578                         /* Timeline 1 does not have a history file, all else should */
4579                         if (rtli != 1 && !existsTimeLineHistory(rtli))
4580                                 ereport(FATAL,
4581                                                 (errmsg("recovery target timeline %u does not exist",
4582                                                                 rtli)));
4583                         recoveryTargetTLI = rtli;
4584                 }
4585                 else
4586                 {
4587                         /* We start the "latest" search from pg_control's timeline */
4588                         recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI);
4589                 }
4590         }
4591 }
4592
4593 /*
4594  * Exit archive-recovery state
4595  */
4596 static void
4597 exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
4598 {
4599         char            recoveryPath[MAXPGPATH];
4600         char            xlogpath[MAXPGPATH];
4601
4602         /*
4603          * We are no longer in archive recovery state.
4604          */
4605         InArchiveRecovery = false;
4606
4607         /*
4608          * We should have the ending log segment currently open.  Verify, and then
4609          * close it (to avoid problems on Windows with trying to rename or delete
4610          * an open file).
4611          */
4612         Assert(readFile >= 0);
4613         Assert(readId == endLogId);
4614         Assert(readSeg == endLogSeg);
4615
4616         close(readFile);
4617         readFile = -1;
4618
4619         /*
4620          * If the segment was fetched from archival storage, we want to replace
4621          * the existing xlog segment (if any) with the archival version.  This is
4622          * because whatever is in XLOGDIR is very possibly older than what we have
4623          * from the archives, since it could have come from restoring a PGDATA
4624          * backup.      In any case, the archival version certainly is more
4625          * descriptive of what our current database state is, because that is what
4626          * we replayed from.
4627          *
4628          * Note that if we are establishing a new timeline, ThisTimeLineID is
4629          * already set to the new value, and so we will create a new file instead
4630          * of overwriting any existing file.  (This is, in fact, always the case
4631          * at present.)
4632          */
4633         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
4634         XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4635
4636         if (restoredFromArchive)
4637         {
4638                 ereport(DEBUG3,
4639                                 (errmsg_internal("moving last restored xlog to \"%s\"",
4640                                                                  xlogpath)));
4641                 unlink(xlogpath);               /* might or might not exist */
4642                 if (rename(recoveryPath, xlogpath) != 0)
4643                         ereport(FATAL,
4644                                         (errcode_for_file_access(),
4645                                          errmsg("could not rename file \"%s\" to \"%s\": %m",
4646                                                         recoveryPath, xlogpath)));
4647                 /* XXX might we need to fix permissions on the file? */
4648         }
4649         else
4650         {
4651                 /*
4652                  * If the latest segment is not archival, but there's still a
4653                  * RECOVERYXLOG laying about, get rid of it.
4654                  */
4655                 unlink(recoveryPath);   /* ignore any error */
4656
4657                 /*
4658                  * If we are establishing a new timeline, we have to copy data from
4659                  * the last WAL segment of the old timeline to create a starting WAL
4660                  * segment for the new timeline.
4661                  */
4662                 if (endTLI != ThisTimeLineID)
4663                         XLogFileCopy(endLogId, endLogSeg,
4664                                                  endTLI, endLogId, endLogSeg);
4665         }
4666
4667         /*
4668          * Let's just make real sure there are not .ready or .done flags posted
4669          * for the new segment.
4670          */
4671         XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4672         XLogArchiveCleanup(xlogpath);
4673
4674         /* Get rid of any remaining recovered timeline-history file, too */
4675         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
4676         unlink(recoveryPath);           /* ignore any error */
4677
4678         /*
4679          * Rename the config file out of the way, so that we don't accidentally
4680          * re-enter archive recovery mode in a subsequent crash.
4681          */
4682         unlink(RECOVERY_COMMAND_DONE);
4683         if (rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE) != 0)
4684                 ereport(FATAL,
4685                                 (errcode_for_file_access(),
4686                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
4687                                                 RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE)));
4688
4689         ereport(LOG,
4690                         (errmsg("archive recovery complete")));
4691 }
4692
4693 /*
4694  * For point-in-time recovery, this function decides whether we want to
4695  * stop applying the XLOG at or after the current record.
4696  *
4697  * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
4698  * *includeThis is set TRUE if we should apply this record before stopping.
4699  * Also, some information is saved in recoveryStopXid et al for use in
4700  * annotating the new timeline's history file.
4701  */
4702 static bool
4703 recoveryStopsHere(XLogRecord *record, bool *includeThis)
4704 {
4705         bool            stopsHere;
4706         uint8           record_info;
4707         TimestampTz recordXtime;
4708
4709         /* We only consider stopping at COMMIT or ABORT records */
4710         if (record->xl_rmid != RM_XACT_ID)
4711                 return false;
4712         record_info = record->xl_info & ~XLR_INFO_MASK;
4713         if (record_info == XLOG_XACT_COMMIT)
4714         {
4715                 xl_xact_commit *recordXactCommitData;
4716
4717                 recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
4718                 recordXtime = recordXactCommitData->xact_time;
4719         }
4720         else if (record_info == XLOG_XACT_ABORT)
4721         {
4722                 xl_xact_abort *recordXactAbortData;
4723
4724                 recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
4725                 recordXtime = recordXactAbortData->xact_time;
4726         }
4727         else
4728                 return false;
4729
4730         /* Remember the most recent COMMIT/ABORT time for logging purposes */
4731         recoveryLastXTime = recordXtime;
4732
4733         /* Do we have a PITR target at all? */
4734         if (!recoveryTarget)
4735                 return false;
4736
4737         if (recoveryTargetExact)
4738         {
4739                 /*
4740                  * there can be only one transaction end record with this exact
4741                  * transactionid
4742                  *
4743                  * when testing for an xid, we MUST test for equality only, since
4744                  * transactions are numbered in the order they start, not the order
4745                  * they complete. A higher numbered xid will complete before you about
4746                  * 50% of the time...
4747                  */
4748                 stopsHere = (record->xl_xid == recoveryTargetXid);
4749                 if (stopsHere)
4750                         *includeThis = recoveryTargetInclusive;
4751         }
4752         else
4753         {
4754                 /*
4755                  * there can be many transactions that share the same commit time, so
4756                  * we stop after the last one, if we are inclusive, or stop at the
4757                  * first one if we are exclusive
4758                  */
4759                 if (recoveryTargetInclusive)
4760                         stopsHere = (recordXtime > recoveryTargetTime);
4761                 else
4762                         stopsHere = (recordXtime >= recoveryTargetTime);
4763                 if (stopsHere)
4764                         *includeThis = false;
4765         }
4766
4767         if (stopsHere)
4768         {
4769                 recoveryStopXid = record->xl_xid;
4770                 recoveryStopTime = recordXtime;
4771                 recoveryStopAfter = *includeThis;
4772
4773                 if (record_info == XLOG_XACT_COMMIT)
4774                 {
4775                         if (recoveryStopAfter)
4776                                 ereport(LOG,
4777                                                 (errmsg("recovery stopping after commit of transaction %u, time %s",
4778                                                                 recoveryStopXid,
4779                                                                 timestamptz_to_str(recoveryStopTime))));
4780                         else
4781                                 ereport(LOG,
4782                                                 (errmsg("recovery stopping before commit of transaction %u, time %s",
4783                                                                 recoveryStopXid,
4784                                                                 timestamptz_to_str(recoveryStopTime))));
4785                 }
4786                 else
4787                 {
4788                         if (recoveryStopAfter)
4789                                 ereport(LOG,
4790                                                 (errmsg("recovery stopping after abort of transaction %u, time %s",
4791                                                                 recoveryStopXid,
4792                                                                 timestamptz_to_str(recoveryStopTime))));
4793                         else
4794                                 ereport(LOG,
4795                                                 (errmsg("recovery stopping before abort of transaction %u, time %s",
4796                                                                 recoveryStopXid,
4797                                                                 timestamptz_to_str(recoveryStopTime))));
4798                 }
4799         }
4800
4801         return stopsHere;
4802 }
4803
4804 /*
4805  * This must be called ONCE during postmaster or standalone-backend startup
4806  */
4807 void
4808 StartupXLOG(void)
4809 {
4810         XLogCtlInsert *Insert;
4811         CheckPoint      checkPoint;
4812         bool            wasShutdown;
4813         bool            reachedStopPoint = false;
4814         bool            haveBackupLabel = false;
4815         XLogRecPtr      RecPtr,
4816                                 LastRec,
4817                                 checkPointLoc,
4818                                 minRecoveryLoc,
4819                                 EndOfLog;
4820         uint32          endLogId;
4821         uint32          endLogSeg;
4822         XLogRecord *record;
4823         uint32          freespace;
4824         TransactionId oldestActiveXID;
4825
4826         /*
4827          * Read control file and check XLOG status looks valid.
4828          *
4829          * Note: in most control paths, *ControlFile is already valid and we need
4830          * not do ReadControlFile() here, but might as well do it to be sure.
4831          */
4832         ReadControlFile();
4833
4834         if (ControlFile->state < DB_SHUTDOWNED ||
4835                 ControlFile->state > DB_IN_PRODUCTION ||
4836                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
4837                 ereport(FATAL,
4838                                 (errmsg("control file contains invalid data")));
4839
4840         if (ControlFile->state == DB_SHUTDOWNED)
4841                 ereport(LOG,
4842                                 (errmsg("database system was shut down at %s",
4843                                                 str_time(ControlFile->time))));
4844         else if (ControlFile->state == DB_SHUTDOWNING)
4845                 ereport(LOG,
4846                                 (errmsg("database system shutdown was interrupted; last known up at %s",
4847                                                 str_time(ControlFile->time))));
4848         else if (ControlFile->state == DB_IN_CRASH_RECOVERY)
4849                 ereport(LOG,
4850                    (errmsg("database system was interrupted while in recovery at %s",
4851                                    str_time(ControlFile->time)),
4852                         errhint("This probably means that some data is corrupted and"
4853                                         " you will have to use the last backup for recovery.")));
4854         else if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY)
4855                 ereport(LOG,
4856                                 (errmsg("database system was interrupted while in recovery at log time %s",
4857                                                 str_time(ControlFile->checkPointCopy.time)),
4858                                  errhint("If this has occurred more than once some data might be corrupted"
4859                           " and you might need to choose an earlier recovery target.")));
4860         else if (ControlFile->state == DB_IN_PRODUCTION)
4861                 ereport(LOG,
4862                           (errmsg("database system was interrupted; last known up at %s",
4863                                           str_time(ControlFile->time))));
4864
4865         /* This is just to allow attaching to startup process with a debugger */
4866 #ifdef XLOG_REPLAY_DELAY
4867         if (ControlFile->state != DB_SHUTDOWNED)
4868                 pg_usleep(60000000L);
4869 #endif
4870
4871         /*
4872          * Initialize on the assumption we want to recover to the same timeline
4873          * that's active according to pg_control.
4874          */
4875         recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
4876
4877         /*
4878          * Check for recovery control file, and if so set up state for offline
4879          * recovery
4880          */
4881         readRecoveryCommandFile();
4882
4883         /* Now we can determine the list of expected TLIs */
4884         expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
4885
4886         /*
4887          * If pg_control's timeline is not in expectedTLIs, then we cannot
4888          * proceed: the backup is not part of the history of the requested
4889          * timeline.
4890          */
4891         if (!list_member_int(expectedTLIs,
4892                                                  (int) ControlFile->checkPointCopy.ThisTimeLineID))
4893                 ereport(FATAL,
4894                                 (errmsg("requested timeline %u is not a child of database system timeline %u",
4895                                                 recoveryTargetTLI,
4896                                                 ControlFile->checkPointCopy.ThisTimeLineID)));
4897
4898         if (read_backup_label(&checkPointLoc, &minRecoveryLoc))
4899         {
4900                 /*
4901                  * When a backup_label file is present, we want to roll forward from
4902                  * the checkpoint it identifies, rather than using pg_control.
4903                  */
4904                 record = ReadCheckpointRecord(checkPointLoc, 0);
4905                 if (record != NULL)
4906                 {
4907                         ereport(DEBUG1,
4908                                         (errmsg("checkpoint record is at %X/%X",
4909                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4910                         InRecovery = true;      /* force recovery even if SHUTDOWNED */
4911                 }
4912                 else
4913                 {
4914                         ereport(PANIC,
4915                                         (errmsg("could not locate required checkpoint record"),
4916                                          errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
4917                 }
4918                 /* set flag to delete it later */
4919                 haveBackupLabel = true;
4920         }
4921         else
4922         {
4923                 /*
4924                  * Get the last valid checkpoint record.  If the latest one according
4925                  * to pg_control is broken, try the next-to-last one.
4926                  */
4927                 checkPointLoc = ControlFile->checkPoint;
4928                 record = ReadCheckpointRecord(checkPointLoc, 1);
4929                 if (record != NULL)
4930                 {
4931                         ereport(DEBUG1,
4932                                         (errmsg("checkpoint record is at %X/%X",
4933                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4934                 }
4935                 else
4936                 {
4937                         checkPointLoc = ControlFile->prevCheckPoint;
4938                         record = ReadCheckpointRecord(checkPointLoc, 2);
4939                         if (record != NULL)
4940                         {
4941                                 ereport(LOG,
4942                                                 (errmsg("using previous checkpoint record at %X/%X",
4943                                                           checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4944                                 InRecovery = true;              /* force recovery even if SHUTDOWNED */
4945                         }
4946                         else
4947                                 ereport(PANIC,
4948                                          (errmsg("could not locate a valid checkpoint record")));
4949                 }
4950         }
4951
4952         LastRec = RecPtr = checkPointLoc;
4953         memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
4954         wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
4955
4956         ereport(DEBUG1,
4957                         (errmsg("redo record is at %X/%X; shutdown %s",
4958                                         checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
4959                                         wasShutdown ? "TRUE" : "FALSE")));
4960         ereport(DEBUG1,
4961                         (errmsg("next transaction ID: %u/%u; next OID: %u",
4962                                         checkPoint.nextXidEpoch, checkPoint.nextXid,
4963                                         checkPoint.nextOid)));
4964         ereport(DEBUG1,
4965                         (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
4966                                         checkPoint.nextMulti, checkPoint.nextMultiOffset)));
4967         if (!TransactionIdIsNormal(checkPoint.nextXid))
4968                 ereport(PANIC,
4969                                 (errmsg("invalid next transaction ID")));
4970
4971         ShmemVariableCache->nextXid = checkPoint.nextXid;
4972         ShmemVariableCache->nextOid = checkPoint.nextOid;
4973         ShmemVariableCache->oidCount = 0;
4974         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4975
4976         /*
4977          * We must replay WAL entries using the same TimeLineID they were created
4978          * under, so temporarily adopt the TLI indicated by the checkpoint (see
4979          * also xlog_redo()).
4980          */
4981         ThisTimeLineID = checkPoint.ThisTimeLineID;
4982
4983         RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
4984
4985         if (XLByteLT(RecPtr, checkPoint.redo))
4986                 ereport(PANIC,
4987                                 (errmsg("invalid redo in checkpoint record")));
4988
4989         /*
4990          * Check whether we need to force recovery from WAL.  If it appears to
4991          * have been a clean shutdown and we did not have a recovery.conf file,
4992          * then assume no recovery needed.
4993          */
4994         if (XLByteLT(checkPoint.redo, RecPtr))
4995         {
4996                 if (wasShutdown)
4997                         ereport(PANIC,
4998                                         (errmsg("invalid redo record in shutdown checkpoint")));
4999                 InRecovery = true;
5000         }
5001         else if (ControlFile->state != DB_SHUTDOWNED)
5002                 InRecovery = true;
5003         else if (InArchiveRecovery)
5004         {
5005                 /* force recovery due to presence of recovery.conf */
5006                 InRecovery = true;
5007         }
5008
5009         /* REDO */
5010         if (InRecovery)
5011         {
5012                 int                     rmid;
5013
5014                 /*
5015                  * Update pg_control to show that we are recovering and to show the
5016                  * selected checkpoint as the place we are starting from. We also mark
5017                  * pg_control with any minimum recovery stop point obtained from a
5018                  * backup history file.
5019                  */
5020                 if (InArchiveRecovery)
5021                 {
5022                         ereport(LOG,
5023                                         (errmsg("automatic recovery in progress")));
5024                         ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
5025                 }
5026                 else
5027                 {
5028                         ereport(LOG,
5029                                         (errmsg("database system was not properly shut down; "
5030                                                         "automatic recovery in progress")));
5031                         ControlFile->state = DB_IN_CRASH_RECOVERY;
5032                 }
5033                 ControlFile->prevCheckPoint = ControlFile->checkPoint;
5034                 ControlFile->checkPoint = checkPointLoc;
5035                 ControlFile->checkPointCopy = checkPoint;
5036                 if (minRecoveryLoc.xlogid != 0 || minRecoveryLoc.xrecoff != 0)
5037                         ControlFile->minRecoveryPoint = minRecoveryLoc;
5038                 ControlFile->time = (pg_time_t) time(NULL);
5039                 UpdateControlFile();
5040
5041                 /*
5042                  * If there was a backup label file, it's done its job and the info
5043                  * has now been propagated into pg_control.  We must get rid of the
5044                  * label file so that if we crash during recovery, we'll pick up at
5045                  * the latest recovery restartpoint instead of going all the way back
5046                  * to the backup start point.  It seems prudent though to just rename
5047                  * the file out of the way rather than delete it completely.
5048                  */
5049                 if (haveBackupLabel)
5050                 {
5051                         unlink(BACKUP_LABEL_OLD);
5052                         if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) != 0)
5053                                 ereport(FATAL,
5054                                                 (errcode_for_file_access(),
5055                                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
5056                                                                 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
5057                 }
5058
5059                 /* Initialize resource managers */
5060                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
5061                 {
5062                         if (RmgrTable[rmid].rm_startup != NULL)
5063                                 RmgrTable[rmid].rm_startup();
5064                 }
5065
5066                 /*
5067                  * Find the first record that logically follows the checkpoint --- it
5068                  * might physically precede it, though.
5069                  */
5070                 if (XLByteLT(checkPoint.redo, RecPtr))
5071                 {
5072                         /* back up to find the record */
5073                         record = ReadRecord(&(checkPoint.redo), PANIC);
5074                 }
5075                 else
5076                 {
5077                         /* just have to read next record after CheckPoint */
5078                         record = ReadRecord(NULL, LOG);
5079                 }
5080
5081                 if (record != NULL)
5082                 {
5083                         bool            recoveryContinue = true;
5084                         bool            recoveryApply = true;
5085                         ErrorContextCallback errcontext;
5086
5087                         InRedo = true;
5088                         ereport(LOG,
5089                                         (errmsg("redo starts at %X/%X",
5090                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
5091
5092                         /*
5093                          * main redo apply loop
5094                          */
5095                         do
5096                         {
5097 #ifdef WAL_DEBUG
5098                                 if (XLOG_DEBUG)
5099                                 {
5100                                         StringInfoData buf;
5101
5102                                         initStringInfo(&buf);
5103                                         appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
5104                                                                          ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
5105                                                                          EndRecPtr.xlogid, EndRecPtr.xrecoff);
5106                                         xlog_outrec(&buf, record);
5107                                         appendStringInfo(&buf, " - ");
5108                                         RmgrTable[record->xl_rmid].rm_desc(&buf,
5109                                                                                                            record->xl_info,
5110                                                                                                          XLogRecGetData(record));
5111                                         elog(LOG, "%s", buf.data);
5112                                         pfree(buf.data);
5113                                 }
5114 #endif
5115
5116                                 /*
5117                                  * Have we reached our recovery target?
5118                                  */
5119                                 if (recoveryStopsHere(record, &recoveryApply))
5120                                 {
5121                                         reachedStopPoint = true;        /* see below */
5122                                         recoveryContinue = false;
5123                                         if (!recoveryApply)
5124                                                 break;
5125                                 }
5126
5127                                 /* Setup error traceback support for ereport() */
5128                                 errcontext.callback = rm_redo_error_callback;
5129                                 errcontext.arg = (void *) record;
5130                                 errcontext.previous = error_context_stack;
5131                                 error_context_stack = &errcontext;
5132
5133                                 /* nextXid must be beyond record's xid */
5134                                 if (TransactionIdFollowsOrEquals(record->xl_xid,
5135                                                                                                  ShmemVariableCache->nextXid))
5136                                 {
5137                                         ShmemVariableCache->nextXid = record->xl_xid;
5138                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
5139                                 }
5140
5141                                 if (record->xl_info & XLR_BKP_BLOCK_MASK)
5142                                         RestoreBkpBlocks(record, EndRecPtr);
5143
5144                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
5145
5146                                 /* Pop the error context stack */
5147                                 error_context_stack = errcontext.previous;
5148
5149                                 LastRec = ReadRecPtr;
5150
5151                                 record = ReadRecord(NULL, LOG);
5152                         } while (record != NULL && recoveryContinue);
5153
5154                         /*
5155                          * end of main redo apply loop
5156                          */
5157
5158                         ereport(LOG,
5159                                         (errmsg("redo done at %X/%X",
5160                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
5161                         if (recoveryLastXTime)
5162                                 ereport(LOG,
5163                                          (errmsg("last completed transaction was at log time %s",
5164                                                          timestamptz_to_str(recoveryLastXTime))));
5165                         InRedo = false;
5166                 }
5167                 else
5168                 {
5169                         /* there are no WAL records following the checkpoint */
5170                         ereport(LOG,
5171                                         (errmsg("redo is not required")));
5172                 }
5173         }
5174
5175         /*
5176          * Re-fetch the last valid or last applied record, so we can identify the
5177          * exact endpoint of what we consider the valid portion of WAL.
5178          */
5179         record = ReadRecord(&LastRec, PANIC);
5180         EndOfLog = EndRecPtr;
5181         XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
5182
5183         /*
5184          * Complain if we did not roll forward far enough to render the backup
5185          * dump consistent.
5186          */
5187         if (XLByteLT(EndOfLog, ControlFile->minRecoveryPoint))
5188         {
5189                 if (reachedStopPoint)   /* stopped because of stop request */
5190                         ereport(FATAL,
5191                                         (errmsg("requested recovery stop point is before end time of backup dump")));
5192                 else    /* ran off end of WAL */
5193                         ereport(FATAL,
5194                                         (errmsg("WAL ends before end time of backup dump")));
5195         }
5196
5197         /*
5198          * Consider whether we need to assign a new timeline ID.
5199          *
5200          * If we are doing an archive recovery, we always assign a new ID.      This
5201          * handles a couple of issues.  If we stopped short of the end of WAL
5202          * during recovery, then we are clearly generating a new timeline and must
5203          * assign it a unique new ID.  Even if we ran to the end, modifying the
5204          * current last segment is problematic because it may result in trying to
5205          * overwrite an already-archived copy of that segment, and we encourage
5206          * DBAs to make their archive_commands reject that.  We can dodge the
5207          * problem by making the new active segment have a new timeline ID.
5208          *
5209          * In a normal crash recovery, we can just extend the timeline we were in.
5210          */
5211         if (InArchiveRecovery)
5212         {
5213                 ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
5214                 ereport(LOG,
5215                                 (errmsg("selected new timeline ID: %u", ThisTimeLineID)));
5216                 writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
5217                                                          curFileTLI, endLogId, endLogSeg);
5218         }
5219
5220         /* Save the selected TimeLineID in shared memory, too */
5221         XLogCtl->ThisTimeLineID = ThisTimeLineID;
5222
5223         /*
5224          * We are now done reading the old WAL.  Turn off archive fetching if it
5225          * was active, and make a writable copy of the last WAL segment. (Note
5226          * that we also have a copy of the last block of the old WAL in readBuf;
5227          * we will use that below.)
5228          */
5229         if (InArchiveRecovery)
5230                 exitArchiveRecovery(curFileTLI, endLogId, endLogSeg);
5231
5232         /*
5233          * Prepare to write WAL starting at EndOfLog position, and init xlog
5234          * buffer cache using the block containing the last record from the
5235          * previous incarnation.
5236          */
5237         openLogId = endLogId;
5238         openLogSeg = endLogSeg;
5239         openLogFile = XLogFileOpen(openLogId, openLogSeg);
5240         openLogOff = 0;
5241         Insert = &XLogCtl->Insert;
5242         Insert->PrevRecord = LastRec;
5243         XLogCtl->xlblocks[0].xlogid = openLogId;
5244         XLogCtl->xlblocks[0].xrecoff =
5245                 ((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
5246
5247         /*
5248          * Tricky point here: readBuf contains the *last* block that the LastRec
5249          * record spans, not the one it starts in.      The last block is indeed the
5250          * one we want to use.
5251          */
5252         Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
5253         memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
5254         Insert->currpos = (char *) Insert->currpage +
5255                 (EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
5256
5257         LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
5258
5259         XLogCtl->Write.LogwrtResult = LogwrtResult;
5260         Insert->LogwrtResult = LogwrtResult;
5261         XLogCtl->LogwrtResult = LogwrtResult;
5262
5263         XLogCtl->LogwrtRqst.Write = EndOfLog;
5264         XLogCtl->LogwrtRqst.Flush = EndOfLog;
5265
5266         freespace = INSERT_FREESPACE(Insert);
5267         if (freespace > 0)
5268         {
5269                 /* Make sure rest of page is zero */
5270                 MemSet(Insert->currpos, 0, freespace);
5271                 XLogCtl->Write.curridx = 0;
5272         }
5273         else
5274         {
5275                 /*
5276                  * Whenever Write.LogwrtResult points to exactly the end of a page,
5277                  * Write.curridx must point to the *next* page (see XLogWrite()).
5278                  *
5279                  * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
5280                  * this is sufficient.  The first actual attempt to insert a log
5281                  * record will advance the insert state.
5282                  */
5283                 XLogCtl->Write.curridx = NextBufIdx(0);
5284         }
5285
5286         /* Pre-scan prepared transactions to find out the range of XIDs present */
5287         oldestActiveXID = PrescanPreparedTransactions();
5288
5289         if (InRecovery)
5290         {
5291                 int                     rmid;
5292
5293                 /*
5294                  * Allow resource managers to do any required cleanup.
5295                  */
5296                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
5297                 {
5298                         if (RmgrTable[rmid].rm_cleanup != NULL)
5299                                 RmgrTable[rmid].rm_cleanup();
5300                 }
5301
5302                 /*
5303                  * Check to see if the XLOG sequence contained any unresolved
5304                  * references to uninitialized pages.
5305                  */
5306                 XLogCheckInvalidPages();
5307
5308                 /*
5309                  * Reset pgstat data, because it may be invalid after recovery.
5310                  */
5311                 pgstat_reset_all();
5312
5313                 /*
5314                  * Perform a checkpoint to update all our recovery activity to disk.
5315                  *
5316                  * Note that we write a shutdown checkpoint rather than an on-line
5317                  * one. This is not particularly critical, but since we may be
5318                  * assigning a new TLI, using a shutdown checkpoint allows us to have
5319                  * the rule that TLI only changes in shutdown checkpoints, which
5320                  * allows some extra error checking in xlog_redo.
5321                  */
5322                 CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
5323         }
5324
5325         /*
5326          * Preallocate additional log files, if wanted.
5327          */
5328         PreallocXlogFiles(EndOfLog);
5329
5330         /*
5331          * Okay, we're officially UP.
5332          */
5333         InRecovery = false;
5334
5335         ControlFile->state = DB_IN_PRODUCTION;
5336         ControlFile->time = (pg_time_t) time(NULL);
5337         UpdateControlFile();
5338
5339         /* start the archive_timeout timer running */
5340         XLogCtl->Write.lastSegSwitchTime = ControlFile->time;
5341
5342         /* initialize shared-memory copy of latest checkpoint XID/epoch */
5343         XLogCtl->ckptXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
5344         XLogCtl->ckptXid = ControlFile->checkPointCopy.nextXid;
5345
5346         /* also initialize latestCompletedXid, to nextXid - 1 */
5347         ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
5348         TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
5349
5350         /* Start up the commit log and related stuff, too */
5351         StartupCLOG();
5352         StartupSUBTRANS(oldestActiveXID);
5353         StartupMultiXact();
5354
5355         /* Reload shared-memory state for prepared transactions */
5356         RecoverPreparedTransactions();
5357
5358         /* Shut down readFile facility, free space */
5359         if (readFile >= 0)
5360         {
5361                 close(readFile);
5362                 readFile = -1;
5363         }
5364         if (readBuf)
5365         {
5366                 free(readBuf);
5367                 readBuf = NULL;
5368         }
5369         if (readRecordBuf)
5370         {
5371                 free(readRecordBuf);
5372                 readRecordBuf = NULL;
5373                 readRecordBufSize = 0;
5374         }
5375 }
5376
5377 /*
5378  * Subroutine to try to fetch and validate a prior checkpoint record.
5379  *
5380  * whichChkpt identifies the checkpoint (merely for reporting purposes).
5381  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
5382  */
5383 static XLogRecord *
5384 ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
5385 {
5386         XLogRecord *record;
5387
5388         if (!XRecOffIsValid(RecPtr.xrecoff))
5389         {
5390                 switch (whichChkpt)
5391                 {
5392                         case 1:
5393                                 ereport(LOG,
5394                                 (errmsg("invalid primary checkpoint link in control file")));
5395                                 break;
5396                         case 2:
5397                                 ereport(LOG,
5398                                                 (errmsg("invalid secondary checkpoint link in control file")));
5399                                 break;
5400                         default:
5401                                 ereport(LOG,
5402                                    (errmsg("invalid checkpoint link in backup_label file")));
5403                                 break;
5404                 }
5405                 return NULL;
5406         }
5407
5408         record = ReadRecord(&RecPtr, LOG);
5409
5410         if (record == NULL)
5411         {
5412                 switch (whichChkpt)
5413                 {
5414                         case 1:
5415                                 ereport(LOG,
5416                                                 (errmsg("invalid primary checkpoint record")));
5417                                 break;
5418                         case 2:
5419                                 ereport(LOG,
5420                                                 (errmsg("invalid secondary checkpoint record")));
5421                                 break;
5422                         default:
5423                                 ereport(LOG,
5424                                                 (errmsg("invalid checkpoint record")));
5425                                 break;
5426                 }
5427                 return NULL;
5428         }
5429         if (record->xl_rmid != RM_XLOG_ID)
5430         {
5431                 switch (whichChkpt)
5432                 {
5433                         case 1:
5434                                 ereport(LOG,
5435                                                 (errmsg("invalid resource manager ID in primary checkpoint record")));
5436                                 break;
5437                         case 2:
5438                                 ereport(LOG,
5439                                                 (errmsg("invalid resource manager ID in secondary checkpoint record")));
5440                                 break;
5441                         default:
5442                                 ereport(LOG,
5443                                 (errmsg("invalid resource manager ID in checkpoint record")));
5444                                 break;
5445                 }
5446                 return NULL;
5447         }
5448         if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
5449                 record->xl_info != XLOG_CHECKPOINT_ONLINE)
5450         {
5451                 switch (whichChkpt)
5452                 {
5453                         case 1:
5454                                 ereport(LOG,
5455                                    (errmsg("invalid xl_info in primary checkpoint record")));
5456                                 break;
5457                         case 2:
5458                                 ereport(LOG,
5459                                  (errmsg("invalid xl_info in secondary checkpoint record")));
5460                                 break;
5461                         default:
5462                                 ereport(LOG,
5463                                                 (errmsg("invalid xl_info in checkpoint record")));
5464                                 break;
5465                 }
5466                 return NULL;
5467         }
5468         if (record->xl_len != sizeof(CheckPoint) ||
5469                 record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
5470         {
5471                 switch (whichChkpt)
5472                 {
5473                         case 1:
5474                                 ereport(LOG,
5475                                         (errmsg("invalid length of primary checkpoint record")));
5476                                 break;
5477                         case 2:
5478                                 ereport(LOG,
5479                                   (errmsg("invalid length of secondary checkpoint record")));
5480                                 break;
5481                         default:
5482                                 ereport(LOG,
5483                                                 (errmsg("invalid length of checkpoint record")));
5484                                 break;
5485                 }
5486                 return NULL;
5487         }
5488         return record;
5489 }
5490
5491 /*
5492  * This must be called during startup of a backend process, except that
5493  * it need not be called in a standalone backend (which does StartupXLOG
5494  * instead).  We need to initialize the local copies of ThisTimeLineID and
5495  * RedoRecPtr.
5496  *
5497  * Note: before Postgres 8.0, we went to some effort to keep the postmaster
5498  * process's copies of ThisTimeLineID and RedoRecPtr valid too.  This was
5499  * unnecessary however, since the postmaster itself never touches XLOG anyway.
5500  */
5501 void
5502 InitXLOGAccess(void)
5503 {
5504         /* ThisTimeLineID doesn't change so we need no lock to copy it */
5505         ThisTimeLineID = XLogCtl->ThisTimeLineID;
5506         /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
5507         (void) GetRedoRecPtr();
5508 }
5509
5510 /*
5511  * Once spawned, a backend may update its local RedoRecPtr from
5512  * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
5513  * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
5514  */
5515 XLogRecPtr
5516 GetRedoRecPtr(void)
5517 {
5518         /* use volatile pointer to prevent code rearrangement */
5519         volatile XLogCtlData *xlogctl = XLogCtl;
5520
5521         SpinLockAcquire(&xlogctl->info_lck);
5522         Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
5523         RedoRecPtr = xlogctl->Insert.RedoRecPtr;
5524         SpinLockRelease(&xlogctl->info_lck);
5525
5526         return RedoRecPtr;
5527 }
5528
5529 /*
5530  * GetInsertRecPtr -- Returns the current insert position.
5531  *
5532  * NOTE: The value *actually* returned is the position of the last full
5533  * xlog page. It lags behind the real insert position by at most 1 page.
5534  * For that, we don't need to acquire WALInsertLock which can be quite
5535  * heavily contended, and an approximation is enough for the current
5536  * usage of this function.
5537  */
5538 XLogRecPtr
5539 GetInsertRecPtr(void)
5540 {
5541         /* use volatile pointer to prevent code rearrangement */
5542         volatile XLogCtlData *xlogctl = XLogCtl;
5543         XLogRecPtr      recptr;
5544
5545         SpinLockAcquire(&xlogctl->info_lck);
5546         recptr = xlogctl->LogwrtRqst.Write;
5547         SpinLockRelease(&xlogctl->info_lck);
5548
5549         return recptr;
5550 }
5551
5552 /*
5553  * Get the time of the last xlog segment switch
5554  */
5555 pg_time_t
5556 GetLastSegSwitchTime(void)
5557 {
5558         pg_time_t       result;
5559
5560         /* Need WALWriteLock, but shared lock is sufficient */
5561         LWLockAcquire(WALWriteLock, LW_SHARED);
5562         result = XLogCtl->Write.lastSegSwitchTime;
5563         LWLockRelease(WALWriteLock);
5564
5565         return result;
5566 }
5567
5568 /*
5569  * GetNextXidAndEpoch - get the current nextXid value and associated epoch
5570  *
5571  * This is exported for use by code that would like to have 64-bit XIDs.
5572  * We don't really support such things, but all XIDs within the system
5573  * can be presumed "close to" the result, and thus the epoch associated
5574  * with them can be determined.
5575  */
5576 void
5577 GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
5578 {
5579         uint32          ckptXidEpoch;
5580         TransactionId ckptXid;
5581         TransactionId nextXid;
5582
5583         /* Must read checkpoint info first, else have race condition */
5584         {
5585                 /* use volatile pointer to prevent code rearrangement */
5586                 volatile XLogCtlData *xlogctl = XLogCtl;
5587
5588                 SpinLockAcquire(&xlogctl->info_lck);
5589                 ckptXidEpoch = xlogctl->ckptXidEpoch;
5590                 ckptXid = xlogctl->ckptXid;
5591                 SpinLockRelease(&xlogctl->info_lck);
5592         }
5593
5594         /* Now fetch current nextXid */
5595         nextXid = ReadNewTransactionId();
5596
5597         /*
5598          * nextXid is certainly logically later than ckptXid.  So if it's
5599          * numerically less, it must have wrapped into the next epoch.
5600          */
5601         if (nextXid < ckptXid)
5602                 ckptXidEpoch++;
5603
5604         *xid = nextXid;
5605         *epoch = ckptXidEpoch;
5606 }
5607
5608 /*
5609  * This must be called ONCE during postmaster or standalone-backend shutdown
5610  */
5611 void
5612 ShutdownXLOG(int code, Datum arg)
5613 {
5614         ereport(LOG,
5615                         (errmsg("shutting down")));
5616
5617         CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
5618         ShutdownCLOG();
5619         ShutdownSUBTRANS();
5620         ShutdownMultiXact();
5621
5622         ereport(LOG,
5623                         (errmsg("database system is shut down")));
5624 }
5625
5626 /*
5627  * Log start of a checkpoint.
5628  */
5629 static void
5630 LogCheckpointStart(int flags)
5631 {
5632         elog(LOG, "checkpoint starting:%s%s%s%s%s%s",
5633                  (flags & CHECKPOINT_IS_SHUTDOWN) ? " shutdown" : "",
5634                  (flags & CHECKPOINT_IMMEDIATE) ? " immediate" : "",
5635                  (flags & CHECKPOINT_FORCE) ? " force" : "",
5636                  (flags & CHECKPOINT_WAIT) ? " wait" : "",
5637                  (flags & CHECKPOINT_CAUSE_XLOG) ? " xlog" : "",
5638                  (flags & CHECKPOINT_CAUSE_TIME) ? " time" : "");
5639 }
5640
5641 /*
5642  * Log end of a checkpoint.
5643  */
5644 static void
5645 LogCheckpointEnd(void)
5646 {
5647         long            write_secs,
5648                                 sync_secs,
5649                                 total_secs;
5650         int                     write_usecs,
5651                                 sync_usecs,
5652                                 total_usecs;
5653
5654         CheckpointStats.ckpt_end_t = GetCurrentTimestamp();
5655
5656         TimestampDifference(CheckpointStats.ckpt_start_t,
5657                                                 CheckpointStats.ckpt_end_t,
5658                                                 &total_secs, &total_usecs);
5659
5660         TimestampDifference(CheckpointStats.ckpt_write_t,
5661                                                 CheckpointStats.ckpt_sync_t,
5662                                                 &write_secs, &write_usecs);
5663
5664         TimestampDifference(CheckpointStats.ckpt_sync_t,
5665                                                 CheckpointStats.ckpt_sync_end_t,
5666                                                 &sync_secs, &sync_usecs);
5667
5668         elog(LOG, "checkpoint complete: wrote %d buffers (%.1f%%); "
5669                  "%d transaction log file(s) added, %d removed, %d recycled; "
5670                  "write=%ld.%03d s, sync=%ld.%03d s, total=%ld.%03d s",
5671                  CheckpointStats.ckpt_bufs_written,
5672                  (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers,
5673                  CheckpointStats.ckpt_segs_added,
5674                  CheckpointStats.ckpt_segs_removed,
5675                  CheckpointStats.ckpt_segs_recycled,
5676                  write_secs, write_usecs / 1000,
5677                  sync_secs, sync_usecs / 1000,
5678                  total_secs, total_usecs / 1000);
5679 }
5680
5681 /*
5682  * Perform a checkpoint --- either during shutdown, or on-the-fly
5683  *
5684  * flags is a bitwise OR of the following:
5685  *      CHECKPOINT_IS_SHUTDOWN: checkpoint is for database shutdown.
5686  *      CHECKPOINT_IMMEDIATE: finish the checkpoint ASAP,
5687  *              ignoring checkpoint_completion_target parameter.
5688  *      CHECKPOINT_FORCE: force a checkpoint even if no XLOG activity has occured
5689  *              since the last one (implied by CHECKPOINT_IS_SHUTDOWN).
5690  *
5691  * Note: flags contains other bits, of interest here only for logging purposes.
5692  * In particular note that this routine is synchronous and does not pay
5693  * attention to CHECKPOINT_WAIT.
5694  */
5695 void
5696 CreateCheckPoint(int flags)
5697 {
5698         bool            shutdown = (flags & CHECKPOINT_IS_SHUTDOWN) != 0;
5699         CheckPoint      checkPoint;
5700         XLogRecPtr      recptr;
5701         XLogCtlInsert *Insert = &XLogCtl->Insert;
5702         XLogRecData rdata;
5703         uint32          freespace;
5704         uint32          _logId;
5705         uint32          _logSeg;
5706         TransactionId *inCommitXids;
5707         int                     nInCommit;
5708
5709         /*
5710          * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
5711          * (This is just pro forma, since in the present system structure there is
5712          * only one process that is allowed to issue checkpoints at any given
5713          * time.)
5714          */
5715         LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
5716
5717         /*
5718          * Prepare to accumulate statistics.
5719          *
5720          * Note: because it is possible for log_checkpoints to change while a
5721          * checkpoint proceeds, we always accumulate stats, even if
5722          * log_checkpoints is currently off.
5723          */
5724         MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
5725         CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
5726
5727         /*
5728          * Use a critical section to force system panic if we have trouble.
5729          */
5730         START_CRIT_SECTION();
5731
5732         if (shutdown)
5733         {
5734                 ControlFile->state = DB_SHUTDOWNING;
5735                 ControlFile->time = (pg_time_t) time(NULL);
5736                 UpdateControlFile();
5737         }
5738
5739         /*
5740          * Let smgr prepare for checkpoint; this has to happen before we determine
5741          * the REDO pointer.  Note that smgr must not do anything that'd have to
5742          * be undone if we decide no checkpoint is needed.
5743          */
5744         smgrpreckpt();
5745
5746         /* Begin filling in the checkpoint WAL record */
5747         MemSet(&checkPoint, 0, sizeof(checkPoint));
5748         checkPoint.ThisTimeLineID = ThisTimeLineID;
5749         checkPoint.time = (pg_time_t) time(NULL);
5750
5751         /*
5752          * We must hold WALInsertLock while examining insert state to determine
5753          * the checkpoint REDO pointer.
5754          */
5755         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5756
5757         /*
5758          * If this isn't a shutdown or forced checkpoint, and we have not inserted
5759          * any XLOG records since the start of the last checkpoint, skip the
5760          * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
5761          * when the system is idle. That wastes log space, and more importantly it
5762          * exposes us to possible loss of both current and previous checkpoint
5763          * records if the machine crashes just as we're writing the update.
5764          * (Perhaps it'd make even more sense to checkpoint only when the previous
5765          * checkpoint record is in a different xlog page?)
5766          *
5767          * We have to make two tests to determine that nothing has happened since
5768          * the start of the last checkpoint: current insertion point must match
5769          * the end of the last checkpoint record, and its redo pointer must point
5770          * to itself.
5771          */
5772         if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_FORCE)) == 0)
5773         {
5774                 XLogRecPtr      curInsert;
5775
5776                 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
5777                 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
5778                         curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
5779                         MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
5780                         ControlFile->checkPoint.xlogid ==
5781                         ControlFile->checkPointCopy.redo.xlogid &&
5782                         ControlFile->checkPoint.xrecoff ==
5783                         ControlFile->checkPointCopy.redo.xrecoff)
5784                 {
5785                         LWLockRelease(WALInsertLock);
5786                         LWLockRelease(CheckpointLock);
5787                         END_CRIT_SECTION();
5788                         return;
5789                 }
5790         }
5791
5792         /*
5793          * Compute new REDO record ptr = location of next XLOG record.
5794          *
5795          * NB: this is NOT necessarily where the checkpoint record itself will be,
5796          * since other backends may insert more XLOG records while we're off doing
5797          * the buffer flush work.  Those XLOG records are logically after the
5798          * checkpoint, even though physically before it.  Got that?
5799          */
5800         freespace = INSERT_FREESPACE(Insert);
5801         if (freespace < SizeOfXLogRecord)
5802         {
5803                 (void) AdvanceXLInsertBuffer(false);
5804                 /* OK to ignore update return flag, since we will do flush anyway */
5805                 freespace = INSERT_FREESPACE(Insert);
5806         }
5807         INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
5808
5809         /*
5810          * Here we update the shared RedoRecPtr for future XLogInsert calls; this
5811          * must be done while holding the insert lock AND the info_lck.
5812          *
5813          * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
5814          * pointing past where it really needs to point.  This is okay; the only
5815          * consequence is that XLogInsert might back up whole buffers that it
5816          * didn't really need to.  We can't postpone advancing RedoRecPtr because
5817          * XLogInserts that happen while we are dumping buffers must assume that
5818          * their buffer changes are not included in the checkpoint.
5819          */
5820         {
5821                 /* use volatile pointer to prevent code rearrangement */
5822                 volatile XLogCtlData *xlogctl = XLogCtl;
5823
5824                 SpinLockAcquire(&xlogctl->info_lck);
5825                 RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
5826                 SpinLockRelease(&xlogctl->info_lck);
5827         }
5828
5829         /*
5830          * Now we can release WAL insert lock, allowing other xacts to proceed
5831          * while we are flushing disk buffers.
5832          */
5833         LWLockRelease(WALInsertLock);
5834
5835         /*
5836          * If enabled, log checkpoint start.  We postpone this until now so as not
5837          * to log anything if we decided to skip the checkpoint.
5838          */
5839         if (log_checkpoints)
5840                 LogCheckpointStart(flags);
5841
5842         /*
5843          * Before flushing data, we must wait for any transactions that are
5844          * currently in their commit critical sections.  If an xact inserted its
5845          * commit record into XLOG just before the REDO point, then a crash
5846          * restart from the REDO point would not replay that record, which means
5847          * that our flushing had better include the xact's update of pg_clog.  So
5848          * we wait till he's out of his commit critical section before proceeding.
5849          * See notes in RecordTransactionCommit().
5850          *
5851          * Because we've already released WALInsertLock, this test is a bit fuzzy:
5852          * it is possible that we will wait for xacts we didn't really need to
5853          * wait for.  But the delay should be short and it seems better to make
5854          * checkpoint take a bit longer than to hold locks longer than necessary.
5855          * (In fact, the whole reason we have this issue is that xact.c does
5856          * commit record XLOG insertion and clog update as two separate steps
5857          * protected by different locks, but again that seems best on grounds of
5858          * minimizing lock contention.)
5859          *
5860          * A transaction that has not yet set inCommit when we look cannot be at
5861          * risk, since he's not inserted his commit record yet; and one that's
5862          * already cleared it is not at risk either, since he's done fixing clog
5863          * and we will correctly flush the update below.  So we cannot miss any
5864          * xacts we need to wait for.
5865          */
5866         nInCommit = GetTransactionsInCommit(&inCommitXids);
5867         if (nInCommit > 0)
5868         {
5869                 do
5870                 {
5871                         pg_usleep(10000L);      /* wait for 10 msec */
5872                 } while (HaveTransactionsInCommit(inCommitXids, nInCommit));
5873         }
5874         pfree(inCommitXids);
5875
5876         /*
5877          * Get the other info we need for the checkpoint record.
5878          */
5879         LWLockAcquire(XidGenLock, LW_SHARED);
5880         checkPoint.nextXid = ShmemVariableCache->nextXid;
5881         LWLockRelease(XidGenLock);
5882
5883         /* Increase XID epoch if we've wrapped around since last checkpoint */
5884         checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
5885         if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
5886                 checkPoint.nextXidEpoch++;
5887
5888         LWLockAcquire(OidGenLock, LW_SHARED);
5889         checkPoint.nextOid = ShmemVariableCache->nextOid;
5890         if (!shutdown)
5891                 checkPoint.nextOid += ShmemVariableCache->oidCount;
5892         LWLockRelease(OidGenLock);
5893
5894         MultiXactGetCheckptMulti(shutdown,
5895                                                          &checkPoint.nextMulti,
5896                                                          &checkPoint.nextMultiOffset);
5897
5898         /*
5899          * Having constructed the checkpoint record, ensure all shmem disk buffers
5900          * and commit-log buffers are flushed to disk.
5901          *
5902          * This I/O could fail for various reasons.  If so, we will fail to
5903          * complete the checkpoint, but there is no reason to force a system
5904          * panic. Accordingly, exit critical section while doing it.
5905          */
5906         END_CRIT_SECTION();
5907
5908         CheckPointGuts(checkPoint.redo, flags);
5909
5910         START_CRIT_SECTION();
5911
5912         /*
5913          * Now insert the checkpoint record into XLOG.
5914          */
5915         rdata.data = (char *) (&checkPoint);
5916         rdata.len = sizeof(checkPoint);
5917         rdata.buffer = InvalidBuffer;
5918         rdata.next = NULL;
5919
5920         recptr = XLogInsert(RM_XLOG_ID,
5921                                                 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
5922                                                 XLOG_CHECKPOINT_ONLINE,
5923                                                 &rdata);
5924
5925         XLogFlush(recptr);
5926
5927         /*
5928          * We now have ProcLastRecPtr = start of actual checkpoint record, recptr
5929          * = end of actual checkpoint record.
5930          */
5931         if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
5932                 ereport(PANIC,
5933                                 (errmsg("concurrent transaction log activity while database system is shutting down")));
5934
5935         /*
5936          * Select point at which we can truncate the log, which we base on the
5937          * prior checkpoint's earliest info.
5938          */
5939         XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
5940
5941         /*
5942          * Update the control file.
5943          */
5944         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5945         if (shutdown)
5946                 ControlFile->state = DB_SHUTDOWNED;
5947         ControlFile->prevCheckPoint = ControlFile->checkPoint;
5948         ControlFile->checkPoint = ProcLastRecPtr;
5949         ControlFile->checkPointCopy = checkPoint;
5950         ControlFile->time = (pg_time_t) time(NULL);
5951         UpdateControlFile();
5952         LWLockRelease(ControlFileLock);
5953
5954         /* Update shared-memory copy of checkpoint XID/epoch */
5955         {
5956                 /* use volatile pointer to prevent code rearrangement */
5957                 volatile XLogCtlData *xlogctl = XLogCtl;
5958
5959                 SpinLockAcquire(&xlogctl->info_lck);
5960                 xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
5961                 xlogctl->ckptXid = checkPoint.nextXid;
5962                 SpinLockRelease(&xlogctl->info_lck);
5963         }
5964
5965         /*
5966          * We are now done with critical updates; no need for system panic if we
5967          * have trouble while fooling with old log segments.
5968          */
5969         END_CRIT_SECTION();
5970
5971         /*
5972          * Let smgr do post-checkpoint cleanup (eg, deleting old files).
5973          */
5974         smgrpostckpt();
5975
5976         /*
5977          * Delete old log files (those no longer needed even for previous
5978          * checkpoint).
5979          */
5980         if (_logId || _logSeg)
5981         {
5982                 PrevLogSeg(_logId, _logSeg);
5983                 RemoveOldXlogFiles(_logId, _logSeg, recptr);
5984         }
5985
5986         /*
5987          * Make more log segments if needed.  (Do this after recycling old log
5988          * segments, since that may supply some of the needed files.)
5989          */
5990         if (!shutdown)
5991                 PreallocXlogFiles(recptr);
5992
5993         /*
5994          * Truncate pg_subtrans if possible.  We can throw away all data before
5995          * the oldest XMIN of any running transaction.  No future transaction will
5996          * attempt to reference any pg_subtrans entry older than that (see Asserts
5997          * in subtrans.c).      During recovery, though, we mustn't do this because
5998          * StartupSUBTRANS hasn't been called yet.
5999          */
6000         if (!InRecovery)
6001                 TruncateSUBTRANS(GetOldestXmin(true, false));
6002
6003         /* All real work is done, but log before releasing lock. */
6004         if (log_checkpoints)
6005                 LogCheckpointEnd();
6006
6007         LWLockRelease(CheckpointLock);
6008 }
6009
6010 /*
6011  * Flush all data in shared memory to disk, and fsync
6012  *
6013  * This is the common code shared between regular checkpoints and
6014  * recovery restartpoints.
6015  */
6016 static void
6017 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
6018 {
6019         CheckPointCLOG();
6020         CheckPointSUBTRANS();
6021         CheckPointMultiXact();
6022         CheckPointBuffers(flags);       /* performs all required fsyncs */
6023         /* We deliberately delay 2PC checkpointing as long as possible */
6024         CheckPointTwoPhase(checkPointRedo);
6025 }
6026
6027 /*
6028  * Set a recovery restart point if appropriate
6029  *
6030  * This is similar to CreateCheckPoint, but is used during WAL recovery
6031  * to establish a point from which recovery can roll forward without
6032  * replaying the entire recovery log.  This function is called each time
6033  * a checkpoint record is read from XLOG; it must determine whether a
6034  * restartpoint is needed or not.
6035  */
6036 static void
6037 RecoveryRestartPoint(const CheckPoint *checkPoint)
6038 {
6039         int                     elapsed_secs;
6040         int                     rmid;
6041
6042         /*
6043          * Do nothing if the elapsed time since the last restartpoint is less than
6044          * half of checkpoint_timeout.  (We use a value less than
6045          * checkpoint_timeout so that variations in the timing of checkpoints on
6046          * the master, or speed of transmission of WAL segments to a slave, won't
6047          * make the slave skip a restartpoint once it's synced with the master.)
6048          * Checking true elapsed time keeps us from doing restartpoints too often
6049          * while rapidly scanning large amounts of WAL.
6050          */
6051         elapsed_secs = (pg_time_t) time(NULL) - ControlFile->time;
6052         if (elapsed_secs < CheckPointTimeout / 2)
6053                 return;
6054
6055         /*
6056          * Is it safe to checkpoint?  We must ask each of the resource managers
6057          * whether they have any partial state information that might prevent a
6058          * correct restart from this point.  If so, we skip this opportunity, but
6059          * return at the next checkpoint record for another try.
6060          */
6061         for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
6062         {
6063                 if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
6064                         if (!(RmgrTable[rmid].rm_safe_restartpoint()))
6065                         {
6066                                 elog(DEBUG2, "RM %d not safe to record restart point at %X/%X",
6067                                          rmid,
6068                                          checkPoint->redo.xlogid,
6069                                          checkPoint->redo.xrecoff);
6070                                 return;
6071                         }
6072         }
6073
6074         /*
6075          * OK, force data out to disk
6076          */
6077         CheckPointGuts(checkPoint->redo, CHECKPOINT_IMMEDIATE);
6078
6079         /*
6080          * Update pg_control so that any subsequent crash will restart from this
6081          * checkpoint.  Note: ReadRecPtr gives the XLOG address of the checkpoint
6082          * record itself.
6083          */
6084         ControlFile->prevCheckPoint = ControlFile->checkPoint;
6085         ControlFile->checkPoint = ReadRecPtr;
6086         ControlFile->checkPointCopy = *checkPoint;
6087         ControlFile->time = (pg_time_t) time(NULL);
6088         UpdateControlFile();
6089
6090         ereport((recoveryLogRestartpoints ? LOG : DEBUG2),
6091                         (errmsg("recovery restart point at %X/%X",
6092                                         checkPoint->redo.xlogid, checkPoint->redo.xrecoff)));
6093         if (recoveryLastXTime)
6094                 ereport((recoveryLogRestartpoints ? LOG : DEBUG2),
6095                                 (errmsg("last completed transaction was at log time %s",
6096                                                 timestamptz_to_str(recoveryLastXTime))));
6097 }
6098
6099 /*
6100  * Write a NEXTOID log record
6101  */
6102 void
6103 XLogPutNextOid(Oid nextOid)
6104 {
6105         XLogRecData rdata;
6106
6107         rdata.data = (char *) (&nextOid);
6108         rdata.len = sizeof(Oid);
6109         rdata.buffer = InvalidBuffer;
6110         rdata.next = NULL;
6111         (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
6112
6113         /*
6114          * We need not flush the NEXTOID record immediately, because any of the
6115          * just-allocated OIDs could only reach disk as part of a tuple insert or
6116          * update that would have its own XLOG record that must follow the NEXTOID
6117          * record.      Therefore, the standard buffer LSN interlock applied to those
6118          * records will ensure no such OID reaches disk before the NEXTOID record
6119          * does.
6120          *
6121          * Note, however, that the above statement only covers state "within" the
6122          * database.  When we use a generated OID as a file or directory name, we
6123          * are in a sense violating the basic WAL rule, because that filesystem
6124          * change may reach disk before the NEXTOID WAL record does.  The impact
6125          * of this is that if a database crash occurs immediately afterward, we
6126          * might after restart re-generate the same OID and find that it conflicts
6127          * with the leftover file or directory.  But since for safety's sake we
6128          * always loop until finding a nonconflicting filename, this poses no real
6129          * problem in practice. See pgsql-hackers discussion 27-Sep-2006.
6130          */
6131 }
6132
6133 /*
6134  * Write an XLOG SWITCH record.
6135  *
6136  * Here we just blindly issue an XLogInsert request for the record.
6137  * All the magic happens inside XLogInsert.
6138  *
6139  * The return value is either the end+1 address of the switch record,
6140  * or the end+1 address of the prior segment if we did not need to
6141  * write a switch record because we are already at segment start.
6142  */
6143 XLogRecPtr
6144 RequestXLogSwitch(void)
6145 {
6146         XLogRecPtr      RecPtr;
6147         XLogRecData rdata;
6148
6149         /* XLOG SWITCH, alone among xlog record types, has no data */
6150         rdata.buffer = InvalidBuffer;
6151         rdata.data = NULL;
6152         rdata.len = 0;
6153         rdata.next = NULL;
6154
6155         RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
6156
6157         return RecPtr;
6158 }
6159
6160 /*
6161  * XLOG resource manager's routines
6162  */
6163 void
6164 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
6165 {
6166         uint8           info = record->xl_info & ~XLR_INFO_MASK;
6167
6168         if (info == XLOG_NEXTOID)
6169         {
6170                 Oid                     nextOid;
6171
6172                 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
6173                 if (ShmemVariableCache->nextOid < nextOid)
6174                 {
6175                         ShmemVariableCache->nextOid = nextOid;
6176                         ShmemVariableCache->oidCount = 0;
6177                 }
6178         }
6179         else if (info == XLOG_CHECKPOINT_SHUTDOWN)
6180         {
6181                 CheckPoint      checkPoint;
6182
6183                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
6184                 /* In a SHUTDOWN checkpoint, believe the counters exactly */
6185                 ShmemVariableCache->nextXid = checkPoint.nextXid;
6186                 ShmemVariableCache->nextOid = checkPoint.nextOid;
6187                 ShmemVariableCache->oidCount = 0;
6188                 MultiXactSetNextMXact(checkPoint.nextMulti,
6189                                                           checkPoint.nextMultiOffset);
6190
6191                 /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
6192                 ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
6193                 ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
6194
6195                 /*
6196                  * TLI may change in a shutdown checkpoint, but it shouldn't decrease
6197                  */
6198                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
6199                 {
6200                         if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
6201                                 !list_member_int(expectedTLIs,
6202                                                                  (int) checkPoint.ThisTimeLineID))
6203                                 ereport(PANIC,
6204                                                 (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
6205                                                                 checkPoint.ThisTimeLineID, ThisTimeLineID)));
6206                         /* Following WAL records should be run with new TLI */
6207                         ThisTimeLineID = checkPoint.ThisTimeLineID;
6208                 }
6209
6210                 RecoveryRestartPoint(&checkPoint);
6211         }
6212         else if (info == XLOG_CHECKPOINT_ONLINE)
6213         {
6214                 CheckPoint      checkPoint;
6215
6216                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
6217                 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
6218                 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
6219                                                                   checkPoint.nextXid))
6220                         ShmemVariableCache->nextXid = checkPoint.nextXid;
6221                 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
6222                 {
6223                         ShmemVariableCache->nextOid = checkPoint.nextOid;
6224                         ShmemVariableCache->oidCount = 0;
6225                 }
6226                 MultiXactAdvanceNextMXact(checkPoint.nextMulti,
6227                                                                   checkPoint.nextMultiOffset);
6228
6229                 /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
6230                 ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
6231                 ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
6232
6233                 /* TLI should not change in an on-line checkpoint */
6234                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
6235                         ereport(PANIC,
6236                                         (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
6237                                                         checkPoint.ThisTimeLineID, ThisTimeLineID)));
6238
6239                 RecoveryRestartPoint(&checkPoint);
6240         }
6241         else if (info == XLOG_NOOP)
6242         {
6243                 /* nothing to do here */
6244         }
6245         else if (info == XLOG_SWITCH)
6246         {
6247                 /* nothing to do here */
6248         }
6249 }
6250
6251 void
6252 xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
6253 {
6254         uint8           info = xl_info & ~XLR_INFO_MASK;
6255
6256         if (info == XLOG_CHECKPOINT_SHUTDOWN ||
6257                 info == XLOG_CHECKPOINT_ONLINE)
6258         {
6259                 CheckPoint *checkpoint = (CheckPoint *) rec;
6260
6261                 appendStringInfo(buf, "checkpoint: redo %X/%X; "
6262                                                  "tli %u; xid %u/%u; oid %u; multi %u; offset %u; %s",
6263                                                  checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
6264                                                  checkpoint->ThisTimeLineID,
6265                                                  checkpoint->nextXidEpoch, checkpoint->nextXid,
6266                                                  checkpoint->nextOid,
6267                                                  checkpoint->nextMulti,
6268                                                  checkpoint->nextMultiOffset,
6269                                  (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
6270         }
6271         else if (info == XLOG_NOOP)
6272         {
6273                 appendStringInfo(buf, "xlog no-op");
6274         }
6275         else if (info == XLOG_NEXTOID)
6276         {
6277                 Oid                     nextOid;
6278
6279                 memcpy(&nextOid, rec, sizeof(Oid));
6280                 appendStringInfo(buf, "nextOid: %u", nextOid);
6281         }
6282         else if (info == XLOG_SWITCH)
6283         {
6284                 appendStringInfo(buf, "xlog switch");
6285         }
6286         else
6287                 appendStringInfo(buf, "UNKNOWN");
6288 }
6289
6290 #ifdef WAL_DEBUG
6291
6292 static void
6293 xlog_outrec(StringInfo buf, XLogRecord *record)
6294 {
6295         int                     i;
6296
6297         appendStringInfo(buf, "prev %X/%X; xid %u",
6298                                          record->xl_prev.xlogid, record->xl_prev.xrecoff,
6299                                          record->xl_xid);
6300
6301         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
6302         {
6303                 if (record->xl_info & XLR_SET_BKP_BLOCK(i))
6304                         appendStringInfo(buf, "; bkpb%d", i + 1);
6305         }
6306
6307         appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
6308 }
6309 #endif   /* WAL_DEBUG */
6310
6311
6312 /*
6313  * Return the (possible) sync flag used for opening a file, depending on the
6314  * value of the GUC wal_sync_method.
6315  */
6316 static int
6317 get_sync_bit(int method)
6318 {
6319         /* If fsync is disabled, never open in sync mode */
6320         if (!enableFsync)
6321                 return 0;
6322
6323         switch (method)
6324         {
6325                 /*
6326                  * enum values for all sync options are defined even if they are not
6327                  * supported on the current platform.  But if not, they are not
6328                  * included in the enum option array, and therefore will never be seen
6329                  * here.
6330                  */
6331                 case SYNC_METHOD_FSYNC:
6332                 case SYNC_METHOD_FSYNC_WRITETHROUGH:
6333                 case SYNC_METHOD_FDATASYNC:
6334                         return 0;
6335 #ifdef OPEN_SYNC_FLAG
6336                 case SYNC_METHOD_OPEN:
6337                         return OPEN_SYNC_FLAG;
6338 #endif
6339 #ifdef OPEN_DATASYNC_FLAG
6340                 case SYNC_METHOD_OPEN_DSYNC:
6341                         return OPEN_DATASYNC_FLAG;
6342 #endif
6343                 default:
6344                         /* can't happen (unless we are out of sync with option array) */
6345                         elog(ERROR, "unrecognized wal_sync_method: %d", method);
6346                         return 0; /* silence warning */
6347         }
6348 }
6349
6350 /*
6351  * GUC support
6352  */
6353 bool
6354 assign_xlog_sync_method(int new_sync_method, bool doit, GucSource source)
6355 {
6356         if (!doit)
6357                 return true;
6358
6359         if (sync_method != new_sync_method)
6360         {
6361                 /*
6362                  * To ensure that no blocks escape unsynced, force an fsync on the
6363                  * currently open log segment (if any).  Also, if the open flag is
6364                  * changing, close the log file so it will be reopened (with new flag
6365                  * bit) at next use.
6366                  */
6367                 if (openLogFile >= 0)
6368                 {
6369                         if (pg_fsync(openLogFile) != 0)
6370                                 ereport(PANIC,
6371                                                 (errcode_for_file_access(),
6372                                                  errmsg("could not fsync log file %u, segment %u: %m",
6373                                                                 openLogId, openLogSeg)));
6374                         if (get_sync_bit(sync_method) != get_sync_bit(new_sync_method))
6375                                 XLogFileClose();
6376                 }
6377         }
6378
6379         return true;
6380 }
6381
6382
6383 /*
6384  * Issue appropriate kind of fsync (if any) on the current XLOG output file
6385  */
6386 static void
6387 issue_xlog_fsync(void)
6388 {
6389         switch (sync_method)
6390         {
6391                 case SYNC_METHOD_FSYNC:
6392                         if (pg_fsync_no_writethrough(openLogFile) != 0)
6393                                 ereport(PANIC,
6394                                                 (errcode_for_file_access(),
6395                                                  errmsg("could not fsync log file %u, segment %u: %m",
6396                                                                 openLogId, openLogSeg)));
6397                         break;
6398 #ifdef HAVE_FSYNC_WRITETHROUGH
6399                 case SYNC_METHOD_FSYNC_WRITETHROUGH:
6400                         if (pg_fsync_writethrough(openLogFile) != 0)
6401                                 ereport(PANIC,
6402                                                 (errcode_for_file_access(),
6403                                                  errmsg("could not fsync write-through log file %u, segment %u: %m",
6404                                                                 openLogId, openLogSeg)));
6405                         break;
6406 #endif
6407 #ifdef HAVE_FDATASYNC
6408                 case SYNC_METHOD_FDATASYNC:
6409                         if (pg_fdatasync(openLogFile) != 0)
6410                                 ereport(PANIC,
6411                                                 (errcode_for_file_access(),
6412                                         errmsg("could not fdatasync log file %u, segment %u: %m",
6413                                                    openLogId, openLogSeg)));
6414                         break;
6415 #endif
6416                 case SYNC_METHOD_OPEN:
6417                 case SYNC_METHOD_OPEN_DSYNC:
6418                         /* write synced it already */
6419                         break;
6420                 default:
6421                         elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
6422                         break;
6423         }
6424 }
6425
6426
6427 /*
6428  * pg_start_backup: set up for taking an on-line backup dump
6429  *
6430  * Essentially what this does is to create a backup label file in $PGDATA,
6431  * where it will be archived as part of the backup dump.  The label file
6432  * contains the user-supplied label string (typically this would be used
6433  * to tell where the backup dump will be stored) and the starting time and
6434  * starting WAL location for the dump.
6435  */
6436 Datum
6437 pg_start_backup(PG_FUNCTION_ARGS)
6438 {
6439         text       *backupid = PG_GETARG_TEXT_P(0);
6440         char       *backupidstr;
6441         XLogRecPtr      checkpointloc;
6442         XLogRecPtr      startpoint;
6443         pg_time_t       stamp_time;
6444         char            strfbuf[128];
6445         char            xlogfilename[MAXFNAMELEN];
6446         uint32          _logId;
6447         uint32          _logSeg;
6448         struct stat stat_buf;
6449         FILE       *fp;
6450
6451         if (!superuser())
6452                 ereport(ERROR,
6453                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6454                                  errmsg("must be superuser to run a backup")));
6455
6456         if (!XLogArchivingActive())
6457                 ereport(ERROR,
6458                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6459                                  errmsg("WAL archiving is not active"),
6460                                  errhint("archive_mode must be enabled at server start.")));
6461
6462         if (!XLogArchiveCommandSet())
6463                 ereport(ERROR,
6464                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6465                                  errmsg("WAL archiving is not active"),
6466                                  errhint("archive_command must be defined before "
6467                                                  "online backups can be made safely.")));
6468
6469         backupidstr = text_to_cstring(backupid);
6470
6471         /*
6472          * Mark backup active in shared memory.  We must do full-page WAL writes
6473          * during an on-line backup even if not doing so at other times, because
6474          * it's quite possible for the backup dump to obtain a "torn" (partially
6475          * written) copy of a database page if it reads the page concurrently with
6476          * our write to the same page.  This can be fixed as long as the first
6477          * write to the page in the WAL sequence is a full-page write. Hence, we
6478          * turn on forcePageWrites and then force a CHECKPOINT, to ensure there
6479          * are no dirty pages in shared memory that might get dumped while the
6480          * backup is in progress without having a corresponding WAL record.  (Once
6481          * the backup is complete, we need not force full-page writes anymore,
6482          * since we expect that any pages not modified during the backup interval
6483          * must have been correctly captured by the backup.)
6484          *
6485          * We must hold WALInsertLock to change the value of forcePageWrites, to
6486          * ensure adequate interlocking against XLogInsert().
6487          */
6488         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6489         if (XLogCtl->Insert.forcePageWrites)
6490         {
6491                 LWLockRelease(WALInsertLock);
6492                 ereport(ERROR,
6493                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6494                                  errmsg("a backup is already in progress"),
6495                                  errhint("Run pg_stop_backup() and try again.")));
6496         }
6497         XLogCtl->Insert.forcePageWrites = true;
6498         LWLockRelease(WALInsertLock);
6499
6500         /* Ensure we release forcePageWrites if fail below */
6501         PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) 0);
6502         {
6503                 /*
6504                  * Force a CHECKPOINT.  Aside from being necessary to prevent torn
6505                  * page problems, this guarantees that two successive backup runs will
6506                  * have different checkpoint positions and hence different history
6507                  * file names, even if nothing happened in between.
6508                  *
6509                  * We don't use CHECKPOINT_IMMEDIATE, hence this can take awhile.
6510                  */
6511                 RequestCheckpoint(CHECKPOINT_FORCE | CHECKPOINT_WAIT);
6512
6513                 /*
6514                  * Now we need to fetch the checkpoint record location, and also its
6515                  * REDO pointer.  The oldest point in WAL that would be needed to
6516                  * restore starting from the checkpoint is precisely the REDO pointer.
6517                  */
6518                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
6519                 checkpointloc = ControlFile->checkPoint;
6520                 startpoint = ControlFile->checkPointCopy.redo;
6521                 LWLockRelease(ControlFileLock);
6522
6523                 XLByteToSeg(startpoint, _logId, _logSeg);
6524                 XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
6525
6526                 /* Use the log timezone here, not the session timezone */
6527                 stamp_time = (pg_time_t) time(NULL);
6528                 pg_strftime(strfbuf, sizeof(strfbuf),
6529                                         "%Y-%m-%d %H:%M:%S %Z",
6530                                         pg_localtime(&stamp_time, log_timezone));
6531
6532                 /*
6533                  * Check for existing backup label --- implies a backup is already
6534                  * running.  (XXX given that we checked forcePageWrites above, maybe
6535                  * it would be OK to just unlink any such label file?)
6536                  */
6537                 if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0)
6538                 {
6539                         if (errno != ENOENT)
6540                                 ereport(ERROR,
6541                                                 (errcode_for_file_access(),
6542                                                  errmsg("could not stat file \"%s\": %m",
6543                                                                 BACKUP_LABEL_FILE)));
6544                 }
6545                 else
6546                         ereport(ERROR,
6547                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6548                                          errmsg("a backup is already in progress"),
6549                                          errhint("If you're sure there is no backup in progress, remove file \"%s\" and try again.",
6550                                                          BACKUP_LABEL_FILE)));
6551
6552                 /*
6553                  * Okay, write the file
6554                  */
6555                 fp = AllocateFile(BACKUP_LABEL_FILE, "w");
6556                 if (!fp)
6557                         ereport(ERROR,
6558                                         (errcode_for_file_access(),
6559                                          errmsg("could not create file \"%s\": %m",
6560                                                         BACKUP_LABEL_FILE)));
6561                 fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
6562                                 startpoint.xlogid, startpoint.xrecoff, xlogfilename);
6563                 fprintf(fp, "CHECKPOINT LOCATION: %X/%X\n",
6564                                 checkpointloc.xlogid, checkpointloc.xrecoff);
6565                 fprintf(fp, "START TIME: %s\n", strfbuf);
6566                 fprintf(fp, "LABEL: %s\n", backupidstr);
6567                 if (fflush(fp) || ferror(fp) || FreeFile(fp))
6568                         ereport(ERROR,
6569                                         (errcode_for_file_access(),
6570                                          errmsg("could not write file \"%s\": %m",
6571                                                         BACKUP_LABEL_FILE)));
6572         }
6573         PG_END_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) 0);
6574
6575         /*
6576          * We're done.  As a convenience, return the starting WAL location.
6577          */
6578         snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
6579                          startpoint.xlogid, startpoint.xrecoff);
6580         PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
6581 }
6582
6583 /* Error cleanup callback for pg_start_backup */
6584 static void
6585 pg_start_backup_callback(int code, Datum arg)
6586 {
6587         /* Turn off forcePageWrites on failure */
6588         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6589         XLogCtl->Insert.forcePageWrites = false;
6590         LWLockRelease(WALInsertLock);
6591 }
6592
6593 /*
6594  * pg_stop_backup: finish taking an on-line backup dump
6595  *
6596  * We remove the backup label file created by pg_start_backup, and instead
6597  * create a backup history file in pg_xlog (whence it will immediately be
6598  * archived).  The backup history file contains the same info found in
6599  * the label file, plus the backup-end time and WAL location.
6600  * Note: different from CancelBackup which just cancels online backup mode.
6601  */
6602 Datum
6603 pg_stop_backup(PG_FUNCTION_ARGS)
6604 {
6605         XLogRecPtr      startpoint;
6606         XLogRecPtr      stoppoint;
6607         pg_time_t       stamp_time;
6608         char            strfbuf[128];
6609         char            histfilepath[MAXPGPATH];
6610         char            startxlogfilename[MAXFNAMELEN];
6611         char            stopxlogfilename[MAXFNAMELEN];
6612         uint32          _logId;
6613         uint32          _logSeg;
6614         FILE       *lfp;
6615         FILE       *fp;
6616         char            ch;
6617         int                     ich;
6618         int                     seconds_before_warning;
6619         int                     waits = 0;
6620
6621         if (!superuser())
6622                 ereport(ERROR,
6623                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6624                                  (errmsg("must be superuser to run a backup"))));
6625
6626         /*
6627          * OK to clear forcePageWrites
6628          */
6629         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6630         XLogCtl->Insert.forcePageWrites = false;
6631         LWLockRelease(WALInsertLock);
6632
6633         /*
6634          * Force a switch to a new xlog segment file, so that the backup is valid
6635          * as soon as archiver moves out the current segment file. We'll report
6636          * the end address of the XLOG SWITCH record as the backup stopping point.
6637          */
6638         stoppoint = RequestXLogSwitch();
6639
6640         XLByteToSeg(stoppoint, _logId, _logSeg);
6641         XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
6642
6643         /* Use the log timezone here, not the session timezone */
6644         stamp_time = (pg_time_t) time(NULL);
6645         pg_strftime(strfbuf, sizeof(strfbuf),
6646                                 "%Y-%m-%d %H:%M:%S %Z",
6647                                 pg_localtime(&stamp_time, log_timezone));
6648
6649         /*
6650          * Open the existing label file
6651          */
6652         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6653         if (!lfp)
6654         {
6655                 if (errno != ENOENT)
6656                         ereport(ERROR,
6657                                         (errcode_for_file_access(),
6658                                          errmsg("could not read file \"%s\": %m",
6659                                                         BACKUP_LABEL_FILE)));
6660                 ereport(ERROR,
6661                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6662                                  errmsg("a backup is not in progress")));
6663         }
6664
6665         /*
6666          * Read and parse the START WAL LOCATION line (this code is pretty crude,
6667          * but we are not expecting any variability in the file format).
6668          */
6669         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %24s)%c",
6670                            &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
6671                            &ch) != 4 || ch != '\n')
6672                 ereport(ERROR,
6673                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6674                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6675
6676         /*
6677          * Write the backup history file
6678          */
6679         XLByteToSeg(startpoint, _logId, _logSeg);
6680         BackupHistoryFilePath(histfilepath, ThisTimeLineID, _logId, _logSeg,
6681                                                   startpoint.xrecoff % XLogSegSize);
6682         fp = AllocateFile(histfilepath, "w");
6683         if (!fp)
6684                 ereport(ERROR,
6685                                 (errcode_for_file_access(),
6686                                  errmsg("could not create file \"%s\": %m",
6687                                                 histfilepath)));
6688         fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
6689                         startpoint.xlogid, startpoint.xrecoff, startxlogfilename);
6690         fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
6691                         stoppoint.xlogid, stoppoint.xrecoff, stopxlogfilename);
6692         /* transfer remaining lines from label to history file */
6693         while ((ich = fgetc(lfp)) != EOF)
6694                 fputc(ich, fp);
6695         fprintf(fp, "STOP TIME: %s\n", strfbuf);
6696         if (fflush(fp) || ferror(fp) || FreeFile(fp))
6697                 ereport(ERROR,
6698                                 (errcode_for_file_access(),
6699                                  errmsg("could not write file \"%s\": %m",
6700                                                 histfilepath)));
6701
6702         /*
6703          * Close and remove the backup label file
6704          */
6705         if (ferror(lfp) || FreeFile(lfp))
6706                 ereport(ERROR,
6707                                 (errcode_for_file_access(),
6708                                  errmsg("could not read file \"%s\": %m",
6709                                                 BACKUP_LABEL_FILE)));
6710         if (unlink(BACKUP_LABEL_FILE) != 0)
6711                 ereport(ERROR,
6712                                 (errcode_for_file_access(),
6713                                  errmsg("could not remove file \"%s\": %m",
6714                                                 BACKUP_LABEL_FILE)));
6715
6716         /*
6717          * Clean out any no-longer-needed history files.  As a side effect, this
6718          * will post a .ready file for the newly created history file, notifying
6719          * the archiver that history file may be archived immediately.
6720          */
6721         CleanupBackupHistory();
6722
6723         /*
6724          * Wait until the history file has been archived. We assume that the 
6725          * alphabetic sorting property of the WAL files ensures the last WAL
6726          * file is guaranteed archived by the time the history file is archived.
6727          *
6728          * We wait forever, since archive_command is supposed to work and
6729          * we assume the admin wanted his backup to work completely. If you 
6730          * don't wish to wait, you can SET statement_timeout = xx;
6731          *
6732          * If the status file is missing, we assume that is because it was
6733          * set to .ready before we slept, then while asleep it has been set
6734          * to .done and then removed by a concurrent checkpoint.
6735          */
6736         BackupHistoryFileName(histfilepath, ThisTimeLineID, _logId, _logSeg,
6737                                                   startpoint.xrecoff % XLogSegSize);
6738
6739         seconds_before_warning = 60;
6740         waits = 0;
6741
6742         while (!XLogArchiveCheckDone(histfilepath, false))
6743         {
6744                 CHECK_FOR_INTERRUPTS();
6745
6746                 pg_usleep(1000000L);
6747
6748                 if (++waits >= seconds_before_warning)
6749                 {
6750                         seconds_before_warning *= 2;     /* This wraps in >10 years... */
6751                         elog(WARNING, "pg_stop_backup() waiting for archive to complete " 
6752                                                         "(%d seconds delay)", waits);
6753                 }
6754         }
6755
6756         /*
6757          * We're done.  As a convenience, return the ending WAL location.
6758          */
6759         snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
6760                          stoppoint.xlogid, stoppoint.xrecoff);
6761         PG_RETURN_TEXT_P(cstring_to_text(stopxlogfilename));
6762 }
6763
6764 /*
6765  * pg_switch_xlog: switch to next xlog file
6766  */
6767 Datum
6768 pg_switch_xlog(PG_FUNCTION_ARGS)
6769 {
6770         XLogRecPtr      switchpoint;
6771         char            location[MAXFNAMELEN];
6772
6773         if (!superuser())
6774                 ereport(ERROR,
6775                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6776                          (errmsg("must be superuser to switch transaction log files"))));
6777
6778         switchpoint = RequestXLogSwitch();
6779
6780         /*
6781          * As a convenience, return the WAL location of the switch record
6782          */
6783         snprintf(location, sizeof(location), "%X/%X",
6784                          switchpoint.xlogid, switchpoint.xrecoff);
6785         PG_RETURN_TEXT_P(cstring_to_text(location));
6786 }
6787
6788 /*
6789  * Report the current WAL write location (same format as pg_start_backup etc)
6790  *
6791  * This is useful for determining how much of WAL is visible to an external
6792  * archiving process.  Note that the data before this point is written out
6793  * to the kernel, but is not necessarily synced to disk.
6794  */
6795 Datum
6796 pg_current_xlog_location(PG_FUNCTION_ARGS)
6797 {
6798         char            location[MAXFNAMELEN];
6799
6800         /* Make sure we have an up-to-date local LogwrtResult */
6801         {
6802                 /* use volatile pointer to prevent code rearrangement */
6803                 volatile XLogCtlData *xlogctl = XLogCtl;
6804
6805                 SpinLockAcquire(&xlogctl->info_lck);
6806                 LogwrtResult = xlogctl->LogwrtResult;
6807                 SpinLockRelease(&xlogctl->info_lck);
6808         }
6809
6810         snprintf(location, sizeof(location), "%X/%X",
6811                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff);
6812         PG_RETURN_TEXT_P(cstring_to_text(location));
6813 }
6814
6815 /*
6816  * Report the current WAL insert location (same format as pg_start_backup etc)
6817  *
6818  * This function is mostly for debugging purposes.
6819  */
6820 Datum
6821 pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
6822 {
6823         XLogCtlInsert *Insert = &XLogCtl->Insert;
6824         XLogRecPtr      current_recptr;
6825         char            location[MAXFNAMELEN];
6826
6827         /*
6828          * Get the current end-of-WAL position ... shared lock is sufficient
6829          */
6830         LWLockAcquire(WALInsertLock, LW_SHARED);
6831         INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
6832         LWLockRelease(WALInsertLock);
6833
6834         snprintf(location, sizeof(location), "%X/%X",
6835                          current_recptr.xlogid, current_recptr.xrecoff);
6836         PG_RETURN_TEXT_P(cstring_to_text(location));
6837 }
6838
6839 /*
6840  * Compute an xlog file name and decimal byte offset given a WAL location,
6841  * such as is returned by pg_stop_backup() or pg_xlog_switch().
6842  *
6843  * Note that a location exactly at a segment boundary is taken to be in
6844  * the previous segment.  This is usually the right thing, since the
6845  * expected usage is to determine which xlog file(s) are ready to archive.
6846  */
6847 Datum
6848 pg_xlogfile_name_offset(PG_FUNCTION_ARGS)
6849 {
6850         text       *location = PG_GETARG_TEXT_P(0);
6851         char       *locationstr;
6852         unsigned int uxlogid;
6853         unsigned int uxrecoff;
6854         uint32          xlogid;
6855         uint32          xlogseg;
6856         uint32          xrecoff;
6857         XLogRecPtr      locationpoint;
6858         char            xlogfilename[MAXFNAMELEN];
6859         Datum           values[2];
6860         bool            isnull[2];
6861         TupleDesc       resultTupleDesc;
6862         HeapTuple       resultHeapTuple;
6863         Datum           result;
6864
6865         /*
6866          * Read input and parse
6867          */
6868         locationstr = text_to_cstring(location);
6869
6870         if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
6871                 ereport(ERROR,
6872                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
6873                                  errmsg("could not parse transaction log location \"%s\"",
6874                                                 locationstr)));
6875
6876         locationpoint.xlogid = uxlogid;
6877         locationpoint.xrecoff = uxrecoff;
6878
6879         /*
6880          * Construct a tuple descriptor for the result row.  This must match this
6881          * function's pg_proc entry!
6882          */
6883         resultTupleDesc = CreateTemplateTupleDesc(2, false);
6884         TupleDescInitEntry(resultTupleDesc, (AttrNumber) 1, "file_name",
6885                                            TEXTOID, -1, 0);
6886         TupleDescInitEntry(resultTupleDesc, (AttrNumber) 2, "file_offset",
6887                                            INT4OID, -1, 0);
6888
6889         resultTupleDesc = BlessTupleDesc(resultTupleDesc);
6890
6891         /*
6892          * xlogfilename
6893          */
6894         XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
6895         XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
6896
6897         values[0] = CStringGetTextDatum(xlogfilename);
6898         isnull[0] = false;
6899
6900         /*
6901          * offset
6902          */
6903         xrecoff = locationpoint.xrecoff - xlogseg * XLogSegSize;
6904
6905         values[1] = UInt32GetDatum(xrecoff);
6906         isnull[1] = false;
6907
6908         /*
6909          * Tuple jam: Having first prepared your Datums, then squash together
6910          */
6911         resultHeapTuple = heap_form_tuple(resultTupleDesc, values, isnull);
6912
6913         result = HeapTupleGetDatum(resultHeapTuple);
6914
6915         PG_RETURN_DATUM(result);
6916 }
6917
6918 /*
6919  * Compute an xlog file name given a WAL location,
6920  * such as is returned by pg_stop_backup() or pg_xlog_switch().
6921  */
6922 Datum
6923 pg_xlogfile_name(PG_FUNCTION_ARGS)
6924 {
6925         text       *location = PG_GETARG_TEXT_P(0);
6926         char       *locationstr;
6927         unsigned int uxlogid;
6928         unsigned int uxrecoff;
6929         uint32          xlogid;
6930         uint32          xlogseg;
6931         XLogRecPtr      locationpoint;
6932         char            xlogfilename[MAXFNAMELEN];
6933
6934         locationstr = text_to_cstring(location);
6935
6936         if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
6937                 ereport(ERROR,
6938                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
6939                                  errmsg("could not parse transaction log location \"%s\"",
6940                                                 locationstr)));
6941
6942         locationpoint.xlogid = uxlogid;
6943         locationpoint.xrecoff = uxrecoff;
6944
6945         XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
6946         XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
6947
6948         PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
6949 }
6950
6951 /*
6952  * read_backup_label: check to see if a backup_label file is present
6953  *
6954  * If we see a backup_label during recovery, we assume that we are recovering
6955  * from a backup dump file, and we therefore roll forward from the checkpoint
6956  * identified by the label file, NOT what pg_control says.      This avoids the
6957  * problem that pg_control might have been archived one or more checkpoints
6958  * later than the start of the dump, and so if we rely on it as the start
6959  * point, we will fail to restore a consistent database state.
6960  *
6961  * We also attempt to retrieve the corresponding backup history file.
6962  * If successful, set *minRecoveryLoc to constrain valid PITR stopping
6963  * points.
6964  *
6965  * Returns TRUE if a backup_label was found (and fills the checkpoint
6966  * location into *checkPointLoc); returns FALSE if not.
6967  */
6968 static bool
6969 read_backup_label(XLogRecPtr *checkPointLoc, XLogRecPtr *minRecoveryLoc)
6970 {
6971         XLogRecPtr      startpoint;
6972         XLogRecPtr      stoppoint;
6973         char            histfilename[MAXFNAMELEN];
6974         char            histfilepath[MAXPGPATH];
6975         char            startxlogfilename[MAXFNAMELEN];
6976         char            stopxlogfilename[MAXFNAMELEN];
6977         TimeLineID      tli;
6978         uint32          _logId;
6979         uint32          _logSeg;
6980         FILE       *lfp;
6981         FILE       *fp;
6982         char            ch;
6983
6984         /* Default is to not constrain recovery stop point */
6985         minRecoveryLoc->xlogid = 0;
6986         minRecoveryLoc->xrecoff = 0;
6987
6988         /*
6989          * See if label file is present
6990          */
6991         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6992         if (!lfp)
6993         {
6994                 if (errno != ENOENT)
6995                         ereport(FATAL,
6996                                         (errcode_for_file_access(),
6997                                          errmsg("could not read file \"%s\": %m",
6998                                                         BACKUP_LABEL_FILE)));
6999                 return false;                   /* it's not there, all is fine */
7000         }
7001
7002         /*
7003          * Read and parse the START WAL LOCATION and CHECKPOINT lines (this code
7004          * is pretty crude, but we are not expecting any variability in the file
7005          * format).
7006          */
7007         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
7008                            &startpoint.xlogid, &startpoint.xrecoff, &tli,
7009                            startxlogfilename, &ch) != 5 || ch != '\n')
7010                 ereport(FATAL,
7011                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
7012                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
7013         if (fscanf(lfp, "CHECKPOINT LOCATION: %X/%X%c",
7014                            &checkPointLoc->xlogid, &checkPointLoc->xrecoff,
7015                            &ch) != 3 || ch != '\n')
7016                 ereport(FATAL,
7017                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
7018                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
7019         if (ferror(lfp) || FreeFile(lfp))
7020                 ereport(FATAL,
7021                                 (errcode_for_file_access(),
7022                                  errmsg("could not read file \"%s\": %m",
7023                                                 BACKUP_LABEL_FILE)));
7024
7025         /*
7026          * Try to retrieve the backup history file (no error if we can't)
7027          */
7028         XLByteToSeg(startpoint, _logId, _logSeg);
7029         BackupHistoryFileName(histfilename, tli, _logId, _logSeg,
7030                                                   startpoint.xrecoff % XLogSegSize);
7031
7032         if (InArchiveRecovery)
7033                 RestoreArchivedFile(histfilepath, histfilename, "RECOVERYHISTORY", 0);
7034         else
7035                 BackupHistoryFilePath(histfilepath, tli, _logId, _logSeg,
7036                                                           startpoint.xrecoff % XLogSegSize);
7037
7038         fp = AllocateFile(histfilepath, "r");
7039         if (fp)
7040         {
7041                 /*
7042                  * Parse history file to identify stop point.
7043                  */
7044                 if (fscanf(fp, "START WAL LOCATION: %X/%X (file %24s)%c",
7045                                    &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
7046                                    &ch) != 4 || ch != '\n')
7047                         ereport(FATAL,
7048                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
7049                                          errmsg("invalid data in file \"%s\"", histfilename)));
7050                 if (fscanf(fp, "STOP WAL LOCATION: %X/%X (file %24s)%c",
7051                                    &stoppoint.xlogid, &stoppoint.xrecoff, stopxlogfilename,
7052                                    &ch) != 4 || ch != '\n')
7053                         ereport(FATAL,
7054                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
7055                                          errmsg("invalid data in file \"%s\"", histfilename)));
7056                 *minRecoveryLoc = stoppoint;
7057                 if (ferror(fp) || FreeFile(fp))
7058                         ereport(FATAL,
7059                                         (errcode_for_file_access(),
7060                                          errmsg("could not read file \"%s\": %m",
7061                                                         histfilepath)));
7062         }
7063
7064         return true;
7065 }
7066
7067 /*
7068  * Error context callback for errors occurring during rm_redo().
7069  */
7070 static void
7071 rm_redo_error_callback(void *arg)
7072 {
7073         XLogRecord *record = (XLogRecord *) arg;
7074         StringInfoData buf;
7075
7076         initStringInfo(&buf);
7077         RmgrTable[record->xl_rmid].rm_desc(&buf,
7078                                                                            record->xl_info,
7079                                                                            XLogRecGetData(record));
7080
7081         /* don't bother emitting empty description */
7082         if (buf.len > 0)
7083                 errcontext("xlog redo %s", buf.data);
7084
7085         pfree(buf.data);
7086 }
7087
7088 /*
7089  * BackupInProgress: check if online backup mode is active
7090  *
7091  * This is done by checking for existence of the "backup_label" file.
7092  */
7093 bool
7094 BackupInProgress(void)
7095 {
7096         struct stat stat_buf;
7097
7098         return (stat(BACKUP_LABEL_FILE, &stat_buf) == 0);
7099 }
7100
7101 /*
7102  * CancelBackup: rename the "backup_label" file to cancel backup mode
7103  *
7104  * If the "backup_label" file exists, it will be renamed to "backup_label.old".
7105  * Note that this will render an online backup in progress useless.
7106  * To correctly finish an online backup, pg_stop_backup must be called.
7107  */
7108 void
7109 CancelBackup(void)
7110 {
7111         struct stat stat_buf;
7112
7113         /* if the file is not there, return */
7114         if (stat(BACKUP_LABEL_FILE, &stat_buf) < 0)
7115                 return;
7116
7117         /* remove leftover file from previously cancelled backup if it exists */
7118         unlink(BACKUP_LABEL_OLD);
7119
7120         if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) == 0)
7121         {
7122                 ereport(LOG,
7123                                 (errmsg("online backup mode cancelled"),
7124                                  errdetail("\"%s\" was renamed to \"%s\".",
7125                                                 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
7126         }
7127         else
7128         {
7129                 ereport(WARNING,
7130                                 (errcode_for_file_access(),
7131                                  errmsg("online backup mode was not cancelled"),
7132                                  errdetail("Could not rename \"%s\" to \"%s\": %m.",
7133                                                 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
7134         }
7135 }
7136