OSDN Git Service

Refactor XLogOpenRelation() and XLogReadBuffer() in preparation for relation
[pg-rex/syncrep.git] / src / backend / access / transam / xlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  *              PostgreSQL transaction log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.314 2008/06/12 09:12:30 heikki Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include <ctype.h>
18 #include <signal.h>
19 #include <time.h>
20 #include <sys/stat.h>
21 #include <sys/time.h>
22 #include <sys/wait.h>
23 #include <unistd.h>
24
25 #include "access/clog.h"
26 #include "access/multixact.h"
27 #include "access/subtrans.h"
28 #include "access/transam.h"
29 #include "access/tuptoaster.h"
30 #include "access/twophase.h"
31 #include "access/xact.h"
32 #include "access/xlog_internal.h"
33 #include "access/xlogutils.h"
34 #include "catalog/catversion.h"
35 #include "catalog/pg_control.h"
36 #include "catalog/pg_type.h"
37 #include "funcapi.h"
38 #include "miscadmin.h"
39 #include "pgstat.h"
40 #include "postmaster/bgwriter.h"
41 #include "storage/bufmgr.h"
42 #include "storage/fd.h"
43 #include "storage/ipc.h"
44 #include "storage/pmsignal.h"
45 #include "storage/procarray.h"
46 #include "storage/smgr.h"
47 #include "storage/spin.h"
48 #include "utils/builtins.h"
49 #include "utils/pg_locale.h"
50 #include "utils/ps_status.h"
51
52
53 /* File path names (all relative to $PGDATA) */
54 #define BACKUP_LABEL_FILE               "backup_label"
55 #define BACKUP_LABEL_OLD                "backup_label.old"
56 #define RECOVERY_COMMAND_FILE   "recovery.conf"
57 #define RECOVERY_COMMAND_DONE   "recovery.done"
58
59
60 /* User-settable parameters */
61 int                     CheckPointSegments = 3;
62 int                     XLOGbuffers = 8;
63 int                     XLogArchiveTimeout = 0;
64 bool            XLogArchiveMode = false;
65 char       *XLogArchiveCommand = NULL;
66 bool            fullPageWrites = true;
67 bool            log_checkpoints = false;
68 int             sync_method = DEFAULT_SYNC_METHOD;
69
70 #ifdef WAL_DEBUG
71 bool            XLOG_DEBUG = false;
72 #endif
73
74 /*
75  * XLOGfileslop is the maximum number of preallocated future XLOG segments.
76  * When we are done with an old XLOG segment file, we will recycle it as a
77  * future XLOG segment as long as there aren't already XLOGfileslop future
78  * segments; else we'll delete it.  This could be made a separate GUC
79  * variable, but at present I think it's sufficient to hardwire it as
80  * 2*CheckPointSegments+1.      Under normal conditions, a checkpoint will free
81  * no more than 2*CheckPointSegments log segments, and we want to recycle all
82  * of them; the +1 allows boundary cases to happen without wasting a
83  * delete/create-segment cycle.
84  */
85 #define XLOGfileslop    (2*CheckPointSegments + 1)
86
87 /*
88  * GUC support
89  */
90 const struct config_enum_entry sync_method_options[] = {
91         {"fsync", SYNC_METHOD_FSYNC, false},
92 #ifdef HAVE_FSYNC_WRITETHROUGH
93         {"fsync_writethrough", SYNC_METHOD_FSYNC_WRITETHROUGH, false},
94 #endif
95 #ifdef HAVE_FDATASYNC
96         {"fdatasync", SYNC_METHOD_FDATASYNC, false},
97 #endif
98 #ifdef OPEN_SYNC_FLAG
99         {"open_sync", SYNC_METHOD_OPEN, false},
100 #endif
101 #ifdef OPEN_DATASYNC_FLAG
102         {"open_datasync", SYNC_METHOD_OPEN_DSYNC, false},
103 #endif
104         {NULL, 0, false}
105 };
106
107 /*
108  * Statistics for current checkpoint are collected in this global struct.
109  * Because only the background writer or a stand-alone backend can perform
110  * checkpoints, this will be unused in normal backends.
111  */
112 CheckpointStatsData CheckpointStats;
113
114 /*
115  * ThisTimeLineID will be same in all backends --- it identifies current
116  * WAL timeline for the database system.
117  */
118 TimeLineID      ThisTimeLineID = 0;
119
120 /* Are we doing recovery from XLOG? */
121 bool            InRecovery = false;
122
123 /* Are we recovering using offline XLOG archives? */
124 static bool InArchiveRecovery = false;
125
126 /* Was the last xlog file restored from archive, or local? */
127 static bool restoredFromArchive = false;
128
129 /* options taken from recovery.conf */
130 static char *recoveryRestoreCommand = NULL;
131 static bool recoveryTarget = false;
132 static bool recoveryTargetExact = false;
133 static bool recoveryTargetInclusive = true;
134 static bool recoveryLogRestartpoints = false;
135 static TransactionId recoveryTargetXid;
136 static TimestampTz recoveryTargetTime;
137 static TimestampTz recoveryLastXTime = 0;
138
139 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
140 static TransactionId recoveryStopXid;
141 static TimestampTz recoveryStopTime;
142 static bool recoveryStopAfter;
143
144 /*
145  * During normal operation, the only timeline we care about is ThisTimeLineID.
146  * During recovery, however, things are more complicated.  To simplify life
147  * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
148  * scan through the WAL history (that is, it is the line that was active when
149  * the currently-scanned WAL record was generated).  We also need these
150  * timeline values:
151  *
152  * recoveryTargetTLI: the desired timeline that we want to end in.
153  *
154  * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
155  * its known parents, newest first (so recoveryTargetTLI is always the
156  * first list member).  Only these TLIs are expected to be seen in the WAL
157  * segments we read, and indeed only these TLIs will be considered as
158  * candidate WAL files to open at all.
159  *
160  * curFileTLI: the TLI appearing in the name of the current input WAL file.
161  * (This is not necessarily the same as ThisTimeLineID, because we could
162  * be scanning data that was copied from an ancestor timeline when the current
163  * file was created.)  During a sequential scan we do not allow this value
164  * to decrease.
165  */
166 static TimeLineID recoveryTargetTLI;
167 static List *expectedTLIs;
168 static TimeLineID curFileTLI;
169
170 /*
171  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
172  * current backend.  It is updated for all inserts.  XactLastRecEnd points to
173  * end+1 of the last record, and is reset when we end a top-level transaction,
174  * or start a new one; so it can be used to tell if the current transaction has
175  * created any XLOG records.
176  */
177 static XLogRecPtr ProcLastRecPtr = {0, 0};
178
179 XLogRecPtr      XactLastRecEnd = {0, 0};
180
181 /*
182  * RedoRecPtr is this backend's local copy of the REDO record pointer
183  * (which is almost but not quite the same as a pointer to the most recent
184  * CHECKPOINT record).  We update this from the shared-memory copy,
185  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
186  * hold the Insert lock).  See XLogInsert for details.  We are also allowed
187  * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
188  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
189  * InitXLOGAccess.
190  */
191 static XLogRecPtr RedoRecPtr;
192
193 /*----------
194  * Shared-memory data structures for XLOG control
195  *
196  * LogwrtRqst indicates a byte position that we need to write and/or fsync
197  * the log up to (all records before that point must be written or fsynced).
198  * LogwrtResult indicates the byte positions we have already written/fsynced.
199  * These structs are identical but are declared separately to indicate their
200  * slightly different functions.
201  *
202  * We do a lot of pushups to minimize the amount of access to lockable
203  * shared memory values.  There are actually three shared-memory copies of
204  * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
205  *              XLogCtl->LogwrtResult is protected by info_lck
206  *              XLogCtl->Write.LogwrtResult is protected by WALWriteLock
207  *              XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
208  * One must hold the associated lock to read or write any of these, but
209  * of course no lock is needed to read/write the unshared LogwrtResult.
210  *
211  * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
212  * right", since both are updated by a write or flush operation before
213  * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
214  * is that it can be examined/modified by code that already holds WALWriteLock
215  * without needing to grab info_lck as well.
216  *
217  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
218  * but is updated when convenient.      Again, it exists for the convenience of
219  * code that is already holding WALInsertLock but not the other locks.
220  *
221  * The unshared LogwrtResult may lag behind any or all of these, and again
222  * is updated when convenient.
223  *
224  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
225  * (protected by info_lck), but we don't need to cache any copies of it.
226  *
227  * Note that this all works because the request and result positions can only
228  * advance forward, never back up, and so we can easily determine which of two
229  * values is "more up to date".
230  *
231  * info_lck is only held long enough to read/update the protected variables,
232  * so it's a plain spinlock.  The other locks are held longer (potentially
233  * over I/O operations), so we use LWLocks for them.  These locks are:
234  *
235  * WALInsertLock: must be held to insert a record into the WAL buffers.
236  *
237  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
238  * XLogFlush).
239  *
240  * ControlFileLock: must be held to read/update control file or create
241  * new log file.
242  *
243  * CheckpointLock: must be held to do a checkpoint (ensures only one
244  * checkpointer at a time; currently, with all checkpoints done by the
245  * bgwriter, this is just pro forma).
246  *
247  *----------
248  */
249
250 typedef struct XLogwrtRqst
251 {
252         XLogRecPtr      Write;                  /* last byte + 1 to write out */
253         XLogRecPtr      Flush;                  /* last byte + 1 to flush */
254 } XLogwrtRqst;
255
256 typedef struct XLogwrtResult
257 {
258         XLogRecPtr      Write;                  /* last byte + 1 written out */
259         XLogRecPtr      Flush;                  /* last byte + 1 flushed */
260 } XLogwrtResult;
261
262 /*
263  * Shared state data for XLogInsert.
264  */
265 typedef struct XLogCtlInsert
266 {
267         XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
268         XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
269         int                     curridx;                /* current block index in cache */
270         XLogPageHeader currpage;        /* points to header of block in cache */
271         char       *currpos;            /* current insertion point in cache */
272         XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
273         bool            forcePageWrites;        /* forcing full-page writes for PITR? */
274 } XLogCtlInsert;
275
276 /*
277  * Shared state data for XLogWrite/XLogFlush.
278  */
279 typedef struct XLogCtlWrite
280 {
281         XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
282         int                     curridx;                /* cache index of next block to write */
283         pg_time_t       lastSegSwitchTime;              /* time of last xlog segment switch */
284 } XLogCtlWrite;
285
286 /*
287  * Total shared-memory state for XLOG.
288  */
289 typedef struct XLogCtlData
290 {
291         /* Protected by WALInsertLock: */
292         XLogCtlInsert Insert;
293
294         /* Protected by info_lck: */
295         XLogwrtRqst LogwrtRqst;
296         XLogwrtResult LogwrtResult;
297         uint32          ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
298         TransactionId ckptXid;
299         XLogRecPtr      asyncCommitLSN; /* LSN of newest async commit */
300
301         /* Protected by WALWriteLock: */
302         XLogCtlWrite Write;
303
304         /*
305          * These values do not change after startup, although the pointed-to pages
306          * and xlblocks values certainly do.  Permission to read/write the pages
307          * and xlblocks values depends on WALInsertLock and WALWriteLock.
308          */
309         char       *pages;                      /* buffers for unwritten XLOG pages */
310         XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
311         int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
312         TimeLineID      ThisTimeLineID;
313
314         slock_t         info_lck;               /* locks shared variables shown above */
315 } XLogCtlData;
316
317 static XLogCtlData *XLogCtl = NULL;
318
319 /*
320  * We maintain an image of pg_control in shared memory.
321  */
322 static ControlFileData *ControlFile = NULL;
323
324 /*
325  * Macros for managing XLogInsert state.  In most cases, the calling routine
326  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
327  * so these are passed as parameters instead of being fetched via XLogCtl.
328  */
329
330 /* Free space remaining in the current xlog page buffer */
331 #define INSERT_FREESPACE(Insert)  \
332         (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
333
334 /* Construct XLogRecPtr value for current insertion point */
335 #define INSERT_RECPTR(recptr,Insert,curridx)  \
336         ( \
337           (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
338           (recptr).xrecoff = \
339                 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
340         )
341
342 #define PrevBufIdx(idx)         \
343                 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
344
345 #define NextBufIdx(idx)         \
346                 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
347
348 /*
349  * Private, possibly out-of-date copy of shared LogwrtResult.
350  * See discussion above.
351  */
352 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
353
354 /*
355  * openLogFile is -1 or a kernel FD for an open log file segment.
356  * When it's open, openLogOff is the current seek offset in the file.
357  * openLogId/openLogSeg identify the segment.  These variables are only
358  * used to write the XLOG, and so will normally refer to the active segment.
359  */
360 static int      openLogFile = -1;
361 static uint32 openLogId = 0;
362 static uint32 openLogSeg = 0;
363 static uint32 openLogOff = 0;
364
365 /*
366  * These variables are used similarly to the ones above, but for reading
367  * the XLOG.  Note, however, that readOff generally represents the offset
368  * of the page just read, not the seek position of the FD itself, which
369  * will be just past that page.
370  */
371 static int      readFile = -1;
372 static uint32 readId = 0;
373 static uint32 readSeg = 0;
374 static uint32 readOff = 0;
375
376 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
377 static char *readBuf = NULL;
378
379 /* Buffer for current ReadRecord result (expandable) */
380 static char *readRecordBuf = NULL;
381 static uint32 readRecordBufSize = 0;
382
383 /* State information for XLOG reading */
384 static XLogRecPtr ReadRecPtr;   /* start of last record read */
385 static XLogRecPtr EndRecPtr;    /* end+1 of last record read */
386 static XLogRecord *nextRecord = NULL;
387 static TimeLineID lastPageTLI = 0;
388
389 static bool InRedo = false;
390
391
392 static void XLogArchiveNotify(const char *xlog);
393 static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
394 static bool XLogArchiveCheckDone(const char *xlog, bool create_if_missing);
395 static void XLogArchiveCleanup(const char *xlog);
396 static void readRecoveryCommandFile(void);
397 static void exitArchiveRecovery(TimeLineID endTLI,
398                                         uint32 endLogId, uint32 endLogSeg);
399 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
400 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
401
402 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
403                                 XLogRecPtr *lsn, BkpBlock *bkpb);
404 static bool AdvanceXLInsertBuffer(bool new_segment);
405 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
406 static int XLogFileInit(uint32 log, uint32 seg,
407                          bool *use_existent, bool use_lock);
408 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
409                                            bool find_free, int *max_advance,
410                                            bool use_lock);
411 static int      XLogFileOpen(uint32 log, uint32 seg);
412 static int      XLogFileRead(uint32 log, uint32 seg, int emode);
413 static void XLogFileClose(void);
414 static bool RestoreArchivedFile(char *path, const char *xlogfname,
415                                         const char *recovername, off_t expectedSize);
416 static void PreallocXlogFiles(XLogRecPtr endptr);
417 static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
418 static void CleanupBackupHistory(void);
419 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
420 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
421 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
422 static List *readTimeLineHistory(TimeLineID targetTLI);
423 static bool existsTimeLineHistory(TimeLineID probeTLI);
424 static TimeLineID findNewestTimeLine(TimeLineID startTLI);
425 static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
426                                          TimeLineID endTLI,
427                                          uint32 endLogId, uint32 endLogSeg);
428 static void WriteControlFile(void);
429 static void ReadControlFile(void);
430 static char *str_time(pg_time_t tnow);
431 #ifdef WAL_DEBUG
432 static void xlog_outrec(StringInfo buf, XLogRecord *record);
433 #endif
434 static void issue_xlog_fsync(void);
435 static void pg_start_backup_callback(int code, Datum arg);
436 static bool read_backup_label(XLogRecPtr *checkPointLoc,
437                                   XLogRecPtr *minRecoveryLoc);
438 static void rm_redo_error_callback(void *arg);
439 static int get_sync_bit(int method);
440
441
442 /*
443  * Insert an XLOG record having the specified RMID and info bytes,
444  * with the body of the record being the data chunk(s) described by
445  * the rdata chain (see xlog.h for notes about rdata).
446  *
447  * Returns XLOG pointer to end of record (beginning of next record).
448  * This can be used as LSN for data pages affected by the logged action.
449  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
450  * before the data page can be written out.  This implements the basic
451  * WAL rule "write the log before the data".)
452  *
453  * NB: this routine feels free to scribble on the XLogRecData structs,
454  * though not on the data they reference.  This is OK since the XLogRecData
455  * structs are always just temporaries in the calling code.
456  */
457 XLogRecPtr
458 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
459 {
460         XLogCtlInsert *Insert = &XLogCtl->Insert;
461         XLogRecord *record;
462         XLogContRecord *contrecord;
463         XLogRecPtr      RecPtr;
464         XLogRecPtr      WriteRqst;
465         uint32          freespace;
466         int                     curridx;
467         XLogRecData *rdt;
468         Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
469         bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
470         BkpBlock        dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
471         XLogRecPtr      dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
472         XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
473         XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
474         XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
475         pg_crc32        rdata_crc;
476         uint32          len,
477                                 write_len;
478         unsigned        i;
479         bool            updrqst;
480         bool            doPageWrites;
481         bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
482
483         /* info's high bits are reserved for use by me */
484         if (info & XLR_INFO_MASK)
485                 elog(PANIC, "invalid xlog info mask %02X", info);
486
487         /*
488          * In bootstrap mode, we don't actually log anything but XLOG resources;
489          * return a phony record pointer.
490          */
491         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
492         {
493                 RecPtr.xlogid = 0;
494                 RecPtr.xrecoff = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
495                 return RecPtr;
496         }
497
498         /*
499          * Here we scan the rdata chain, determine which buffers must be backed
500          * up, and compute the CRC values for the data.  Note that the record
501          * header isn't added into the CRC initially since we don't know the final
502          * length or info bits quite yet.  Thus, the CRC will represent the CRC of
503          * the whole record in the order "rdata, then backup blocks, then record
504          * header".
505          *
506          * We may have to loop back to here if a race condition is detected below.
507          * We could prevent the race by doing all this work while holding the
508          * insert lock, but it seems better to avoid doing CRC calculations while
509          * holding the lock.  This means we have to be careful about modifying the
510          * rdata chain until we know we aren't going to loop back again.  The only
511          * change we allow ourselves to make earlier is to set rdt->data = NULL in
512          * chain items we have decided we will have to back up the whole buffer
513          * for.  This is OK because we will certainly decide the same thing again
514          * for those items if we do it over; doing it here saves an extra pass
515          * over the chain later.
516          */
517 begin:;
518         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
519         {
520                 dtbuf[i] = InvalidBuffer;
521                 dtbuf_bkp[i] = false;
522         }
523
524         /*
525          * Decide if we need to do full-page writes in this XLOG record: true if
526          * full_page_writes is on or we have a PITR request for it.  Since we
527          * don't yet have the insert lock, forcePageWrites could change under us,
528          * but we'll recheck it once we have the lock.
529          */
530         doPageWrites = fullPageWrites || Insert->forcePageWrites;
531
532         INIT_CRC32(rdata_crc);
533         len = 0;
534         for (rdt = rdata;;)
535         {
536                 if (rdt->buffer == InvalidBuffer)
537                 {
538                         /* Simple data, just include it */
539                         len += rdt->len;
540                         COMP_CRC32(rdata_crc, rdt->data, rdt->len);
541                 }
542                 else
543                 {
544                         /* Find info for buffer */
545                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
546                         {
547                                 if (rdt->buffer == dtbuf[i])
548                                 {
549                                         /* Buffer already referenced by earlier chain item */
550                                         if (dtbuf_bkp[i])
551                                                 rdt->data = NULL;
552                                         else if (rdt->data)
553                                         {
554                                                 len += rdt->len;
555                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
556                                         }
557                                         break;
558                                 }
559                                 if (dtbuf[i] == InvalidBuffer)
560                                 {
561                                         /* OK, put it in this slot */
562                                         dtbuf[i] = rdt->buffer;
563                                         if (XLogCheckBuffer(rdt, doPageWrites,
564                                                                                 &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
565                                         {
566                                                 dtbuf_bkp[i] = true;
567                                                 rdt->data = NULL;
568                                         }
569                                         else if (rdt->data)
570                                         {
571                                                 len += rdt->len;
572                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
573                                         }
574                                         break;
575                                 }
576                         }
577                         if (i >= XLR_MAX_BKP_BLOCKS)
578                                 elog(PANIC, "can backup at most %d blocks per xlog record",
579                                          XLR_MAX_BKP_BLOCKS);
580                 }
581                 /* Break out of loop when rdt points to last chain item */
582                 if (rdt->next == NULL)
583                         break;
584                 rdt = rdt->next;
585         }
586
587         /*
588          * Now add the backup block headers and data into the CRC
589          */
590         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
591         {
592                 if (dtbuf_bkp[i])
593                 {
594                         BkpBlock   *bkpb = &(dtbuf_xlg[i]);
595                         char       *page;
596
597                         COMP_CRC32(rdata_crc,
598                                            (char *) bkpb,
599                                            sizeof(BkpBlock));
600                         page = (char *) BufferGetBlock(dtbuf[i]);
601                         if (bkpb->hole_length == 0)
602                         {
603                                 COMP_CRC32(rdata_crc,
604                                                    page,
605                                                    BLCKSZ);
606                         }
607                         else
608                         {
609                                 /* must skip the hole */
610                                 COMP_CRC32(rdata_crc,
611                                                    page,
612                                                    bkpb->hole_offset);
613                                 COMP_CRC32(rdata_crc,
614                                                    page + (bkpb->hole_offset + bkpb->hole_length),
615                                                    BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
616                         }
617                 }
618         }
619
620         /*
621          * NOTE: We disallow len == 0 because it provides a useful bit of extra
622          * error checking in ReadRecord.  This means that all callers of
623          * XLogInsert must supply at least some not-in-a-buffer data.  However, we
624          * make an exception for XLOG SWITCH records because we don't want them to
625          * ever cross a segment boundary.
626          */
627         if (len == 0 && !isLogSwitch)
628                 elog(PANIC, "invalid xlog record length %u", len);
629
630         START_CRIT_SECTION();
631
632         /* Now wait to get insert lock */
633         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
634
635         /*
636          * Check to see if my RedoRecPtr is out of date.  If so, may have to go
637          * back and recompute everything.  This can only happen just after a
638          * checkpoint, so it's better to be slow in this case and fast otherwise.
639          *
640          * If we aren't doing full-page writes then RedoRecPtr doesn't actually
641          * affect the contents of the XLOG record, so we'll update our local copy
642          * but not force a recomputation.
643          */
644         if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
645         {
646                 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
647                 RedoRecPtr = Insert->RedoRecPtr;
648
649                 if (doPageWrites)
650                 {
651                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
652                         {
653                                 if (dtbuf[i] == InvalidBuffer)
654                                         continue;
655                                 if (dtbuf_bkp[i] == false &&
656                                         XLByteLE(dtbuf_lsn[i], RedoRecPtr))
657                                 {
658                                         /*
659                                          * Oops, this buffer now needs to be backed up, but we
660                                          * didn't think so above.  Start over.
661                                          */
662                                         LWLockRelease(WALInsertLock);
663                                         END_CRIT_SECTION();
664                                         goto begin;
665                                 }
666                         }
667                 }
668         }
669
670         /*
671          * Also check to see if forcePageWrites was just turned on; if we weren't
672          * already doing full-page writes then go back and recompute. (If it was
673          * just turned off, we could recompute the record without full pages, but
674          * we choose not to bother.)
675          */
676         if (Insert->forcePageWrites && !doPageWrites)
677         {
678                 /* Oops, must redo it with full-page data */
679                 LWLockRelease(WALInsertLock);
680                 END_CRIT_SECTION();
681                 goto begin;
682         }
683
684         /*
685          * Make additional rdata chain entries for the backup blocks, so that we
686          * don't need to special-case them in the write loop.  Note that we have
687          * now irrevocably changed the input rdata chain.  At the exit of this
688          * loop, write_len includes the backup block data.
689          *
690          * Also set the appropriate info bits to show which buffers were backed
691          * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
692          * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
693          */
694         write_len = len;
695         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
696         {
697                 BkpBlock   *bkpb;
698                 char       *page;
699
700                 if (!dtbuf_bkp[i])
701                         continue;
702
703                 info |= XLR_SET_BKP_BLOCK(i);
704
705                 bkpb = &(dtbuf_xlg[i]);
706                 page = (char *) BufferGetBlock(dtbuf[i]);
707
708                 rdt->next = &(dtbuf_rdt1[i]);
709                 rdt = rdt->next;
710
711                 rdt->data = (char *) bkpb;
712                 rdt->len = sizeof(BkpBlock);
713                 write_len += sizeof(BkpBlock);
714
715                 rdt->next = &(dtbuf_rdt2[i]);
716                 rdt = rdt->next;
717
718                 if (bkpb->hole_length == 0)
719                 {
720                         rdt->data = page;
721                         rdt->len = BLCKSZ;
722                         write_len += BLCKSZ;
723                         rdt->next = NULL;
724                 }
725                 else
726                 {
727                         /* must skip the hole */
728                         rdt->data = page;
729                         rdt->len = bkpb->hole_offset;
730                         write_len += bkpb->hole_offset;
731
732                         rdt->next = &(dtbuf_rdt3[i]);
733                         rdt = rdt->next;
734
735                         rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
736                         rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
737                         write_len += rdt->len;
738                         rdt->next = NULL;
739                 }
740         }
741
742         /*
743          * If we backed up any full blocks and online backup is not in progress,
744          * mark the backup blocks as removable.  This allows the WAL archiver to
745          * know whether it is safe to compress archived WAL data by transforming
746          * full-block records into the non-full-block format.
747          *
748          * Note: we could just set the flag whenever !forcePageWrites, but
749          * defining it like this leaves the info bit free for some potential other
750          * use in records without any backup blocks.
751          */
752         if ((info & XLR_BKP_BLOCK_MASK) && !Insert->forcePageWrites)
753                 info |= XLR_BKP_REMOVABLE;
754
755         /*
756          * If there isn't enough space on the current XLOG page for a record
757          * header, advance to the next page (leaving the unused space as zeroes).
758          */
759         updrqst = false;
760         freespace = INSERT_FREESPACE(Insert);
761         if (freespace < SizeOfXLogRecord)
762         {
763                 updrqst = AdvanceXLInsertBuffer(false);
764                 freespace = INSERT_FREESPACE(Insert);
765         }
766
767         /* Compute record's XLOG location */
768         curridx = Insert->curridx;
769         INSERT_RECPTR(RecPtr, Insert, curridx);
770
771         /*
772          * If the record is an XLOG_SWITCH, and we are exactly at the start of a
773          * segment, we need not insert it (and don't want to because we'd like
774          * consecutive switch requests to be no-ops).  Instead, make sure
775          * everything is written and flushed through the end of the prior segment,
776          * and return the prior segment's end address.
777          */
778         if (isLogSwitch &&
779                 (RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
780         {
781                 /* We can release insert lock immediately */
782                 LWLockRelease(WALInsertLock);
783
784                 RecPtr.xrecoff -= SizeOfXLogLongPHD;
785                 if (RecPtr.xrecoff == 0)
786                 {
787                         /* crossing a logid boundary */
788                         RecPtr.xlogid -= 1;
789                         RecPtr.xrecoff = XLogFileSize;
790                 }
791
792                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
793                 LogwrtResult = XLogCtl->Write.LogwrtResult;
794                 if (!XLByteLE(RecPtr, LogwrtResult.Flush))
795                 {
796                         XLogwrtRqst FlushRqst;
797
798                         FlushRqst.Write = RecPtr;
799                         FlushRqst.Flush = RecPtr;
800                         XLogWrite(FlushRqst, false, false);
801                 }
802                 LWLockRelease(WALWriteLock);
803
804                 END_CRIT_SECTION();
805
806                 return RecPtr;
807         }
808
809         /* Insert record header */
810
811         record = (XLogRecord *) Insert->currpos;
812         record->xl_prev = Insert->PrevRecord;
813         record->xl_xid = GetCurrentTransactionIdIfAny();
814         record->xl_tot_len = SizeOfXLogRecord + write_len;
815         record->xl_len = len;           /* doesn't include backup blocks */
816         record->xl_info = info;
817         record->xl_rmid = rmid;
818
819         /* Now we can finish computing the record's CRC */
820         COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
821                            SizeOfXLogRecord - sizeof(pg_crc32));
822         FIN_CRC32(rdata_crc);
823         record->xl_crc = rdata_crc;
824
825 #ifdef WAL_DEBUG
826         if (XLOG_DEBUG)
827         {
828                 StringInfoData buf;
829
830                 initStringInfo(&buf);
831                 appendStringInfo(&buf, "INSERT @ %X/%X: ",
832                                                  RecPtr.xlogid, RecPtr.xrecoff);
833                 xlog_outrec(&buf, record);
834                 if (rdata->data != NULL)
835                 {
836                         appendStringInfo(&buf, " - ");
837                         RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
838                 }
839                 elog(LOG, "%s", buf.data);
840                 pfree(buf.data);
841         }
842 #endif
843
844         /* Record begin of record in appropriate places */
845         ProcLastRecPtr = RecPtr;
846         Insert->PrevRecord = RecPtr;
847
848         Insert->currpos += SizeOfXLogRecord;
849         freespace -= SizeOfXLogRecord;
850
851         /*
852          * Append the data, including backup blocks if any
853          */
854         while (write_len)
855         {
856                 while (rdata->data == NULL)
857                         rdata = rdata->next;
858
859                 if (freespace > 0)
860                 {
861                         if (rdata->len > freespace)
862                         {
863                                 memcpy(Insert->currpos, rdata->data, freespace);
864                                 rdata->data += freespace;
865                                 rdata->len -= freespace;
866                                 write_len -= freespace;
867                         }
868                         else
869                         {
870                                 memcpy(Insert->currpos, rdata->data, rdata->len);
871                                 freespace -= rdata->len;
872                                 write_len -= rdata->len;
873                                 Insert->currpos += rdata->len;
874                                 rdata = rdata->next;
875                                 continue;
876                         }
877                 }
878
879                 /* Use next buffer */
880                 updrqst = AdvanceXLInsertBuffer(false);
881                 curridx = Insert->curridx;
882                 /* Insert cont-record header */
883                 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
884                 contrecord = (XLogContRecord *) Insert->currpos;
885                 contrecord->xl_rem_len = write_len;
886                 Insert->currpos += SizeOfXLogContRecord;
887                 freespace = INSERT_FREESPACE(Insert);
888         }
889
890         /* Ensure next record will be properly aligned */
891         Insert->currpos = (char *) Insert->currpage +
892                 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
893         freespace = INSERT_FREESPACE(Insert);
894
895         /*
896          * The recptr I return is the beginning of the *next* record. This will be
897          * stored as LSN for changed data pages...
898          */
899         INSERT_RECPTR(RecPtr, Insert, curridx);
900
901         /*
902          * If the record is an XLOG_SWITCH, we must now write and flush all the
903          * existing data, and then forcibly advance to the start of the next
904          * segment.  It's not good to do this I/O while holding the insert lock,
905          * but there seems too much risk of confusion if we try to release the
906          * lock sooner.  Fortunately xlog switch needn't be a high-performance
907          * operation anyway...
908          */
909         if (isLogSwitch)
910         {
911                 XLogCtlWrite *Write = &XLogCtl->Write;
912                 XLogwrtRqst FlushRqst;
913                 XLogRecPtr      OldSegEnd;
914
915                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
916
917                 /*
918                  * Flush through the end of the page containing XLOG_SWITCH, and
919                  * perform end-of-segment actions (eg, notifying archiver).
920                  */
921                 WriteRqst = XLogCtl->xlblocks[curridx];
922                 FlushRqst.Write = WriteRqst;
923                 FlushRqst.Flush = WriteRqst;
924                 XLogWrite(FlushRqst, false, true);
925
926                 /* Set up the next buffer as first page of next segment */
927                 /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
928                 (void) AdvanceXLInsertBuffer(true);
929
930                 /* There should be no unwritten data */
931                 curridx = Insert->curridx;
932                 Assert(curridx == Write->curridx);
933
934                 /* Compute end address of old segment */
935                 OldSegEnd = XLogCtl->xlblocks[curridx];
936                 OldSegEnd.xrecoff -= XLOG_BLCKSZ;
937                 if (OldSegEnd.xrecoff == 0)
938                 {
939                         /* crossing a logid boundary */
940                         OldSegEnd.xlogid -= 1;
941                         OldSegEnd.xrecoff = XLogFileSize;
942                 }
943
944                 /* Make it look like we've written and synced all of old segment */
945                 LogwrtResult.Write = OldSegEnd;
946                 LogwrtResult.Flush = OldSegEnd;
947
948                 /*
949                  * Update shared-memory status --- this code should match XLogWrite
950                  */
951                 {
952                         /* use volatile pointer to prevent code rearrangement */
953                         volatile XLogCtlData *xlogctl = XLogCtl;
954
955                         SpinLockAcquire(&xlogctl->info_lck);
956                         xlogctl->LogwrtResult = LogwrtResult;
957                         if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
958                                 xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
959                         if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
960                                 xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
961                         SpinLockRelease(&xlogctl->info_lck);
962                 }
963
964                 Write->LogwrtResult = LogwrtResult;
965
966                 LWLockRelease(WALWriteLock);
967
968                 updrqst = false;                /* done already */
969         }
970         else
971         {
972                 /* normal case, ie not xlog switch */
973
974                 /* Need to update shared LogwrtRqst if some block was filled up */
975                 if (freespace < SizeOfXLogRecord)
976                 {
977                         /* curridx is filled and available for writing out */
978                         updrqst = true;
979                 }
980                 else
981                 {
982                         /* if updrqst already set, write through end of previous buf */
983                         curridx = PrevBufIdx(curridx);
984                 }
985                 WriteRqst = XLogCtl->xlblocks[curridx];
986         }
987
988         LWLockRelease(WALInsertLock);
989
990         if (updrqst)
991         {
992                 /* use volatile pointer to prevent code rearrangement */
993                 volatile XLogCtlData *xlogctl = XLogCtl;
994
995                 SpinLockAcquire(&xlogctl->info_lck);
996                 /* advance global request to include new block(s) */
997                 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
998                         xlogctl->LogwrtRqst.Write = WriteRqst;
999                 /* update local result copy while I have the chance */
1000                 LogwrtResult = xlogctl->LogwrtResult;
1001                 SpinLockRelease(&xlogctl->info_lck);
1002         }
1003
1004         XactLastRecEnd = RecPtr;
1005
1006         END_CRIT_SECTION();
1007
1008         return RecPtr;
1009 }
1010
1011 /*
1012  * Determine whether the buffer referenced by an XLogRecData item has to
1013  * be backed up, and if so fill a BkpBlock struct for it.  In any case
1014  * save the buffer's LSN at *lsn.
1015  */
1016 static bool
1017 XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
1018                                 XLogRecPtr *lsn, BkpBlock *bkpb)
1019 {
1020         PageHeader      page;
1021
1022         page = (PageHeader) BufferGetBlock(rdata->buffer);
1023
1024         /*
1025          * XXX We assume page LSN is first data on *every* page that can be passed
1026          * to XLogInsert, whether it otherwise has the standard page layout or
1027          * not.
1028          */
1029         *lsn = page->pd_lsn;
1030
1031         if (doPageWrites &&
1032                 XLByteLE(page->pd_lsn, RedoRecPtr))
1033         {
1034                 /*
1035                  * The page needs to be backed up, so set up *bkpb
1036                  */
1037                 bkpb->node = BufferGetFileNode(rdata->buffer);
1038                 bkpb->block = BufferGetBlockNumber(rdata->buffer);
1039
1040                 if (rdata->buffer_std)
1041                 {
1042                         /* Assume we can omit data between pd_lower and pd_upper */
1043                         uint16          lower = page->pd_lower;
1044                         uint16          upper = page->pd_upper;
1045
1046                         if (lower >= SizeOfPageHeaderData &&
1047                                 upper > lower &&
1048                                 upper <= BLCKSZ)
1049                         {
1050                                 bkpb->hole_offset = lower;
1051                                 bkpb->hole_length = upper - lower;
1052                         }
1053                         else
1054                         {
1055                                 /* No "hole" to compress out */
1056                                 bkpb->hole_offset = 0;
1057                                 bkpb->hole_length = 0;
1058                         }
1059                 }
1060                 else
1061                 {
1062                         /* Not a standard page header, don't try to eliminate "hole" */
1063                         bkpb->hole_offset = 0;
1064                         bkpb->hole_length = 0;
1065                 }
1066
1067                 return true;                    /* buffer requires backup */
1068         }
1069
1070         return false;                           /* buffer does not need to be backed up */
1071 }
1072
1073 /*
1074  * XLogArchiveNotify
1075  *
1076  * Create an archive notification file
1077  *
1078  * The name of the notification file is the message that will be picked up
1079  * by the archiver, e.g. we write 0000000100000001000000C6.ready
1080  * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
1081  * then when complete, rename it to 0000000100000001000000C6.done
1082  */
1083 static void
1084 XLogArchiveNotify(const char *xlog)
1085 {
1086         char            archiveStatusPath[MAXPGPATH];
1087         FILE       *fd;
1088
1089         /* insert an otherwise empty file called <XLOG>.ready */
1090         StatusFilePath(archiveStatusPath, xlog, ".ready");
1091         fd = AllocateFile(archiveStatusPath, "w");
1092         if (fd == NULL)
1093         {
1094                 ereport(LOG,
1095                                 (errcode_for_file_access(),
1096                                  errmsg("could not create archive status file \"%s\": %m",
1097                                                 archiveStatusPath)));
1098                 return;
1099         }
1100         if (FreeFile(fd))
1101         {
1102                 ereport(LOG,
1103                                 (errcode_for_file_access(),
1104                                  errmsg("could not write archive status file \"%s\": %m",
1105                                                 archiveStatusPath)));
1106                 return;
1107         }
1108
1109         /* Notify archiver that it's got something to do */
1110         if (IsUnderPostmaster)
1111                 SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);
1112 }
1113
1114 /*
1115  * Convenience routine to notify using log/seg representation of filename
1116  */
1117 static void
1118 XLogArchiveNotifySeg(uint32 log, uint32 seg)
1119 {
1120         char            xlog[MAXFNAMELEN];
1121
1122         XLogFileName(xlog, ThisTimeLineID, log, seg);
1123         XLogArchiveNotify(xlog);
1124 }
1125
1126 /*
1127  * XLogArchiveCheckDone
1128  *
1129  * This is called when we are ready to delete or recycle an old XLOG segment
1130  * file or backup history file.  If it is okay to delete it then return true.
1131  * If it is not time to delete it, make sure a .ready file exists, and return
1132  * false.
1133  *
1134  * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists,
1135  * then return false; else create <XLOG>.ready and return false.
1136  *
1137  * The reason we do things this way is so that if the original attempt to
1138  * create <XLOG>.ready fails, we'll retry during subsequent checkpoints.
1139  */
1140 static bool
1141 XLogArchiveCheckDone(const char *xlog, bool create_if_missing)
1142 {
1143         char            archiveStatusPath[MAXPGPATH];
1144         struct stat stat_buf;
1145
1146         /* Always deletable if archiving is off */
1147         if (!XLogArchivingActive())
1148                 return true;
1149
1150         /* First check for .done --- this means archiver is done with it */
1151         StatusFilePath(archiveStatusPath, xlog, ".done");
1152         if (stat(archiveStatusPath, &stat_buf) == 0)
1153                 return true;
1154
1155         /* check for .ready --- this means archiver is still busy with it */
1156         StatusFilePath(archiveStatusPath, xlog, ".ready");
1157         if (stat(archiveStatusPath, &stat_buf) == 0)
1158                 return false;
1159
1160         /* Race condition --- maybe archiver just finished, so recheck */
1161         StatusFilePath(archiveStatusPath, xlog, ".done");
1162         if (stat(archiveStatusPath, &stat_buf) == 0)
1163                 return true;
1164
1165         /* Retry creation of the .ready file */
1166         if (create_if_missing)
1167                 XLogArchiveNotify(xlog);
1168
1169         return false;
1170 }
1171
1172 /*
1173  * XLogArchiveCleanup
1174  *
1175  * Cleanup archive notification file(s) for a particular xlog segment
1176  */
1177 static void
1178 XLogArchiveCleanup(const char *xlog)
1179 {
1180         char            archiveStatusPath[MAXPGPATH];
1181
1182         /* Remove the .done file */
1183         StatusFilePath(archiveStatusPath, xlog, ".done");
1184         unlink(archiveStatusPath);
1185         /* should we complain about failure? */
1186
1187         /* Remove the .ready file if present --- normally it shouldn't be */
1188         StatusFilePath(archiveStatusPath, xlog, ".ready");
1189         unlink(archiveStatusPath);
1190         /* should we complain about failure? */
1191 }
1192
1193 /*
1194  * Advance the Insert state to the next buffer page, writing out the next
1195  * buffer if it still contains unwritten data.
1196  *
1197  * If new_segment is TRUE then we set up the next buffer page as the first
1198  * page of the next xlog segment file, possibly but not usually the next
1199  * consecutive file page.
1200  *
1201  * The global LogwrtRqst.Write pointer needs to be advanced to include the
1202  * just-filled page.  If we can do this for free (without an extra lock),
1203  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
1204  * request update still needs to be done, FALSE if we did it internally.
1205  *
1206  * Must be called with WALInsertLock held.
1207  */
1208 static bool
1209 AdvanceXLInsertBuffer(bool new_segment)
1210 {
1211         XLogCtlInsert *Insert = &XLogCtl->Insert;
1212         XLogCtlWrite *Write = &XLogCtl->Write;
1213         int                     nextidx = NextBufIdx(Insert->curridx);
1214         bool            update_needed = true;
1215         XLogRecPtr      OldPageRqstPtr;
1216         XLogwrtRqst WriteRqst;
1217         XLogRecPtr      NewPageEndPtr;
1218         XLogPageHeader NewPage;
1219
1220         /* Use Insert->LogwrtResult copy if it's more fresh */
1221         if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
1222                 LogwrtResult = Insert->LogwrtResult;
1223
1224         /*
1225          * Get ending-offset of the buffer page we need to replace (this may be
1226          * zero if the buffer hasn't been used yet).  Fall through if it's already
1227          * written out.
1228          */
1229         OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
1230         if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1231         {
1232                 /* nope, got work to do... */
1233                 XLogRecPtr      FinishedPageRqstPtr;
1234
1235                 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1236
1237                 /* Before waiting, get info_lck and update LogwrtResult */
1238                 {
1239                         /* use volatile pointer to prevent code rearrangement */
1240                         volatile XLogCtlData *xlogctl = XLogCtl;
1241
1242                         SpinLockAcquire(&xlogctl->info_lck);
1243                         if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
1244                                 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
1245                         LogwrtResult = xlogctl->LogwrtResult;
1246                         SpinLockRelease(&xlogctl->info_lck);
1247                 }
1248
1249                 update_needed = false;  /* Did the shared-request update */
1250
1251                 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1252                 {
1253                         /* OK, someone wrote it already */
1254                         Insert->LogwrtResult = LogwrtResult;
1255                 }
1256                 else
1257                 {
1258                         /* Must acquire write lock */
1259                         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1260                         LogwrtResult = Write->LogwrtResult;
1261                         if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1262                         {
1263                                 /* OK, someone wrote it already */
1264                                 LWLockRelease(WALWriteLock);
1265                                 Insert->LogwrtResult = LogwrtResult;
1266                         }
1267                         else
1268                         {
1269                                 /*
1270                                  * Have to write buffers while holding insert lock. This is
1271                                  * not good, so only write as much as we absolutely must.
1272                                  */
1273                                 WriteRqst.Write = OldPageRqstPtr;
1274                                 WriteRqst.Flush.xlogid = 0;
1275                                 WriteRqst.Flush.xrecoff = 0;
1276                                 XLogWrite(WriteRqst, false, false);
1277                                 LWLockRelease(WALWriteLock);
1278                                 Insert->LogwrtResult = LogwrtResult;
1279                         }
1280                 }
1281         }
1282
1283         /*
1284          * Now the next buffer slot is free and we can set it up to be the next
1285          * output page.
1286          */
1287         NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
1288
1289         if (new_segment)
1290         {
1291                 /* force it to a segment start point */
1292                 NewPageEndPtr.xrecoff += XLogSegSize - 1;
1293                 NewPageEndPtr.xrecoff -= NewPageEndPtr.xrecoff % XLogSegSize;
1294         }
1295
1296         if (NewPageEndPtr.xrecoff >= XLogFileSize)
1297         {
1298                 /* crossing a logid boundary */
1299                 NewPageEndPtr.xlogid += 1;
1300                 NewPageEndPtr.xrecoff = XLOG_BLCKSZ;
1301         }
1302         else
1303                 NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
1304         XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
1305         NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
1306
1307         Insert->curridx = nextidx;
1308         Insert->currpage = NewPage;
1309
1310         Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
1311
1312         /*
1313          * Be sure to re-zero the buffer so that bytes beyond what we've written
1314          * will look like zeroes and not valid XLOG records...
1315          */
1316         MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
1317
1318         /*
1319          * Fill the new page's header
1320          */
1321         NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
1322
1323         /* NewPage->xlp_info = 0; */    /* done by memset */
1324         NewPage   ->xlp_tli = ThisTimeLineID;
1325         NewPage   ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
1326         NewPage   ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - XLOG_BLCKSZ;
1327
1328         /*
1329          * If first page of an XLOG segment file, make it a long header.
1330          */
1331         if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
1332         {
1333                 XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
1334
1335                 NewLongPage->xlp_sysid = ControlFile->system_identifier;
1336                 NewLongPage->xlp_seg_size = XLogSegSize;
1337                 NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
1338                 NewPage   ->xlp_info |= XLP_LONG_HEADER;
1339
1340                 Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
1341         }
1342
1343         return update_needed;
1344 }
1345
1346 /*
1347  * Check whether we've consumed enough xlog space that a checkpoint is needed.
1348  *
1349  * Caller must have just finished filling the open log file (so that
1350  * openLogId/openLogSeg are valid).  We measure the distance from RedoRecPtr
1351  * to the open log file and see if that exceeds CheckPointSegments.
1352  *
1353  * Note: it is caller's responsibility that RedoRecPtr is up-to-date.
1354  */
1355 static bool
1356 XLogCheckpointNeeded(void)
1357 {
1358         /*
1359          * A straight computation of segment number could overflow 32 bits. Rather
1360          * than assuming we have working 64-bit arithmetic, we compare the
1361          * highest-order bits separately, and force a checkpoint immediately when
1362          * they change.
1363          */
1364         uint32          old_segno,
1365                                 new_segno;
1366         uint32          old_highbits,
1367                                 new_highbits;
1368
1369         old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
1370                 (RedoRecPtr.xrecoff / XLogSegSize);
1371         old_highbits = RedoRecPtr.xlogid / XLogSegSize;
1372         new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile + openLogSeg;
1373         new_highbits = openLogId / XLogSegSize;
1374         if (new_highbits != old_highbits ||
1375                 new_segno >= old_segno + (uint32) (CheckPointSegments - 1))
1376                 return true;
1377         return false;
1378 }
1379
1380 /*
1381  * Write and/or fsync the log at least as far as WriteRqst indicates.
1382  *
1383  * If flexible == TRUE, we don't have to write as far as WriteRqst, but
1384  * may stop at any convenient boundary (such as a cache or logfile boundary).
1385  * This option allows us to avoid uselessly issuing multiple writes when a
1386  * single one would do.
1387  *
1388  * If xlog_switch == TRUE, we are intending an xlog segment switch, so
1389  * perform end-of-segment actions after writing the last page, even if
1390  * it's not physically the end of its segment.  (NB: this will work properly
1391  * only if caller specifies WriteRqst == page-end and flexible == false,
1392  * and there is some data to write.)
1393  *
1394  * Must be called with WALWriteLock held.
1395  */
1396 static void
1397 XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
1398 {
1399         XLogCtlWrite *Write = &XLogCtl->Write;
1400         bool            ispartialpage;
1401         bool            last_iteration;
1402         bool            finishing_seg;
1403         bool            use_existent;
1404         int                     curridx;
1405         int                     npages;
1406         int                     startidx;
1407         uint32          startoffset;
1408
1409         /* We should always be inside a critical section here */
1410         Assert(CritSectionCount > 0);
1411
1412         /*
1413          * Update local LogwrtResult (caller probably did this already, but...)
1414          */
1415         LogwrtResult = Write->LogwrtResult;
1416
1417         /*
1418          * Since successive pages in the xlog cache are consecutively allocated,
1419          * we can usually gather multiple pages together and issue just one
1420          * write() call.  npages is the number of pages we have determined can be
1421          * written together; startidx is the cache block index of the first one,
1422          * and startoffset is the file offset at which it should go. The latter
1423          * two variables are only valid when npages > 0, but we must initialize
1424          * all of them to keep the compiler quiet.
1425          */
1426         npages = 0;
1427         startidx = 0;
1428         startoffset = 0;
1429
1430         /*
1431          * Within the loop, curridx is the cache block index of the page to
1432          * consider writing.  We advance Write->curridx only after successfully
1433          * writing pages.  (Right now, this refinement is useless since we are
1434          * going to PANIC if any error occurs anyway; but someday it may come in
1435          * useful.)
1436          */
1437         curridx = Write->curridx;
1438
1439         while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1440         {
1441                 /*
1442                  * Make sure we're not ahead of the insert process.  This could happen
1443                  * if we're passed a bogus WriteRqst.Write that is past the end of the
1444                  * last page that's been initialized by AdvanceXLInsertBuffer.
1445                  */
1446                 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
1447                         elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
1448                                  LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1449                                  XLogCtl->xlblocks[curridx].xlogid,
1450                                  XLogCtl->xlblocks[curridx].xrecoff);
1451
1452                 /* Advance LogwrtResult.Write to end of current buffer page */
1453                 LogwrtResult.Write = XLogCtl->xlblocks[curridx];
1454                 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1455
1456                 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1457                 {
1458                         /*
1459                          * Switch to new logfile segment.  We cannot have any pending
1460                          * pages here (since we dump what we have at segment end).
1461                          */
1462                         Assert(npages == 0);
1463                         if (openLogFile >= 0)
1464                                 XLogFileClose();
1465                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1466
1467                         /* create/use new log file */
1468                         use_existent = true;
1469                         openLogFile = XLogFileInit(openLogId, openLogSeg,
1470                                                                            &use_existent, true);
1471                         openLogOff = 0;
1472                 }
1473
1474                 /* Make sure we have the current logfile open */
1475                 if (openLogFile < 0)
1476                 {
1477                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1478                         openLogFile = XLogFileOpen(openLogId, openLogSeg);
1479                         openLogOff = 0;
1480                 }
1481
1482                 /* Add current page to the set of pending pages-to-dump */
1483                 if (npages == 0)
1484                 {
1485                         /* first of group */
1486                         startidx = curridx;
1487                         startoffset = (LogwrtResult.Write.xrecoff - XLOG_BLCKSZ) % XLogSegSize;
1488                 }
1489                 npages++;
1490
1491                 /*
1492                  * Dump the set if this will be the last loop iteration, or if we are
1493                  * at the last page of the cache area (since the next page won't be
1494                  * contiguous in memory), or if we are at the end of the logfile
1495                  * segment.
1496                  */
1497                 last_iteration = !XLByteLT(LogwrtResult.Write, WriteRqst.Write);
1498
1499                 finishing_seg = !ispartialpage &&
1500                         (startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
1501
1502                 if (last_iteration ||
1503                         curridx == XLogCtl->XLogCacheBlck ||
1504                         finishing_seg)
1505                 {
1506                         char       *from;
1507                         Size            nbytes;
1508
1509                         /* Need to seek in the file? */
1510                         if (openLogOff != startoffset)
1511                         {
1512                                 if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
1513                                         ereport(PANIC,
1514                                                         (errcode_for_file_access(),
1515                                                          errmsg("could not seek in log file %u, "
1516                                                                         "segment %u to offset %u: %m",
1517                                                                         openLogId, openLogSeg, startoffset)));
1518                                 openLogOff = startoffset;
1519                         }
1520
1521                         /* OK to write the page(s) */
1522                         from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
1523                         nbytes = npages * (Size) XLOG_BLCKSZ;
1524                         errno = 0;
1525                         if (write(openLogFile, from, nbytes) != nbytes)
1526                         {
1527                                 /* if write didn't set errno, assume no disk space */
1528                                 if (errno == 0)
1529                                         errno = ENOSPC;
1530                                 ereport(PANIC,
1531                                                 (errcode_for_file_access(),
1532                                                  errmsg("could not write to log file %u, segment %u "
1533                                                                 "at offset %u, length %lu: %m",
1534                                                                 openLogId, openLogSeg,
1535                                                                 openLogOff, (unsigned long) nbytes)));
1536                         }
1537
1538                         /* Update state for write */
1539                         openLogOff += nbytes;
1540                         Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
1541                         npages = 0;
1542
1543                         /*
1544                          * If we just wrote the whole last page of a logfile segment,
1545                          * fsync the segment immediately.  This avoids having to go back
1546                          * and re-open prior segments when an fsync request comes along
1547                          * later. Doing it here ensures that one and only one backend will
1548                          * perform this fsync.
1549                          *
1550                          * We also do this if this is the last page written for an xlog
1551                          * switch.
1552                          *
1553                          * This is also the right place to notify the Archiver that the
1554                          * segment is ready to copy to archival storage, and to update the
1555                          * timer for archive_timeout, and to signal for a checkpoint if
1556                          * too many logfile segments have been used since the last
1557                          * checkpoint.
1558                          */
1559                         if (finishing_seg || (xlog_switch && last_iteration))
1560                         {
1561                                 issue_xlog_fsync();
1562                                 LogwrtResult.Flush = LogwrtResult.Write;                /* end of page */
1563
1564                                 if (XLogArchivingActive())
1565                                         XLogArchiveNotifySeg(openLogId, openLogSeg);
1566
1567                                 Write->lastSegSwitchTime = (pg_time_t) time(NULL);
1568
1569                                 /*
1570                                  * Signal bgwriter to start a checkpoint if we've consumed too
1571                                  * much xlog since the last one.  For speed, we first check
1572                                  * using the local copy of RedoRecPtr, which might be out of
1573                                  * date; if it looks like a checkpoint is needed, forcibly
1574                                  * update RedoRecPtr and recheck.
1575                                  */
1576                                 if (IsUnderPostmaster &&
1577                                         XLogCheckpointNeeded())
1578                                 {
1579                                         (void) GetRedoRecPtr();
1580                                         if (XLogCheckpointNeeded())
1581                                                 RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
1582                                 }
1583                         }
1584                 }
1585
1586                 if (ispartialpage)
1587                 {
1588                         /* Only asked to write a partial page */
1589                         LogwrtResult.Write = WriteRqst.Write;
1590                         break;
1591                 }
1592                 curridx = NextBufIdx(curridx);
1593
1594                 /* If flexible, break out of loop as soon as we wrote something */
1595                 if (flexible && npages == 0)
1596                         break;
1597         }
1598
1599         Assert(npages == 0);
1600         Assert(curridx == Write->curridx);
1601
1602         /*
1603          * If asked to flush, do so
1604          */
1605         if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1606                 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1607         {
1608                 /*
1609                  * Could get here without iterating above loop, in which case we might
1610                  * have no open file or the wrong one.  However, we do not need to
1611                  * fsync more than one file.
1612                  */
1613                 if (sync_method != SYNC_METHOD_OPEN &&
1614                         sync_method != SYNC_METHOD_OPEN_DSYNC)
1615                 {
1616                         if (openLogFile >= 0 &&
1617                                 !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1618                                 XLogFileClose();
1619                         if (openLogFile < 0)
1620                         {
1621                                 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1622                                 openLogFile = XLogFileOpen(openLogId, openLogSeg);
1623                                 openLogOff = 0;
1624                         }
1625                         issue_xlog_fsync();
1626                 }
1627                 LogwrtResult.Flush = LogwrtResult.Write;
1628         }
1629
1630         /*
1631          * Update shared-memory status
1632          *
1633          * We make sure that the shared 'request' values do not fall behind the
1634          * 'result' values.  This is not absolutely essential, but it saves some
1635          * code in a couple of places.
1636          */
1637         {
1638                 /* use volatile pointer to prevent code rearrangement */
1639                 volatile XLogCtlData *xlogctl = XLogCtl;
1640
1641                 SpinLockAcquire(&xlogctl->info_lck);
1642                 xlogctl->LogwrtResult = LogwrtResult;
1643                 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1644                         xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1645                 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1646                         xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1647                 SpinLockRelease(&xlogctl->info_lck);
1648         }
1649
1650         Write->LogwrtResult = LogwrtResult;
1651 }
1652
1653 /*
1654  * Record the LSN for an asynchronous transaction commit.
1655  * (This should not be called for aborts, nor for synchronous commits.)
1656  */
1657 void
1658 XLogSetAsyncCommitLSN(XLogRecPtr asyncCommitLSN)
1659 {
1660         /* use volatile pointer to prevent code rearrangement */
1661         volatile XLogCtlData *xlogctl = XLogCtl;
1662
1663         SpinLockAcquire(&xlogctl->info_lck);
1664         if (XLByteLT(xlogctl->asyncCommitLSN, asyncCommitLSN))
1665                 xlogctl->asyncCommitLSN = asyncCommitLSN;
1666         SpinLockRelease(&xlogctl->info_lck);
1667 }
1668
1669 /*
1670  * Ensure that all XLOG data through the given position is flushed to disk.
1671  *
1672  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1673  * already held, and we try to avoid acquiring it if possible.
1674  */
1675 void
1676 XLogFlush(XLogRecPtr record)
1677 {
1678         XLogRecPtr      WriteRqstPtr;
1679         XLogwrtRqst WriteRqst;
1680
1681         /* Disabled during REDO */
1682         if (InRedo)
1683                 return;
1684
1685         /* Quick exit if already known flushed */
1686         if (XLByteLE(record, LogwrtResult.Flush))
1687                 return;
1688
1689 #ifdef WAL_DEBUG
1690         if (XLOG_DEBUG)
1691                 elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
1692                          record.xlogid, record.xrecoff,
1693                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1694                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1695 #endif
1696
1697         START_CRIT_SECTION();
1698
1699         /*
1700          * Since fsync is usually a horribly expensive operation, we try to
1701          * piggyback as much data as we can on each fsync: if we see any more data
1702          * entered into the xlog buffer, we'll write and fsync that too, so that
1703          * the final value of LogwrtResult.Flush is as large as possible. This
1704          * gives us some chance of avoiding another fsync immediately after.
1705          */
1706
1707         /* initialize to given target; may increase below */
1708         WriteRqstPtr = record;
1709
1710         /* read LogwrtResult and update local state */
1711         {
1712                 /* use volatile pointer to prevent code rearrangement */
1713                 volatile XLogCtlData *xlogctl = XLogCtl;
1714
1715                 SpinLockAcquire(&xlogctl->info_lck);
1716                 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1717                         WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1718                 LogwrtResult = xlogctl->LogwrtResult;
1719                 SpinLockRelease(&xlogctl->info_lck);
1720         }
1721
1722         /* done already? */
1723         if (!XLByteLE(record, LogwrtResult.Flush))
1724         {
1725                 /* now wait for the write lock */
1726                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1727                 LogwrtResult = XLogCtl->Write.LogwrtResult;
1728                 if (!XLByteLE(record, LogwrtResult.Flush))
1729                 {
1730                         /* try to write/flush later additions to XLOG as well */
1731                         if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
1732                         {
1733                                 XLogCtlInsert *Insert = &XLogCtl->Insert;
1734                                 uint32          freespace = INSERT_FREESPACE(Insert);
1735
1736                                 if (freespace < SizeOfXLogRecord)               /* buffer is full */
1737                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1738                                 else
1739                                 {
1740                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1741                                         WriteRqstPtr.xrecoff -= freespace;
1742                                 }
1743                                 LWLockRelease(WALInsertLock);
1744                                 WriteRqst.Write = WriteRqstPtr;
1745                                 WriteRqst.Flush = WriteRqstPtr;
1746                         }
1747                         else
1748                         {
1749                                 WriteRqst.Write = WriteRqstPtr;
1750                                 WriteRqst.Flush = record;
1751                         }
1752                         XLogWrite(WriteRqst, false, false);
1753                 }
1754                 LWLockRelease(WALWriteLock);
1755         }
1756
1757         END_CRIT_SECTION();
1758
1759         /*
1760          * If we still haven't flushed to the request point then we have a
1761          * problem; most likely, the requested flush point is past end of XLOG.
1762          * This has been seen to occur when a disk page has a corrupted LSN.
1763          *
1764          * Formerly we treated this as a PANIC condition, but that hurts the
1765          * system's robustness rather than helping it: we do not want to take down
1766          * the whole system due to corruption on one data page.  In particular, if
1767          * the bad page is encountered again during recovery then we would be
1768          * unable to restart the database at all!  (This scenario has actually
1769          * happened in the field several times with 7.1 releases. Note that we
1770          * cannot get here while InRedo is true, but if the bad page is brought in
1771          * and marked dirty during recovery then CreateCheckPoint will try to
1772          * flush it at the end of recovery.)
1773          *
1774          * The current approach is to ERROR under normal conditions, but only
1775          * WARNING during recovery, so that the system can be brought up even if
1776          * there's a corrupt LSN.  Note that for calls from xact.c, the ERROR will
1777          * be promoted to PANIC since xact.c calls this routine inside a critical
1778          * section.  However, calls from bufmgr.c are not within critical sections
1779          * and so we will not force a restart for a bad LSN on a data page.
1780          */
1781         if (XLByteLT(LogwrtResult.Flush, record))
1782                 elog(InRecovery ? WARNING : ERROR,
1783                 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
1784                          record.xlogid, record.xrecoff,
1785                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1786 }
1787
1788 /*
1789  * Flush xlog, but without specifying exactly where to flush to.
1790  *
1791  * We normally flush only completed blocks; but if there is nothing to do on
1792  * that basis, we check for unflushed async commits in the current incomplete
1793  * block, and flush through the latest one of those.  Thus, if async commits
1794  * are not being used, we will flush complete blocks only.      We can guarantee
1795  * that async commits reach disk after at most three cycles; normally only
1796  * one or two.  (We allow XLogWrite to write "flexibly", meaning it can stop
1797  * at the end of the buffer ring; this makes a difference only with very high
1798  * load or long wal_writer_delay, but imposes one extra cycle for the worst
1799  * case for async commits.)
1800  *
1801  * This routine is invoked periodically by the background walwriter process.
1802  */
1803 void
1804 XLogBackgroundFlush(void)
1805 {
1806         XLogRecPtr      WriteRqstPtr;
1807         bool            flexible = true;
1808
1809         /* read LogwrtResult and update local state */
1810         {
1811                 /* use volatile pointer to prevent code rearrangement */
1812                 volatile XLogCtlData *xlogctl = XLogCtl;
1813
1814                 SpinLockAcquire(&xlogctl->info_lck);
1815                 LogwrtResult = xlogctl->LogwrtResult;
1816                 WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1817                 SpinLockRelease(&xlogctl->info_lck);
1818         }
1819
1820         /* back off to last completed page boundary */
1821         WriteRqstPtr.xrecoff -= WriteRqstPtr.xrecoff % XLOG_BLCKSZ;
1822
1823         /* if we have already flushed that far, consider async commit records */
1824         if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
1825         {
1826                 /* use volatile pointer to prevent code rearrangement */
1827                 volatile XLogCtlData *xlogctl = XLogCtl;
1828
1829                 SpinLockAcquire(&xlogctl->info_lck);
1830                 WriteRqstPtr = xlogctl->asyncCommitLSN;
1831                 SpinLockRelease(&xlogctl->info_lck);
1832                 flexible = false;               /* ensure it all gets written */
1833         }
1834
1835         /* Done if already known flushed */
1836         if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
1837                 return;
1838
1839 #ifdef WAL_DEBUG
1840         if (XLOG_DEBUG)
1841                 elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
1842                          WriteRqstPtr.xlogid, WriteRqstPtr.xrecoff,
1843                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1844                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1845 #endif
1846
1847         START_CRIT_SECTION();
1848
1849         /* now wait for the write lock */
1850         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1851         LogwrtResult = XLogCtl->Write.LogwrtResult;
1852         if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
1853         {
1854                 XLogwrtRqst WriteRqst;
1855
1856                 WriteRqst.Write = WriteRqstPtr;
1857                 WriteRqst.Flush = WriteRqstPtr;
1858                 XLogWrite(WriteRqst, flexible, false);
1859         }
1860         LWLockRelease(WALWriteLock);
1861
1862         END_CRIT_SECTION();
1863 }
1864
1865 /*
1866  * Flush any previous asynchronously-committed transactions' commit records.
1867  *
1868  * NOTE: it is unwise to assume that this provides any strong guarantees.
1869  * In particular, because of the inexact LSN bookkeeping used by clog.c,
1870  * we cannot assume that hint bits will be settable for these transactions.
1871  */
1872 void
1873 XLogAsyncCommitFlush(void)
1874 {
1875         XLogRecPtr      WriteRqstPtr;
1876
1877         /* use volatile pointer to prevent code rearrangement */
1878         volatile XLogCtlData *xlogctl = XLogCtl;
1879
1880         SpinLockAcquire(&xlogctl->info_lck);
1881         WriteRqstPtr = xlogctl->asyncCommitLSN;
1882         SpinLockRelease(&xlogctl->info_lck);
1883
1884         XLogFlush(WriteRqstPtr);
1885 }
1886
1887 /*
1888  * Test whether XLOG data has been flushed up to (at least) the given position.
1889  *
1890  * Returns true if a flush is still needed.  (It may be that someone else
1891  * is already in process of flushing that far, however.)
1892  */
1893 bool
1894 XLogNeedsFlush(XLogRecPtr record)
1895 {
1896         /* Quick exit if already known flushed */
1897         if (XLByteLE(record, LogwrtResult.Flush))
1898                 return false;
1899
1900         /* read LogwrtResult and update local state */
1901         {
1902                 /* use volatile pointer to prevent code rearrangement */
1903                 volatile XLogCtlData *xlogctl = XLogCtl;
1904
1905                 SpinLockAcquire(&xlogctl->info_lck);
1906                 LogwrtResult = xlogctl->LogwrtResult;
1907                 SpinLockRelease(&xlogctl->info_lck);
1908         }
1909
1910         /* check again */
1911         if (XLByteLE(record, LogwrtResult.Flush))
1912                 return false;
1913
1914         return true;
1915 }
1916
1917 /*
1918  * Create a new XLOG file segment, or open a pre-existing one.
1919  *
1920  * log, seg: identify segment to be created/opened.
1921  *
1922  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1923  * pre-existing file will be deleted).  On return, TRUE if a pre-existing
1924  * file was used.
1925  *
1926  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1927  * place.  This should be TRUE except during bootstrap log creation.  The
1928  * caller must *not* hold the lock at call.
1929  *
1930  * Returns FD of opened file.
1931  *
1932  * Note: errors here are ERROR not PANIC because we might or might not be
1933  * inside a critical section (eg, during checkpoint there is no reason to
1934  * take down the system on failure).  They will promote to PANIC if we are
1935  * in a critical section.
1936  */
1937 static int
1938 XLogFileInit(uint32 log, uint32 seg,
1939                          bool *use_existent, bool use_lock)
1940 {
1941         char            path[MAXPGPATH];
1942         char            tmppath[MAXPGPATH];
1943         char       *zbuffer;
1944         uint32          installed_log;
1945         uint32          installed_seg;
1946         int                     max_advance;
1947         int                     fd;
1948         int                     nbytes;
1949
1950         XLogFilePath(path, ThisTimeLineID, log, seg);
1951
1952         /*
1953          * Try to use existent file (checkpoint maker may have created it already)
1954          */
1955         if (*use_existent)
1956         {
1957                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
1958                                                    S_IRUSR | S_IWUSR);
1959                 if (fd < 0)
1960                 {
1961                         if (errno != ENOENT)
1962                                 ereport(ERROR,
1963                                                 (errcode_for_file_access(),
1964                                                  errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1965                                                                 path, log, seg)));
1966                 }
1967                 else
1968                         return fd;
1969         }
1970
1971         /*
1972          * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
1973          * another process is doing the same thing.  If so, we will end up
1974          * pre-creating an extra log segment.  That seems OK, and better than
1975          * holding the lock throughout this lengthy process.
1976          */
1977         elog(DEBUG2, "creating and filling new WAL file");
1978
1979         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1980
1981         unlink(tmppath);
1982
1983         /* do not use get_sync_bit() here --- want to fsync only at end of fill */
1984         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1985                                            S_IRUSR | S_IWUSR);
1986         if (fd < 0)
1987                 ereport(ERROR,
1988                                 (errcode_for_file_access(),
1989                                  errmsg("could not create file \"%s\": %m", tmppath)));
1990
1991         /*
1992          * Zero-fill the file.  We have to do this the hard way to ensure that all
1993          * the file space has really been allocated --- on platforms that allow
1994          * "holes" in files, just seeking to the end doesn't allocate intermediate
1995          * space.  This way, we know that we have all the space and (after the
1996          * fsync below) that all the indirect blocks are down on disk.  Therefore,
1997          * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
1998          * log file.
1999          *
2000          * Note: palloc zbuffer, instead of just using a local char array, to
2001          * ensure it is reasonably well-aligned; this may save a few cycles
2002          * transferring data to the kernel.
2003          */
2004         zbuffer = (char *) palloc0(XLOG_BLCKSZ);
2005         for (nbytes = 0; nbytes < XLogSegSize; nbytes += XLOG_BLCKSZ)
2006         {
2007                 errno = 0;
2008                 if ((int) write(fd, zbuffer, XLOG_BLCKSZ) != (int) XLOG_BLCKSZ)
2009                 {
2010                         int                     save_errno = errno;
2011
2012                         /*
2013                          * If we fail to make the file, delete it to release disk space
2014                          */
2015                         unlink(tmppath);
2016                         /* if write didn't set errno, assume problem is no disk space */
2017                         errno = save_errno ? save_errno : ENOSPC;
2018
2019                         ereport(ERROR,
2020                                         (errcode_for_file_access(),
2021                                          errmsg("could not write to file \"%s\": %m", tmppath)));
2022                 }
2023         }
2024         pfree(zbuffer);
2025
2026         if (pg_fsync(fd) != 0)
2027                 ereport(ERROR,
2028                                 (errcode_for_file_access(),
2029                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
2030
2031         if (close(fd))
2032                 ereport(ERROR,
2033                                 (errcode_for_file_access(),
2034                                  errmsg("could not close file \"%s\": %m", tmppath)));
2035
2036         /*
2037          * Now move the segment into place with its final name.
2038          *
2039          * If caller didn't want to use a pre-existing file, get rid of any
2040          * pre-existing file.  Otherwise, cope with possibility that someone else
2041          * has created the file while we were filling ours: if so, use ours to
2042          * pre-create a future log segment.
2043          */
2044         installed_log = log;
2045         installed_seg = seg;
2046         max_advance = XLOGfileslop;
2047         if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
2048                                                                 *use_existent, &max_advance,
2049                                                                 use_lock))
2050         {
2051                 /* No need for any more future segments... */
2052                 unlink(tmppath);
2053         }
2054
2055         elog(DEBUG2, "done creating and filling new WAL file");
2056
2057         /* Set flag to tell caller there was no existent file */
2058         *use_existent = false;
2059
2060         /* Now open original target segment (might not be file I just made) */
2061         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
2062                                            S_IRUSR | S_IWUSR);
2063         if (fd < 0)
2064                 ereport(ERROR,
2065                                 (errcode_for_file_access(),
2066                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2067                                   path, log, seg)));
2068
2069         return fd;
2070 }
2071
2072 /*
2073  * Create a new XLOG file segment by copying a pre-existing one.
2074  *
2075  * log, seg: identify segment to be created.
2076  *
2077  * srcTLI, srclog, srcseg: identify segment to be copied (could be from
2078  *              a different timeline)
2079  *
2080  * Currently this is only used during recovery, and so there are no locking
2081  * considerations.      But we should be just as tense as XLogFileInit to avoid
2082  * emplacing a bogus file.
2083  */
2084 static void
2085 XLogFileCopy(uint32 log, uint32 seg,
2086                          TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
2087 {
2088         char            path[MAXPGPATH];
2089         char            tmppath[MAXPGPATH];
2090         char            buffer[XLOG_BLCKSZ];
2091         int                     srcfd;
2092         int                     fd;
2093         int                     nbytes;
2094
2095         /*
2096          * Open the source file
2097          */
2098         XLogFilePath(path, srcTLI, srclog, srcseg);
2099         srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2100         if (srcfd < 0)
2101                 ereport(ERROR,
2102                                 (errcode_for_file_access(),
2103                                  errmsg("could not open file \"%s\": %m", path)));
2104
2105         /*
2106          * Copy into a temp file name.
2107          */
2108         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
2109
2110         unlink(tmppath);
2111
2112         /* do not use get_sync_bit() here --- want to fsync only at end of fill */
2113         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
2114                                            S_IRUSR | S_IWUSR);
2115         if (fd < 0)
2116                 ereport(ERROR,
2117                                 (errcode_for_file_access(),
2118                                  errmsg("could not create file \"%s\": %m", tmppath)));
2119
2120         /*
2121          * Do the data copying.
2122          */
2123         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
2124         {
2125                 errno = 0;
2126                 if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
2127                 {
2128                         if (errno != 0)
2129                                 ereport(ERROR,
2130                                                 (errcode_for_file_access(),
2131                                                  errmsg("could not read file \"%s\": %m", path)));
2132                         else
2133                                 ereport(ERROR,
2134                                                 (errmsg("not enough data in file \"%s\"", path)));
2135                 }
2136                 errno = 0;
2137                 if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
2138                 {
2139                         int                     save_errno = errno;
2140
2141                         /*
2142                          * If we fail to make the file, delete it to release disk space
2143                          */
2144                         unlink(tmppath);
2145                         /* if write didn't set errno, assume problem is no disk space */
2146                         errno = save_errno ? save_errno : ENOSPC;
2147
2148                         ereport(ERROR,
2149                                         (errcode_for_file_access(),
2150                                          errmsg("could not write to file \"%s\": %m", tmppath)));
2151                 }
2152         }
2153
2154         if (pg_fsync(fd) != 0)
2155                 ereport(ERROR,
2156                                 (errcode_for_file_access(),
2157                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
2158
2159         if (close(fd))
2160                 ereport(ERROR,
2161                                 (errcode_for_file_access(),
2162                                  errmsg("could not close file \"%s\": %m", tmppath)));
2163
2164         close(srcfd);
2165
2166         /*
2167          * Now move the segment into place with its final name.
2168          */
2169         if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
2170                 elog(ERROR, "InstallXLogFileSegment should not have failed");
2171 }
2172
2173 /*
2174  * Install a new XLOG segment file as a current or future log segment.
2175  *
2176  * This is used both to install a newly-created segment (which has a temp
2177  * filename while it's being created) and to recycle an old segment.
2178  *
2179  * *log, *seg: identify segment to install as (or first possible target).
2180  * When find_free is TRUE, these are modified on return to indicate the
2181  * actual installation location or last segment searched.
2182  *
2183  * tmppath: initial name of file to install.  It will be renamed into place.
2184  *
2185  * find_free: if TRUE, install the new segment at the first empty log/seg
2186  * number at or after the passed numbers.  If FALSE, install the new segment
2187  * exactly where specified, deleting any existing segment file there.
2188  *
2189  * *max_advance: maximum number of log/seg slots to advance past the starting
2190  * point.  Fail if no free slot is found in this range.  On return, reduced
2191  * by the number of slots skipped over.  (Irrelevant, and may be NULL,
2192  * when find_free is FALSE.)
2193  *
2194  * use_lock: if TRUE, acquire ControlFileLock while moving file into
2195  * place.  This should be TRUE except during bootstrap log creation.  The
2196  * caller must *not* hold the lock at call.
2197  *
2198  * Returns TRUE if file installed, FALSE if not installed because of
2199  * exceeding max_advance limit.  On Windows, we also return FALSE if we
2200  * can't rename the file into place because someone's got it open.
2201  * (Any other kind of failure causes ereport().)
2202  */
2203 static bool
2204 InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
2205                                            bool find_free, int *max_advance,
2206                                            bool use_lock)
2207 {
2208         char            path[MAXPGPATH];
2209         struct stat stat_buf;
2210
2211         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2212
2213         /*
2214          * We want to be sure that only one process does this at a time.
2215          */
2216         if (use_lock)
2217                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
2218
2219         if (!find_free)
2220         {
2221                 /* Force installation: get rid of any pre-existing segment file */
2222                 unlink(path);
2223         }
2224         else
2225         {
2226                 /* Find a free slot to put it in */
2227                 while (stat(path, &stat_buf) == 0)
2228                 {
2229                         if (*max_advance <= 0)
2230                         {
2231                                 /* Failed to find a free slot within specified range */
2232                                 if (use_lock)
2233                                         LWLockRelease(ControlFileLock);
2234                                 return false;
2235                         }
2236                         NextLogSeg(*log, *seg);
2237                         (*max_advance)--;
2238                         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2239                 }
2240         }
2241
2242         /*
2243          * Prefer link() to rename() here just to be really sure that we don't
2244          * overwrite an existing logfile.  However, there shouldn't be one, so
2245          * rename() is an acceptable substitute except for the truly paranoid.
2246          */
2247 #if HAVE_WORKING_LINK
2248         if (link(tmppath, path) < 0)
2249                 ereport(ERROR,
2250                                 (errcode_for_file_access(),
2251                                  errmsg("could not link file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2252                                                 tmppath, path, *log, *seg)));
2253         unlink(tmppath);
2254 #else
2255         if (rename(tmppath, path) < 0)
2256         {
2257 #ifdef WIN32
2258 #if !defined(__CYGWIN__)
2259                 if (GetLastError() == ERROR_ACCESS_DENIED)
2260 #else
2261                 if (errno == EACCES)
2262 #endif
2263                 {
2264                         if (use_lock)
2265                                 LWLockRelease(ControlFileLock);
2266                         return false;
2267                 }
2268 #endif   /* WIN32 */
2269
2270                 ereport(ERROR,
2271                                 (errcode_for_file_access(),
2272                                  errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2273                                                 tmppath, path, *log, *seg)));
2274         }
2275 #endif
2276
2277         if (use_lock)
2278                 LWLockRelease(ControlFileLock);
2279
2280         return true;
2281 }
2282
2283 /*
2284  * Open a pre-existing logfile segment for writing.
2285  */
2286 static int
2287 XLogFileOpen(uint32 log, uint32 seg)
2288 {
2289         char            path[MAXPGPATH];
2290         int                     fd;
2291
2292         XLogFilePath(path, ThisTimeLineID, log, seg);
2293
2294         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
2295                                            S_IRUSR | S_IWUSR);
2296         if (fd < 0)
2297                 ereport(PANIC,
2298                                 (errcode_for_file_access(),
2299                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2300                                   path, log, seg)));
2301
2302         return fd;
2303 }
2304
2305 /*
2306  * Open a logfile segment for reading (during recovery).
2307  */
2308 static int
2309 XLogFileRead(uint32 log, uint32 seg, int emode)
2310 {
2311         char            path[MAXPGPATH];
2312         char            xlogfname[MAXFNAMELEN];
2313         char            activitymsg[MAXFNAMELEN + 16];
2314         ListCell   *cell;
2315         int                     fd;
2316
2317         /*
2318          * Loop looking for a suitable timeline ID: we might need to read any of
2319          * the timelines listed in expectedTLIs.
2320          *
2321          * We expect curFileTLI on entry to be the TLI of the preceding file in
2322          * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
2323          * to go backwards; this prevents us from picking up the wrong file when a
2324          * parent timeline extends to higher segment numbers than the child we
2325          * want to read.
2326          */
2327         foreach(cell, expectedTLIs)
2328         {
2329                 TimeLineID      tli = (TimeLineID) lfirst_int(cell);
2330
2331                 if (tli < curFileTLI)
2332                         break;                          /* don't bother looking at too-old TLIs */
2333
2334                 XLogFileName(xlogfname, tli, log, seg);
2335
2336                 if (InArchiveRecovery)
2337                 {
2338                         /* Report recovery progress in PS display */
2339                         snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
2340                                          xlogfname);
2341                         set_ps_display(activitymsg, false);
2342
2343                         restoredFromArchive = RestoreArchivedFile(path, xlogfname,
2344                                                                                                           "RECOVERYXLOG",
2345                                                                                                           XLogSegSize);
2346                 }
2347                 else
2348                         XLogFilePath(path, tli, log, seg);
2349
2350                 fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2351                 if (fd >= 0)
2352                 {
2353                         /* Success! */
2354                         curFileTLI = tli;
2355
2356                         /* Report recovery progress in PS display */
2357                         snprintf(activitymsg, sizeof(activitymsg), "recovering %s",
2358                                          xlogfname);
2359                         set_ps_display(activitymsg, false);
2360
2361                         return fd;
2362                 }
2363                 if (errno != ENOENT)    /* unexpected failure? */
2364                         ereport(PANIC,
2365                                         (errcode_for_file_access(),
2366                         errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2367                                    path, log, seg)));
2368         }
2369
2370         /* Couldn't find it.  For simplicity, complain about front timeline */
2371         XLogFilePath(path, recoveryTargetTLI, log, seg);
2372         errno = ENOENT;
2373         ereport(emode,
2374                         (errcode_for_file_access(),
2375                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2376                                   path, log, seg)));
2377         return -1;
2378 }
2379
2380 /*
2381  * Close the current logfile segment for writing.
2382  */
2383 static void
2384 XLogFileClose(void)
2385 {
2386         Assert(openLogFile >= 0);
2387
2388         /*
2389          * posix_fadvise is problematic on many platforms: on older x86 Linux it
2390          * just dumps core, and there are reports of problems on PPC platforms as
2391          * well.  The following is therefore disabled for the time being. We could
2392          * consider some kind of configure test to see if it's safe to use, but
2393          * since we lack hard evidence that there's any useful performance gain to
2394          * be had, spending time on that seems unprofitable for now.
2395          */
2396 #ifdef NOT_USED
2397
2398         /*
2399          * WAL segment files will not be re-read in normal operation, so we advise
2400          * OS to release any cached pages.      But do not do so if WAL archiving is
2401          * active, because archiver process could use the cache to read the WAL
2402          * segment.
2403          *
2404          * While O_DIRECT works for O_SYNC, posix_fadvise() works for fsync() and
2405          * O_SYNC, and some platforms only have posix_fadvise().
2406          */
2407 #if defined(HAVE_DECL_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
2408         if (!XLogArchivingActive())
2409                 posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
2410 #endif
2411 #endif   /* NOT_USED */
2412
2413         if (close(openLogFile))
2414                 ereport(PANIC,
2415                                 (errcode_for_file_access(),
2416                                  errmsg("could not close log file %u, segment %u: %m",
2417                                                 openLogId, openLogSeg)));
2418         openLogFile = -1;
2419 }
2420
2421 /*
2422  * Attempt to retrieve the specified file from off-line archival storage.
2423  * If successful, fill "path" with its complete path (note that this will be
2424  * a temp file name that doesn't follow the normal naming convention), and
2425  * return TRUE.
2426  *
2427  * If not successful, fill "path" with the name of the normal on-line file
2428  * (which may or may not actually exist, but we'll try to use it), and return
2429  * FALSE.
2430  *
2431  * For fixed-size files, the caller may pass the expected size as an
2432  * additional crosscheck on successful recovery.  If the file size is not
2433  * known, set expectedSize = 0.
2434  */
2435 static bool
2436 RestoreArchivedFile(char *path, const char *xlogfname,
2437                                         const char *recovername, off_t expectedSize)
2438 {
2439         char            xlogpath[MAXPGPATH];
2440         char            xlogRestoreCmd[MAXPGPATH];
2441         char            lastRestartPointFname[MAXPGPATH];
2442         char       *dp;
2443         char       *endp;
2444         const char *sp;
2445         int                     rc;
2446         bool            signaled;
2447         struct stat stat_buf;
2448         uint32          restartLog;
2449         uint32          restartSeg;
2450
2451         /*
2452          * When doing archive recovery, we always prefer an archived log file even
2453          * if a file of the same name exists in XLOGDIR.  The reason is that the
2454          * file in XLOGDIR could be an old, un-filled or partly-filled version
2455          * that was copied and restored as part of backing up $PGDATA.
2456          *
2457          * We could try to optimize this slightly by checking the local copy
2458          * lastchange timestamp against the archived copy, but we have no API to
2459          * do this, nor can we guarantee that the lastchange timestamp was
2460          * preserved correctly when we copied to archive. Our aim is robustness,
2461          * so we elect not to do this.
2462          *
2463          * If we cannot obtain the log file from the archive, however, we will try
2464          * to use the XLOGDIR file if it exists.  This is so that we can make use
2465          * of log segments that weren't yet transferred to the archive.
2466          *
2467          * Notice that we don't actually overwrite any files when we copy back
2468          * from archive because the recoveryRestoreCommand may inadvertently
2469          * restore inappropriate xlogs, or they may be corrupt, so we may wish to
2470          * fallback to the segments remaining in current XLOGDIR later. The
2471          * copy-from-archive filename is always the same, ensuring that we don't
2472          * run out of disk space on long recoveries.
2473          */
2474         snprintf(xlogpath, MAXPGPATH, XLOGDIR "/%s", recovername);
2475
2476         /*
2477          * Make sure there is no existing file named recovername.
2478          */
2479         if (stat(xlogpath, &stat_buf) != 0)
2480         {
2481                 if (errno != ENOENT)
2482                         ereport(FATAL,
2483                                         (errcode_for_file_access(),
2484                                          errmsg("could not stat file \"%s\": %m",
2485                                                         xlogpath)));
2486         }
2487         else
2488         {
2489                 if (unlink(xlogpath) != 0)
2490                         ereport(FATAL,
2491                                         (errcode_for_file_access(),
2492                                          errmsg("could not remove file \"%s\": %m",
2493                                                         xlogpath)));
2494         }
2495
2496         /*
2497          * Calculate the archive file cutoff point for use during log shipping
2498          * replication. All files earlier than this point can be deleted
2499          * from the archive, though there is no requirement to do so.
2500          *
2501          * We initialise this with the filename of an InvalidXLogRecPtr, which
2502          * will prevent the deletion of any WAL files from the archive
2503          * because of the alphabetic sorting property of WAL filenames. 
2504          *
2505          * Once we have successfully located the redo pointer of the checkpoint
2506          * from which we start recovery we never request a file prior to the redo
2507          * pointer of the last restartpoint. When redo begins we know that we
2508          * have successfully located it, so there is no need for additional
2509          * status flags to signify the point when we can begin deleting WAL files
2510          * from the archive. 
2511          */
2512         if (InRedo)
2513         {
2514                 XLByteToSeg(ControlFile->checkPointCopy.redo,
2515                                         restartLog, restartSeg);
2516                 XLogFileName(lastRestartPointFname,
2517                                          ControlFile->checkPointCopy.ThisTimeLineID,
2518                                          restartLog, restartSeg);
2519                 /* we shouldn't need anything earlier than last restart point */
2520                 Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
2521         }
2522         else
2523                 XLogFileName(lastRestartPointFname, 0, 0, 0);
2524
2525         /*
2526          * construct the command to be executed
2527          */
2528         dp = xlogRestoreCmd;
2529         endp = xlogRestoreCmd + MAXPGPATH - 1;
2530         *endp = '\0';
2531
2532         for (sp = recoveryRestoreCommand; *sp; sp++)
2533         {
2534                 if (*sp == '%')
2535                 {
2536                         switch (sp[1])
2537                         {
2538                                 case 'p':
2539                                         /* %p: relative path of target file */
2540                                         sp++;
2541                                         StrNCpy(dp, xlogpath, endp - dp);
2542                                         make_native_path(dp);
2543                                         dp += strlen(dp);
2544                                         break;
2545                                 case 'f':
2546                                         /* %f: filename of desired file */
2547                                         sp++;
2548                                         StrNCpy(dp, xlogfname, endp - dp);
2549                                         dp += strlen(dp);
2550                                         break;
2551                                 case 'r':
2552                                         /* %r: filename of last restartpoint */
2553                                         sp++;
2554                                         StrNCpy(dp, lastRestartPointFname, endp - dp);
2555                                         dp += strlen(dp);
2556                                         break;
2557                                 case '%':
2558                                         /* convert %% to a single % */
2559                                         sp++;
2560                                         if (dp < endp)
2561                                                 *dp++ = *sp;
2562                                         break;
2563                                 default:
2564                                         /* otherwise treat the % as not special */
2565                                         if (dp < endp)
2566                                                 *dp++ = *sp;
2567                                         break;
2568                         }
2569                 }
2570                 else
2571                 {
2572                         if (dp < endp)
2573                                 *dp++ = *sp;
2574                 }
2575         }
2576         *dp = '\0';
2577
2578         ereport(DEBUG3,
2579                         (errmsg_internal("executing restore command \"%s\"",
2580                                                          xlogRestoreCmd)));
2581
2582         /*
2583          * Copy xlog from archival storage to XLOGDIR
2584          */
2585         rc = system(xlogRestoreCmd);
2586         if (rc == 0)
2587         {
2588                 /*
2589                  * command apparently succeeded, but let's make sure the file is
2590                  * really there now and has the correct size.
2591                  *
2592                  * XXX I made wrong-size a fatal error to ensure the DBA would notice
2593                  * it, but is that too strong?  We could try to plow ahead with a
2594                  * local copy of the file ... but the problem is that there probably
2595                  * isn't one, and we'd incorrectly conclude we've reached the end of
2596                  * WAL and we're done recovering ...
2597                  */
2598                 if (stat(xlogpath, &stat_buf) == 0)
2599                 {
2600                         if (expectedSize > 0 && stat_buf.st_size != expectedSize)
2601                                 ereport(FATAL,
2602                                                 (errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
2603                                                                 xlogfname,
2604                                                                 (unsigned long) stat_buf.st_size,
2605                                                                 (unsigned long) expectedSize)));
2606                         else
2607                         {
2608                                 ereport(LOG,
2609                                                 (errmsg("restored log file \"%s\" from archive",
2610                                                                 xlogfname)));
2611                                 strcpy(path, xlogpath);
2612                                 return true;
2613                         }
2614                 }
2615                 else
2616                 {
2617                         /* stat failed */
2618                         if (errno != ENOENT)
2619                                 ereport(FATAL,
2620                                                 (errcode_for_file_access(),
2621                                                  errmsg("could not stat file \"%s\": %m",
2622                                                                 xlogpath)));
2623                 }
2624         }
2625
2626         /*
2627          * Remember, we rollforward UNTIL the restore fails so failure here is
2628          * just part of the process... that makes it difficult to determine
2629          * whether the restore failed because there isn't an archive to restore,
2630          * or because the administrator has specified the restore program
2631          * incorrectly.  We have to assume the former.
2632          *
2633          * However, if the failure was due to any sort of signal, it's best to
2634          * punt and abort recovery.  (If we "return false" here, upper levels will
2635          * assume that recovery is complete and start up the database!) It's
2636          * essential to abort on child SIGINT and SIGQUIT, because per spec
2637          * system() ignores SIGINT and SIGQUIT while waiting; if we see one of
2638          * those it's a good bet we should have gotten it too.  Aborting on other
2639          * signals such as SIGTERM seems a good idea as well.
2640          *
2641          * Per the Single Unix Spec, shells report exit status > 128 when a called
2642          * command died on a signal.  Also, 126 and 127 are used to report
2643          * problems such as an unfindable command; treat those as fatal errors
2644          * too.
2645          */
2646         signaled = WIFSIGNALED(rc) || WEXITSTATUS(rc) > 125;
2647
2648         ereport(signaled ? FATAL : DEBUG2,
2649                 (errmsg("could not restore file \"%s\" from archive: return code %d",
2650                                 xlogfname, rc)));
2651
2652         /*
2653          * if an archived file is not available, there might still be a version of
2654          * this file in XLOGDIR, so return that as the filename to open.
2655          *
2656          * In many recovery scenarios we expect this to fail also, but if so that
2657          * just means we've reached the end of WAL.
2658          */
2659         snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
2660         return false;
2661 }
2662
2663 /*
2664  * Preallocate log files beyond the specified log endpoint.
2665  *
2666  * XXX this is currently extremely conservative, since it forces only one
2667  * future log segment to exist, and even that only if we are 75% done with
2668  * the current one.  This is only appropriate for very low-WAL-volume systems.
2669  * High-volume systems will be OK once they've built up a sufficient set of
2670  * recycled log segments, but the startup transient is likely to include
2671  * a lot of segment creations by foreground processes, which is not so good.
2672  */
2673 static void
2674 PreallocXlogFiles(XLogRecPtr endptr)
2675 {
2676         uint32          _logId;
2677         uint32          _logSeg;
2678         int                     lf;
2679         bool            use_existent;
2680
2681         XLByteToPrevSeg(endptr, _logId, _logSeg);
2682         if ((endptr.xrecoff - 1) % XLogSegSize >=
2683                 (uint32) (0.75 * XLogSegSize))
2684         {
2685                 NextLogSeg(_logId, _logSeg);
2686                 use_existent = true;
2687                 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
2688                 close(lf);
2689                 if (!use_existent)
2690                         CheckpointStats.ckpt_segs_added++;
2691         }
2692 }
2693
2694 /*
2695  * Recycle or remove all log files older or equal to passed log/seg#
2696  *
2697  * endptr is current (or recent) end of xlog; this is used to determine
2698  * whether we want to recycle rather than delete no-longer-wanted log files.
2699  */
2700 static void
2701 RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
2702 {
2703         uint32          endlogId;
2704         uint32          endlogSeg;
2705         int                     max_advance;
2706         DIR                *xldir;
2707         struct dirent *xlde;
2708         char            lastoff[MAXFNAMELEN];
2709         char            path[MAXPGPATH];
2710
2711         /*
2712          * Initialize info about where to try to recycle to.  We allow recycling
2713          * segments up to XLOGfileslop segments beyond the current XLOG location.
2714          */
2715         XLByteToPrevSeg(endptr, endlogId, endlogSeg);
2716         max_advance = XLOGfileslop;
2717
2718         xldir = AllocateDir(XLOGDIR);
2719         if (xldir == NULL)
2720                 ereport(ERROR,
2721                                 (errcode_for_file_access(),
2722                                  errmsg("could not open transaction log directory \"%s\": %m",
2723                                                 XLOGDIR)));
2724
2725         XLogFileName(lastoff, ThisTimeLineID, log, seg);
2726
2727         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2728         {
2729                 /*
2730                  * We ignore the timeline part of the XLOG segment identifiers in
2731                  * deciding whether a segment is still needed.  This ensures that we
2732                  * won't prematurely remove a segment from a parent timeline. We could
2733                  * probably be a little more proactive about removing segments of
2734                  * non-parent timelines, but that would be a whole lot more
2735                  * complicated.
2736                  *
2737                  * We use the alphanumeric sorting property of the filenames to decide
2738                  * which ones are earlier than the lastoff segment.
2739                  */
2740                 if (strlen(xlde->d_name) == 24 &&
2741                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2742                         strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
2743                 {
2744                         if (XLogArchiveCheckDone(xlde->d_name, true))
2745                         {
2746                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2747
2748                                 /*
2749                                  * Before deleting the file, see if it can be recycled as a
2750                                  * future log segment.
2751                                  */
2752                                 if (InstallXLogFileSegment(&endlogId, &endlogSeg, path,
2753                                                                                    true, &max_advance,
2754                                                                                    true))
2755                                 {
2756                                         ereport(DEBUG2,
2757                                                         (errmsg("recycled transaction log file \"%s\"",
2758                                                                         xlde->d_name)));
2759                                         CheckpointStats.ckpt_segs_recycled++;
2760                                         /* Needn't recheck that slot on future iterations */
2761                                         if (max_advance > 0)
2762                                         {
2763                                                 NextLogSeg(endlogId, endlogSeg);
2764                                                 max_advance--;
2765                                         }
2766                                 }
2767                                 else
2768                                 {
2769                                         /* No need for any more future segments... */
2770                                         ereport(DEBUG2,
2771                                                         (errmsg("removing transaction log file \"%s\"",
2772                                                                         xlde->d_name)));
2773                                         unlink(path);
2774                                         CheckpointStats.ckpt_segs_removed++;
2775                                 }
2776
2777                                 XLogArchiveCleanup(xlde->d_name);
2778                         }
2779                 }
2780         }
2781
2782         FreeDir(xldir);
2783 }
2784
2785 /*
2786  * Remove previous backup history files.  This also retries creation of
2787  * .ready files for any backup history files for which XLogArchiveNotify
2788  * failed earlier.
2789  */
2790 static void
2791 CleanupBackupHistory(void)
2792 {
2793         DIR                *xldir;
2794         struct dirent *xlde;
2795         char            path[MAXPGPATH];
2796
2797         xldir = AllocateDir(XLOGDIR);
2798         if (xldir == NULL)
2799                 ereport(ERROR,
2800                                 (errcode_for_file_access(),
2801                                  errmsg("could not open transaction log directory \"%s\": %m",
2802                                                 XLOGDIR)));
2803
2804         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2805         {
2806                 if (strlen(xlde->d_name) > 24 &&
2807                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2808                         strcmp(xlde->d_name + strlen(xlde->d_name) - strlen(".backup"),
2809                                    ".backup") == 0)
2810                 {
2811                         if (XLogArchiveCheckDone(xlde->d_name, true))
2812                         {
2813                                 ereport(DEBUG2,
2814                                 (errmsg("removing transaction log backup history file \"%s\"",
2815                                                 xlde->d_name)));
2816                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2817                                 unlink(path);
2818                                 XLogArchiveCleanup(xlde->d_name);
2819                         }
2820                 }
2821         }
2822
2823         FreeDir(xldir);
2824 }
2825
2826 /*
2827  * Restore the backup blocks present in an XLOG record, if any.
2828  *
2829  * We assume all of the record has been read into memory at *record.
2830  *
2831  * Note: when a backup block is available in XLOG, we restore it
2832  * unconditionally, even if the page in the database appears newer.
2833  * This is to protect ourselves against database pages that were partially
2834  * or incorrectly written during a crash.  We assume that the XLOG data
2835  * must be good because it has passed a CRC check, while the database
2836  * page might not be.  This will force us to replay all subsequent
2837  * modifications of the page that appear in XLOG, rather than possibly
2838  * ignoring them as already applied, but that's not a huge drawback.
2839  */
2840 static void
2841 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
2842 {
2843         Buffer          buffer;
2844         Page            page;
2845         BkpBlock        bkpb;
2846         char       *blk;
2847         int                     i;
2848
2849         blk = (char *) XLogRecGetData(record) + record->xl_len;
2850         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2851         {
2852                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2853                         continue;
2854
2855                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2856                 blk += sizeof(BkpBlock);
2857
2858                 buffer = XLogReadBuffer(bkpb.node, bkpb.block, true);
2859                 Assert(BufferIsValid(buffer));
2860                 page = (Page) BufferGetPage(buffer);
2861
2862                 if (bkpb.hole_length == 0)
2863                 {
2864                         memcpy((char *) page, blk, BLCKSZ);
2865                 }
2866                 else
2867                 {
2868                         /* must zero-fill the hole */
2869                         MemSet((char *) page, 0, BLCKSZ);
2870                         memcpy((char *) page, blk, bkpb.hole_offset);
2871                         memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
2872                                    blk + bkpb.hole_offset,
2873                                    BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
2874                 }
2875
2876                 PageSetLSN(page, lsn);
2877                 PageSetTLI(page, ThisTimeLineID);
2878                 MarkBufferDirty(buffer);
2879                 UnlockReleaseBuffer(buffer);
2880
2881                 blk += BLCKSZ - bkpb.hole_length;
2882         }
2883 }
2884
2885 /*
2886  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
2887  * record (other than to the minimal extent of computing the amount of
2888  * data to read in) until we've checked the CRCs.
2889  *
2890  * We assume all of the record has been read into memory at *record.
2891  */
2892 static bool
2893 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
2894 {
2895         pg_crc32        crc;
2896         int                     i;
2897         uint32          len = record->xl_len;
2898         BkpBlock        bkpb;
2899         char       *blk;
2900
2901         /* First the rmgr data */
2902         INIT_CRC32(crc);
2903         COMP_CRC32(crc, XLogRecGetData(record), len);
2904
2905         /* Add in the backup blocks, if any */
2906         blk = (char *) XLogRecGetData(record) + len;
2907         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2908         {
2909                 uint32          blen;
2910
2911                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2912                         continue;
2913
2914                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2915                 if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
2916                 {
2917                         ereport(emode,
2918                                         (errmsg("incorrect hole size in record at %X/%X",
2919                                                         recptr.xlogid, recptr.xrecoff)));
2920                         return false;
2921                 }
2922                 blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
2923                 COMP_CRC32(crc, blk, blen);
2924                 blk += blen;
2925         }
2926
2927         /* Check that xl_tot_len agrees with our calculation */
2928         if (blk != (char *) record + record->xl_tot_len)
2929         {
2930                 ereport(emode,
2931                                 (errmsg("incorrect total length in record at %X/%X",
2932                                                 recptr.xlogid, recptr.xrecoff)));
2933                 return false;
2934         }
2935
2936         /* Finally include the record header */
2937         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
2938                            SizeOfXLogRecord - sizeof(pg_crc32));
2939         FIN_CRC32(crc);
2940
2941         if (!EQ_CRC32(record->xl_crc, crc))
2942         {
2943                 ereport(emode,
2944                 (errmsg("incorrect resource manager data checksum in record at %X/%X",
2945                                 recptr.xlogid, recptr.xrecoff)));
2946                 return false;
2947         }
2948
2949         return true;
2950 }
2951
2952 /*
2953  * Attempt to read an XLOG record.
2954  *
2955  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
2956  * try to read a record just after the last one previously read.
2957  *
2958  * If no valid record is available, returns NULL, or fails if emode is PANIC.
2959  * (emode must be either PANIC or LOG.)
2960  *
2961  * The record is copied into readRecordBuf, so that on successful return,
2962  * the returned record pointer always points there.
2963  */
2964 static XLogRecord *
2965 ReadRecord(XLogRecPtr *RecPtr, int emode)
2966 {
2967         XLogRecord *record;
2968         char       *buffer;
2969         XLogRecPtr      tmpRecPtr = EndRecPtr;
2970         bool            randAccess = false;
2971         uint32          len,
2972                                 total_len;
2973         uint32          targetPageOff;
2974         uint32          targetRecOff;
2975         uint32          pageHeaderSize;
2976
2977         if (readBuf == NULL)
2978         {
2979                 /*
2980                  * First time through, permanently allocate readBuf.  We do it this
2981                  * way, rather than just making a static array, for two reasons: (1)
2982                  * no need to waste the storage in most instantiations of the backend;
2983                  * (2) a static char array isn't guaranteed to have any particular
2984                  * alignment, whereas malloc() will provide MAXALIGN'd storage.
2985                  */
2986                 readBuf = (char *) malloc(XLOG_BLCKSZ);
2987                 Assert(readBuf != NULL);
2988         }
2989
2990         if (RecPtr == NULL)
2991         {
2992                 RecPtr = &tmpRecPtr;
2993                 /* fast case if next record is on same page */
2994                 if (nextRecord != NULL)
2995                 {
2996                         record = nextRecord;
2997                         goto got_record;
2998                 }
2999                 /* align old recptr to next page */
3000                 if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
3001                         tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
3002                 if (tmpRecPtr.xrecoff >= XLogFileSize)
3003                 {
3004                         (tmpRecPtr.xlogid)++;
3005                         tmpRecPtr.xrecoff = 0;
3006                 }
3007                 /* We will account for page header size below */
3008         }
3009         else
3010         {
3011                 if (!XRecOffIsValid(RecPtr->xrecoff))
3012                         ereport(PANIC,
3013                                         (errmsg("invalid record offset at %X/%X",
3014                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3015
3016                 /*
3017                  * Since we are going to a random position in WAL, forget any prior
3018                  * state about what timeline we were in, and allow it to be any
3019                  * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
3020                  * to go backwards (but we can't reset that variable right here, since
3021                  * we might not change files at all).
3022                  */
3023                 lastPageTLI = 0;                /* see comment in ValidXLOGHeader */
3024                 randAccess = true;              /* allow curFileTLI to go backwards too */
3025         }
3026
3027         if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
3028         {
3029                 close(readFile);
3030                 readFile = -1;
3031         }
3032         XLByteToSeg(*RecPtr, readId, readSeg);
3033         if (readFile < 0)
3034         {
3035                 /* Now it's okay to reset curFileTLI if random fetch */
3036                 if (randAccess)
3037                         curFileTLI = 0;
3038
3039                 readFile = XLogFileRead(readId, readSeg, emode);
3040                 if (readFile < 0)
3041                         goto next_record_is_invalid;
3042
3043                 /*
3044                  * Whenever switching to a new WAL segment, we read the first page of
3045                  * the file and validate its header, even if that's not where the
3046                  * target record is.  This is so that we can check the additional
3047                  * identification info that is present in the first page's "long"
3048                  * header.
3049                  */
3050                 readOff = 0;
3051                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3052                 {
3053                         ereport(emode,
3054                                         (errcode_for_file_access(),
3055                                          errmsg("could not read from log file %u, segment %u, offset %u: %m",
3056                                                         readId, readSeg, readOff)));
3057                         goto next_record_is_invalid;
3058                 }
3059                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
3060                         goto next_record_is_invalid;
3061         }
3062
3063         targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
3064         if (readOff != targetPageOff)
3065         {
3066                 readOff = targetPageOff;
3067                 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
3068                 {
3069                         ereport(emode,
3070                                         (errcode_for_file_access(),
3071                                          errmsg("could not seek in log file %u, segment %u to offset %u: %m",
3072                                                         readId, readSeg, readOff)));
3073                         goto next_record_is_invalid;
3074                 }
3075                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3076                 {
3077                         ereport(emode,
3078                                         (errcode_for_file_access(),
3079                                          errmsg("could not read from log file %u, segment %u, offset %u: %m",
3080                                                         readId, readSeg, readOff)));
3081                         goto next_record_is_invalid;
3082                 }
3083                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
3084                         goto next_record_is_invalid;
3085         }
3086         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3087         targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
3088         if (targetRecOff == 0)
3089         {
3090                 /*
3091                  * Can only get here in the continuing-from-prev-page case, because
3092                  * XRecOffIsValid eliminated the zero-page-offset case otherwise. Need
3093                  * to skip over the new page's header.
3094                  */
3095                 tmpRecPtr.xrecoff += pageHeaderSize;
3096                 targetRecOff = pageHeaderSize;
3097         }
3098         else if (targetRecOff < pageHeaderSize)
3099         {
3100                 ereport(emode,
3101                                 (errmsg("invalid record offset at %X/%X",
3102                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3103                 goto next_record_is_invalid;
3104         }
3105         if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
3106                 targetRecOff == pageHeaderSize)
3107         {
3108                 ereport(emode,
3109                                 (errmsg("contrecord is requested by %X/%X",
3110                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3111                 goto next_record_is_invalid;
3112         }
3113         record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
3114
3115 got_record:;
3116
3117         /*
3118          * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
3119          * required.
3120          */
3121         if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
3122         {
3123                 if (record->xl_len != 0)
3124                 {
3125                         ereport(emode,
3126                                         (errmsg("invalid xlog switch record at %X/%X",
3127                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3128                         goto next_record_is_invalid;
3129                 }
3130         }
3131         else if (record->xl_len == 0)
3132         {
3133                 ereport(emode,
3134                                 (errmsg("record with zero length at %X/%X",
3135                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3136                 goto next_record_is_invalid;
3137         }
3138         if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
3139                 record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
3140                 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
3141         {
3142                 ereport(emode,
3143                                 (errmsg("invalid record length at %X/%X",
3144                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3145                 goto next_record_is_invalid;
3146         }
3147         if (record->xl_rmid > RM_MAX_ID)
3148         {
3149                 ereport(emode,
3150                                 (errmsg("invalid resource manager ID %u at %X/%X",
3151                                                 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
3152                 goto next_record_is_invalid;
3153         }
3154         if (randAccess)
3155         {
3156                 /*
3157                  * We can't exactly verify the prev-link, but surely it should be less
3158                  * than the record's own address.
3159                  */
3160                 if (!XLByteLT(record->xl_prev, *RecPtr))
3161                 {
3162                         ereport(emode,
3163                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
3164                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
3165                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3166                         goto next_record_is_invalid;
3167                 }
3168         }
3169         else
3170         {
3171                 /*
3172                  * Record's prev-link should exactly match our previous location. This
3173                  * check guards against torn WAL pages where a stale but valid-looking
3174                  * WAL record starts on a sector boundary.
3175                  */
3176                 if (!XLByteEQ(record->xl_prev, ReadRecPtr))
3177                 {
3178                         ereport(emode,
3179                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
3180                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
3181                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3182                         goto next_record_is_invalid;
3183                 }
3184         }
3185
3186         /*
3187          * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
3188          * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
3189          * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
3190          * enough for all "normal" records, but very large commit or abort records
3191          * might need more space.)
3192          */
3193         total_len = record->xl_tot_len;
3194         if (total_len > readRecordBufSize)
3195         {
3196                 uint32          newSize = total_len;
3197
3198                 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
3199                 newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
3200                 if (readRecordBuf)
3201                         free(readRecordBuf);
3202                 readRecordBuf = (char *) malloc(newSize);
3203                 if (!readRecordBuf)
3204                 {
3205                         readRecordBufSize = 0;
3206                         /* We treat this as a "bogus data" condition */
3207                         ereport(emode,
3208                                         (errmsg("record length %u at %X/%X too long",
3209                                                         total_len, RecPtr->xlogid, RecPtr->xrecoff)));
3210                         goto next_record_is_invalid;
3211                 }
3212                 readRecordBufSize = newSize;
3213         }
3214
3215         buffer = readRecordBuf;
3216         nextRecord = NULL;
3217         len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
3218         if (total_len > len)
3219         {
3220                 /* Need to reassemble record */
3221                 XLogContRecord *contrecord;
3222                 uint32          gotlen = len;
3223
3224                 memcpy(buffer, record, len);
3225                 record = (XLogRecord *) buffer;
3226                 buffer += len;
3227                 for (;;)
3228                 {
3229                         readOff += XLOG_BLCKSZ;
3230                         if (readOff >= XLogSegSize)
3231                         {
3232                                 close(readFile);
3233                                 readFile = -1;
3234                                 NextLogSeg(readId, readSeg);
3235                                 readFile = XLogFileRead(readId, readSeg, emode);
3236                                 if (readFile < 0)
3237                                         goto next_record_is_invalid;
3238                                 readOff = 0;
3239                         }
3240                         if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3241                         {
3242                                 ereport(emode,
3243                                                 (errcode_for_file_access(),
3244                                                  errmsg("could not read from log file %u, segment %u, offset %u: %m",
3245                                                                 readId, readSeg, readOff)));
3246                                 goto next_record_is_invalid;
3247                         }
3248                         if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
3249                                 goto next_record_is_invalid;
3250                         if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
3251                         {
3252                                 ereport(emode,
3253                                                 (errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
3254                                                                 readId, readSeg, readOff)));
3255                                 goto next_record_is_invalid;
3256                         }
3257                         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3258                         contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize);
3259                         if (contrecord->xl_rem_len == 0 ||
3260                                 total_len != (contrecord->xl_rem_len + gotlen))
3261                         {
3262                                 ereport(emode,
3263                                                 (errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
3264                                                                 contrecord->xl_rem_len,
3265                                                                 readId, readSeg, readOff)));
3266                                 goto next_record_is_invalid;
3267                         }
3268                         len = XLOG_BLCKSZ - pageHeaderSize - SizeOfXLogContRecord;
3269                         if (contrecord->xl_rem_len > len)
3270                         {
3271                                 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
3272                                 gotlen += len;
3273                                 buffer += len;
3274                                 continue;
3275                         }
3276                         memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
3277                                    contrecord->xl_rem_len);
3278                         break;
3279                 }
3280                 if (!RecordIsValid(record, *RecPtr, emode))
3281                         goto next_record_is_invalid;
3282                 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3283                 if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
3284                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
3285                 {
3286                         nextRecord = (XLogRecord *) ((char *) contrecord +
3287                                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
3288                 }
3289                 EndRecPtr.xlogid = readId;
3290                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
3291                         pageHeaderSize +
3292                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
3293                 ReadRecPtr = *RecPtr;
3294                 /* needn't worry about XLOG SWITCH, it can't cross page boundaries */
3295                 return record;
3296         }
3297
3298         /* Record does not cross a page boundary */
3299         if (!RecordIsValid(record, *RecPtr, emode))
3300                 goto next_record_is_invalid;
3301         if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
3302                 MAXALIGN(total_len))
3303                 nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
3304         EndRecPtr.xlogid = RecPtr->xlogid;
3305         EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
3306         ReadRecPtr = *RecPtr;
3307         memcpy(buffer, record, total_len);
3308
3309         /*
3310          * Special processing if it's an XLOG SWITCH record
3311          */
3312         if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
3313         {
3314                 /* Pretend it extends to end of segment */
3315                 EndRecPtr.xrecoff += XLogSegSize - 1;
3316                 EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
3317                 nextRecord = NULL;              /* definitely not on same page */
3318
3319                 /*
3320                  * Pretend that readBuf contains the last page of the segment. This is
3321                  * just to avoid Assert failure in StartupXLOG if XLOG ends with this
3322                  * segment.
3323                  */
3324                 readOff = XLogSegSize - XLOG_BLCKSZ;
3325         }
3326         return (XLogRecord *) buffer;
3327
3328 next_record_is_invalid:;
3329         if (readFile >= 0)
3330         {
3331                 close(readFile);
3332                 readFile = -1;
3333         }
3334         nextRecord = NULL;
3335         return NULL;
3336 }
3337
3338 /*
3339  * Check whether the xlog header of a page just read in looks valid.
3340  *
3341  * This is just a convenience subroutine to avoid duplicated code in
3342  * ReadRecord.  It's not intended for use from anywhere else.
3343  */
3344 static bool
3345 ValidXLOGHeader(XLogPageHeader hdr, int emode)
3346 {
3347         XLogRecPtr      recaddr;
3348
3349         if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
3350         {
3351                 ereport(emode,
3352                                 (errmsg("invalid magic number %04X in log file %u, segment %u, offset %u",
3353                                                 hdr->xlp_magic, readId, readSeg, readOff)));
3354                 return false;
3355         }
3356         if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
3357         {
3358                 ereport(emode,
3359                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3360                                                 hdr->xlp_info, readId, readSeg, readOff)));
3361                 return false;
3362         }
3363         if (hdr->xlp_info & XLP_LONG_HEADER)
3364         {
3365                 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
3366
3367                 if (longhdr->xlp_sysid != ControlFile->system_identifier)
3368                 {
3369                         char            fhdrident_str[32];
3370                         char            sysident_str[32];
3371
3372                         /*
3373                          * Format sysids separately to keep platform-dependent format code
3374                          * out of the translatable message string.
3375                          */
3376                         snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
3377                                          longhdr->xlp_sysid);
3378                         snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
3379                                          ControlFile->system_identifier);
3380                         ereport(emode,
3381                                         (errmsg("WAL file is from different system"),
3382                                          errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
3383                                                            fhdrident_str, sysident_str)));
3384                         return false;
3385                 }
3386                 if (longhdr->xlp_seg_size != XLogSegSize)
3387                 {
3388                         ereport(emode,
3389                                         (errmsg("WAL file is from different system"),
3390                                          errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
3391                         return false;
3392                 }
3393                 if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
3394                 {
3395                         ereport(emode,
3396                                         (errmsg("WAL file is from different system"),
3397                                          errdetail("Incorrect XLOG_BLCKSZ in page header.")));
3398                         return false;
3399                 }
3400         }
3401         else if (readOff == 0)
3402         {
3403                 /* hmm, first page of file doesn't have a long header? */
3404                 ereport(emode,
3405                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3406                                                 hdr->xlp_info, readId, readSeg, readOff)));
3407                 return false;
3408         }
3409
3410         recaddr.xlogid = readId;
3411         recaddr.xrecoff = readSeg * XLogSegSize + readOff;
3412         if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
3413         {
3414                 ereport(emode,
3415                                 (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
3416                                                 hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
3417                                                 readId, readSeg, readOff)));
3418                 return false;
3419         }
3420
3421         /*
3422          * Check page TLI is one of the expected values.
3423          */
3424         if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
3425         {
3426                 ereport(emode,
3427                                 (errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u",
3428                                                 hdr->xlp_tli,
3429                                                 readId, readSeg, readOff)));
3430                 return false;
3431         }
3432
3433         /*
3434          * Since child timelines are always assigned a TLI greater than their
3435          * immediate parent's TLI, we should never see TLI go backwards across
3436          * successive pages of a consistent WAL sequence.
3437          *
3438          * Of course this check should only be applied when advancing sequentially
3439          * across pages; therefore ReadRecord resets lastPageTLI to zero when
3440          * going to a random page.
3441          */
3442         if (hdr->xlp_tli < lastPageTLI)
3443         {
3444                 ereport(emode,
3445                                 (errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u",
3446                                                 hdr->xlp_tli, lastPageTLI,
3447                                                 readId, readSeg, readOff)));
3448                 return false;
3449         }
3450         lastPageTLI = hdr->xlp_tli;
3451         return true;
3452 }
3453
3454 /*
3455  * Try to read a timeline's history file.
3456  *
3457  * If successful, return the list of component TLIs (the given TLI followed by
3458  * its ancestor TLIs).  If we can't find the history file, assume that the
3459  * timeline has no parents, and return a list of just the specified timeline
3460  * ID.
3461  */
3462 static List *
3463 readTimeLineHistory(TimeLineID targetTLI)
3464 {
3465         List       *result;
3466         char            path[MAXPGPATH];
3467         char            histfname[MAXFNAMELEN];
3468         char            fline[MAXPGPATH];
3469         FILE       *fd;
3470
3471         if (InArchiveRecovery)
3472         {
3473                 TLHistoryFileName(histfname, targetTLI);
3474                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3475         }
3476         else
3477                 TLHistoryFilePath(path, targetTLI);
3478
3479         fd = AllocateFile(path, "r");
3480         if (fd == NULL)
3481         {
3482                 if (errno != ENOENT)
3483                         ereport(FATAL,
3484                                         (errcode_for_file_access(),
3485                                          errmsg("could not open file \"%s\": %m", path)));
3486                 /* Not there, so assume no parents */
3487                 return list_make1_int((int) targetTLI);
3488         }
3489
3490         result = NIL;
3491
3492         /*
3493          * Parse the file...
3494          */
3495         while (fgets(fline, sizeof(fline), fd) != NULL)
3496         {
3497                 /* skip leading whitespace and check for # comment */
3498                 char       *ptr;
3499                 char       *endptr;
3500                 TimeLineID      tli;
3501
3502                 for (ptr = fline; *ptr; ptr++)
3503                 {
3504                         if (!isspace((unsigned char) *ptr))
3505                                 break;
3506                 }
3507                 if (*ptr == '\0' || *ptr == '#')
3508                         continue;
3509
3510                 /* expect a numeric timeline ID as first field of line */
3511                 tli = (TimeLineID) strtoul(ptr, &endptr, 0);
3512                 if (endptr == ptr)
3513                         ereport(FATAL,
3514                                         (errmsg("syntax error in history file: %s", fline),
3515                                          errhint("Expected a numeric timeline ID.")));
3516
3517                 if (result &&
3518                         tli <= (TimeLineID) linitial_int(result))
3519                         ereport(FATAL,
3520                                         (errmsg("invalid data in history file: %s", fline),
3521                                    errhint("Timeline IDs must be in increasing sequence.")));
3522
3523                 /* Build list with newest item first */
3524                 result = lcons_int((int) tli, result);
3525
3526                 /* we ignore the remainder of each line */
3527         }
3528
3529         FreeFile(fd);
3530
3531         if (result &&
3532                 targetTLI <= (TimeLineID) linitial_int(result))
3533                 ereport(FATAL,
3534                                 (errmsg("invalid data in history file \"%s\"", path),
3535                         errhint("Timeline IDs must be less than child timeline's ID.")));
3536
3537         result = lcons_int((int) targetTLI, result);
3538
3539         ereport(DEBUG3,
3540                         (errmsg_internal("history of timeline %u is %s",
3541                                                          targetTLI, nodeToString(result))));
3542
3543         return result;
3544 }
3545
3546 /*
3547  * Probe whether a timeline history file exists for the given timeline ID
3548  */
3549 static bool
3550 existsTimeLineHistory(TimeLineID probeTLI)
3551 {
3552         char            path[MAXPGPATH];
3553         char            histfname[MAXFNAMELEN];
3554         FILE       *fd;
3555
3556         if (InArchiveRecovery)
3557         {
3558                 TLHistoryFileName(histfname, probeTLI);
3559                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3560         }
3561         else
3562                 TLHistoryFilePath(path, probeTLI);
3563
3564         fd = AllocateFile(path, "r");
3565         if (fd != NULL)
3566         {
3567                 FreeFile(fd);
3568                 return true;
3569         }
3570         else
3571         {
3572                 if (errno != ENOENT)
3573                         ereport(FATAL,
3574                                         (errcode_for_file_access(),
3575                                          errmsg("could not open file \"%s\": %m", path)));
3576                 return false;
3577         }
3578 }
3579
3580 /*
3581  * Find the newest existing timeline, assuming that startTLI exists.
3582  *
3583  * Note: while this is somewhat heuristic, it does positively guarantee
3584  * that (result + 1) is not a known timeline, and therefore it should
3585  * be safe to assign that ID to a new timeline.
3586  */
3587 static TimeLineID
3588 findNewestTimeLine(TimeLineID startTLI)
3589 {
3590         TimeLineID      newestTLI;
3591         TimeLineID      probeTLI;
3592
3593         /*
3594          * The algorithm is just to probe for the existence of timeline history
3595          * files.  XXX is it useful to allow gaps in the sequence?
3596          */
3597         newestTLI = startTLI;
3598
3599         for (probeTLI = startTLI + 1;; probeTLI++)
3600         {
3601                 if (existsTimeLineHistory(probeTLI))
3602                 {
3603                         newestTLI = probeTLI;           /* probeTLI exists */
3604                 }
3605                 else
3606                 {
3607                         /* doesn't exist, assume we're done */
3608                         break;
3609                 }
3610         }
3611
3612         return newestTLI;
3613 }
3614
3615 /*
3616  * Create a new timeline history file.
3617  *
3618  *      newTLI: ID of the new timeline
3619  *      parentTLI: ID of its immediate parent
3620  *      endTLI et al: ID of the last used WAL file, for annotation purposes
3621  *
3622  * Currently this is only used during recovery, and so there are no locking
3623  * considerations.      But we should be just as tense as XLogFileInit to avoid
3624  * emplacing a bogus file.
3625  */
3626 static void
3627 writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
3628                                          TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
3629 {
3630         char            path[MAXPGPATH];
3631         char            tmppath[MAXPGPATH];
3632         char            histfname[MAXFNAMELEN];
3633         char            xlogfname[MAXFNAMELEN];
3634         char            buffer[BLCKSZ];
3635         int                     srcfd;
3636         int                     fd;
3637         int                     nbytes;
3638
3639         Assert(newTLI > parentTLI); /* else bad selection of newTLI */
3640
3641         /*
3642          * Write into a temp file name.
3643          */
3644         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3645
3646         unlink(tmppath);
3647
3648         /* do not use get_sync_bit() here --- want to fsync only at end of fill */
3649         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
3650                                            S_IRUSR | S_IWUSR);
3651         if (fd < 0)
3652                 ereport(ERROR,
3653                                 (errcode_for_file_access(),
3654                                  errmsg("could not create file \"%s\": %m", tmppath)));
3655
3656         /*
3657          * If a history file exists for the parent, copy it verbatim
3658          */
3659         if (InArchiveRecovery)
3660         {
3661                 TLHistoryFileName(histfname, parentTLI);
3662                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3663         }
3664         else
3665                 TLHistoryFilePath(path, parentTLI);
3666
3667         srcfd = BasicOpenFile(path, O_RDONLY, 0);
3668         if (srcfd < 0)
3669         {
3670                 if (errno != ENOENT)
3671                         ereport(ERROR,
3672                                         (errcode_for_file_access(),
3673                                          errmsg("could not open file \"%s\": %m", path)));
3674                 /* Not there, so assume parent has no parents */
3675         }
3676         else
3677         {
3678                 for (;;)
3679                 {
3680                         errno = 0;
3681                         nbytes = (int) read(srcfd, buffer, sizeof(buffer));
3682                         if (nbytes < 0 || errno != 0)
3683                                 ereport(ERROR,
3684                                                 (errcode_for_file_access(),
3685                                                  errmsg("could not read file \"%s\": %m", path)));
3686                         if (nbytes == 0)
3687                                 break;
3688                         errno = 0;
3689                         if ((int) write(fd, buffer, nbytes) != nbytes)
3690                         {
3691                                 int                     save_errno = errno;
3692
3693                                 /*
3694                                  * If we fail to make the file, delete it to release disk
3695                                  * space
3696                                  */
3697                                 unlink(tmppath);
3698
3699                                 /*
3700                                  * if write didn't set errno, assume problem is no disk space
3701                                  */
3702                                 errno = save_errno ? save_errno : ENOSPC;
3703
3704                                 ereport(ERROR,
3705                                                 (errcode_for_file_access(),
3706                                          errmsg("could not write to file \"%s\": %m", tmppath)));
3707                         }
3708                 }
3709                 close(srcfd);
3710         }
3711
3712         /*
3713          * Append one line with the details of this timeline split.
3714          *
3715          * If we did have a parent file, insert an extra newline just in case the
3716          * parent file failed to end with one.
3717          */
3718         XLogFileName(xlogfname, endTLI, endLogId, endLogSeg);
3719
3720         snprintf(buffer, sizeof(buffer),
3721                          "%s%u\t%s\t%s transaction %u at %s\n",
3722                          (srcfd < 0) ? "" : "\n",
3723                          parentTLI,
3724                          xlogfname,
3725                          recoveryStopAfter ? "after" : "before",
3726                          recoveryStopXid,
3727                          timestamptz_to_str(recoveryStopTime));
3728
3729         nbytes = strlen(buffer);
3730         errno = 0;
3731         if ((int) write(fd, buffer, nbytes) != nbytes)
3732         {
3733                 int                     save_errno = errno;
3734
3735                 /*
3736                  * If we fail to make the file, delete it to release disk space
3737                  */
3738                 unlink(tmppath);
3739                 /* if write didn't set errno, assume problem is no disk space */
3740                 errno = save_errno ? save_errno : ENOSPC;
3741
3742                 ereport(ERROR,
3743                                 (errcode_for_file_access(),
3744                                  errmsg("could not write to file \"%s\": %m", tmppath)));
3745         }
3746
3747         if (pg_fsync(fd) != 0)
3748                 ereport(ERROR,
3749                                 (errcode_for_file_access(),
3750                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
3751
3752         if (close(fd))
3753                 ereport(ERROR,
3754                                 (errcode_for_file_access(),
3755                                  errmsg("could not close file \"%s\": %m", tmppath)));
3756
3757
3758         /*
3759          * Now move the completed history file into place with its final name.
3760          */
3761         TLHistoryFilePath(path, newTLI);
3762
3763         /*
3764          * Prefer link() to rename() here just to be really sure that we don't
3765          * overwrite an existing logfile.  However, there shouldn't be one, so
3766          * rename() is an acceptable substitute except for the truly paranoid.
3767          */
3768 #if HAVE_WORKING_LINK
3769         if (link(tmppath, path) < 0)
3770                 ereport(ERROR,
3771                                 (errcode_for_file_access(),
3772                                  errmsg("could not link file \"%s\" to \"%s\": %m",
3773                                                 tmppath, path)));
3774         unlink(tmppath);
3775 #else
3776         if (rename(tmppath, path) < 0)
3777                 ereport(ERROR,
3778                                 (errcode_for_file_access(),
3779                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
3780                                                 tmppath, path)));
3781 #endif
3782
3783         /* The history file can be archived immediately. */
3784         TLHistoryFileName(histfname, newTLI);
3785         XLogArchiveNotify(histfname);
3786 }
3787
3788 /*
3789  * I/O routines for pg_control
3790  *
3791  * *ControlFile is a buffer in shared memory that holds an image of the
3792  * contents of pg_control.      WriteControlFile() initializes pg_control
3793  * given a preloaded buffer, ReadControlFile() loads the buffer from
3794  * the pg_control file (during postmaster or standalone-backend startup),
3795  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
3796  *
3797  * For simplicity, WriteControlFile() initializes the fields of pg_control
3798  * that are related to checking backend/database compatibility, and
3799  * ReadControlFile() verifies they are correct.  We could split out the
3800  * I/O and compatibility-check functions, but there seems no need currently.
3801  */
3802 static void
3803 WriteControlFile(void)
3804 {
3805         int                     fd;
3806         char            buffer[PG_CONTROL_SIZE];                /* need not be aligned */
3807         char       *localeptr;
3808
3809         /*
3810          * Initialize version and compatibility-check fields
3811          */
3812         ControlFile->pg_control_version = PG_CONTROL_VERSION;
3813         ControlFile->catalog_version_no = CATALOG_VERSION_NO;
3814
3815         ControlFile->maxAlign = MAXIMUM_ALIGNOF;
3816         ControlFile->floatFormat = FLOATFORMAT_VALUE;
3817
3818         ControlFile->blcksz = BLCKSZ;
3819         ControlFile->relseg_size = RELSEG_SIZE;
3820         ControlFile->xlog_blcksz = XLOG_BLCKSZ;
3821         ControlFile->xlog_seg_size = XLOG_SEG_SIZE;
3822
3823         ControlFile->nameDataLen = NAMEDATALEN;
3824         ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
3825
3826         ControlFile->toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE;
3827
3828 #ifdef HAVE_INT64_TIMESTAMP
3829         ControlFile->enableIntTimes = true;
3830 #else
3831         ControlFile->enableIntTimes = false;
3832 #endif
3833         ControlFile->float4ByVal = FLOAT4PASSBYVAL;
3834         ControlFile->float8ByVal = FLOAT8PASSBYVAL;
3835
3836         ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
3837         localeptr = setlocale(LC_COLLATE, NULL);
3838         if (!localeptr)
3839                 ereport(PANIC,
3840                                 (errmsg("invalid LC_COLLATE setting")));
3841         StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
3842         localeptr = setlocale(LC_CTYPE, NULL);
3843         if (!localeptr)
3844                 ereport(PANIC,
3845                                 (errmsg("invalid LC_CTYPE setting")));
3846         StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
3847
3848         /* Contents are protected with a CRC */
3849         INIT_CRC32(ControlFile->crc);
3850         COMP_CRC32(ControlFile->crc,
3851                            (char *) ControlFile,
3852                            offsetof(ControlFileData, crc));
3853         FIN_CRC32(ControlFile->crc);
3854
3855         /*
3856          * We write out PG_CONTROL_SIZE bytes into pg_control, zero-padding the
3857          * excess over sizeof(ControlFileData).  This reduces the odds of
3858          * premature-EOF errors when reading pg_control.  We'll still fail when we
3859          * check the contents of the file, but hopefully with a more specific
3860          * error than "couldn't read pg_control".
3861          */
3862         if (sizeof(ControlFileData) > PG_CONTROL_SIZE)
3863                 elog(PANIC, "sizeof(ControlFileData) is larger than PG_CONTROL_SIZE; fix either one");
3864
3865         memset(buffer, 0, PG_CONTROL_SIZE);
3866         memcpy(buffer, ControlFile, sizeof(ControlFileData));
3867
3868         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3869                                            O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
3870                                            S_IRUSR | S_IWUSR);
3871         if (fd < 0)
3872                 ereport(PANIC,
3873                                 (errcode_for_file_access(),
3874                                  errmsg("could not create control file \"%s\": %m",
3875                                                 XLOG_CONTROL_FILE)));
3876
3877         errno = 0;
3878         if (write(fd, buffer, PG_CONTROL_SIZE) != PG_CONTROL_SIZE)
3879         {
3880                 /* if write didn't set errno, assume problem is no disk space */
3881                 if (errno == 0)
3882                         errno = ENOSPC;
3883                 ereport(PANIC,
3884                                 (errcode_for_file_access(),
3885                                  errmsg("could not write to control file: %m")));
3886         }
3887
3888         if (pg_fsync(fd) != 0)
3889                 ereport(PANIC,
3890                                 (errcode_for_file_access(),
3891                                  errmsg("could not fsync control file: %m")));
3892
3893         if (close(fd))
3894                 ereport(PANIC,
3895                                 (errcode_for_file_access(),
3896                                  errmsg("could not close control file: %m")));
3897 }
3898
3899 static void
3900 ReadControlFile(void)
3901 {
3902         pg_crc32        crc;
3903         int                     fd;
3904
3905         /*
3906          * Read data...
3907          */
3908         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3909                                            O_RDWR | PG_BINARY,
3910                                            S_IRUSR | S_IWUSR);
3911         if (fd < 0)
3912                 ereport(PANIC,
3913                                 (errcode_for_file_access(),
3914                                  errmsg("could not open control file \"%s\": %m",
3915                                                 XLOG_CONTROL_FILE)));
3916
3917         if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3918                 ereport(PANIC,
3919                                 (errcode_for_file_access(),
3920                                  errmsg("could not read from control file: %m")));
3921
3922         close(fd);
3923
3924         /*
3925          * Check for expected pg_control format version.  If this is wrong, the
3926          * CRC check will likely fail because we'll be checking the wrong number
3927          * of bytes.  Complaining about wrong version will probably be more
3928          * enlightening than complaining about wrong CRC.
3929          */
3930
3931         if (ControlFile->pg_control_version != PG_CONTROL_VERSION && ControlFile->pg_control_version % 65536 == 0 && ControlFile->pg_control_version / 65536 != 0)
3932                 ereport(FATAL,
3933                                 (errmsg("database files are incompatible with server"),
3934                                  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d (0x%08x),"
3935                                                    " but the server was compiled with PG_CONTROL_VERSION %d (0x%08x).",
3936                                                    ControlFile->pg_control_version, ControlFile->pg_control_version,
3937                                                    PG_CONTROL_VERSION, PG_CONTROL_VERSION),
3938                                  errhint("This could be a problem of mismatched byte ordering.  It looks like you need to initdb.")));
3939
3940         if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
3941                 ereport(FATAL,
3942                                 (errmsg("database files are incompatible with server"),
3943                                  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
3944                                   " but the server was compiled with PG_CONTROL_VERSION %d.",
3945                                                 ControlFile->pg_control_version, PG_CONTROL_VERSION),
3946                                  errhint("It looks like you need to initdb.")));
3947
3948         /* Now check the CRC. */
3949         INIT_CRC32(crc);
3950         COMP_CRC32(crc,
3951                            (char *) ControlFile,
3952                            offsetof(ControlFileData, crc));
3953         FIN_CRC32(crc);
3954
3955         if (!EQ_CRC32(crc, ControlFile->crc))
3956                 ereport(FATAL,
3957                                 (errmsg("incorrect checksum in control file")));
3958
3959         /*
3960          * Do compatibility checking immediately.  We do this here for 2 reasons:
3961          *
3962          * (1) if the database isn't compatible with the backend executable, we
3963          * want to abort before we can possibly do any damage;
3964          *
3965          * (2) this code is executed in the postmaster, so the setlocale() will
3966          * propagate to forked backends, which aren't going to read this file for
3967          * themselves.  (These locale settings are considered critical
3968          * compatibility items because they can affect sort order of indexes.)
3969          */
3970         if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
3971                 ereport(FATAL,
3972                                 (errmsg("database files are incompatible with server"),
3973                                  errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
3974                                   " but the server was compiled with CATALOG_VERSION_NO %d.",
3975                                                 ControlFile->catalog_version_no, CATALOG_VERSION_NO),
3976                                  errhint("It looks like you need to initdb.")));
3977         if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
3978                 ereport(FATAL,
3979                                 (errmsg("database files are incompatible with server"),
3980                    errdetail("The database cluster was initialized with MAXALIGN %d,"
3981                                          " but the server was compiled with MAXALIGN %d.",
3982                                          ControlFile->maxAlign, MAXIMUM_ALIGNOF),
3983                                  errhint("It looks like you need to initdb.")));
3984         if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
3985                 ereport(FATAL,
3986                                 (errmsg("database files are incompatible with server"),
3987                                  errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
3988                                  errhint("It looks like you need to initdb.")));
3989         if (ControlFile->blcksz != BLCKSZ)
3990                 ereport(FATAL,
3991                                 (errmsg("database files are incompatible with server"),
3992                          errdetail("The database cluster was initialized with BLCKSZ %d,"
3993                                            " but the server was compiled with BLCKSZ %d.",
3994                                            ControlFile->blcksz, BLCKSZ),
3995                                  errhint("It looks like you need to recompile or initdb.")));
3996         if (ControlFile->relseg_size != RELSEG_SIZE)
3997                 ereport(FATAL,
3998                                 (errmsg("database files are incompatible with server"),
3999                 errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
4000                                   " but the server was compiled with RELSEG_SIZE %d.",
4001                                   ControlFile->relseg_size, RELSEG_SIZE),
4002                                  errhint("It looks like you need to recompile or initdb.")));
4003         if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
4004                 ereport(FATAL,
4005                                 (errmsg("database files are incompatible with server"),
4006                 errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
4007                                   " but the server was compiled with XLOG_BLCKSZ %d.",
4008                                   ControlFile->xlog_blcksz, XLOG_BLCKSZ),
4009                                  errhint("It looks like you need to recompile or initdb.")));
4010         if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
4011                 ereport(FATAL,
4012                                 (errmsg("database files are incompatible with server"),
4013                                  errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d,"
4014                                            " but the server was compiled with XLOG_SEG_SIZE %d.",
4015                                                    ControlFile->xlog_seg_size, XLOG_SEG_SIZE),
4016                                  errhint("It looks like you need to recompile or initdb.")));
4017         if (ControlFile->nameDataLen != NAMEDATALEN)
4018                 ereport(FATAL,
4019                                 (errmsg("database files are incompatible with server"),
4020                 errdetail("The database cluster was initialized with NAMEDATALEN %d,"
4021                                   " but the server was compiled with NAMEDATALEN %d.",
4022                                   ControlFile->nameDataLen, NAMEDATALEN),
4023                                  errhint("It looks like you need to recompile or initdb.")));
4024         if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
4025                 ereport(FATAL,
4026                                 (errmsg("database files are incompatible with server"),
4027                                  errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
4028                                           " but the server was compiled with INDEX_MAX_KEYS %d.",
4029                                                    ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
4030                                  errhint("It looks like you need to recompile or initdb.")));
4031         if (ControlFile->toast_max_chunk_size != TOAST_MAX_CHUNK_SIZE)
4032                 ereport(FATAL,
4033                                 (errmsg("database files are incompatible with server"),
4034                                  errdetail("The database cluster was initialized with TOAST_MAX_CHUNK_SIZE %d,"
4035                                 " but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.",
4036                           ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE),
4037                                  errhint("It looks like you need to recompile or initdb.")));
4038
4039 #ifdef HAVE_INT64_TIMESTAMP
4040         if (ControlFile->enableIntTimes != true)
4041                 ereport(FATAL,
4042                                 (errmsg("database files are incompatible with server"),
4043                                  errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
4044                                   " but the server was compiled with HAVE_INT64_TIMESTAMP."),
4045                                  errhint("It looks like you need to recompile or initdb.")));
4046 #else
4047         if (ControlFile->enableIntTimes != false)
4048                 ereport(FATAL,
4049                                 (errmsg("database files are incompatible with server"),
4050                                  errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
4051                            " but the server was compiled without HAVE_INT64_TIMESTAMP."),
4052                                  errhint("It looks like you need to recompile or initdb.")));
4053 #endif
4054
4055 #ifdef USE_FLOAT4_BYVAL
4056         if (ControlFile->float4ByVal != true)
4057                 ereport(FATAL,
4058                                 (errmsg("database files are incompatible with server"),
4059                                  errdetail("The database cluster was initialized without USE_FLOAT4_BYVAL"
4060                                                    " but the server was compiled with USE_FLOAT4_BYVAL."),
4061                                  errhint("It looks like you need to recompile or initdb.")));
4062 #else
4063         if (ControlFile->float4ByVal != false)
4064                 ereport(FATAL,
4065                                 (errmsg("database files are incompatible with server"),
4066                                  errdetail("The database cluster was initialized with USE_FLOAT4_BYVAL"
4067                                                    " but the server was compiled without USE_FLOAT4_BYVAL."),
4068                                  errhint("It looks like you need to recompile or initdb.")));
4069 #endif
4070
4071 #ifdef USE_FLOAT8_BYVAL
4072         if (ControlFile->float8ByVal != true)
4073                 ereport(FATAL,
4074                                 (errmsg("database files are incompatible with server"),
4075                                  errdetail("The database cluster was initialized without USE_FLOAT8_BYVAL"
4076                                                    " but the server was compiled with USE_FLOAT8_BYVAL."),
4077                                  errhint("It looks like you need to recompile or initdb.")));
4078 #else
4079         if (ControlFile->float8ByVal != false)
4080                 ereport(FATAL,
4081                                 (errmsg("database files are incompatible with server"),
4082                                  errdetail("The database cluster was initialized with USE_FLOAT8_BYVAL"
4083                                                    " but the server was compiled without USE_FLOAT8_BYVAL."),
4084                                  errhint("It looks like you need to recompile or initdb.")));
4085 #endif
4086
4087         if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
4088                 ereport(FATAL,
4089                                 (errmsg("database files are incompatible with server"),
4090                                  errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
4091                                   " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
4092                                                    ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
4093                                  errhint("It looks like you need to recompile or initdb.")));
4094         if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
4095                 ereport(FATAL,
4096                         (errmsg("database files are incompatible with operating system"),
4097                          errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
4098                                            " which is not recognized by setlocale().",
4099                                            ControlFile->lc_collate),
4100                          errhint("It looks like you need to initdb or install locale support.")));
4101         if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
4102                 ereport(FATAL,
4103                         (errmsg("database files are incompatible with operating system"),
4104                 errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
4105                                   " which is not recognized by setlocale().",
4106                                   ControlFile->lc_ctype),
4107                          errhint("It looks like you need to initdb or install locale support.")));
4108
4109         /* Make the fixed locale settings visible as GUC variables, too */
4110         SetConfigOption("lc_collate", ControlFile->lc_collate,
4111                                         PGC_INTERNAL, PGC_S_OVERRIDE);
4112         SetConfigOption("lc_ctype", ControlFile->lc_ctype,
4113                                         PGC_INTERNAL, PGC_S_OVERRIDE);
4114 }
4115
4116 void
4117 UpdateControlFile(void)
4118 {
4119         int                     fd;
4120
4121         INIT_CRC32(ControlFile->crc);
4122         COMP_CRC32(ControlFile->crc,
4123                            (char *) ControlFile,
4124                            offsetof(ControlFileData, crc));
4125         FIN_CRC32(ControlFile->crc);
4126
4127         fd = BasicOpenFile(XLOG_CONTROL_FILE,
4128                                            O_RDWR | PG_BINARY,
4129                                            S_IRUSR | S_IWUSR);
4130         if (fd < 0)
4131                 ereport(PANIC,
4132                                 (errcode_for_file_access(),
4133                                  errmsg("could not open control file \"%s\": %m",
4134                                                 XLOG_CONTROL_FILE)));
4135
4136         errno = 0;
4137         if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
4138         {
4139                 /* if write didn't set errno, assume problem is no disk space */
4140                 if (errno == 0)
4141                         errno = ENOSPC;
4142                 ereport(PANIC,
4143                                 (errcode_for_file_access(),
4144                                  errmsg("could not write to control file: %m")));
4145         }
4146
4147         if (pg_fsync(fd) != 0)
4148                 ereport(PANIC,
4149                                 (errcode_for_file_access(),
4150                                  errmsg("could not fsync control file: %m")));
4151
4152         if (close(fd))
4153                 ereport(PANIC,
4154                                 (errcode_for_file_access(),
4155                                  errmsg("could not close control file: %m")));
4156 }
4157
4158 /*
4159  * Initialization of shared memory for XLOG
4160  */
4161 Size
4162 XLOGShmemSize(void)
4163 {
4164         Size            size;
4165
4166         /* XLogCtl */
4167         size = sizeof(XLogCtlData);
4168         /* xlblocks array */
4169         size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
4170         /* extra alignment padding for XLOG I/O buffers */
4171         size = add_size(size, ALIGNOF_XLOG_BUFFER);
4172         /* and the buffers themselves */
4173         size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
4174
4175         /*
4176          * Note: we don't count ControlFileData, it comes out of the "slop factor"
4177          * added by CreateSharedMemoryAndSemaphores.  This lets us use this
4178          * routine again below to compute the actual allocation size.
4179          */
4180
4181         return size;
4182 }
4183
4184 void
4185 XLOGShmemInit(void)
4186 {
4187         bool            foundCFile,
4188                                 foundXLog;
4189         char       *allocptr;
4190
4191         ControlFile = (ControlFileData *)
4192                 ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
4193         XLogCtl = (XLogCtlData *)
4194                 ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
4195
4196         if (foundCFile || foundXLog)
4197         {
4198                 /* both should be present or neither */
4199                 Assert(foundCFile && foundXLog);
4200                 return;
4201         }
4202
4203         memset(XLogCtl, 0, sizeof(XLogCtlData));
4204
4205         /*
4206          * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
4207          * multiple of the alignment for same, so no extra alignment padding is
4208          * needed here.
4209          */
4210         allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
4211         XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
4212         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
4213         allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
4214
4215         /*
4216          * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
4217          */
4218         allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
4219         XLogCtl->pages = allocptr;
4220         memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
4221
4222         /*
4223          * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
4224          * in additional info.)
4225          */
4226         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
4227         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
4228         SpinLockInit(&XLogCtl->info_lck);
4229
4230         /*
4231          * If we are not in bootstrap mode, pg_control should already exist. Read
4232          * and validate it immediately (see comments in ReadControlFile() for the
4233          * reasons why).
4234          */
4235         if (!IsBootstrapProcessingMode())
4236                 ReadControlFile();
4237 }
4238
4239 /*
4240  * This func must be called ONCE on system install.  It creates pg_control
4241  * and the initial XLOG segment.
4242  */
4243 void
4244 BootStrapXLOG(void)
4245 {
4246         CheckPoint      checkPoint;
4247         char       *buffer;
4248         XLogPageHeader page;
4249         XLogLongPageHeader longpage;
4250         XLogRecord *record;
4251         bool            use_existent;
4252         uint64          sysidentifier;
4253         struct timeval tv;
4254         pg_crc32        crc;
4255
4256         /*
4257          * Select a hopefully-unique system identifier code for this installation.
4258          * We use the result of gettimeofday(), including the fractional seconds
4259          * field, as being about as unique as we can easily get.  (Think not to
4260          * use random(), since it hasn't been seeded and there's no portable way
4261          * to seed it other than the system clock value...)  The upper half of the
4262          * uint64 value is just the tv_sec part, while the lower half is the XOR
4263          * of tv_sec and tv_usec.  This is to ensure that we don't lose uniqueness
4264          * unnecessarily if "uint64" is really only 32 bits wide.  A person
4265          * knowing this encoding can determine the initialization time of the
4266          * installation, which could perhaps be useful sometimes.
4267          */
4268         gettimeofday(&tv, NULL);
4269         sysidentifier = ((uint64) tv.tv_sec) << 32;
4270         sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
4271
4272         /* First timeline ID is always 1 */
4273         ThisTimeLineID = 1;
4274
4275         /* page buffer must be aligned suitably for O_DIRECT */
4276         buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
4277         page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
4278         memset(page, 0, XLOG_BLCKSZ);
4279
4280         /* Set up information for the initial checkpoint record */
4281         checkPoint.redo.xlogid = 0;
4282         checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
4283         checkPoint.ThisTimeLineID = ThisTimeLineID;
4284         checkPoint.nextXidEpoch = 0;
4285         checkPoint.nextXid = FirstNormalTransactionId;
4286         checkPoint.nextOid = FirstBootstrapObjectId;
4287         checkPoint.nextMulti = FirstMultiXactId;
4288         checkPoint.nextMultiOffset = 0;
4289         checkPoint.time = (pg_time_t) time(NULL);
4290
4291         ShmemVariableCache->nextXid = checkPoint.nextXid;
4292         ShmemVariableCache->nextOid = checkPoint.nextOid;
4293         ShmemVariableCache->oidCount = 0;
4294         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4295
4296         /* Set up the XLOG page header */
4297         page->xlp_magic = XLOG_PAGE_MAGIC;
4298         page->xlp_info = XLP_LONG_HEADER;
4299         page->xlp_tli = ThisTimeLineID;
4300         page->xlp_pageaddr.xlogid = 0;
4301         page->xlp_pageaddr.xrecoff = 0;
4302         longpage = (XLogLongPageHeader) page;
4303         longpage->xlp_sysid = sysidentifier;
4304         longpage->xlp_seg_size = XLogSegSize;
4305         longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
4306
4307         /* Insert the initial checkpoint record */
4308         record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
4309         record->xl_prev.xlogid = 0;
4310         record->xl_prev.xrecoff = 0;
4311         record->xl_xid = InvalidTransactionId;
4312         record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
4313         record->xl_len = sizeof(checkPoint);
4314         record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
4315         record->xl_rmid = RM_XLOG_ID;
4316         memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
4317
4318         INIT_CRC32(crc);
4319         COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
4320         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
4321                            SizeOfXLogRecord - sizeof(pg_crc32));
4322         FIN_CRC32(crc);
4323         record->xl_crc = crc;
4324
4325         /* Create first XLOG segment file */
4326         use_existent = false;
4327         openLogFile = XLogFileInit(0, 0, &use_existent, false);
4328
4329         /* Write the first page with the initial record */
4330         errno = 0;
4331         if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
4332         {
4333                 /* if write didn't set errno, assume problem is no disk space */
4334                 if (errno == 0)
4335                         errno = ENOSPC;
4336                 ereport(PANIC,
4337                                 (errcode_for_file_access(),
4338                           errmsg("could not write bootstrap transaction log file: %m")));
4339         }
4340
4341         if (pg_fsync(openLogFile) != 0)
4342                 ereport(PANIC,
4343                                 (errcode_for_file_access(),
4344                           errmsg("could not fsync bootstrap transaction log file: %m")));
4345
4346         if (close(openLogFile))
4347                 ereport(PANIC,
4348                                 (errcode_for_file_access(),
4349                           errmsg("could not close bootstrap transaction log file: %m")));
4350
4351         openLogFile = -1;
4352
4353         /* Now create pg_control */
4354
4355         memset(ControlFile, 0, sizeof(ControlFileData));
4356         /* Initialize pg_control status fields */
4357         ControlFile->system_identifier = sysidentifier;
4358         ControlFile->state = DB_SHUTDOWNED;
4359         ControlFile->time = checkPoint.time;
4360         ControlFile->checkPoint = checkPoint.redo;
4361         ControlFile->checkPointCopy = checkPoint;
4362         /* some additional ControlFile fields are set in WriteControlFile() */
4363
4364         WriteControlFile();
4365
4366         /* Bootstrap the commit log, too */
4367         BootStrapCLOG();
4368         BootStrapSUBTRANS();
4369         BootStrapMultiXact();
4370
4371         pfree(buffer);
4372 }
4373
4374 static char *
4375 str_time(pg_time_t tnow)
4376 {
4377         static char buf[128];
4378
4379         pg_strftime(buf, sizeof(buf),
4380                                 "%Y-%m-%d %H:%M:%S %Z",
4381                                 pg_localtime(&tnow, log_timezone));
4382
4383         return buf;
4384 }
4385
4386 /*
4387  * See if there is a recovery command file (recovery.conf), and if so
4388  * read in parameters for archive recovery.
4389  *
4390  * XXX longer term intention is to expand this to
4391  * cater for additional parameters and controls
4392  * possibly use a flex lexer similar to the GUC one
4393  */
4394 static void
4395 readRecoveryCommandFile(void)
4396 {
4397         FILE       *fd;
4398         char            cmdline[MAXPGPATH];
4399         TimeLineID      rtli = 0;
4400         bool            rtliGiven = false;
4401         bool            syntaxError = false;
4402
4403         fd = AllocateFile(RECOVERY_COMMAND_FILE, "r");
4404         if (fd == NULL)
4405         {
4406                 if (errno == ENOENT)
4407                         return;                         /* not there, so no archive recovery */
4408                 ereport(FATAL,
4409                                 (errcode_for_file_access(),
4410                                  errmsg("could not open recovery command file \"%s\": %m",
4411                                                 RECOVERY_COMMAND_FILE)));
4412         }
4413
4414         ereport(LOG,
4415                         (errmsg("starting archive recovery")));
4416
4417         /*
4418          * Parse the file...
4419          */
4420         while (fgets(cmdline, sizeof(cmdline), fd) != NULL)
4421         {
4422                 /* skip leading whitespace and check for # comment */
4423                 char       *ptr;
4424                 char       *tok1;
4425                 char       *tok2;
4426
4427                 for (ptr = cmdline; *ptr; ptr++)
4428                 {
4429                         if (!isspace((unsigned char) *ptr))
4430                                 break;
4431                 }
4432                 if (*ptr == '\0' || *ptr == '#')
4433                         continue;
4434
4435                 /* identify the quoted parameter value */
4436                 tok1 = strtok(ptr, "'");
4437                 if (!tok1)
4438                 {
4439                         syntaxError = true;
4440                         break;
4441                 }
4442                 tok2 = strtok(NULL, "'");
4443                 if (!tok2)
4444                 {
4445                         syntaxError = true;
4446                         break;
4447                 }
4448                 /* reparse to get just the parameter name */
4449                 tok1 = strtok(ptr, " \t=");
4450                 if (!tok1)
4451                 {
4452                         syntaxError = true;
4453                         break;
4454                 }
4455
4456                 if (strcmp(tok1, "restore_command") == 0)
4457                 {
4458                         recoveryRestoreCommand = pstrdup(tok2);
4459                         ereport(LOG,
4460                                         (errmsg("restore_command = '%s'",
4461                                                         recoveryRestoreCommand)));
4462                 }
4463                 else if (strcmp(tok1, "recovery_target_timeline") == 0)
4464                 {
4465                         rtliGiven = true;
4466                         if (strcmp(tok2, "latest") == 0)
4467                                 rtli = 0;
4468                         else
4469                         {
4470                                 errno = 0;
4471                                 rtli = (TimeLineID) strtoul(tok2, NULL, 0);
4472                                 if (errno == EINVAL || errno == ERANGE)
4473                                         ereport(FATAL,
4474                                                         (errmsg("recovery_target_timeline is not a valid number: \"%s\"",
4475                                                                         tok2)));
4476                         }
4477                         if (rtli)
4478                                 ereport(LOG,
4479                                                 (errmsg("recovery_target_timeline = %u", rtli)));
4480                         else
4481                                 ereport(LOG,
4482                                                 (errmsg("recovery_target_timeline = latest")));
4483                 }
4484                 else if (strcmp(tok1, "recovery_target_xid") == 0)
4485                 {
4486                         errno = 0;
4487                         recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0);
4488                         if (errno == EINVAL || errno == ERANGE)
4489                                 ereport(FATAL,
4490                                  (errmsg("recovery_target_xid is not a valid number: \"%s\"",
4491                                                  tok2)));
4492                         ereport(LOG,
4493                                         (errmsg("recovery_target_xid = %u",
4494                                                         recoveryTargetXid)));
4495                         recoveryTarget = true;
4496                         recoveryTargetExact = true;
4497                 }
4498                 else if (strcmp(tok1, "recovery_target_time") == 0)
4499                 {
4500                         /*
4501                          * if recovery_target_xid specified, then this overrides
4502                          * recovery_target_time
4503                          */
4504                         if (recoveryTargetExact)
4505                                 continue;
4506                         recoveryTarget = true;
4507                         recoveryTargetExact = false;
4508
4509                         /*
4510                          * Convert the time string given by the user to TimestampTz form.
4511                          */
4512                         recoveryTargetTime =
4513                                 DatumGetTimestampTz(DirectFunctionCall3(timestamptz_in,
4514                                                                                                                 CStringGetDatum(tok2),
4515                                                                                                 ObjectIdGetDatum(InvalidOid),
4516                                                                                                                 Int32GetDatum(-1)));
4517                         ereport(LOG,
4518                                         (errmsg("recovery_target_time = '%s'",
4519                                                         timestamptz_to_str(recoveryTargetTime))));
4520                 }
4521                 else if (strcmp(tok1, "recovery_target_inclusive") == 0)
4522                 {
4523                         /*
4524                          * does nothing if a recovery_target is not also set
4525                          */
4526                         if (strcmp(tok2, "true") == 0)
4527                                 recoveryTargetInclusive = true;
4528                         else
4529                         {
4530                                 recoveryTargetInclusive = false;
4531                                 tok2 = "false";
4532                         }
4533                         ereport(LOG,
4534                                         (errmsg("recovery_target_inclusive = %s", tok2)));
4535                 }
4536                 else if (strcmp(tok1, "log_restartpoints") == 0)
4537                 {
4538                         /*
4539                          * does nothing if a recovery_target is not also set
4540                          */
4541                         if (strcmp(tok2, "true") == 0)
4542                                 recoveryLogRestartpoints = true;
4543                         else
4544                         {
4545                                 recoveryLogRestartpoints = false;
4546                                 tok2 = "false";
4547                         }
4548                         ereport(LOG,
4549                                         (errmsg("log_restartpoints = %s", tok2)));
4550                 }
4551                 else
4552                         ereport(FATAL,
4553                                         (errmsg("unrecognized recovery parameter \"%s\"",
4554                                                         tok1)));
4555         }
4556
4557         FreeFile(fd);
4558
4559         if (syntaxError)
4560                 ereport(FATAL,
4561                                 (errmsg("syntax error in recovery command file: %s",
4562                                                 cmdline),
4563                           errhint("Lines should have the format parameter = 'value'.")));
4564
4565         /* Check that required parameters were supplied */
4566         if (recoveryRestoreCommand == NULL)
4567                 ereport(FATAL,
4568                                 (errmsg("recovery command file \"%s\" did not specify restore_command",
4569                                                 RECOVERY_COMMAND_FILE)));
4570
4571         /* Enable fetching from archive recovery area */
4572         InArchiveRecovery = true;
4573
4574         /*
4575          * If user specified recovery_target_timeline, validate it or compute the
4576          * "latest" value.      We can't do this until after we've gotten the restore
4577          * command and set InArchiveRecovery, because we need to fetch timeline
4578          * history files from the archive.
4579          */
4580         if (rtliGiven)
4581         {
4582                 if (rtli)
4583                 {
4584                         /* Timeline 1 does not have a history file, all else should */
4585                         if (rtli != 1 && !existsTimeLineHistory(rtli))
4586                                 ereport(FATAL,
4587                                                 (errmsg("recovery target timeline %u does not exist",
4588                                                                 rtli)));
4589                         recoveryTargetTLI = rtli;
4590                 }
4591                 else
4592                 {
4593                         /* We start the "latest" search from pg_control's timeline */
4594                         recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI);
4595                 }
4596         }
4597 }
4598
4599 /*
4600  * Exit archive-recovery state
4601  */
4602 static void
4603 exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
4604 {
4605         char            recoveryPath[MAXPGPATH];
4606         char            xlogpath[MAXPGPATH];
4607
4608         /*
4609          * We are no longer in archive recovery state.
4610          */
4611         InArchiveRecovery = false;
4612
4613         /*
4614          * We should have the ending log segment currently open.  Verify, and then
4615          * close it (to avoid problems on Windows with trying to rename or delete
4616          * an open file).
4617          */
4618         Assert(readFile >= 0);
4619         Assert(readId == endLogId);
4620         Assert(readSeg == endLogSeg);
4621
4622         close(readFile);
4623         readFile = -1;
4624
4625         /*
4626          * If the segment was fetched from archival storage, we want to replace
4627          * the existing xlog segment (if any) with the archival version.  This is
4628          * because whatever is in XLOGDIR is very possibly older than what we have
4629          * from the archives, since it could have come from restoring a PGDATA
4630          * backup.      In any case, the archival version certainly is more
4631          * descriptive of what our current database state is, because that is what
4632          * we replayed from.
4633          *
4634          * Note that if we are establishing a new timeline, ThisTimeLineID is
4635          * already set to the new value, and so we will create a new file instead
4636          * of overwriting any existing file.  (This is, in fact, always the case
4637          * at present.)
4638          */
4639         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
4640         XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4641
4642         if (restoredFromArchive)
4643         {
4644                 ereport(DEBUG3,
4645                                 (errmsg_internal("moving last restored xlog to \"%s\"",
4646                                                                  xlogpath)));
4647                 unlink(xlogpath);               /* might or might not exist */
4648                 if (rename(recoveryPath, xlogpath) != 0)
4649                         ereport(FATAL,
4650                                         (errcode_for_file_access(),
4651                                          errmsg("could not rename file \"%s\" to \"%s\": %m",
4652                                                         recoveryPath, xlogpath)));
4653                 /* XXX might we need to fix permissions on the file? */
4654         }
4655         else
4656         {
4657                 /*
4658                  * If the latest segment is not archival, but there's still a
4659                  * RECOVERYXLOG laying about, get rid of it.
4660                  */
4661                 unlink(recoveryPath);   /* ignore any error */
4662
4663                 /*
4664                  * If we are establishing a new timeline, we have to copy data from
4665                  * the last WAL segment of the old timeline to create a starting WAL
4666                  * segment for the new timeline.
4667                  */
4668                 if (endTLI != ThisTimeLineID)
4669                         XLogFileCopy(endLogId, endLogSeg,
4670                                                  endTLI, endLogId, endLogSeg);
4671         }
4672
4673         /*
4674          * Let's just make real sure there are not .ready or .done flags posted
4675          * for the new segment.
4676          */
4677         XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4678         XLogArchiveCleanup(xlogpath);
4679
4680         /* Get rid of any remaining recovered timeline-history file, too */
4681         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
4682         unlink(recoveryPath);           /* ignore any error */
4683
4684         /*
4685          * Rename the config file out of the way, so that we don't accidentally
4686          * re-enter archive recovery mode in a subsequent crash.
4687          */
4688         unlink(RECOVERY_COMMAND_DONE);
4689         if (rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE) != 0)
4690                 ereport(FATAL,
4691                                 (errcode_for_file_access(),
4692                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
4693                                                 RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE)));
4694
4695         ereport(LOG,
4696                         (errmsg("archive recovery complete")));
4697 }
4698
4699 /*
4700  * For point-in-time recovery, this function decides whether we want to
4701  * stop applying the XLOG at or after the current record.
4702  *
4703  * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
4704  * *includeThis is set TRUE if we should apply this record before stopping.
4705  * Also, some information is saved in recoveryStopXid et al for use in
4706  * annotating the new timeline's history file.
4707  */
4708 static bool
4709 recoveryStopsHere(XLogRecord *record, bool *includeThis)
4710 {
4711         bool            stopsHere;
4712         uint8           record_info;
4713         TimestampTz recordXtime;
4714
4715         /* We only consider stopping at COMMIT or ABORT records */
4716         if (record->xl_rmid != RM_XACT_ID)
4717                 return false;
4718         record_info = record->xl_info & ~XLR_INFO_MASK;
4719         if (record_info == XLOG_XACT_COMMIT)
4720         {
4721                 xl_xact_commit *recordXactCommitData;
4722
4723                 recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
4724                 recordXtime = recordXactCommitData->xact_time;
4725         }
4726         else if (record_info == XLOG_XACT_ABORT)
4727         {
4728                 xl_xact_abort *recordXactAbortData;
4729
4730                 recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
4731                 recordXtime = recordXactAbortData->xact_time;
4732         }
4733         else
4734                 return false;
4735
4736         /* Remember the most recent COMMIT/ABORT time for logging purposes */
4737         recoveryLastXTime = recordXtime;
4738
4739         /* Do we have a PITR target at all? */
4740         if (!recoveryTarget)
4741                 return false;
4742
4743         if (recoveryTargetExact)
4744         {
4745                 /*
4746                  * there can be only one transaction end record with this exact
4747                  * transactionid
4748                  *
4749                  * when testing for an xid, we MUST test for equality only, since
4750                  * transactions are numbered in the order they start, not the order
4751                  * they complete. A higher numbered xid will complete before you about
4752                  * 50% of the time...
4753                  */
4754                 stopsHere = (record->xl_xid == recoveryTargetXid);
4755                 if (stopsHere)
4756                         *includeThis = recoveryTargetInclusive;
4757         }
4758         else
4759         {
4760                 /*
4761                  * there can be many transactions that share the same commit time, so
4762                  * we stop after the last one, if we are inclusive, or stop at the
4763                  * first one if we are exclusive
4764                  */
4765                 if (recoveryTargetInclusive)
4766                         stopsHere = (recordXtime > recoveryTargetTime);
4767                 else
4768                         stopsHere = (recordXtime >= recoveryTargetTime);
4769                 if (stopsHere)
4770                         *includeThis = false;
4771         }
4772
4773         if (stopsHere)
4774         {
4775                 recoveryStopXid = record->xl_xid;
4776                 recoveryStopTime = recordXtime;
4777                 recoveryStopAfter = *includeThis;
4778
4779                 if (record_info == XLOG_XACT_COMMIT)
4780                 {
4781                         if (recoveryStopAfter)
4782                                 ereport(LOG,
4783                                                 (errmsg("recovery stopping after commit of transaction %u, time %s",
4784                                                                 recoveryStopXid,
4785                                                                 timestamptz_to_str(recoveryStopTime))));
4786                         else
4787                                 ereport(LOG,
4788                                                 (errmsg("recovery stopping before commit of transaction %u, time %s",
4789                                                                 recoveryStopXid,
4790                                                                 timestamptz_to_str(recoveryStopTime))));
4791                 }
4792                 else
4793                 {
4794                         if (recoveryStopAfter)
4795                                 ereport(LOG,
4796                                                 (errmsg("recovery stopping after abort of transaction %u, time %s",
4797                                                                 recoveryStopXid,
4798                                                                 timestamptz_to_str(recoveryStopTime))));
4799                         else
4800                                 ereport(LOG,
4801                                                 (errmsg("recovery stopping before abort of transaction %u, time %s",
4802                                                                 recoveryStopXid,
4803                                                                 timestamptz_to_str(recoveryStopTime))));
4804                 }
4805         }
4806
4807         return stopsHere;
4808 }
4809
4810 /*
4811  * This must be called ONCE during postmaster or standalone-backend startup
4812  */
4813 void
4814 StartupXLOG(void)
4815 {
4816         XLogCtlInsert *Insert;
4817         CheckPoint      checkPoint;
4818         bool            wasShutdown;
4819         bool            reachedStopPoint = false;
4820         bool            haveBackupLabel = false;
4821         XLogRecPtr      RecPtr,
4822                                 LastRec,
4823                                 checkPointLoc,
4824                                 minRecoveryLoc,
4825                                 EndOfLog;
4826         uint32          endLogId;
4827         uint32          endLogSeg;
4828         XLogRecord *record;
4829         uint32          freespace;
4830         TransactionId oldestActiveXID;
4831
4832         /*
4833          * Read control file and check XLOG status looks valid.
4834          *
4835          * Note: in most control paths, *ControlFile is already valid and we need
4836          * not do ReadControlFile() here, but might as well do it to be sure.
4837          */
4838         ReadControlFile();
4839
4840         if (ControlFile->state < DB_SHUTDOWNED ||
4841                 ControlFile->state > DB_IN_PRODUCTION ||
4842                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
4843                 ereport(FATAL,
4844                                 (errmsg("control file contains invalid data")));
4845
4846         if (ControlFile->state == DB_SHUTDOWNED)
4847                 ereport(LOG,
4848                                 (errmsg("database system was shut down at %s",
4849                                                 str_time(ControlFile->time))));
4850         else if (ControlFile->state == DB_SHUTDOWNING)
4851                 ereport(LOG,
4852                                 (errmsg("database system shutdown was interrupted; last known up at %s",
4853                                                 str_time(ControlFile->time))));
4854         else if (ControlFile->state == DB_IN_CRASH_RECOVERY)
4855                 ereport(LOG,
4856                    (errmsg("database system was interrupted while in recovery at %s",
4857                                    str_time(ControlFile->time)),
4858                         errhint("This probably means that some data is corrupted and"
4859                                         " you will have to use the last backup for recovery.")));
4860         else if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY)
4861                 ereport(LOG,
4862                                 (errmsg("database system was interrupted while in recovery at log time %s",
4863                                                 str_time(ControlFile->checkPointCopy.time)),
4864                                  errhint("If this has occurred more than once some data might be corrupted"
4865                           " and you might need to choose an earlier recovery target.")));
4866         else if (ControlFile->state == DB_IN_PRODUCTION)
4867                 ereport(LOG,
4868                           (errmsg("database system was interrupted; last known up at %s",
4869                                           str_time(ControlFile->time))));
4870
4871         /* This is just to allow attaching to startup process with a debugger */
4872 #ifdef XLOG_REPLAY_DELAY
4873         if (ControlFile->state != DB_SHUTDOWNED)
4874                 pg_usleep(60000000L);
4875 #endif
4876
4877         /*
4878          * Initialize on the assumption we want to recover to the same timeline
4879          * that's active according to pg_control.
4880          */
4881         recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
4882
4883         /*
4884          * Check for recovery control file, and if so set up state for offline
4885          * recovery
4886          */
4887         readRecoveryCommandFile();
4888
4889         /* Now we can determine the list of expected TLIs */
4890         expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
4891
4892         /*
4893          * If pg_control's timeline is not in expectedTLIs, then we cannot
4894          * proceed: the backup is not part of the history of the requested
4895          * timeline.
4896          */
4897         if (!list_member_int(expectedTLIs,
4898                                                  (int) ControlFile->checkPointCopy.ThisTimeLineID))
4899                 ereport(FATAL,
4900                                 (errmsg("requested timeline %u is not a child of database system timeline %u",
4901                                                 recoveryTargetTLI,
4902                                                 ControlFile->checkPointCopy.ThisTimeLineID)));
4903
4904         if (read_backup_label(&checkPointLoc, &minRecoveryLoc))
4905         {
4906                 /*
4907                  * When a backup_label file is present, we want to roll forward from
4908                  * the checkpoint it identifies, rather than using pg_control.
4909                  */
4910                 record = ReadCheckpointRecord(checkPointLoc, 0);
4911                 if (record != NULL)
4912                 {
4913                         ereport(DEBUG1,
4914                                         (errmsg("checkpoint record is at %X/%X",
4915                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4916                         InRecovery = true;      /* force recovery even if SHUTDOWNED */
4917                 }
4918                 else
4919                 {
4920                         ereport(PANIC,
4921                                         (errmsg("could not locate required checkpoint record"),
4922                                          errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
4923                 }
4924                 /* set flag to delete it later */
4925                 haveBackupLabel = true;
4926         }
4927         else
4928         {
4929                 /*
4930                  * Get the last valid checkpoint record.  If the latest one according
4931                  * to pg_control is broken, try the next-to-last one.
4932                  */
4933                 checkPointLoc = ControlFile->checkPoint;
4934                 record = ReadCheckpointRecord(checkPointLoc, 1);
4935                 if (record != NULL)
4936                 {
4937                         ereport(DEBUG1,
4938                                         (errmsg("checkpoint record is at %X/%X",
4939                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4940                 }
4941                 else
4942                 {
4943                         checkPointLoc = ControlFile->prevCheckPoint;
4944                         record = ReadCheckpointRecord(checkPointLoc, 2);
4945                         if (record != NULL)
4946                         {
4947                                 ereport(LOG,
4948                                                 (errmsg("using previous checkpoint record at %X/%X",
4949                                                           checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4950                                 InRecovery = true;              /* force recovery even if SHUTDOWNED */
4951                         }
4952                         else
4953                                 ereport(PANIC,
4954                                          (errmsg("could not locate a valid checkpoint record")));
4955                 }
4956         }
4957
4958         LastRec = RecPtr = checkPointLoc;
4959         memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
4960         wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
4961
4962         ereport(DEBUG1,
4963                         (errmsg("redo record is at %X/%X; shutdown %s",
4964                                         checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
4965                                         wasShutdown ? "TRUE" : "FALSE")));
4966         ereport(DEBUG1,
4967                         (errmsg("next transaction ID: %u/%u; next OID: %u",
4968                                         checkPoint.nextXidEpoch, checkPoint.nextXid,
4969                                         checkPoint.nextOid)));
4970         ereport(DEBUG1,
4971                         (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
4972                                         checkPoint.nextMulti, checkPoint.nextMultiOffset)));
4973         if (!TransactionIdIsNormal(checkPoint.nextXid))
4974                 ereport(PANIC,
4975                                 (errmsg("invalid next transaction ID")));
4976
4977         ShmemVariableCache->nextXid = checkPoint.nextXid;
4978         ShmemVariableCache->nextOid = checkPoint.nextOid;
4979         ShmemVariableCache->oidCount = 0;
4980         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4981
4982         /*
4983          * We must replay WAL entries using the same TimeLineID they were created
4984          * under, so temporarily adopt the TLI indicated by the checkpoint (see
4985          * also xlog_redo()).
4986          */
4987         ThisTimeLineID = checkPoint.ThisTimeLineID;
4988
4989         RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
4990
4991         if (XLByteLT(RecPtr, checkPoint.redo))
4992                 ereport(PANIC,
4993                                 (errmsg("invalid redo in checkpoint record")));
4994
4995         /*
4996          * Check whether we need to force recovery from WAL.  If it appears to
4997          * have been a clean shutdown and we did not have a recovery.conf file,
4998          * then assume no recovery needed.
4999          */
5000         if (XLByteLT(checkPoint.redo, RecPtr))
5001         {
5002                 if (wasShutdown)
5003                         ereport(PANIC,
5004                                         (errmsg("invalid redo record in shutdown checkpoint")));
5005                 InRecovery = true;
5006         }
5007         else if (ControlFile->state != DB_SHUTDOWNED)
5008                 InRecovery = true;
5009         else if (InArchiveRecovery)
5010         {
5011                 /* force recovery due to presence of recovery.conf */
5012                 InRecovery = true;
5013         }
5014
5015         /* REDO */
5016         if (InRecovery)
5017         {
5018                 int                     rmid;
5019
5020                 /*
5021                  * Update pg_control to show that we are recovering and to show the
5022                  * selected checkpoint as the place we are starting from. We also mark
5023                  * pg_control with any minimum recovery stop point obtained from a
5024                  * backup history file.
5025                  */
5026                 if (InArchiveRecovery)
5027                 {
5028                         ereport(LOG,
5029                                         (errmsg("automatic recovery in progress")));
5030                         ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
5031                 }
5032                 else
5033                 {
5034                         ereport(LOG,
5035                                         (errmsg("database system was not properly shut down; "
5036                                                         "automatic recovery in progress")));
5037                         ControlFile->state = DB_IN_CRASH_RECOVERY;
5038                 }
5039                 ControlFile->prevCheckPoint = ControlFile->checkPoint;
5040                 ControlFile->checkPoint = checkPointLoc;
5041                 ControlFile->checkPointCopy = checkPoint;
5042                 if (minRecoveryLoc.xlogid != 0 || minRecoveryLoc.xrecoff != 0)
5043                         ControlFile->minRecoveryPoint = minRecoveryLoc;
5044                 ControlFile->time = (pg_time_t) time(NULL);
5045                 UpdateControlFile();
5046
5047                 /*
5048                  * If there was a backup label file, it's done its job and the info
5049                  * has now been propagated into pg_control.  We must get rid of the
5050                  * label file so that if we crash during recovery, we'll pick up at
5051                  * the latest recovery restartpoint instead of going all the way back
5052                  * to the backup start point.  It seems prudent though to just rename
5053                  * the file out of the way rather than delete it completely.
5054                  */
5055                 if (haveBackupLabel)
5056                 {
5057                         unlink(BACKUP_LABEL_OLD);
5058                         if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) != 0)
5059                                 ereport(FATAL,
5060                                                 (errcode_for_file_access(),
5061                                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
5062                                                                 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
5063                 }
5064
5065                 /* Initialize resource managers */
5066                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
5067                 {
5068                         if (RmgrTable[rmid].rm_startup != NULL)
5069                                 RmgrTable[rmid].rm_startup();
5070                 }
5071
5072                 /*
5073                  * Find the first record that logically follows the checkpoint --- it
5074                  * might physically precede it, though.
5075                  */
5076                 if (XLByteLT(checkPoint.redo, RecPtr))
5077                 {
5078                         /* back up to find the record */
5079                         record = ReadRecord(&(checkPoint.redo), PANIC);
5080                 }
5081                 else
5082                 {
5083                         /* just have to read next record after CheckPoint */
5084                         record = ReadRecord(NULL, LOG);
5085                 }
5086
5087                 if (record != NULL)
5088                 {
5089                         bool            recoveryContinue = true;
5090                         bool            recoveryApply = true;
5091                         ErrorContextCallback errcontext;
5092
5093                         InRedo = true;
5094                         ereport(LOG,
5095                                         (errmsg("redo starts at %X/%X",
5096                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
5097
5098                         /*
5099                          * main redo apply loop
5100                          */
5101                         do
5102                         {
5103 #ifdef WAL_DEBUG
5104                                 if (XLOG_DEBUG)
5105                                 {
5106                                         StringInfoData buf;
5107
5108                                         initStringInfo(&buf);
5109                                         appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
5110                                                                          ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
5111                                                                          EndRecPtr.xlogid, EndRecPtr.xrecoff);
5112                                         xlog_outrec(&buf, record);
5113                                         appendStringInfo(&buf, " - ");
5114                                         RmgrTable[record->xl_rmid].rm_desc(&buf,
5115                                                                                                            record->xl_info,
5116                                                                                                          XLogRecGetData(record));
5117                                         elog(LOG, "%s", buf.data);
5118                                         pfree(buf.data);
5119                                 }
5120 #endif
5121
5122                                 /*
5123                                  * Have we reached our recovery target?
5124                                  */
5125                                 if (recoveryStopsHere(record, &recoveryApply))
5126                                 {
5127                                         reachedStopPoint = true;        /* see below */
5128                                         recoveryContinue = false;
5129                                         if (!recoveryApply)
5130                                                 break;
5131                                 }
5132
5133                                 /* Setup error traceback support for ereport() */
5134                                 errcontext.callback = rm_redo_error_callback;
5135                                 errcontext.arg = (void *) record;
5136                                 errcontext.previous = error_context_stack;
5137                                 error_context_stack = &errcontext;
5138
5139                                 /* nextXid must be beyond record's xid */
5140                                 if (TransactionIdFollowsOrEquals(record->xl_xid,
5141                                                                                                  ShmemVariableCache->nextXid))
5142                                 {
5143                                         ShmemVariableCache->nextXid = record->xl_xid;
5144                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
5145                                 }
5146
5147                                 if (record->xl_info & XLR_BKP_BLOCK_MASK)
5148                                         RestoreBkpBlocks(record, EndRecPtr);
5149
5150                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
5151
5152                                 /* Pop the error context stack */
5153                                 error_context_stack = errcontext.previous;
5154
5155                                 LastRec = ReadRecPtr;
5156
5157                                 record = ReadRecord(NULL, LOG);
5158                         } while (record != NULL && recoveryContinue);
5159
5160                         /*
5161                          * end of main redo apply loop
5162                          */
5163
5164                         ereport(LOG,
5165                                         (errmsg("redo done at %X/%X",
5166                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
5167                         if (recoveryLastXTime)
5168                                 ereport(LOG,
5169                                          (errmsg("last completed transaction was at log time %s",
5170                                                          timestamptz_to_str(recoveryLastXTime))));
5171                         InRedo = false;
5172                 }
5173                 else
5174                 {
5175                         /* there are no WAL records following the checkpoint */
5176                         ereport(LOG,
5177                                         (errmsg("redo is not required")));
5178                 }
5179         }
5180
5181         /*
5182          * Re-fetch the last valid or last applied record, so we can identify the
5183          * exact endpoint of what we consider the valid portion of WAL.
5184          */
5185         record = ReadRecord(&LastRec, PANIC);
5186         EndOfLog = EndRecPtr;
5187         XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
5188
5189         /*
5190          * Complain if we did not roll forward far enough to render the backup
5191          * dump consistent.
5192          */
5193         if (XLByteLT(EndOfLog, ControlFile->minRecoveryPoint))
5194         {
5195                 if (reachedStopPoint)   /* stopped because of stop request */
5196                         ereport(FATAL,
5197                                         (errmsg("requested recovery stop point is before end time of backup dump")));
5198                 else    /* ran off end of WAL */
5199                         ereport(FATAL,
5200                                         (errmsg("WAL ends before end time of backup dump")));
5201         }
5202
5203         /*
5204          * Consider whether we need to assign a new timeline ID.
5205          *
5206          * If we are doing an archive recovery, we always assign a new ID.      This
5207          * handles a couple of issues.  If we stopped short of the end of WAL
5208          * during recovery, then we are clearly generating a new timeline and must
5209          * assign it a unique new ID.  Even if we ran to the end, modifying the
5210          * current last segment is problematic because it may result in trying to
5211          * overwrite an already-archived copy of that segment, and we encourage
5212          * DBAs to make their archive_commands reject that.  We can dodge the
5213          * problem by making the new active segment have a new timeline ID.
5214          *
5215          * In a normal crash recovery, we can just extend the timeline we were in.
5216          */
5217         if (InArchiveRecovery)
5218         {
5219                 ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
5220                 ereport(LOG,
5221                                 (errmsg("selected new timeline ID: %u", ThisTimeLineID)));
5222                 writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
5223                                                          curFileTLI, endLogId, endLogSeg);
5224         }
5225
5226         /* Save the selected TimeLineID in shared memory, too */
5227         XLogCtl->ThisTimeLineID = ThisTimeLineID;
5228
5229         /*
5230          * We are now done reading the old WAL.  Turn off archive fetching if it
5231          * was active, and make a writable copy of the last WAL segment. (Note
5232          * that we also have a copy of the last block of the old WAL in readBuf;
5233          * we will use that below.)
5234          */
5235         if (InArchiveRecovery)
5236                 exitArchiveRecovery(curFileTLI, endLogId, endLogSeg);
5237
5238         /*
5239          * Prepare to write WAL starting at EndOfLog position, and init xlog
5240          * buffer cache using the block containing the last record from the
5241          * previous incarnation.
5242          */
5243         openLogId = endLogId;
5244         openLogSeg = endLogSeg;
5245         openLogFile = XLogFileOpen(openLogId, openLogSeg);
5246         openLogOff = 0;
5247         Insert = &XLogCtl->Insert;
5248         Insert->PrevRecord = LastRec;
5249         XLogCtl->xlblocks[0].xlogid = openLogId;
5250         XLogCtl->xlblocks[0].xrecoff =
5251                 ((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
5252
5253         /*
5254          * Tricky point here: readBuf contains the *last* block that the LastRec
5255          * record spans, not the one it starts in.      The last block is indeed the
5256          * one we want to use.
5257          */
5258         Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
5259         memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
5260         Insert->currpos = (char *) Insert->currpage +
5261                 (EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
5262
5263         LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
5264
5265         XLogCtl->Write.LogwrtResult = LogwrtResult;
5266         Insert->LogwrtResult = LogwrtResult;
5267         XLogCtl->LogwrtResult = LogwrtResult;
5268
5269         XLogCtl->LogwrtRqst.Write = EndOfLog;
5270         XLogCtl->LogwrtRqst.Flush = EndOfLog;
5271
5272         freespace = INSERT_FREESPACE(Insert);
5273         if (freespace > 0)
5274         {
5275                 /* Make sure rest of page is zero */
5276                 MemSet(Insert->currpos, 0, freespace);
5277                 XLogCtl->Write.curridx = 0;
5278         }
5279         else
5280         {
5281                 /*
5282                  * Whenever Write.LogwrtResult points to exactly the end of a page,
5283                  * Write.curridx must point to the *next* page (see XLogWrite()).
5284                  *
5285                  * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
5286                  * this is sufficient.  The first actual attempt to insert a log
5287                  * record will advance the insert state.
5288                  */
5289                 XLogCtl->Write.curridx = NextBufIdx(0);
5290         }
5291
5292         /* Pre-scan prepared transactions to find out the range of XIDs present */
5293         oldestActiveXID = PrescanPreparedTransactions();
5294
5295         if (InRecovery)
5296         {
5297                 int                     rmid;
5298
5299                 /*
5300                  * Allow resource managers to do any required cleanup.
5301                  */
5302                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
5303                 {
5304                         if (RmgrTable[rmid].rm_cleanup != NULL)
5305                                 RmgrTable[rmid].rm_cleanup();
5306                 }
5307
5308                 /*
5309                  * Check to see if the XLOG sequence contained any unresolved
5310                  * references to uninitialized pages.
5311                  */
5312                 XLogCheckInvalidPages();
5313
5314                 /*
5315                  * Reset pgstat data, because it may be invalid after recovery.
5316                  */
5317                 pgstat_reset_all();
5318
5319                 /*
5320                  * Perform a checkpoint to update all our recovery activity to disk.
5321                  *
5322                  * Note that we write a shutdown checkpoint rather than an on-line
5323                  * one. This is not particularly critical, but since we may be
5324                  * assigning a new TLI, using a shutdown checkpoint allows us to have
5325                  * the rule that TLI only changes in shutdown checkpoints, which
5326                  * allows some extra error checking in xlog_redo.
5327                  */
5328                 CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
5329         }
5330
5331         /*
5332          * Preallocate additional log files, if wanted.
5333          */
5334         PreallocXlogFiles(EndOfLog);
5335
5336         /*
5337          * Okay, we're officially UP.
5338          */
5339         InRecovery = false;
5340
5341         ControlFile->state = DB_IN_PRODUCTION;
5342         ControlFile->time = (pg_time_t) time(NULL);
5343         UpdateControlFile();
5344
5345         /* start the archive_timeout timer running */
5346         XLogCtl->Write.lastSegSwitchTime = ControlFile->time;
5347
5348         /* initialize shared-memory copy of latest checkpoint XID/epoch */
5349         XLogCtl->ckptXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
5350         XLogCtl->ckptXid = ControlFile->checkPointCopy.nextXid;
5351
5352         /* also initialize latestCompletedXid, to nextXid - 1 */
5353         ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
5354         TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
5355
5356         /* Start up the commit log and related stuff, too */
5357         StartupCLOG();
5358         StartupSUBTRANS(oldestActiveXID);
5359         StartupMultiXact();
5360
5361         /* Reload shared-memory state for prepared transactions */
5362         RecoverPreparedTransactions();
5363
5364         /* Shut down readFile facility, free space */
5365         if (readFile >= 0)
5366         {
5367                 close(readFile);
5368                 readFile = -1;
5369         }
5370         if (readBuf)
5371         {
5372                 free(readBuf);
5373                 readBuf = NULL;
5374         }
5375         if (readRecordBuf)
5376         {
5377                 free(readRecordBuf);
5378                 readRecordBuf = NULL;
5379                 readRecordBufSize = 0;
5380         }
5381 }
5382
5383 /*
5384  * Subroutine to try to fetch and validate a prior checkpoint record.
5385  *
5386  * whichChkpt identifies the checkpoint (merely for reporting purposes).
5387  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
5388  */
5389 static XLogRecord *
5390 ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
5391 {
5392         XLogRecord *record;
5393
5394         if (!XRecOffIsValid(RecPtr.xrecoff))
5395         {
5396                 switch (whichChkpt)
5397                 {
5398                         case 1:
5399                                 ereport(LOG,
5400                                 (errmsg("invalid primary checkpoint link in control file")));
5401                                 break;
5402                         case 2:
5403                                 ereport(LOG,
5404                                                 (errmsg("invalid secondary checkpoint link in control file")));
5405                                 break;
5406                         default:
5407                                 ereport(LOG,
5408                                    (errmsg("invalid checkpoint link in backup_label file")));
5409                                 break;
5410                 }
5411                 return NULL;
5412         }
5413
5414         record = ReadRecord(&RecPtr, LOG);
5415
5416         if (record == NULL)
5417         {
5418                 switch (whichChkpt)
5419                 {
5420                         case 1:
5421                                 ereport(LOG,
5422                                                 (errmsg("invalid primary checkpoint record")));
5423                                 break;
5424                         case 2:
5425                                 ereport(LOG,
5426                                                 (errmsg("invalid secondary checkpoint record")));
5427                                 break;
5428                         default:
5429                                 ereport(LOG,
5430                                                 (errmsg("invalid checkpoint record")));
5431                                 break;
5432                 }
5433                 return NULL;
5434         }
5435         if (record->xl_rmid != RM_XLOG_ID)
5436         {
5437                 switch (whichChkpt)
5438                 {
5439                         case 1:
5440                                 ereport(LOG,
5441                                                 (errmsg("invalid resource manager ID in primary checkpoint record")));
5442                                 break;
5443                         case 2:
5444                                 ereport(LOG,
5445                                                 (errmsg("invalid resource manager ID in secondary checkpoint record")));
5446                                 break;
5447                         default:
5448                                 ereport(LOG,
5449                                 (errmsg("invalid resource manager ID in checkpoint record")));
5450                                 break;
5451                 }
5452                 return NULL;
5453         }
5454         if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
5455                 record->xl_info != XLOG_CHECKPOINT_ONLINE)
5456         {
5457                 switch (whichChkpt)
5458                 {
5459                         case 1:
5460                                 ereport(LOG,
5461                                    (errmsg("invalid xl_info in primary checkpoint record")));
5462                                 break;
5463                         case 2:
5464                                 ereport(LOG,
5465                                  (errmsg("invalid xl_info in secondary checkpoint record")));
5466                                 break;
5467                         default:
5468                                 ereport(LOG,
5469                                                 (errmsg("invalid xl_info in checkpoint record")));
5470                                 break;
5471                 }
5472                 return NULL;
5473         }
5474         if (record->xl_len != sizeof(CheckPoint) ||
5475                 record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
5476         {
5477                 switch (whichChkpt)
5478                 {
5479                         case 1:
5480                                 ereport(LOG,
5481                                         (errmsg("invalid length of primary checkpoint record")));
5482                                 break;
5483                         case 2:
5484                                 ereport(LOG,
5485                                   (errmsg("invalid length of secondary checkpoint record")));
5486                                 break;
5487                         default:
5488                                 ereport(LOG,
5489                                                 (errmsg("invalid length of checkpoint record")));
5490                                 break;
5491                 }
5492                 return NULL;
5493         }
5494         return record;
5495 }
5496
5497 /*
5498  * This must be called during startup of a backend process, except that
5499  * it need not be called in a standalone backend (which does StartupXLOG
5500  * instead).  We need to initialize the local copies of ThisTimeLineID and
5501  * RedoRecPtr.
5502  *
5503  * Note: before Postgres 8.0, we went to some effort to keep the postmaster
5504  * process's copies of ThisTimeLineID and RedoRecPtr valid too.  This was
5505  * unnecessary however, since the postmaster itself never touches XLOG anyway.
5506  */
5507 void
5508 InitXLOGAccess(void)
5509 {
5510         /* ThisTimeLineID doesn't change so we need no lock to copy it */
5511         ThisTimeLineID = XLogCtl->ThisTimeLineID;
5512         /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
5513         (void) GetRedoRecPtr();
5514 }
5515
5516 /*
5517  * Once spawned, a backend may update its local RedoRecPtr from
5518  * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
5519  * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
5520  */
5521 XLogRecPtr
5522 GetRedoRecPtr(void)
5523 {
5524         /* use volatile pointer to prevent code rearrangement */
5525         volatile XLogCtlData *xlogctl = XLogCtl;
5526
5527         SpinLockAcquire(&xlogctl->info_lck);
5528         Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
5529         RedoRecPtr = xlogctl->Insert.RedoRecPtr;
5530         SpinLockRelease(&xlogctl->info_lck);
5531
5532         return RedoRecPtr;
5533 }
5534
5535 /*
5536  * GetInsertRecPtr -- Returns the current insert position.
5537  *
5538  * NOTE: The value *actually* returned is the position of the last full
5539  * xlog page. It lags behind the real insert position by at most 1 page.
5540  * For that, we don't need to acquire WALInsertLock which can be quite
5541  * heavily contended, and an approximation is enough for the current
5542  * usage of this function.
5543  */
5544 XLogRecPtr
5545 GetInsertRecPtr(void)
5546 {
5547         /* use volatile pointer to prevent code rearrangement */
5548         volatile XLogCtlData *xlogctl = XLogCtl;
5549         XLogRecPtr      recptr;
5550
5551         SpinLockAcquire(&xlogctl->info_lck);
5552         recptr = xlogctl->LogwrtRqst.Write;
5553         SpinLockRelease(&xlogctl->info_lck);
5554
5555         return recptr;
5556 }
5557
5558 /*
5559  * Get the time of the last xlog segment switch
5560  */
5561 pg_time_t
5562 GetLastSegSwitchTime(void)
5563 {
5564         pg_time_t       result;
5565
5566         /* Need WALWriteLock, but shared lock is sufficient */
5567         LWLockAcquire(WALWriteLock, LW_SHARED);
5568         result = XLogCtl->Write.lastSegSwitchTime;
5569         LWLockRelease(WALWriteLock);
5570
5571         return result;
5572 }
5573
5574 /*
5575  * GetNextXidAndEpoch - get the current nextXid value and associated epoch
5576  *
5577  * This is exported for use by code that would like to have 64-bit XIDs.
5578  * We don't really support such things, but all XIDs within the system
5579  * can be presumed "close to" the result, and thus the epoch associated
5580  * with them can be determined.
5581  */
5582 void
5583 GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
5584 {
5585         uint32          ckptXidEpoch;
5586         TransactionId ckptXid;
5587         TransactionId nextXid;
5588
5589         /* Must read checkpoint info first, else have race condition */
5590         {
5591                 /* use volatile pointer to prevent code rearrangement */
5592                 volatile XLogCtlData *xlogctl = XLogCtl;
5593
5594                 SpinLockAcquire(&xlogctl->info_lck);
5595                 ckptXidEpoch = xlogctl->ckptXidEpoch;
5596                 ckptXid = xlogctl->ckptXid;
5597                 SpinLockRelease(&xlogctl->info_lck);
5598         }
5599
5600         /* Now fetch current nextXid */
5601         nextXid = ReadNewTransactionId();
5602
5603         /*
5604          * nextXid is certainly logically later than ckptXid.  So if it's
5605          * numerically less, it must have wrapped into the next epoch.
5606          */
5607         if (nextXid < ckptXid)
5608                 ckptXidEpoch++;
5609
5610         *xid = nextXid;
5611         *epoch = ckptXidEpoch;
5612 }
5613
5614 /*
5615  * This must be called ONCE during postmaster or standalone-backend shutdown
5616  */
5617 void
5618 ShutdownXLOG(int code, Datum arg)
5619 {
5620         ereport(LOG,
5621                         (errmsg("shutting down")));
5622
5623         CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
5624         ShutdownCLOG();
5625         ShutdownSUBTRANS();
5626         ShutdownMultiXact();
5627
5628         ereport(LOG,
5629                         (errmsg("database system is shut down")));
5630 }
5631
5632 /*
5633  * Log start of a checkpoint.
5634  */
5635 static void
5636 LogCheckpointStart(int flags)
5637 {
5638         elog(LOG, "checkpoint starting:%s%s%s%s%s%s",
5639                  (flags & CHECKPOINT_IS_SHUTDOWN) ? " shutdown" : "",
5640                  (flags & CHECKPOINT_IMMEDIATE) ? " immediate" : "",
5641                  (flags & CHECKPOINT_FORCE) ? " force" : "",
5642                  (flags & CHECKPOINT_WAIT) ? " wait" : "",
5643                  (flags & CHECKPOINT_CAUSE_XLOG) ? " xlog" : "",
5644                  (flags & CHECKPOINT_CAUSE_TIME) ? " time" : "");
5645 }
5646
5647 /*
5648  * Log end of a checkpoint.
5649  */
5650 static void
5651 LogCheckpointEnd(void)
5652 {
5653         long            write_secs,
5654                                 sync_secs,
5655                                 total_secs;
5656         int                     write_usecs,
5657                                 sync_usecs,
5658                                 total_usecs;
5659
5660         CheckpointStats.ckpt_end_t = GetCurrentTimestamp();
5661
5662         TimestampDifference(CheckpointStats.ckpt_start_t,
5663                                                 CheckpointStats.ckpt_end_t,
5664                                                 &total_secs, &total_usecs);
5665
5666         TimestampDifference(CheckpointStats.ckpt_write_t,
5667                                                 CheckpointStats.ckpt_sync_t,
5668                                                 &write_secs, &write_usecs);
5669
5670         TimestampDifference(CheckpointStats.ckpt_sync_t,
5671                                                 CheckpointStats.ckpt_sync_end_t,
5672                                                 &sync_secs, &sync_usecs);
5673
5674         elog(LOG, "checkpoint complete: wrote %d buffers (%.1f%%); "
5675                  "%d transaction log file(s) added, %d removed, %d recycled; "
5676                  "write=%ld.%03d s, sync=%ld.%03d s, total=%ld.%03d s",
5677                  CheckpointStats.ckpt_bufs_written,
5678                  (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers,
5679                  CheckpointStats.ckpt_segs_added,
5680                  CheckpointStats.ckpt_segs_removed,
5681                  CheckpointStats.ckpt_segs_recycled,
5682                  write_secs, write_usecs / 1000,
5683                  sync_secs, sync_usecs / 1000,
5684                  total_secs, total_usecs / 1000);
5685 }
5686
5687 /*
5688  * Perform a checkpoint --- either during shutdown, or on-the-fly
5689  *
5690  * flags is a bitwise OR of the following:
5691  *      CHECKPOINT_IS_SHUTDOWN: checkpoint is for database shutdown.
5692  *      CHECKPOINT_IMMEDIATE: finish the checkpoint ASAP,
5693  *              ignoring checkpoint_completion_target parameter.
5694  *      CHECKPOINT_FORCE: force a checkpoint even if no XLOG activity has occured
5695  *              since the last one (implied by CHECKPOINT_IS_SHUTDOWN).
5696  *
5697  * Note: flags contains other bits, of interest here only for logging purposes.
5698  * In particular note that this routine is synchronous and does not pay
5699  * attention to CHECKPOINT_WAIT.
5700  */
5701 void
5702 CreateCheckPoint(int flags)
5703 {
5704         bool            shutdown = (flags & CHECKPOINT_IS_SHUTDOWN) != 0;
5705         CheckPoint      checkPoint;
5706         XLogRecPtr      recptr;
5707         XLogCtlInsert *Insert = &XLogCtl->Insert;
5708         XLogRecData rdata;
5709         uint32          freespace;
5710         uint32          _logId;
5711         uint32          _logSeg;
5712         TransactionId *inCommitXids;
5713         int                     nInCommit;
5714
5715         /*
5716          * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
5717          * (This is just pro forma, since in the present system structure there is
5718          * only one process that is allowed to issue checkpoints at any given
5719          * time.)
5720          */
5721         LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
5722
5723         /*
5724          * Prepare to accumulate statistics.
5725          *
5726          * Note: because it is possible for log_checkpoints to change while a
5727          * checkpoint proceeds, we always accumulate stats, even if
5728          * log_checkpoints is currently off.
5729          */
5730         MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
5731         CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
5732
5733         /*
5734          * Use a critical section to force system panic if we have trouble.
5735          */
5736         START_CRIT_SECTION();
5737
5738         if (shutdown)
5739         {
5740                 ControlFile->state = DB_SHUTDOWNING;
5741                 ControlFile->time = (pg_time_t) time(NULL);
5742                 UpdateControlFile();
5743         }
5744
5745         /*
5746          * Let smgr prepare for checkpoint; this has to happen before we determine
5747          * the REDO pointer.  Note that smgr must not do anything that'd have to
5748          * be undone if we decide no checkpoint is needed.
5749          */
5750         smgrpreckpt();
5751
5752         /* Begin filling in the checkpoint WAL record */
5753         MemSet(&checkPoint, 0, sizeof(checkPoint));
5754         checkPoint.ThisTimeLineID = ThisTimeLineID;
5755         checkPoint.time = (pg_time_t) time(NULL);
5756
5757         /*
5758          * We must hold WALInsertLock while examining insert state to determine
5759          * the checkpoint REDO pointer.
5760          */
5761         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5762
5763         /*
5764          * If this isn't a shutdown or forced checkpoint, and we have not inserted
5765          * any XLOG records since the start of the last checkpoint, skip the
5766          * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
5767          * when the system is idle. That wastes log space, and more importantly it
5768          * exposes us to possible loss of both current and previous checkpoint
5769          * records if the machine crashes just as we're writing the update.
5770          * (Perhaps it'd make even more sense to checkpoint only when the previous
5771          * checkpoint record is in a different xlog page?)
5772          *
5773          * We have to make two tests to determine that nothing has happened since
5774          * the start of the last checkpoint: current insertion point must match
5775          * the end of the last checkpoint record, and its redo pointer must point
5776          * to itself.
5777          */
5778         if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_FORCE)) == 0)
5779         {
5780                 XLogRecPtr      curInsert;
5781
5782                 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
5783                 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
5784                         curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
5785                         MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
5786                         ControlFile->checkPoint.xlogid ==
5787                         ControlFile->checkPointCopy.redo.xlogid &&
5788                         ControlFile->checkPoint.xrecoff ==
5789                         ControlFile->checkPointCopy.redo.xrecoff)
5790                 {
5791                         LWLockRelease(WALInsertLock);
5792                         LWLockRelease(CheckpointLock);
5793                         END_CRIT_SECTION();
5794                         return;
5795                 }
5796         }
5797
5798         /*
5799          * Compute new REDO record ptr = location of next XLOG record.
5800          *
5801          * NB: this is NOT necessarily where the checkpoint record itself will be,
5802          * since other backends may insert more XLOG records while we're off doing
5803          * the buffer flush work.  Those XLOG records are logically after the
5804          * checkpoint, even though physically before it.  Got that?
5805          */
5806         freespace = INSERT_FREESPACE(Insert);
5807         if (freespace < SizeOfXLogRecord)
5808         {
5809                 (void) AdvanceXLInsertBuffer(false);
5810                 /* OK to ignore update return flag, since we will do flush anyway */
5811                 freespace = INSERT_FREESPACE(Insert);
5812         }
5813         INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
5814
5815         /*
5816          * Here we update the shared RedoRecPtr for future XLogInsert calls; this
5817          * must be done while holding the insert lock AND the info_lck.
5818          *
5819          * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
5820          * pointing past where it really needs to point.  This is okay; the only
5821          * consequence is that XLogInsert might back up whole buffers that it
5822          * didn't really need to.  We can't postpone advancing RedoRecPtr because
5823          * XLogInserts that happen while we are dumping buffers must assume that
5824          * their buffer changes are not included in the checkpoint.
5825          */
5826         {
5827                 /* use volatile pointer to prevent code rearrangement */
5828                 volatile XLogCtlData *xlogctl = XLogCtl;
5829
5830                 SpinLockAcquire(&xlogctl->info_lck);
5831                 RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
5832                 SpinLockRelease(&xlogctl->info_lck);
5833         }
5834
5835         /*
5836          * Now we can release WAL insert lock, allowing other xacts to proceed
5837          * while we are flushing disk buffers.
5838          */
5839         LWLockRelease(WALInsertLock);
5840
5841         /*
5842          * If enabled, log checkpoint start.  We postpone this until now so as not
5843          * to log anything if we decided to skip the checkpoint.
5844          */
5845         if (log_checkpoints)
5846                 LogCheckpointStart(flags);
5847
5848         /*
5849          * Before flushing data, we must wait for any transactions that are
5850          * currently in their commit critical sections.  If an xact inserted its
5851          * commit record into XLOG just before the REDO point, then a crash
5852          * restart from the REDO point would not replay that record, which means
5853          * that our flushing had better include the xact's update of pg_clog.  So
5854          * we wait till he's out of his commit critical section before proceeding.
5855          * See notes in RecordTransactionCommit().
5856          *
5857          * Because we've already released WALInsertLock, this test is a bit fuzzy:
5858          * it is possible that we will wait for xacts we didn't really need to
5859          * wait for.  But the delay should be short and it seems better to make
5860          * checkpoint take a bit longer than to hold locks longer than necessary.
5861          * (In fact, the whole reason we have this issue is that xact.c does
5862          * commit record XLOG insertion and clog update as two separate steps
5863          * protected by different locks, but again that seems best on grounds of
5864          * minimizing lock contention.)
5865          *
5866          * A transaction that has not yet set inCommit when we look cannot be at
5867          * risk, since he's not inserted his commit record yet; and one that's
5868          * already cleared it is not at risk either, since he's done fixing clog
5869          * and we will correctly flush the update below.  So we cannot miss any
5870          * xacts we need to wait for.
5871          */
5872         nInCommit = GetTransactionsInCommit(&inCommitXids);
5873         if (nInCommit > 0)
5874         {
5875                 do
5876                 {
5877                         pg_usleep(10000L);      /* wait for 10 msec */
5878                 } while (HaveTransactionsInCommit(inCommitXids, nInCommit));
5879         }
5880         pfree(inCommitXids);
5881
5882         /*
5883          * Get the other info we need for the checkpoint record.
5884          */
5885         LWLockAcquire(XidGenLock, LW_SHARED);
5886         checkPoint.nextXid = ShmemVariableCache->nextXid;
5887         LWLockRelease(XidGenLock);
5888
5889         /* Increase XID epoch if we've wrapped around since last checkpoint */
5890         checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
5891         if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
5892                 checkPoint.nextXidEpoch++;
5893
5894         LWLockAcquire(OidGenLock, LW_SHARED);
5895         checkPoint.nextOid = ShmemVariableCache->nextOid;
5896         if (!shutdown)
5897                 checkPoint.nextOid += ShmemVariableCache->oidCount;
5898         LWLockRelease(OidGenLock);
5899
5900         MultiXactGetCheckptMulti(shutdown,
5901                                                          &checkPoint.nextMulti,
5902                                                          &checkPoint.nextMultiOffset);
5903
5904         /*
5905          * Having constructed the checkpoint record, ensure all shmem disk buffers
5906          * and commit-log buffers are flushed to disk.
5907          *
5908          * This I/O could fail for various reasons.  If so, we will fail to
5909          * complete the checkpoint, but there is no reason to force a system
5910          * panic. Accordingly, exit critical section while doing it.
5911          */
5912         END_CRIT_SECTION();
5913
5914         CheckPointGuts(checkPoint.redo, flags);
5915
5916         START_CRIT_SECTION();
5917
5918         /*
5919          * Now insert the checkpoint record into XLOG.
5920          */
5921         rdata.data = (char *) (&checkPoint);
5922         rdata.len = sizeof(checkPoint);
5923         rdata.buffer = InvalidBuffer;
5924         rdata.next = NULL;
5925
5926         recptr = XLogInsert(RM_XLOG_ID,
5927                                                 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
5928                                                 XLOG_CHECKPOINT_ONLINE,
5929                                                 &rdata);
5930
5931         XLogFlush(recptr);
5932
5933         /*
5934          * We now have ProcLastRecPtr = start of actual checkpoint record, recptr
5935          * = end of actual checkpoint record.
5936          */
5937         if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
5938                 ereport(PANIC,
5939                                 (errmsg("concurrent transaction log activity while database system is shutting down")));
5940
5941         /*
5942          * Select point at which we can truncate the log, which we base on the
5943          * prior checkpoint's earliest info.
5944          */
5945         XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
5946
5947         /*
5948          * Update the control file.
5949          */
5950         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5951         if (shutdown)
5952                 ControlFile->state = DB_SHUTDOWNED;
5953         ControlFile->prevCheckPoint = ControlFile->checkPoint;
5954         ControlFile->checkPoint = ProcLastRecPtr;
5955         ControlFile->checkPointCopy = checkPoint;
5956         ControlFile->time = (pg_time_t) time(NULL);
5957         UpdateControlFile();
5958         LWLockRelease(ControlFileLock);
5959
5960         /* Update shared-memory copy of checkpoint XID/epoch */
5961         {
5962                 /* use volatile pointer to prevent code rearrangement */
5963                 volatile XLogCtlData *xlogctl = XLogCtl;
5964
5965                 SpinLockAcquire(&xlogctl->info_lck);
5966                 xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
5967                 xlogctl->ckptXid = checkPoint.nextXid;
5968                 SpinLockRelease(&xlogctl->info_lck);
5969         }
5970
5971         /*
5972          * We are now done with critical updates; no need for system panic if we
5973          * have trouble while fooling with old log segments.
5974          */
5975         END_CRIT_SECTION();
5976
5977         /*
5978          * Let smgr do post-checkpoint cleanup (eg, deleting old files).
5979          */
5980         smgrpostckpt();
5981
5982         /*
5983          * Delete old log files (those no longer needed even for previous
5984          * checkpoint).
5985          */
5986         if (_logId || _logSeg)
5987         {
5988                 PrevLogSeg(_logId, _logSeg);
5989                 RemoveOldXlogFiles(_logId, _logSeg, recptr);
5990         }
5991
5992         /*
5993          * Make more log segments if needed.  (Do this after recycling old log
5994          * segments, since that may supply some of the needed files.)
5995          */
5996         if (!shutdown)
5997                 PreallocXlogFiles(recptr);
5998
5999         /*
6000          * Truncate pg_subtrans if possible.  We can throw away all data before
6001          * the oldest XMIN of any running transaction.  No future transaction will
6002          * attempt to reference any pg_subtrans entry older than that (see Asserts
6003          * in subtrans.c).      During recovery, though, we mustn't do this because
6004          * StartupSUBTRANS hasn't been called yet.
6005          */
6006         if (!InRecovery)
6007                 TruncateSUBTRANS(GetOldestXmin(true, false));
6008
6009         /* All real work is done, but log before releasing lock. */
6010         if (log_checkpoints)
6011                 LogCheckpointEnd();
6012
6013         LWLockRelease(CheckpointLock);
6014 }
6015
6016 /*
6017  * Flush all data in shared memory to disk, and fsync
6018  *
6019  * This is the common code shared between regular checkpoints and
6020  * recovery restartpoints.
6021  */
6022 static void
6023 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
6024 {
6025         CheckPointCLOG();
6026         CheckPointSUBTRANS();
6027         CheckPointMultiXact();
6028         CheckPointBuffers(flags);       /* performs all required fsyncs */
6029         /* We deliberately delay 2PC checkpointing as long as possible */
6030         CheckPointTwoPhase(checkPointRedo);
6031 }
6032
6033 /*
6034  * Set a recovery restart point if appropriate
6035  *
6036  * This is similar to CreateCheckPoint, but is used during WAL recovery
6037  * to establish a point from which recovery can roll forward without
6038  * replaying the entire recovery log.  This function is called each time
6039  * a checkpoint record is read from XLOG; it must determine whether a
6040  * restartpoint is needed or not.
6041  */
6042 static void
6043 RecoveryRestartPoint(const CheckPoint *checkPoint)
6044 {
6045         int                     elapsed_secs;
6046         int                     rmid;
6047
6048         /*
6049          * Do nothing if the elapsed time since the last restartpoint is less than
6050          * half of checkpoint_timeout.  (We use a value less than
6051          * checkpoint_timeout so that variations in the timing of checkpoints on
6052          * the master, or speed of transmission of WAL segments to a slave, won't
6053          * make the slave skip a restartpoint once it's synced with the master.)
6054          * Checking true elapsed time keeps us from doing restartpoints too often
6055          * while rapidly scanning large amounts of WAL.
6056          */
6057         elapsed_secs = (pg_time_t) time(NULL) - ControlFile->time;
6058         if (elapsed_secs < CheckPointTimeout / 2)
6059                 return;
6060
6061         /*
6062          * Is it safe to checkpoint?  We must ask each of the resource managers
6063          * whether they have any partial state information that might prevent a
6064          * correct restart from this point.  If so, we skip this opportunity, but
6065          * return at the next checkpoint record for another try.
6066          */
6067         for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
6068         {
6069                 if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
6070                         if (!(RmgrTable[rmid].rm_safe_restartpoint()))
6071                         {
6072                                 elog(DEBUG2, "RM %d not safe to record restart point at %X/%X",
6073                                          rmid,
6074                                          checkPoint->redo.xlogid,
6075                                          checkPoint->redo.xrecoff);
6076                                 return;
6077                         }
6078         }
6079
6080         /*
6081          * OK, force data out to disk
6082          */
6083         CheckPointGuts(checkPoint->redo, CHECKPOINT_IMMEDIATE);
6084
6085         /*
6086          * Update pg_control so that any subsequent crash will restart from this
6087          * checkpoint.  Note: ReadRecPtr gives the XLOG address of the checkpoint
6088          * record itself.
6089          */
6090         ControlFile->prevCheckPoint = ControlFile->checkPoint;
6091         ControlFile->checkPoint = ReadRecPtr;
6092         ControlFile->checkPointCopy = *checkPoint;
6093         ControlFile->time = (pg_time_t) time(NULL);
6094         UpdateControlFile();
6095
6096         ereport((recoveryLogRestartpoints ? LOG : DEBUG2),
6097                         (errmsg("recovery restart point at %X/%X",
6098                                         checkPoint->redo.xlogid, checkPoint->redo.xrecoff)));
6099         if (recoveryLastXTime)
6100                 ereport((recoveryLogRestartpoints ? LOG : DEBUG2),
6101                                 (errmsg("last completed transaction was at log time %s",
6102                                                 timestamptz_to_str(recoveryLastXTime))));
6103 }
6104
6105 /*
6106  * Write a NEXTOID log record
6107  */
6108 void
6109 XLogPutNextOid(Oid nextOid)
6110 {
6111         XLogRecData rdata;
6112
6113         rdata.data = (char *) (&nextOid);
6114         rdata.len = sizeof(Oid);
6115         rdata.buffer = InvalidBuffer;
6116         rdata.next = NULL;
6117         (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
6118
6119         /*
6120          * We need not flush the NEXTOID record immediately, because any of the
6121          * just-allocated OIDs could only reach disk as part of a tuple insert or
6122          * update that would have its own XLOG record that must follow the NEXTOID
6123          * record.      Therefore, the standard buffer LSN interlock applied to those
6124          * records will ensure no such OID reaches disk before the NEXTOID record
6125          * does.
6126          *
6127          * Note, however, that the above statement only covers state "within" the
6128          * database.  When we use a generated OID as a file or directory name, we
6129          * are in a sense violating the basic WAL rule, because that filesystem
6130          * change may reach disk before the NEXTOID WAL record does.  The impact
6131          * of this is that if a database crash occurs immediately afterward, we
6132          * might after restart re-generate the same OID and find that it conflicts
6133          * with the leftover file or directory.  But since for safety's sake we
6134          * always loop until finding a nonconflicting filename, this poses no real
6135          * problem in practice. See pgsql-hackers discussion 27-Sep-2006.
6136          */
6137 }
6138
6139 /*
6140  * Write an XLOG SWITCH record.
6141  *
6142  * Here we just blindly issue an XLogInsert request for the record.
6143  * All the magic happens inside XLogInsert.
6144  *
6145  * The return value is either the end+1 address of the switch record,
6146  * or the end+1 address of the prior segment if we did not need to
6147  * write a switch record because we are already at segment start.
6148  */
6149 XLogRecPtr
6150 RequestXLogSwitch(void)
6151 {
6152         XLogRecPtr      RecPtr;
6153         XLogRecData rdata;
6154
6155         /* XLOG SWITCH, alone among xlog record types, has no data */
6156         rdata.buffer = InvalidBuffer;
6157         rdata.data = NULL;
6158         rdata.len = 0;
6159         rdata.next = NULL;
6160
6161         RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
6162
6163         return RecPtr;
6164 }
6165
6166 /*
6167  * XLOG resource manager's routines
6168  */
6169 void
6170 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
6171 {
6172         uint8           info = record->xl_info & ~XLR_INFO_MASK;
6173
6174         if (info == XLOG_NEXTOID)
6175         {
6176                 Oid                     nextOid;
6177
6178                 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
6179                 if (ShmemVariableCache->nextOid < nextOid)
6180                 {
6181                         ShmemVariableCache->nextOid = nextOid;
6182                         ShmemVariableCache->oidCount = 0;
6183                 }
6184         }
6185         else if (info == XLOG_CHECKPOINT_SHUTDOWN)
6186         {
6187                 CheckPoint      checkPoint;
6188
6189                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
6190                 /* In a SHUTDOWN checkpoint, believe the counters exactly */
6191                 ShmemVariableCache->nextXid = checkPoint.nextXid;
6192                 ShmemVariableCache->nextOid = checkPoint.nextOid;
6193                 ShmemVariableCache->oidCount = 0;
6194                 MultiXactSetNextMXact(checkPoint.nextMulti,
6195                                                           checkPoint.nextMultiOffset);
6196
6197                 /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
6198                 ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
6199                 ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
6200
6201                 /*
6202                  * TLI may change in a shutdown checkpoint, but it shouldn't decrease
6203                  */
6204                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
6205                 {
6206                         if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
6207                                 !list_member_int(expectedTLIs,
6208                                                                  (int) checkPoint.ThisTimeLineID))
6209                                 ereport(PANIC,
6210                                                 (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
6211                                                                 checkPoint.ThisTimeLineID, ThisTimeLineID)));
6212                         /* Following WAL records should be run with new TLI */
6213                         ThisTimeLineID = checkPoint.ThisTimeLineID;
6214                 }
6215
6216                 RecoveryRestartPoint(&checkPoint);
6217         }
6218         else if (info == XLOG_CHECKPOINT_ONLINE)
6219         {
6220                 CheckPoint      checkPoint;
6221
6222                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
6223                 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
6224                 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
6225                                                                   checkPoint.nextXid))
6226                         ShmemVariableCache->nextXid = checkPoint.nextXid;
6227                 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
6228                 {
6229                         ShmemVariableCache->nextOid = checkPoint.nextOid;
6230                         ShmemVariableCache->oidCount = 0;
6231                 }
6232                 MultiXactAdvanceNextMXact(checkPoint.nextMulti,
6233                                                                   checkPoint.nextMultiOffset);
6234
6235                 /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
6236                 ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
6237                 ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
6238
6239                 /* TLI should not change in an on-line checkpoint */
6240                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
6241                         ereport(PANIC,
6242                                         (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
6243                                                         checkPoint.ThisTimeLineID, ThisTimeLineID)));
6244
6245                 RecoveryRestartPoint(&checkPoint);
6246         }
6247         else if (info == XLOG_NOOP)
6248         {
6249                 /* nothing to do here */
6250         }
6251         else if (info == XLOG_SWITCH)
6252         {
6253                 /* nothing to do here */
6254         }
6255 }
6256
6257 void
6258 xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
6259 {
6260         uint8           info = xl_info & ~XLR_INFO_MASK;
6261
6262         if (info == XLOG_CHECKPOINT_SHUTDOWN ||
6263                 info == XLOG_CHECKPOINT_ONLINE)
6264         {
6265                 CheckPoint *checkpoint = (CheckPoint *) rec;
6266
6267                 appendStringInfo(buf, "checkpoint: redo %X/%X; "
6268                                                  "tli %u; xid %u/%u; oid %u; multi %u; offset %u; %s",
6269                                                  checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
6270                                                  checkpoint->ThisTimeLineID,
6271                                                  checkpoint->nextXidEpoch, checkpoint->nextXid,
6272                                                  checkpoint->nextOid,
6273                                                  checkpoint->nextMulti,
6274                                                  checkpoint->nextMultiOffset,
6275                                  (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
6276         }
6277         else if (info == XLOG_NOOP)
6278         {
6279                 appendStringInfo(buf, "xlog no-op");
6280         }
6281         else if (info == XLOG_NEXTOID)
6282         {
6283                 Oid                     nextOid;
6284
6285                 memcpy(&nextOid, rec, sizeof(Oid));
6286                 appendStringInfo(buf, "nextOid: %u", nextOid);
6287         }
6288         else if (info == XLOG_SWITCH)
6289         {
6290                 appendStringInfo(buf, "xlog switch");
6291         }
6292         else
6293                 appendStringInfo(buf, "UNKNOWN");
6294 }
6295
6296 #ifdef WAL_DEBUG
6297
6298 static void
6299 xlog_outrec(StringInfo buf, XLogRecord *record)
6300 {
6301         int                     i;
6302
6303         appendStringInfo(buf, "prev %X/%X; xid %u",
6304                                          record->xl_prev.xlogid, record->xl_prev.xrecoff,
6305                                          record->xl_xid);
6306
6307         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
6308         {
6309                 if (record->xl_info & XLR_SET_BKP_BLOCK(i))
6310                         appendStringInfo(buf, "; bkpb%d", i + 1);
6311         }
6312
6313         appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
6314 }
6315 #endif   /* WAL_DEBUG */
6316
6317
6318 /*
6319  * Return the (possible) sync flag used for opening a file, depending on the
6320  * value of the GUC wal_sync_method.
6321  */
6322 static int
6323 get_sync_bit(int method)
6324 {
6325         /* If fsync is disabled, never open in sync mode */
6326         if (!enableFsync)
6327                 return 0;
6328
6329         switch (method)
6330         {
6331                 /*
6332                  * enum values for all sync options are defined even if they are not
6333                  * supported on the current platform.  But if not, they are not
6334                  * included in the enum option array, and therefore will never be seen
6335                  * here.
6336                  */
6337                 case SYNC_METHOD_FSYNC:
6338                 case SYNC_METHOD_FSYNC_WRITETHROUGH:
6339                 case SYNC_METHOD_FDATASYNC:
6340                         return 0;
6341 #ifdef OPEN_SYNC_FLAG
6342                 case SYNC_METHOD_OPEN:
6343                         return OPEN_SYNC_FLAG;
6344 #endif
6345 #ifdef OPEN_DATASYNC_FLAG
6346                 case SYNC_METHOD_OPEN_DSYNC:
6347                         return OPEN_DATASYNC_FLAG;
6348 #endif
6349                 default:
6350                         /* can't happen (unless we are out of sync with option array) */
6351                         elog(ERROR, "unrecognized wal_sync_method: %d", method);
6352                         return 0; /* silence warning */
6353         }
6354 }
6355
6356 /*
6357  * GUC support
6358  */
6359 bool
6360 assign_xlog_sync_method(int new_sync_method, bool doit, GucSource source)
6361 {
6362         if (!doit)
6363                 return true;
6364
6365         if (sync_method != new_sync_method)
6366         {
6367                 /*
6368                  * To ensure that no blocks escape unsynced, force an fsync on the
6369                  * currently open log segment (if any).  Also, if the open flag is
6370                  * changing, close the log file so it will be reopened (with new flag
6371                  * bit) at next use.
6372                  */
6373                 if (openLogFile >= 0)
6374                 {
6375                         if (pg_fsync(openLogFile) != 0)
6376                                 ereport(PANIC,
6377                                                 (errcode_for_file_access(),
6378                                                  errmsg("could not fsync log file %u, segment %u: %m",
6379                                                                 openLogId, openLogSeg)));
6380                         if (get_sync_bit(sync_method) != get_sync_bit(new_sync_method))
6381                                 XLogFileClose();
6382                 }
6383         }
6384
6385         return true;
6386 }
6387
6388
6389 /*
6390  * Issue appropriate kind of fsync (if any) on the current XLOG output file
6391  */
6392 static void
6393 issue_xlog_fsync(void)
6394 {
6395         switch (sync_method)
6396         {
6397                 case SYNC_METHOD_FSYNC:
6398                         if (pg_fsync_no_writethrough(openLogFile) != 0)
6399                                 ereport(PANIC,
6400                                                 (errcode_for_file_access(),
6401                                                  errmsg("could not fsync log file %u, segment %u: %m",
6402                                                                 openLogId, openLogSeg)));
6403                         break;
6404 #ifdef HAVE_FSYNC_WRITETHROUGH
6405                 case SYNC_METHOD_FSYNC_WRITETHROUGH:
6406                         if (pg_fsync_writethrough(openLogFile) != 0)
6407                                 ereport(PANIC,
6408                                                 (errcode_for_file_access(),
6409                                                  errmsg("could not fsync write-through log file %u, segment %u: %m",
6410                                                                 openLogId, openLogSeg)));
6411                         break;
6412 #endif
6413 #ifdef HAVE_FDATASYNC
6414                 case SYNC_METHOD_FDATASYNC:
6415                         if (pg_fdatasync(openLogFile) != 0)
6416                                 ereport(PANIC,
6417                                                 (errcode_for_file_access(),
6418                                         errmsg("could not fdatasync log file %u, segment %u: %m",
6419                                                    openLogId, openLogSeg)));
6420                         break;
6421 #endif
6422                 case SYNC_METHOD_OPEN:
6423                 case SYNC_METHOD_OPEN_DSYNC:
6424                         /* write synced it already */
6425                         break;
6426                 default:
6427                         elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
6428                         break;
6429         }
6430 }
6431
6432
6433 /*
6434  * pg_start_backup: set up for taking an on-line backup dump
6435  *
6436  * Essentially what this does is to create a backup label file in $PGDATA,
6437  * where it will be archived as part of the backup dump.  The label file
6438  * contains the user-supplied label string (typically this would be used
6439  * to tell where the backup dump will be stored) and the starting time and
6440  * starting WAL location for the dump.
6441  */
6442 Datum
6443 pg_start_backup(PG_FUNCTION_ARGS)
6444 {
6445         text       *backupid = PG_GETARG_TEXT_P(0);
6446         char       *backupidstr;
6447         XLogRecPtr      checkpointloc;
6448         XLogRecPtr      startpoint;
6449         pg_time_t       stamp_time;
6450         char            strfbuf[128];
6451         char            xlogfilename[MAXFNAMELEN];
6452         uint32          _logId;
6453         uint32          _logSeg;
6454         struct stat stat_buf;
6455         FILE       *fp;
6456
6457         if (!superuser())
6458                 ereport(ERROR,
6459                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6460                                  errmsg("must be superuser to run a backup")));
6461
6462         if (!XLogArchivingActive())
6463                 ereport(ERROR,
6464                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6465                                  errmsg("WAL archiving is not active"),
6466                                  errhint("archive_mode must be enabled at server start.")));
6467
6468         if (!XLogArchiveCommandSet())
6469                 ereport(ERROR,
6470                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6471                                  errmsg("WAL archiving is not active"),
6472                                  errhint("archive_command must be defined before "
6473                                                  "online backups can be made safely.")));
6474
6475         backupidstr = text_to_cstring(backupid);
6476
6477         /*
6478          * Mark backup active in shared memory.  We must do full-page WAL writes
6479          * during an on-line backup even if not doing so at other times, because
6480          * it's quite possible for the backup dump to obtain a "torn" (partially
6481          * written) copy of a database page if it reads the page concurrently with
6482          * our write to the same page.  This can be fixed as long as the first
6483          * write to the page in the WAL sequence is a full-page write. Hence, we
6484          * turn on forcePageWrites and then force a CHECKPOINT, to ensure there
6485          * are no dirty pages in shared memory that might get dumped while the
6486          * backup is in progress without having a corresponding WAL record.  (Once
6487          * the backup is complete, we need not force full-page writes anymore,
6488          * since we expect that any pages not modified during the backup interval
6489          * must have been correctly captured by the backup.)
6490          *
6491          * We must hold WALInsertLock to change the value of forcePageWrites, to
6492          * ensure adequate interlocking against XLogInsert().
6493          */
6494         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6495         if (XLogCtl->Insert.forcePageWrites)
6496         {
6497                 LWLockRelease(WALInsertLock);
6498                 ereport(ERROR,
6499                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6500                                  errmsg("a backup is already in progress"),
6501                                  errhint("Run pg_stop_backup() and try again.")));
6502         }
6503         XLogCtl->Insert.forcePageWrites = true;
6504         LWLockRelease(WALInsertLock);
6505
6506         /* Ensure we release forcePageWrites if fail below */
6507         PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) 0);
6508         {
6509                 /*
6510                  * Force a CHECKPOINT.  Aside from being necessary to prevent torn
6511                  * page problems, this guarantees that two successive backup runs will
6512                  * have different checkpoint positions and hence different history
6513                  * file names, even if nothing happened in between.
6514                  *
6515                  * We don't use CHECKPOINT_IMMEDIATE, hence this can take awhile.
6516                  */
6517                 RequestCheckpoint(CHECKPOINT_FORCE | CHECKPOINT_WAIT);
6518
6519                 /*
6520                  * Now we need to fetch the checkpoint record location, and also its
6521                  * REDO pointer.  The oldest point in WAL that would be needed to
6522                  * restore starting from the checkpoint is precisely the REDO pointer.
6523                  */
6524                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
6525                 checkpointloc = ControlFile->checkPoint;
6526                 startpoint = ControlFile->checkPointCopy.redo;
6527                 LWLockRelease(ControlFileLock);
6528
6529                 XLByteToSeg(startpoint, _logId, _logSeg);
6530                 XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
6531
6532                 /* Use the log timezone here, not the session timezone */
6533                 stamp_time = (pg_time_t) time(NULL);
6534                 pg_strftime(strfbuf, sizeof(strfbuf),
6535                                         "%Y-%m-%d %H:%M:%S %Z",
6536                                         pg_localtime(&stamp_time, log_timezone));
6537
6538                 /*
6539                  * Check for existing backup label --- implies a backup is already
6540                  * running.  (XXX given that we checked forcePageWrites above, maybe
6541                  * it would be OK to just unlink any such label file?)
6542                  */
6543                 if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0)
6544                 {
6545                         if (errno != ENOENT)
6546                                 ereport(ERROR,
6547                                                 (errcode_for_file_access(),
6548                                                  errmsg("could not stat file \"%s\": %m",
6549                                                                 BACKUP_LABEL_FILE)));
6550                 }
6551                 else
6552                         ereport(ERROR,
6553                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6554                                          errmsg("a backup is already in progress"),
6555                                          errhint("If you're sure there is no backup in progress, remove file \"%s\" and try again.",
6556                                                          BACKUP_LABEL_FILE)));
6557
6558                 /*
6559                  * Okay, write the file
6560                  */
6561                 fp = AllocateFile(BACKUP_LABEL_FILE, "w");
6562                 if (!fp)
6563                         ereport(ERROR,
6564                                         (errcode_for_file_access(),
6565                                          errmsg("could not create file \"%s\": %m",
6566                                                         BACKUP_LABEL_FILE)));
6567                 fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
6568                                 startpoint.xlogid, startpoint.xrecoff, xlogfilename);
6569                 fprintf(fp, "CHECKPOINT LOCATION: %X/%X\n",
6570                                 checkpointloc.xlogid, checkpointloc.xrecoff);
6571                 fprintf(fp, "START TIME: %s\n", strfbuf);
6572                 fprintf(fp, "LABEL: %s\n", backupidstr);
6573                 if (fflush(fp) || ferror(fp) || FreeFile(fp))
6574                         ereport(ERROR,
6575                                         (errcode_for_file_access(),
6576                                          errmsg("could not write file \"%s\": %m",
6577                                                         BACKUP_LABEL_FILE)));
6578         }
6579         PG_END_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) 0);
6580
6581         /*
6582          * We're done.  As a convenience, return the starting WAL location.
6583          */
6584         snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
6585                          startpoint.xlogid, startpoint.xrecoff);
6586         PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
6587 }
6588
6589 /* Error cleanup callback for pg_start_backup */
6590 static void
6591 pg_start_backup_callback(int code, Datum arg)
6592 {
6593         /* Turn off forcePageWrites on failure */
6594         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6595         XLogCtl->Insert.forcePageWrites = false;
6596         LWLockRelease(WALInsertLock);
6597 }
6598
6599 /*
6600  * pg_stop_backup: finish taking an on-line backup dump
6601  *
6602  * We remove the backup label file created by pg_start_backup, and instead
6603  * create a backup history file in pg_xlog (whence it will immediately be
6604  * archived).  The backup history file contains the same info found in
6605  * the label file, plus the backup-end time and WAL location.
6606  * Note: different from CancelBackup which just cancels online backup mode.
6607  */
6608 Datum
6609 pg_stop_backup(PG_FUNCTION_ARGS)
6610 {
6611         XLogRecPtr      startpoint;
6612         XLogRecPtr      stoppoint;
6613         pg_time_t       stamp_time;
6614         char            strfbuf[128];
6615         char            histfilepath[MAXPGPATH];
6616         char            startxlogfilename[MAXFNAMELEN];
6617         char            stopxlogfilename[MAXFNAMELEN];
6618         uint32          _logId;
6619         uint32          _logSeg;
6620         FILE       *lfp;
6621         FILE       *fp;
6622         char            ch;
6623         int                     ich;
6624         int                     seconds_before_warning;
6625         int                     waits = 0;
6626
6627         if (!superuser())
6628                 ereport(ERROR,
6629                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6630                                  (errmsg("must be superuser to run a backup"))));
6631
6632         /*
6633          * OK to clear forcePageWrites
6634          */
6635         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6636         XLogCtl->Insert.forcePageWrites = false;
6637         LWLockRelease(WALInsertLock);
6638
6639         /*
6640          * Force a switch to a new xlog segment file, so that the backup is valid
6641          * as soon as archiver moves out the current segment file. We'll report
6642          * the end address of the XLOG SWITCH record as the backup stopping point.
6643          */
6644         stoppoint = RequestXLogSwitch();
6645
6646         XLByteToSeg(stoppoint, _logId, _logSeg);
6647         XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
6648
6649         /* Use the log timezone here, not the session timezone */
6650         stamp_time = (pg_time_t) time(NULL);
6651         pg_strftime(strfbuf, sizeof(strfbuf),
6652                                 "%Y-%m-%d %H:%M:%S %Z",
6653                                 pg_localtime(&stamp_time, log_timezone));
6654
6655         /*
6656          * Open the existing label file
6657          */
6658         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6659         if (!lfp)
6660         {
6661                 if (errno != ENOENT)
6662                         ereport(ERROR,
6663                                         (errcode_for_file_access(),
6664                                          errmsg("could not read file \"%s\": %m",
6665                                                         BACKUP_LABEL_FILE)));
6666                 ereport(ERROR,
6667                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6668                                  errmsg("a backup is not in progress")));
6669         }
6670
6671         /*
6672          * Read and parse the START WAL LOCATION line (this code is pretty crude,
6673          * but we are not expecting any variability in the file format).
6674          */
6675         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %24s)%c",
6676                            &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
6677                            &ch) != 4 || ch != '\n')
6678                 ereport(ERROR,
6679                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6680                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6681
6682         /*
6683          * Write the backup history file
6684          */
6685         XLByteToSeg(startpoint, _logId, _logSeg);
6686         BackupHistoryFilePath(histfilepath, ThisTimeLineID, _logId, _logSeg,
6687                                                   startpoint.xrecoff % XLogSegSize);
6688         fp = AllocateFile(histfilepath, "w");
6689         if (!fp)
6690                 ereport(ERROR,
6691                                 (errcode_for_file_access(),
6692                                  errmsg("could not create file \"%s\": %m",
6693                                                 histfilepath)));
6694         fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
6695                         startpoint.xlogid, startpoint.xrecoff, startxlogfilename);
6696         fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
6697                         stoppoint.xlogid, stoppoint.xrecoff, stopxlogfilename);
6698         /* transfer remaining lines from label to history file */
6699         while ((ich = fgetc(lfp)) != EOF)
6700                 fputc(ich, fp);
6701         fprintf(fp, "STOP TIME: %s\n", strfbuf);
6702         if (fflush(fp) || ferror(fp) || FreeFile(fp))
6703                 ereport(ERROR,
6704                                 (errcode_for_file_access(),
6705                                  errmsg("could not write file \"%s\": %m",
6706                                                 histfilepath)));
6707
6708         /*
6709          * Close and remove the backup label file
6710          */
6711         if (ferror(lfp) || FreeFile(lfp))
6712                 ereport(ERROR,
6713                                 (errcode_for_file_access(),
6714                                  errmsg("could not read file \"%s\": %m",
6715                                                 BACKUP_LABEL_FILE)));
6716         if (unlink(BACKUP_LABEL_FILE) != 0)
6717                 ereport(ERROR,
6718                                 (errcode_for_file_access(),
6719                                  errmsg("could not remove file \"%s\": %m",
6720                                                 BACKUP_LABEL_FILE)));
6721
6722         /*
6723          * Clean out any no-longer-needed history files.  As a side effect, this
6724          * will post a .ready file for the newly created history file, notifying
6725          * the archiver that history file may be archived immediately.
6726          */
6727         CleanupBackupHistory();
6728
6729         /*
6730          * Wait until the history file has been archived. We assume that the 
6731          * alphabetic sorting property of the WAL files ensures the last WAL
6732          * file is guaranteed archived by the time the history file is archived.
6733          *
6734          * We wait forever, since archive_command is supposed to work and
6735          * we assume the admin wanted his backup to work completely. If you 
6736          * don't wish to wait, you can SET statement_timeout = xx;
6737          *
6738          * If the status file is missing, we assume that is because it was
6739          * set to .ready before we slept, then while asleep it has been set
6740          * to .done and then removed by a concurrent checkpoint.
6741          */
6742         BackupHistoryFileName(histfilepath, ThisTimeLineID, _logId, _logSeg,
6743                                                   startpoint.xrecoff % XLogSegSize);
6744
6745         seconds_before_warning = 60;
6746         waits = 0;
6747
6748         while (!XLogArchiveCheckDone(histfilepath, false))
6749         {
6750                 CHECK_FOR_INTERRUPTS();
6751
6752                 pg_usleep(1000000L);
6753
6754                 if (++waits >= seconds_before_warning)
6755                 {
6756                         seconds_before_warning *= 2;     /* This wraps in >10 years... */
6757                         elog(WARNING, "pg_stop_backup() waiting for archive to complete " 
6758                                                         "(%d seconds delay)", waits);
6759                 }
6760         }
6761
6762         /*
6763          * We're done.  As a convenience, return the ending WAL location.
6764          */
6765         snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
6766                          stoppoint.xlogid, stoppoint.xrecoff);
6767         PG_RETURN_TEXT_P(cstring_to_text(stopxlogfilename));
6768 }
6769
6770 /*
6771  * pg_switch_xlog: switch to next xlog file
6772  */
6773 Datum
6774 pg_switch_xlog(PG_FUNCTION_ARGS)
6775 {
6776         XLogRecPtr      switchpoint;
6777         char            location[MAXFNAMELEN];
6778
6779         if (!superuser())
6780                 ereport(ERROR,
6781                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6782                          (errmsg("must be superuser to switch transaction log files"))));
6783
6784         switchpoint = RequestXLogSwitch();
6785
6786         /*
6787          * As a convenience, return the WAL location of the switch record
6788          */
6789         snprintf(location, sizeof(location), "%X/%X",
6790                          switchpoint.xlogid, switchpoint.xrecoff);
6791         PG_RETURN_TEXT_P(cstring_to_text(location));
6792 }
6793
6794 /*
6795  * Report the current WAL write location (same format as pg_start_backup etc)
6796  *
6797  * This is useful for determining how much of WAL is visible to an external
6798  * archiving process.  Note that the data before this point is written out
6799  * to the kernel, but is not necessarily synced to disk.
6800  */
6801 Datum
6802 pg_current_xlog_location(PG_FUNCTION_ARGS)
6803 {
6804         char            location[MAXFNAMELEN];
6805
6806         /* Make sure we have an up-to-date local LogwrtResult */
6807         {
6808                 /* use volatile pointer to prevent code rearrangement */
6809                 volatile XLogCtlData *xlogctl = XLogCtl;
6810
6811                 SpinLockAcquire(&xlogctl->info_lck);
6812                 LogwrtResult = xlogctl->LogwrtResult;
6813                 SpinLockRelease(&xlogctl->info_lck);
6814         }
6815
6816         snprintf(location, sizeof(location), "%X/%X",
6817                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff);
6818         PG_RETURN_TEXT_P(cstring_to_text(location));
6819 }
6820
6821 /*
6822  * Report the current WAL insert location (same format as pg_start_backup etc)
6823  *
6824  * This function is mostly for debugging purposes.
6825  */
6826 Datum
6827 pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
6828 {
6829         XLogCtlInsert *Insert = &XLogCtl->Insert;
6830         XLogRecPtr      current_recptr;
6831         char            location[MAXFNAMELEN];
6832
6833         /*
6834          * Get the current end-of-WAL position ... shared lock is sufficient
6835          */
6836         LWLockAcquire(WALInsertLock, LW_SHARED);
6837         INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
6838         LWLockRelease(WALInsertLock);
6839
6840         snprintf(location, sizeof(location), "%X/%X",
6841                          current_recptr.xlogid, current_recptr.xrecoff);
6842         PG_RETURN_TEXT_P(cstring_to_text(location));
6843 }
6844
6845 /*
6846  * Compute an xlog file name and decimal byte offset given a WAL location,
6847  * such as is returned by pg_stop_backup() or pg_xlog_switch().
6848  *
6849  * Note that a location exactly at a segment boundary is taken to be in
6850  * the previous segment.  This is usually the right thing, since the
6851  * expected usage is to determine which xlog file(s) are ready to archive.
6852  */
6853 Datum
6854 pg_xlogfile_name_offset(PG_FUNCTION_ARGS)
6855 {
6856         text       *location = PG_GETARG_TEXT_P(0);
6857         char       *locationstr;
6858         unsigned int uxlogid;
6859         unsigned int uxrecoff;
6860         uint32          xlogid;
6861         uint32          xlogseg;
6862         uint32          xrecoff;
6863         XLogRecPtr      locationpoint;
6864         char            xlogfilename[MAXFNAMELEN];
6865         Datum           values[2];
6866         bool            isnull[2];
6867         TupleDesc       resultTupleDesc;
6868         HeapTuple       resultHeapTuple;
6869         Datum           result;
6870
6871         /*
6872          * Read input and parse
6873          */
6874         locationstr = text_to_cstring(location);
6875
6876         if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
6877                 ereport(ERROR,
6878                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
6879                                  errmsg("could not parse transaction log location \"%s\"",
6880                                                 locationstr)));
6881
6882         locationpoint.xlogid = uxlogid;
6883         locationpoint.xrecoff = uxrecoff;
6884
6885         /*
6886          * Construct a tuple descriptor for the result row.  This must match this
6887          * function's pg_proc entry!
6888          */
6889         resultTupleDesc = CreateTemplateTupleDesc(2, false);
6890         TupleDescInitEntry(resultTupleDesc, (AttrNumber) 1, "file_name",
6891                                            TEXTOID, -1, 0);
6892         TupleDescInitEntry(resultTupleDesc, (AttrNumber) 2, "file_offset",
6893                                            INT4OID, -1, 0);
6894
6895         resultTupleDesc = BlessTupleDesc(resultTupleDesc);
6896
6897         /*
6898          * xlogfilename
6899          */
6900         XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
6901         XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
6902
6903         values[0] = CStringGetTextDatum(xlogfilename);
6904         isnull[0] = false;
6905
6906         /*
6907          * offset
6908          */
6909         xrecoff = locationpoint.xrecoff - xlogseg * XLogSegSize;
6910
6911         values[1] = UInt32GetDatum(xrecoff);
6912         isnull[1] = false;
6913
6914         /*
6915          * Tuple jam: Having first prepared your Datums, then squash together
6916          */
6917         resultHeapTuple = heap_form_tuple(resultTupleDesc, values, isnull);
6918
6919         result = HeapTupleGetDatum(resultHeapTuple);
6920
6921         PG_RETURN_DATUM(result);
6922 }
6923
6924 /*
6925  * Compute an xlog file name given a WAL location,
6926  * such as is returned by pg_stop_backup() or pg_xlog_switch().
6927  */
6928 Datum
6929 pg_xlogfile_name(PG_FUNCTION_ARGS)
6930 {
6931         text       *location = PG_GETARG_TEXT_P(0);
6932         char       *locationstr;
6933         unsigned int uxlogid;
6934         unsigned int uxrecoff;
6935         uint32          xlogid;
6936         uint32          xlogseg;
6937         XLogRecPtr      locationpoint;
6938         char            xlogfilename[MAXFNAMELEN];
6939
6940         locationstr = text_to_cstring(location);
6941
6942         if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
6943                 ereport(ERROR,
6944                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
6945                                  errmsg("could not parse transaction log location \"%s\"",
6946                                                 locationstr)));
6947
6948         locationpoint.xlogid = uxlogid;
6949         locationpoint.xrecoff = uxrecoff;
6950
6951         XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
6952         XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
6953
6954         PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
6955 }
6956
6957 /*
6958  * read_backup_label: check to see if a backup_label file is present
6959  *
6960  * If we see a backup_label during recovery, we assume that we are recovering
6961  * from a backup dump file, and we therefore roll forward from the checkpoint
6962  * identified by the label file, NOT what pg_control says.      This avoids the
6963  * problem that pg_control might have been archived one or more checkpoints
6964  * later than the start of the dump, and so if we rely on it as the start
6965  * point, we will fail to restore a consistent database state.
6966  *
6967  * We also attempt to retrieve the corresponding backup history file.
6968  * If successful, set *minRecoveryLoc to constrain valid PITR stopping
6969  * points.
6970  *
6971  * Returns TRUE if a backup_label was found (and fills the checkpoint
6972  * location into *checkPointLoc); returns FALSE if not.
6973  */
6974 static bool
6975 read_backup_label(XLogRecPtr *checkPointLoc, XLogRecPtr *minRecoveryLoc)
6976 {
6977         XLogRecPtr      startpoint;
6978         XLogRecPtr      stoppoint;
6979         char            histfilename[MAXFNAMELEN];
6980         char            histfilepath[MAXPGPATH];
6981         char            startxlogfilename[MAXFNAMELEN];
6982         char            stopxlogfilename[MAXFNAMELEN];
6983         TimeLineID      tli;
6984         uint32          _logId;
6985         uint32          _logSeg;
6986         FILE       *lfp;
6987         FILE       *fp;
6988         char            ch;
6989
6990         /* Default is to not constrain recovery stop point */
6991         minRecoveryLoc->xlogid = 0;
6992         minRecoveryLoc->xrecoff = 0;
6993
6994         /*
6995          * See if label file is present
6996          */
6997         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6998         if (!lfp)
6999         {
7000                 if (errno != ENOENT)
7001                         ereport(FATAL,
7002                                         (errcode_for_file_access(),
7003                                          errmsg("could not read file \"%s\": %m",
7004                                                         BACKUP_LABEL_FILE)));
7005                 return false;                   /* it's not there, all is fine */
7006         }
7007
7008         /*
7009          * Read and parse the START WAL LOCATION and CHECKPOINT lines (this code
7010          * is pretty crude, but we are not expecting any variability in the file
7011          * format).
7012          */
7013         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
7014                            &startpoint.xlogid, &startpoint.xrecoff, &tli,
7015                            startxlogfilename, &ch) != 5 || ch != '\n')
7016                 ereport(FATAL,
7017                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
7018                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
7019         if (fscanf(lfp, "CHECKPOINT LOCATION: %X/%X%c",
7020                            &checkPointLoc->xlogid, &checkPointLoc->xrecoff,
7021                            &ch) != 3 || ch != '\n')
7022                 ereport(FATAL,
7023                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
7024                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
7025         if (ferror(lfp) || FreeFile(lfp))
7026                 ereport(FATAL,
7027                                 (errcode_for_file_access(),
7028                                  errmsg("could not read file \"%s\": %m",
7029                                                 BACKUP_LABEL_FILE)));
7030
7031         /*
7032          * Try to retrieve the backup history file (no error if we can't)
7033          */
7034         XLByteToSeg(startpoint, _logId, _logSeg);
7035         BackupHistoryFileName(histfilename, tli, _logId, _logSeg,
7036                                                   startpoint.xrecoff % XLogSegSize);
7037
7038         if (InArchiveRecovery)
7039                 RestoreArchivedFile(histfilepath, histfilename, "RECOVERYHISTORY", 0);
7040         else
7041                 BackupHistoryFilePath(histfilepath, tli, _logId, _logSeg,
7042                                                           startpoint.xrecoff % XLogSegSize);
7043
7044         fp = AllocateFile(histfilepath, "r");
7045         if (fp)
7046         {
7047                 /*
7048                  * Parse history file to identify stop point.
7049                  */
7050                 if (fscanf(fp, "START WAL LOCATION: %X/%X (file %24s)%c",
7051                                    &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
7052                                    &ch) != 4 || ch != '\n')
7053                         ereport(FATAL,
7054                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
7055                                          errmsg("invalid data in file \"%s\"", histfilename)));
7056                 if (fscanf(fp, "STOP WAL LOCATION: %X/%X (file %24s)%c",
7057                                    &stoppoint.xlogid, &stoppoint.xrecoff, stopxlogfilename,
7058                                    &ch) != 4 || ch != '\n')
7059                         ereport(FATAL,
7060                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
7061                                          errmsg("invalid data in file \"%s\"", histfilename)));
7062                 *minRecoveryLoc = stoppoint;
7063                 if (ferror(fp) || FreeFile(fp))
7064                         ereport(FATAL,
7065                                         (errcode_for_file_access(),
7066                                          errmsg("could not read file \"%s\": %m",
7067                                                         histfilepath)));
7068         }
7069
7070         return true;
7071 }
7072
7073 /*
7074  * Error context callback for errors occurring during rm_redo().
7075  */
7076 static void
7077 rm_redo_error_callback(void *arg)
7078 {
7079         XLogRecord *record = (XLogRecord *) arg;
7080         StringInfoData buf;
7081
7082         initStringInfo(&buf);
7083         RmgrTable[record->xl_rmid].rm_desc(&buf,
7084                                                                            record->xl_info,
7085                                                                            XLogRecGetData(record));
7086
7087         /* don't bother emitting empty description */
7088         if (buf.len > 0)
7089                 errcontext("xlog redo %s", buf.data);
7090
7091         pfree(buf.data);
7092 }
7093
7094 /*
7095  * BackupInProgress: check if online backup mode is active
7096  *
7097  * This is done by checking for existence of the "backup_label" file.
7098  */
7099 bool
7100 BackupInProgress(void)
7101 {
7102         struct stat stat_buf;
7103
7104         return (stat(BACKUP_LABEL_FILE, &stat_buf) == 0);
7105 }
7106
7107 /*
7108  * CancelBackup: rename the "backup_label" file to cancel backup mode
7109  *
7110  * If the "backup_label" file exists, it will be renamed to "backup_label.old".
7111  * Note that this will render an online backup in progress useless.
7112  * To correctly finish an online backup, pg_stop_backup must be called.
7113  */
7114 void
7115 CancelBackup(void)
7116 {
7117         struct stat stat_buf;
7118
7119         /* if the file is not there, return */
7120         if (stat(BACKUP_LABEL_FILE, &stat_buf) < 0)
7121                 return;
7122
7123         /* remove leftover file from previously cancelled backup if it exists */
7124         unlink(BACKUP_LABEL_OLD);
7125
7126         if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) == 0)
7127         {
7128                 ereport(LOG,
7129                                 (errmsg("online backup mode cancelled"),
7130                                  errdetail("\"%s\" was renamed to \"%s\".",
7131                                                 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
7132         }
7133         else
7134         {
7135                 ereport(WARNING,
7136                                 (errcode_for_file_access(),
7137                                  errmsg("online backup mode was not cancelled"),
7138                                  errdetail("Could not rename \"%s\" to \"%s\": %m.",
7139                                                 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
7140         }
7141 }
7142