OSDN Git Service

When telling the bgwriter that we need a checkpoint because too much xlog
[pg-rex/syncrep.git] / src / backend / access / transam / xlog.c
1 /*-------------------------------------------------------------------------
2  *
3  * xlog.c
4  *              PostgreSQL transaction log manager
5  *
6  *
7  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
8  * Portions Copyright (c) 1994, Regents of the University of California
9  *
10  * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.286 2007/10/12 19:39:59 tgl Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14
15 #include "postgres.h"
16
17 #include <ctype.h>
18 #include <fcntl.h>
19 #include <signal.h>
20 #include <time.h>
21 #include <sys/stat.h>
22 #include <sys/time.h>
23 #include <sys/wait.h>
24 #include <unistd.h>
25
26 #include "access/clog.h"
27 #include "access/heapam.h"
28 #include "access/multixact.h"
29 #include "access/subtrans.h"
30 #include "access/transam.h"
31 #include "access/tuptoaster.h"
32 #include "access/twophase.h"
33 #include "access/xact.h"
34 #include "access/xlog_internal.h"
35 #include "access/xlogdefs.h"
36 #include "access/xlogutils.h"
37 #include "catalog/catversion.h"
38 #include "catalog/pg_control.h"
39 #include "catalog/pg_type.h"
40 #include "funcapi.h"
41 #include "miscadmin.h"
42 #include "pgstat.h"
43 #include "postmaster/bgwriter.h"
44 #include "storage/bufpage.h"
45 #include "storage/fd.h"
46 #include "storage/pmsignal.h"
47 #include "storage/procarray.h"
48 #include "storage/spin.h"
49 #include "utils/builtins.h"
50 #include "utils/pg_locale.h"
51 #include "utils/ps_status.h"
52
53
54 /* File path names (all relative to $PGDATA) */
55 #define BACKUP_LABEL_FILE               "backup_label"
56 #define BACKUP_LABEL_OLD                "backup_label.old"
57 #define RECOVERY_COMMAND_FILE   "recovery.conf"
58 #define RECOVERY_COMMAND_DONE   "recovery.done"
59
60
61 /* User-settable parameters */
62 int                     CheckPointSegments = 3;
63 int                     XLOGbuffers = 8;
64 int                     XLogArchiveTimeout = 0;
65 bool            XLogArchiveMode = false;
66 char       *XLogArchiveCommand = NULL;
67 char       *XLOG_sync_method = NULL;
68 const char      XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
69 bool            fullPageWrites = true;
70 bool            log_checkpoints = false;
71
72 #ifdef WAL_DEBUG
73 bool            XLOG_DEBUG = false;
74 #endif
75
76 /*
77  * XLOGfileslop is the maximum number of preallocated future XLOG segments.
78  * When we are done with an old XLOG segment file, we will recycle it as a
79  * future XLOG segment as long as there aren't already XLOGfileslop future
80  * segments; else we'll delete it.  This could be made a separate GUC
81  * variable, but at present I think it's sufficient to hardwire it as
82  * 2*CheckPointSegments+1.  Under normal conditions, a checkpoint will free
83  * no more than 2*CheckPointSegments log segments, and we want to recycle all
84  * of them; the +1 allows boundary cases to happen without wasting a
85  * delete/create-segment cycle.
86  */
87 #define XLOGfileslop    (2*CheckPointSegments + 1)
88
89
90 /* these are derived from XLOG_sync_method by assign_xlog_sync_method */
91 int                     sync_method = DEFAULT_SYNC_METHOD;
92 static int      open_sync_bit = DEFAULT_SYNC_FLAGBIT;
93
94 #define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
95
96
97 /*
98  * Statistics for current checkpoint are collected in this global struct.
99  * Because only the background writer or a stand-alone backend can perform
100  * checkpoints, this will be unused in normal backends.
101  */
102 CheckpointStatsData CheckpointStats;
103
104 /*
105  * ThisTimeLineID will be same in all backends --- it identifies current
106  * WAL timeline for the database system.
107  */
108 TimeLineID      ThisTimeLineID = 0;
109
110 /* Are we doing recovery from XLOG? */
111 bool            InRecovery = false;
112
113 /* Are we recovering using offline XLOG archives? */
114 static bool InArchiveRecovery = false;
115
116 /* Was the last xlog file restored from archive, or local? */
117 static bool restoredFromArchive = false;
118
119 /* options taken from recovery.conf */
120 static char *recoveryRestoreCommand = NULL;
121 static bool recoveryTarget = false;
122 static bool recoveryTargetExact = false;
123 static bool recoveryTargetInclusive = true;
124 static bool recoveryLogRestartpoints = false;
125 static TransactionId recoveryTargetXid;
126 static TimestampTz recoveryTargetTime;
127 static TimestampTz recoveryLastXTime = 0;
128
129 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
130 static TransactionId recoveryStopXid;
131 static TimestampTz recoveryStopTime;
132 static bool recoveryStopAfter;
133
134 /*
135  * During normal operation, the only timeline we care about is ThisTimeLineID.
136  * During recovery, however, things are more complicated.  To simplify life
137  * for rmgr code, we keep ThisTimeLineID set to the "current" timeline as we
138  * scan through the WAL history (that is, it is the line that was active when
139  * the currently-scanned WAL record was generated).  We also need these
140  * timeline values:
141  *
142  * recoveryTargetTLI: the desired timeline that we want to end in.
143  *
144  * expectedTLIs: an integer list of recoveryTargetTLI and the TLIs of
145  * its known parents, newest first (so recoveryTargetTLI is always the
146  * first list member).  Only these TLIs are expected to be seen in the WAL
147  * segments we read, and indeed only these TLIs will be considered as
148  * candidate WAL files to open at all.
149  *
150  * curFileTLI: the TLI appearing in the name of the current input WAL file.
151  * (This is not necessarily the same as ThisTimeLineID, because we could
152  * be scanning data that was copied from an ancestor timeline when the current
153  * file was created.)  During a sequential scan we do not allow this value
154  * to decrease.
155  */
156 static TimeLineID recoveryTargetTLI;
157 static List *expectedTLIs;
158 static TimeLineID curFileTLI;
159
160 /*
161  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
162  * current backend.  It is updated for all inserts.  XactLastRecEnd points to
163  * end+1 of the last record, and is reset when we end a top-level transaction,
164  * or start a new one; so it can be used to tell if the current transaction has
165  * created any XLOG records.
166  */
167 static XLogRecPtr ProcLastRecPtr = {0, 0};
168
169 XLogRecPtr      XactLastRecEnd = {0, 0};
170
171 /*
172  * RedoRecPtr is this backend's local copy of the REDO record pointer
173  * (which is almost but not quite the same as a pointer to the most recent
174  * CHECKPOINT record).  We update this from the shared-memory copy,
175  * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
176  * hold the Insert lock).  See XLogInsert for details.  We are also allowed
177  * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck;
178  * see GetRedoRecPtr.  A freshly spawned backend obtains the value during
179  * InitXLOGAccess.
180  */
181 static XLogRecPtr RedoRecPtr;
182
183 /*----------
184  * Shared-memory data structures for XLOG control
185  *
186  * LogwrtRqst indicates a byte position that we need to write and/or fsync
187  * the log up to (all records before that point must be written or fsynced).
188  * LogwrtResult indicates the byte positions we have already written/fsynced.
189  * These structs are identical but are declared separately to indicate their
190  * slightly different functions.
191  *
192  * We do a lot of pushups to minimize the amount of access to lockable
193  * shared memory values.  There are actually three shared-memory copies of
194  * LogwrtResult, plus one unshared copy in each backend.  Here's how it works:
195  *              XLogCtl->LogwrtResult is protected by info_lck
196  *              XLogCtl->Write.LogwrtResult is protected by WALWriteLock
197  *              XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
198  * One must hold the associated lock to read or write any of these, but
199  * of course no lock is needed to read/write the unshared LogwrtResult.
200  *
201  * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
202  * right", since both are updated by a write or flush operation before
203  * it releases WALWriteLock.  The point of keeping XLogCtl->Write.LogwrtResult
204  * is that it can be examined/modified by code that already holds WALWriteLock
205  * without needing to grab info_lck as well.
206  *
207  * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
208  * but is updated when convenient.      Again, it exists for the convenience of
209  * code that is already holding WALInsertLock but not the other locks.
210  *
211  * The unshared LogwrtResult may lag behind any or all of these, and again
212  * is updated when convenient.
213  *
214  * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
215  * (protected by info_lck), but we don't need to cache any copies of it.
216  *
217  * Note that this all works because the request and result positions can only
218  * advance forward, never back up, and so we can easily determine which of two
219  * values is "more up to date".
220  *
221  * info_lck is only held long enough to read/update the protected variables,
222  * so it's a plain spinlock.  The other locks are held longer (potentially
223  * over I/O operations), so we use LWLocks for them.  These locks are:
224  *
225  * WALInsertLock: must be held to insert a record into the WAL buffers.
226  *
227  * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
228  * XLogFlush).
229  *
230  * ControlFileLock: must be held to read/update control file or create
231  * new log file.
232  *
233  * CheckpointLock: must be held to do a checkpoint (ensures only one
234  * checkpointer at a time; currently, with all checkpoints done by the
235  * bgwriter, this is just pro forma).
236  *
237  *----------
238  */
239
240 typedef struct XLogwrtRqst
241 {
242         XLogRecPtr      Write;                  /* last byte + 1 to write out */
243         XLogRecPtr      Flush;                  /* last byte + 1 to flush */
244 } XLogwrtRqst;
245
246 typedef struct XLogwrtResult
247 {
248         XLogRecPtr      Write;                  /* last byte + 1 written out */
249         XLogRecPtr      Flush;                  /* last byte + 1 flushed */
250 } XLogwrtResult;
251
252 /*
253  * Shared state data for XLogInsert.
254  */
255 typedef struct XLogCtlInsert
256 {
257         XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
258         XLogRecPtr      PrevRecord;             /* start of previously-inserted record */
259         int                     curridx;                /* current block index in cache */
260         XLogPageHeader currpage;        /* points to header of block in cache */
261         char       *currpos;            /* current insertion point in cache */
262         XLogRecPtr      RedoRecPtr;             /* current redo point for insertions */
263         bool            forcePageWrites;        /* forcing full-page writes for PITR? */
264 } XLogCtlInsert;
265
266 /*
267  * Shared state data for XLogWrite/XLogFlush.
268  */
269 typedef struct XLogCtlWrite
270 {
271         XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
272         int                     curridx;                /* cache index of next block to write */
273         time_t          lastSegSwitchTime;              /* time of last xlog segment switch */
274 } XLogCtlWrite;
275
276 /*
277  * Total shared-memory state for XLOG.
278  */
279 typedef struct XLogCtlData
280 {
281         /* Protected by WALInsertLock: */
282         XLogCtlInsert Insert;
283
284         /* Protected by info_lck: */
285         XLogwrtRqst LogwrtRqst;
286         XLogwrtResult LogwrtResult;
287         uint32          ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
288         TransactionId ckptXid;
289         XLogRecPtr      asyncCommitLSN; /* LSN of newest async commit */
290
291         /* Protected by WALWriteLock: */
292         XLogCtlWrite Write;
293
294         /*
295          * These values do not change after startup, although the pointed-to pages
296          * and xlblocks values certainly do.  Permission to read/write the pages
297          * and xlblocks values depends on WALInsertLock and WALWriteLock.
298          */
299         char       *pages;                      /* buffers for unwritten XLOG pages */
300         XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
301         Size            XLogCacheByte;  /* # bytes in xlog buffers */
302         int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
303         TimeLineID      ThisTimeLineID;
304
305         slock_t         info_lck;               /* locks shared variables shown above */
306 } XLogCtlData;
307
308 static XLogCtlData *XLogCtl = NULL;
309
310 /*
311  * We maintain an image of pg_control in shared memory.
312  */
313 static ControlFileData *ControlFile = NULL;
314
315 /*
316  * Macros for managing XLogInsert state.  In most cases, the calling routine
317  * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
318  * so these are passed as parameters instead of being fetched via XLogCtl.
319  */
320
321 /* Free space remaining in the current xlog page buffer */
322 #define INSERT_FREESPACE(Insert)  \
323         (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
324
325 /* Construct XLogRecPtr value for current insertion point */
326 #define INSERT_RECPTR(recptr,Insert,curridx)  \
327         ( \
328           (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
329           (recptr).xrecoff = \
330                 XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
331         )
332
333 #define PrevBufIdx(idx)         \
334                 (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
335
336 #define NextBufIdx(idx)         \
337                 (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
338
339 /*
340  * Private, possibly out-of-date copy of shared LogwrtResult.
341  * See discussion above.
342  */
343 static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
344
345 /*
346  * openLogFile is -1 or a kernel FD for an open log file segment.
347  * When it's open, openLogOff is the current seek offset in the file.
348  * openLogId/openLogSeg identify the segment.  These variables are only
349  * used to write the XLOG, and so will normally refer to the active segment.
350  */
351 static int      openLogFile = -1;
352 static uint32 openLogId = 0;
353 static uint32 openLogSeg = 0;
354 static uint32 openLogOff = 0;
355
356 /*
357  * These variables are used similarly to the ones above, but for reading
358  * the XLOG.  Note, however, that readOff generally represents the offset
359  * of the page just read, not the seek position of the FD itself, which
360  * will be just past that page.
361  */
362 static int      readFile = -1;
363 static uint32 readId = 0;
364 static uint32 readSeg = 0;
365 static uint32 readOff = 0;
366
367 /* Buffer for currently read page (XLOG_BLCKSZ bytes) */
368 static char *readBuf = NULL;
369
370 /* Buffer for current ReadRecord result (expandable) */
371 static char *readRecordBuf = NULL;
372 static uint32 readRecordBufSize = 0;
373
374 /* State information for XLOG reading */
375 static XLogRecPtr ReadRecPtr;   /* start of last record read */
376 static XLogRecPtr EndRecPtr;    /* end+1 of last record read */
377 static XLogRecord *nextRecord = NULL;
378 static TimeLineID lastPageTLI = 0;
379
380 static bool InRedo = false;
381
382
383 static void XLogArchiveNotify(const char *xlog);
384 static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
385 static bool XLogArchiveCheckDone(const char *xlog);
386 static void XLogArchiveCleanup(const char *xlog);
387 static void readRecoveryCommandFile(void);
388 static void exitArchiveRecovery(TimeLineID endTLI,
389                                         uint32 endLogId, uint32 endLogSeg);
390 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
391 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
392
393 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
394                                 XLogRecPtr *lsn, BkpBlock *bkpb);
395 static bool AdvanceXLInsertBuffer(bool new_segment);
396 static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
397 static int XLogFileInit(uint32 log, uint32 seg,
398                          bool *use_existent, bool use_lock);
399 static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
400                                            bool find_free, int *max_advance,
401                                            bool use_lock);
402 static int      XLogFileOpen(uint32 log, uint32 seg);
403 static int      XLogFileRead(uint32 log, uint32 seg, int emode);
404 static void XLogFileClose(void);
405 static bool RestoreArchivedFile(char *path, const char *xlogfname,
406                                         const char *recovername, off_t expectedSize);
407 static void PreallocXlogFiles(XLogRecPtr endptr);
408 static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
409 static void CleanupBackupHistory(void);
410 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
411 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
412 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
413 static List *readTimeLineHistory(TimeLineID targetTLI);
414 static bool existsTimeLineHistory(TimeLineID probeTLI);
415 static TimeLineID findNewestTimeLine(TimeLineID startTLI);
416 static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
417                                          TimeLineID endTLI,
418                                          uint32 endLogId, uint32 endLogSeg);
419 static void WriteControlFile(void);
420 static void ReadControlFile(void);
421 static char *str_time(pg_time_t tnow);
422 static void issue_xlog_fsync(void);
423
424 #ifdef WAL_DEBUG
425 static void xlog_outrec(StringInfo buf, XLogRecord *record);
426 #endif
427 static bool read_backup_label(XLogRecPtr *checkPointLoc,
428                                   XLogRecPtr *minRecoveryLoc);
429 static void rm_redo_error_callback(void *arg);
430
431
432 /*
433  * Insert an XLOG record having the specified RMID and info bytes,
434  * with the body of the record being the data chunk(s) described by
435  * the rdata chain (see xlog.h for notes about rdata).
436  *
437  * Returns XLOG pointer to end of record (beginning of next record).
438  * This can be used as LSN for data pages affected by the logged action.
439  * (LSN is the XLOG point up to which the XLOG must be flushed to disk
440  * before the data page can be written out.  This implements the basic
441  * WAL rule "write the log before the data".)
442  *
443  * NB: this routine feels free to scribble on the XLogRecData structs,
444  * though not on the data they reference.  This is OK since the XLogRecData
445  * structs are always just temporaries in the calling code.
446  */
447 XLogRecPtr
448 XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
449 {
450         XLogCtlInsert *Insert = &XLogCtl->Insert;
451         XLogRecord *record;
452         XLogContRecord *contrecord;
453         XLogRecPtr      RecPtr;
454         XLogRecPtr      WriteRqst;
455         uint32          freespace;
456         int                     curridx;
457         XLogRecData *rdt;
458         Buffer          dtbuf[XLR_MAX_BKP_BLOCKS];
459         bool            dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
460         BkpBlock        dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
461         XLogRecPtr      dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
462         XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
463         XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
464         XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
465         pg_crc32        rdata_crc;
466         uint32          len,
467                                 write_len;
468         unsigned        i;
469         bool            updrqst;
470         bool            doPageWrites;
471         bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
472
473         /* info's high bits are reserved for use by me */
474         if (info & XLR_INFO_MASK)
475                 elog(PANIC, "invalid xlog info mask %02X", info);
476
477         /*
478          * In bootstrap mode, we don't actually log anything but XLOG resources;
479          * return a phony record pointer.
480          */
481         if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
482         {
483                 RecPtr.xlogid = 0;
484                 RecPtr.xrecoff = SizeOfXLogLongPHD;             /* start of 1st chkpt record */
485                 return RecPtr;
486         }
487
488         /*
489          * Here we scan the rdata chain, determine which buffers must be backed
490          * up, and compute the CRC values for the data.  Note that the record
491          * header isn't added into the CRC initially since we don't know the final
492          * length or info bits quite yet.  Thus, the CRC will represent the CRC of
493          * the whole record in the order "rdata, then backup blocks, then record
494          * header".
495          *
496          * We may have to loop back to here if a race condition is detected below.
497          * We could prevent the race by doing all this work while holding the
498          * insert lock, but it seems better to avoid doing CRC calculations while
499          * holding the lock.  This means we have to be careful about modifying the
500          * rdata chain until we know we aren't going to loop back again.  The only
501          * change we allow ourselves to make earlier is to set rdt->data = NULL in
502          * chain items we have decided we will have to back up the whole buffer
503          * for.  This is OK because we will certainly decide the same thing again
504          * for those items if we do it over; doing it here saves an extra pass
505          * over the chain later.
506          */
507 begin:;
508         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
509         {
510                 dtbuf[i] = InvalidBuffer;
511                 dtbuf_bkp[i] = false;
512         }
513
514         /*
515          * Decide if we need to do full-page writes in this XLOG record: true if
516          * full_page_writes is on or we have a PITR request for it.  Since we
517          * don't yet have the insert lock, forcePageWrites could change under us,
518          * but we'll recheck it once we have the lock.
519          */
520         doPageWrites = fullPageWrites || Insert->forcePageWrites;
521
522         INIT_CRC32(rdata_crc);
523         len = 0;
524         for (rdt = rdata;;)
525         {
526                 if (rdt->buffer == InvalidBuffer)
527                 {
528                         /* Simple data, just include it */
529                         len += rdt->len;
530                         COMP_CRC32(rdata_crc, rdt->data, rdt->len);
531                 }
532                 else
533                 {
534                         /* Find info for buffer */
535                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
536                         {
537                                 if (rdt->buffer == dtbuf[i])
538                                 {
539                                         /* Buffer already referenced by earlier chain item */
540                                         if (dtbuf_bkp[i])
541                                                 rdt->data = NULL;
542                                         else if (rdt->data)
543                                         {
544                                                 len += rdt->len;
545                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
546                                         }
547                                         break;
548                                 }
549                                 if (dtbuf[i] == InvalidBuffer)
550                                 {
551                                         /* OK, put it in this slot */
552                                         dtbuf[i] = rdt->buffer;
553                                         if (XLogCheckBuffer(rdt, doPageWrites,
554                                                                                 &(dtbuf_lsn[i]), &(dtbuf_xlg[i])))
555                                         {
556                                                 dtbuf_bkp[i] = true;
557                                                 rdt->data = NULL;
558                                         }
559                                         else if (rdt->data)
560                                         {
561                                                 len += rdt->len;
562                                                 COMP_CRC32(rdata_crc, rdt->data, rdt->len);
563                                         }
564                                         break;
565                                 }
566                         }
567                         if (i >= XLR_MAX_BKP_BLOCKS)
568                                 elog(PANIC, "can backup at most %d blocks per xlog record",
569                                          XLR_MAX_BKP_BLOCKS);
570                 }
571                 /* Break out of loop when rdt points to last chain item */
572                 if (rdt->next == NULL)
573                         break;
574                 rdt = rdt->next;
575         }
576
577         /*
578          * Now add the backup block headers and data into the CRC
579          */
580         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
581         {
582                 if (dtbuf_bkp[i])
583                 {
584                         BkpBlock   *bkpb = &(dtbuf_xlg[i]);
585                         char       *page;
586
587                         COMP_CRC32(rdata_crc,
588                                            (char *) bkpb,
589                                            sizeof(BkpBlock));
590                         page = (char *) BufferGetBlock(dtbuf[i]);
591                         if (bkpb->hole_length == 0)
592                         {
593                                 COMP_CRC32(rdata_crc,
594                                                    page,
595                                                    BLCKSZ);
596                         }
597                         else
598                         {
599                                 /* must skip the hole */
600                                 COMP_CRC32(rdata_crc,
601                                                    page,
602                                                    bkpb->hole_offset);
603                                 COMP_CRC32(rdata_crc,
604                                                    page + (bkpb->hole_offset + bkpb->hole_length),
605                                                    BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
606                         }
607                 }
608         }
609
610         /*
611          * NOTE: We disallow len == 0 because it provides a useful bit of extra
612          * error checking in ReadRecord.  This means that all callers of
613          * XLogInsert must supply at least some not-in-a-buffer data.  However, we
614          * make an exception for XLOG SWITCH records because we don't want them to
615          * ever cross a segment boundary.
616          */
617         if (len == 0 && !isLogSwitch)
618                 elog(PANIC, "invalid xlog record length %u", len);
619
620         START_CRIT_SECTION();
621
622         /* Now wait to get insert lock */
623         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
624
625         /*
626          * Check to see if my RedoRecPtr is out of date.  If so, may have to go
627          * back and recompute everything.  This can only happen just after a
628          * checkpoint, so it's better to be slow in this case and fast otherwise.
629          *
630          * If we aren't doing full-page writes then RedoRecPtr doesn't actually
631          * affect the contents of the XLOG record, so we'll update our local copy
632          * but not force a recomputation.
633          */
634         if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
635         {
636                 Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
637                 RedoRecPtr = Insert->RedoRecPtr;
638
639                 if (doPageWrites)
640                 {
641                         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
642                         {
643                                 if (dtbuf[i] == InvalidBuffer)
644                                         continue;
645                                 if (dtbuf_bkp[i] == false &&
646                                         XLByteLE(dtbuf_lsn[i], RedoRecPtr))
647                                 {
648                                         /*
649                                          * Oops, this buffer now needs to be backed up, but we
650                                          * didn't think so above.  Start over.
651                                          */
652                                         LWLockRelease(WALInsertLock);
653                                         END_CRIT_SECTION();
654                                         goto begin;
655                                 }
656                         }
657                 }
658         }
659
660         /*
661          * Also check to see if forcePageWrites was just turned on; if we weren't
662          * already doing full-page writes then go back and recompute. (If it was
663          * just turned off, we could recompute the record without full pages, but
664          * we choose not to bother.)
665          */
666         if (Insert->forcePageWrites && !doPageWrites)
667         {
668                 /* Oops, must redo it with full-page data */
669                 LWLockRelease(WALInsertLock);
670                 END_CRIT_SECTION();
671                 goto begin;
672         }
673
674         /*
675          * Make additional rdata chain entries for the backup blocks, so that we
676          * don't need to special-case them in the write loop.  Note that we have
677          * now irrevocably changed the input rdata chain.  At the exit of this
678          * loop, write_len includes the backup block data.
679          *
680          * Also set the appropriate info bits to show which buffers were backed
681          * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
682          * buffer value (ignoring InvalidBuffer) appearing in the rdata chain.
683          */
684         write_len = len;
685         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
686         {
687                 BkpBlock   *bkpb;
688                 char       *page;
689
690                 if (!dtbuf_bkp[i])
691                         continue;
692
693                 info |= XLR_SET_BKP_BLOCK(i);
694
695                 bkpb = &(dtbuf_xlg[i]);
696                 page = (char *) BufferGetBlock(dtbuf[i]);
697
698                 rdt->next = &(dtbuf_rdt1[i]);
699                 rdt = rdt->next;
700
701                 rdt->data = (char *) bkpb;
702                 rdt->len = sizeof(BkpBlock);
703                 write_len += sizeof(BkpBlock);
704
705                 rdt->next = &(dtbuf_rdt2[i]);
706                 rdt = rdt->next;
707
708                 if (bkpb->hole_length == 0)
709                 {
710                         rdt->data = page;
711                         rdt->len = BLCKSZ;
712                         write_len += BLCKSZ;
713                         rdt->next = NULL;
714                 }
715                 else
716                 {
717                         /* must skip the hole */
718                         rdt->data = page;
719                         rdt->len = bkpb->hole_offset;
720                         write_len += bkpb->hole_offset;
721
722                         rdt->next = &(dtbuf_rdt3[i]);
723                         rdt = rdt->next;
724
725                         rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
726                         rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
727                         write_len += rdt->len;
728                         rdt->next = NULL;
729                 }
730         }
731
732         /*
733          * If we backed up any full blocks and online backup is not in progress,
734          * mark the backup blocks as removable.  This allows the WAL archiver to
735          * know whether it is safe to compress archived WAL data by transforming
736          * full-block records into the non-full-block format.
737          *
738          * Note: we could just set the flag whenever !forcePageWrites, but
739          * defining it like this leaves the info bit free for some potential
740          * other use in records without any backup blocks.
741          */
742         if ((info & XLR_BKP_BLOCK_MASK) && !Insert->forcePageWrites)
743                 info |= XLR_BKP_REMOVABLE;
744
745         /*
746          * If there isn't enough space on the current XLOG page for a record
747          * header, advance to the next page (leaving the unused space as zeroes).
748          */
749         updrqst = false;
750         freespace = INSERT_FREESPACE(Insert);
751         if (freespace < SizeOfXLogRecord)
752         {
753                 updrqst = AdvanceXLInsertBuffer(false);
754                 freespace = INSERT_FREESPACE(Insert);
755         }
756
757         /* Compute record's XLOG location */
758         curridx = Insert->curridx;
759         INSERT_RECPTR(RecPtr, Insert, curridx);
760
761         /*
762          * If the record is an XLOG_SWITCH, and we are exactly at the start of a
763          * segment, we need not insert it (and don't want to because we'd like
764          * consecutive switch requests to be no-ops).  Instead, make sure
765          * everything is written and flushed through the end of the prior segment,
766          * and return the prior segment's end address.
767          */
768         if (isLogSwitch &&
769                 (RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
770         {
771                 /* We can release insert lock immediately */
772                 LWLockRelease(WALInsertLock);
773
774                 RecPtr.xrecoff -= SizeOfXLogLongPHD;
775                 if (RecPtr.xrecoff == 0)
776                 {
777                         /* crossing a logid boundary */
778                         RecPtr.xlogid -= 1;
779                         RecPtr.xrecoff = XLogFileSize;
780                 }
781
782                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
783                 LogwrtResult = XLogCtl->Write.LogwrtResult;
784                 if (!XLByteLE(RecPtr, LogwrtResult.Flush))
785                 {
786                         XLogwrtRqst FlushRqst;
787
788                         FlushRqst.Write = RecPtr;
789                         FlushRqst.Flush = RecPtr;
790                         XLogWrite(FlushRqst, false, false);
791                 }
792                 LWLockRelease(WALWriteLock);
793
794                 END_CRIT_SECTION();
795
796                 return RecPtr;
797         }
798
799         /* Insert record header */
800
801         record = (XLogRecord *) Insert->currpos;
802         record->xl_prev = Insert->PrevRecord;
803         record->xl_xid = GetCurrentTransactionIdIfAny();
804         record->xl_tot_len = SizeOfXLogRecord + write_len;
805         record->xl_len = len;           /* doesn't include backup blocks */
806         record->xl_info = info;
807         record->xl_rmid = rmid;
808
809         /* Now we can finish computing the record's CRC */
810         COMP_CRC32(rdata_crc, (char *) record + sizeof(pg_crc32),
811                            SizeOfXLogRecord - sizeof(pg_crc32));
812         FIN_CRC32(rdata_crc);
813         record->xl_crc = rdata_crc;
814
815 #ifdef WAL_DEBUG
816         if (XLOG_DEBUG)
817         {
818                 StringInfoData buf;
819
820                 initStringInfo(&buf);
821                 appendStringInfo(&buf, "INSERT @ %X/%X: ",
822                                                  RecPtr.xlogid, RecPtr.xrecoff);
823                 xlog_outrec(&buf, record);
824                 if (rdata->data != NULL)
825                 {
826                         appendStringInfo(&buf, " - ");
827                         RmgrTable[record->xl_rmid].rm_desc(&buf, record->xl_info, rdata->data);
828                 }
829                 elog(LOG, "%s", buf.data);
830                 pfree(buf.data);
831         }
832 #endif
833
834         /* Record begin of record in appropriate places */
835         ProcLastRecPtr = RecPtr;
836         Insert->PrevRecord = RecPtr;
837
838         Insert->currpos += SizeOfXLogRecord;
839         freespace -= SizeOfXLogRecord;
840
841         /*
842          * Append the data, including backup blocks if any
843          */
844         while (write_len)
845         {
846                 while (rdata->data == NULL)
847                         rdata = rdata->next;
848
849                 if (freespace > 0)
850                 {
851                         if (rdata->len > freespace)
852                         {
853                                 memcpy(Insert->currpos, rdata->data, freespace);
854                                 rdata->data += freespace;
855                                 rdata->len -= freespace;
856                                 write_len -= freespace;
857                         }
858                         else
859                         {
860                                 memcpy(Insert->currpos, rdata->data, rdata->len);
861                                 freespace -= rdata->len;
862                                 write_len -= rdata->len;
863                                 Insert->currpos += rdata->len;
864                                 rdata = rdata->next;
865                                 continue;
866                         }
867                 }
868
869                 /* Use next buffer */
870                 updrqst = AdvanceXLInsertBuffer(false);
871                 curridx = Insert->curridx;
872                 /* Insert cont-record header */
873                 Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
874                 contrecord = (XLogContRecord *) Insert->currpos;
875                 contrecord->xl_rem_len = write_len;
876                 Insert->currpos += SizeOfXLogContRecord;
877                 freespace = INSERT_FREESPACE(Insert);
878         }
879
880         /* Ensure next record will be properly aligned */
881         Insert->currpos = (char *) Insert->currpage +
882                 MAXALIGN(Insert->currpos - (char *) Insert->currpage);
883         freespace = INSERT_FREESPACE(Insert);
884
885         /*
886          * The recptr I return is the beginning of the *next* record. This will be
887          * stored as LSN for changed data pages...
888          */
889         INSERT_RECPTR(RecPtr, Insert, curridx);
890
891         /*
892          * If the record is an XLOG_SWITCH, we must now write and flush all the
893          * existing data, and then forcibly advance to the start of the next
894          * segment.  It's not good to do this I/O while holding the insert lock,
895          * but there seems too much risk of confusion if we try to release the
896          * lock sooner.  Fortunately xlog switch needn't be a high-performance
897          * operation anyway...
898          */
899         if (isLogSwitch)
900         {
901                 XLogCtlWrite *Write = &XLogCtl->Write;
902                 XLogwrtRqst FlushRqst;
903                 XLogRecPtr      OldSegEnd;
904
905                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
906
907                 /*
908                  * Flush through the end of the page containing XLOG_SWITCH, and
909                  * perform end-of-segment actions (eg, notifying archiver).
910                  */
911                 WriteRqst = XLogCtl->xlblocks[curridx];
912                 FlushRqst.Write = WriteRqst;
913                 FlushRqst.Flush = WriteRqst;
914                 XLogWrite(FlushRqst, false, true);
915
916                 /* Set up the next buffer as first page of next segment */
917                 /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
918                 (void) AdvanceXLInsertBuffer(true);
919
920                 /* There should be no unwritten data */
921                 curridx = Insert->curridx;
922                 Assert(curridx == Write->curridx);
923
924                 /* Compute end address of old segment */
925                 OldSegEnd = XLogCtl->xlblocks[curridx];
926                 OldSegEnd.xrecoff -= XLOG_BLCKSZ;
927                 if (OldSegEnd.xrecoff == 0)
928                 {
929                         /* crossing a logid boundary */
930                         OldSegEnd.xlogid -= 1;
931                         OldSegEnd.xrecoff = XLogFileSize;
932                 }
933
934                 /* Make it look like we've written and synced all of old segment */
935                 LogwrtResult.Write = OldSegEnd;
936                 LogwrtResult.Flush = OldSegEnd;
937
938                 /*
939                  * Update shared-memory status --- this code should match XLogWrite
940                  */
941                 {
942                         /* use volatile pointer to prevent code rearrangement */
943                         volatile XLogCtlData *xlogctl = XLogCtl;
944
945                         SpinLockAcquire(&xlogctl->info_lck);
946                         xlogctl->LogwrtResult = LogwrtResult;
947                         if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
948                                 xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
949                         if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
950                                 xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
951                         SpinLockRelease(&xlogctl->info_lck);
952                 }
953
954                 Write->LogwrtResult = LogwrtResult;
955
956                 LWLockRelease(WALWriteLock);
957
958                 updrqst = false;                /* done already */
959         }
960         else
961         {
962                 /* normal case, ie not xlog switch */
963
964                 /* Need to update shared LogwrtRqst if some block was filled up */
965                 if (freespace < SizeOfXLogRecord)
966                 {
967                         /* curridx is filled and available for writing out */
968                         updrqst = true;
969                 }
970                 else
971                 {
972                         /* if updrqst already set, write through end of previous buf */
973                         curridx = PrevBufIdx(curridx);
974                 }
975                 WriteRqst = XLogCtl->xlblocks[curridx];
976         }
977
978         LWLockRelease(WALInsertLock);
979
980         if (updrqst)
981         {
982                 /* use volatile pointer to prevent code rearrangement */
983                 volatile XLogCtlData *xlogctl = XLogCtl;
984
985                 SpinLockAcquire(&xlogctl->info_lck);
986                 /* advance global request to include new block(s) */
987                 if (XLByteLT(xlogctl->LogwrtRqst.Write, WriteRqst))
988                         xlogctl->LogwrtRqst.Write = WriteRqst;
989                 /* update local result copy while I have the chance */
990                 LogwrtResult = xlogctl->LogwrtResult;
991                 SpinLockRelease(&xlogctl->info_lck);
992         }
993
994         XactLastRecEnd = RecPtr;
995
996         END_CRIT_SECTION();
997
998         return RecPtr;
999 }
1000
1001 /*
1002  * Determine whether the buffer referenced by an XLogRecData item has to
1003  * be backed up, and if so fill a BkpBlock struct for it.  In any case
1004  * save the buffer's LSN at *lsn.
1005  */
1006 static bool
1007 XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
1008                                 XLogRecPtr *lsn, BkpBlock *bkpb)
1009 {
1010         PageHeader      page;
1011
1012         page = (PageHeader) BufferGetBlock(rdata->buffer);
1013
1014         /*
1015          * XXX We assume page LSN is first data on *every* page that can be passed
1016          * to XLogInsert, whether it otherwise has the standard page layout or
1017          * not.
1018          */
1019         *lsn = page->pd_lsn;
1020
1021         if (doPageWrites &&
1022                 XLByteLE(page->pd_lsn, RedoRecPtr))
1023         {
1024                 /*
1025                  * The page needs to be backed up, so set up *bkpb
1026                  */
1027                 bkpb->node = BufferGetFileNode(rdata->buffer);
1028                 bkpb->block = BufferGetBlockNumber(rdata->buffer);
1029
1030                 if (rdata->buffer_std)
1031                 {
1032                         /* Assume we can omit data between pd_lower and pd_upper */
1033                         uint16          lower = page->pd_lower;
1034                         uint16          upper = page->pd_upper;
1035
1036                         if (lower >= SizeOfPageHeaderData &&
1037                                 upper > lower &&
1038                                 upper <= BLCKSZ)
1039                         {
1040                                 bkpb->hole_offset = lower;
1041                                 bkpb->hole_length = upper - lower;
1042                         }
1043                         else
1044                         {
1045                                 /* No "hole" to compress out */
1046                                 bkpb->hole_offset = 0;
1047                                 bkpb->hole_length = 0;
1048                         }
1049                 }
1050                 else
1051                 {
1052                         /* Not a standard page header, don't try to eliminate "hole" */
1053                         bkpb->hole_offset = 0;
1054                         bkpb->hole_length = 0;
1055                 }
1056
1057                 return true;                    /* buffer requires backup */
1058         }
1059
1060         return false;                           /* buffer does not need to be backed up */
1061 }
1062
1063 /*
1064  * XLogArchiveNotify
1065  *
1066  * Create an archive notification file
1067  *
1068  * The name of the notification file is the message that will be picked up
1069  * by the archiver, e.g. we write 0000000100000001000000C6.ready
1070  * and the archiver then knows to archive XLOGDIR/0000000100000001000000C6,
1071  * then when complete, rename it to 0000000100000001000000C6.done
1072  */
1073 static void
1074 XLogArchiveNotify(const char *xlog)
1075 {
1076         char            archiveStatusPath[MAXPGPATH];
1077         FILE       *fd;
1078
1079         /* insert an otherwise empty file called <XLOG>.ready */
1080         StatusFilePath(archiveStatusPath, xlog, ".ready");
1081         fd = AllocateFile(archiveStatusPath, "w");
1082         if (fd == NULL)
1083         {
1084                 ereport(LOG,
1085                                 (errcode_for_file_access(),
1086                                  errmsg("could not create archive status file \"%s\": %m",
1087                                                 archiveStatusPath)));
1088                 return;
1089         }
1090         if (FreeFile(fd))
1091         {
1092                 ereport(LOG,
1093                                 (errcode_for_file_access(),
1094                                  errmsg("could not write archive status file \"%s\": %m",
1095                                                 archiveStatusPath)));
1096                 return;
1097         }
1098
1099         /* Notify archiver that it's got something to do */
1100         if (IsUnderPostmaster)
1101                 SendPostmasterSignal(PMSIGNAL_WAKEN_ARCHIVER);
1102 }
1103
1104 /*
1105  * Convenience routine to notify using log/seg representation of filename
1106  */
1107 static void
1108 XLogArchiveNotifySeg(uint32 log, uint32 seg)
1109 {
1110         char            xlog[MAXFNAMELEN];
1111
1112         XLogFileName(xlog, ThisTimeLineID, log, seg);
1113         XLogArchiveNotify(xlog);
1114 }
1115
1116 /*
1117  * XLogArchiveCheckDone
1118  *
1119  * This is called when we are ready to delete or recycle an old XLOG segment
1120  * file or backup history file.  If it is okay to delete it then return true.
1121  * If it is not time to delete it, make sure a .ready file exists, and return
1122  * false.
1123  *
1124  * If <XLOG>.done exists, then return true; else if <XLOG>.ready exists,
1125  * then return false; else create <XLOG>.ready and return false.
1126  *
1127  * The reason we do things this way is so that if the original attempt to
1128  * create <XLOG>.ready fails, we'll retry during subsequent checkpoints.
1129  */
1130 static bool
1131 XLogArchiveCheckDone(const char *xlog)
1132 {
1133         char            archiveStatusPath[MAXPGPATH];
1134         struct stat stat_buf;
1135
1136         /* Always deletable if archiving is off */
1137         if (!XLogArchivingActive())
1138                 return true;
1139
1140         /* First check for .done --- this means archiver is done with it */
1141         StatusFilePath(archiveStatusPath, xlog, ".done");
1142         if (stat(archiveStatusPath, &stat_buf) == 0)
1143                 return true;
1144
1145         /* check for .ready --- this means archiver is still busy with it */
1146         StatusFilePath(archiveStatusPath, xlog, ".ready");
1147         if (stat(archiveStatusPath, &stat_buf) == 0)
1148                 return false;
1149
1150         /* Race condition --- maybe archiver just finished, so recheck */
1151         StatusFilePath(archiveStatusPath, xlog, ".done");
1152         if (stat(archiveStatusPath, &stat_buf) == 0)
1153                 return true;
1154
1155         /* Retry creation of the .ready file */
1156         XLogArchiveNotify(xlog);
1157         return false;
1158 }
1159
1160 /*
1161  * XLogArchiveCleanup
1162  *
1163  * Cleanup archive notification file(s) for a particular xlog segment
1164  */
1165 static void
1166 XLogArchiveCleanup(const char *xlog)
1167 {
1168         char            archiveStatusPath[MAXPGPATH];
1169
1170         /* Remove the .done file */
1171         StatusFilePath(archiveStatusPath, xlog, ".done");
1172         unlink(archiveStatusPath);
1173         /* should we complain about failure? */
1174
1175         /* Remove the .ready file if present --- normally it shouldn't be */
1176         StatusFilePath(archiveStatusPath, xlog, ".ready");
1177         unlink(archiveStatusPath);
1178         /* should we complain about failure? */
1179 }
1180
1181 /*
1182  * Advance the Insert state to the next buffer page, writing out the next
1183  * buffer if it still contains unwritten data.
1184  *
1185  * If new_segment is TRUE then we set up the next buffer page as the first
1186  * page of the next xlog segment file, possibly but not usually the next
1187  * consecutive file page.
1188  *
1189  * The global LogwrtRqst.Write pointer needs to be advanced to include the
1190  * just-filled page.  If we can do this for free (without an extra lock),
1191  * we do so here.  Otherwise the caller must do it.  We return TRUE if the
1192  * request update still needs to be done, FALSE if we did it internally.
1193  *
1194  * Must be called with WALInsertLock held.
1195  */
1196 static bool
1197 AdvanceXLInsertBuffer(bool new_segment)
1198 {
1199         XLogCtlInsert *Insert = &XLogCtl->Insert;
1200         XLogCtlWrite *Write = &XLogCtl->Write;
1201         int                     nextidx = NextBufIdx(Insert->curridx);
1202         bool            update_needed = true;
1203         XLogRecPtr      OldPageRqstPtr;
1204         XLogwrtRqst WriteRqst;
1205         XLogRecPtr      NewPageEndPtr;
1206         XLogPageHeader NewPage;
1207
1208         /* Use Insert->LogwrtResult copy if it's more fresh */
1209         if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
1210                 LogwrtResult = Insert->LogwrtResult;
1211
1212         /*
1213          * Get ending-offset of the buffer page we need to replace (this may be
1214          * zero if the buffer hasn't been used yet).  Fall through if it's already
1215          * written out.
1216          */
1217         OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
1218         if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1219         {
1220                 /* nope, got work to do... */
1221                 XLogRecPtr      FinishedPageRqstPtr;
1222
1223                 FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1224
1225                 /* Before waiting, get info_lck and update LogwrtResult */
1226                 {
1227                         /* use volatile pointer to prevent code rearrangement */
1228                         volatile XLogCtlData *xlogctl = XLogCtl;
1229
1230                         SpinLockAcquire(&xlogctl->info_lck);
1231                         if (XLByteLT(xlogctl->LogwrtRqst.Write, FinishedPageRqstPtr))
1232                                 xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr;
1233                         LogwrtResult = xlogctl->LogwrtResult;
1234                         SpinLockRelease(&xlogctl->info_lck);
1235                 }
1236
1237                 update_needed = false;  /* Did the shared-request update */
1238
1239                 if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1240                 {
1241                         /* OK, someone wrote it already */
1242                         Insert->LogwrtResult = LogwrtResult;
1243                 }
1244                 else
1245                 {
1246                         /* Must acquire write lock */
1247                         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1248                         LogwrtResult = Write->LogwrtResult;
1249                         if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
1250                         {
1251                                 /* OK, someone wrote it already */
1252                                 LWLockRelease(WALWriteLock);
1253                                 Insert->LogwrtResult = LogwrtResult;
1254                         }
1255                         else
1256                         {
1257                                 /*
1258                                  * Have to write buffers while holding insert lock. This is
1259                                  * not good, so only write as much as we absolutely must.
1260                                  */
1261                                 WriteRqst.Write = OldPageRqstPtr;
1262                                 WriteRqst.Flush.xlogid = 0;
1263                                 WriteRqst.Flush.xrecoff = 0;
1264                                 XLogWrite(WriteRqst, false, false);
1265                                 LWLockRelease(WALWriteLock);
1266                                 Insert->LogwrtResult = LogwrtResult;
1267                         }
1268                 }
1269         }
1270
1271         /*
1272          * Now the next buffer slot is free and we can set it up to be the next
1273          * output page.
1274          */
1275         NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
1276
1277         if (new_segment)
1278         {
1279                 /* force it to a segment start point */
1280                 NewPageEndPtr.xrecoff += XLogSegSize - 1;
1281                 NewPageEndPtr.xrecoff -= NewPageEndPtr.xrecoff % XLogSegSize;
1282         }
1283
1284         if (NewPageEndPtr.xrecoff >= XLogFileSize)
1285         {
1286                 /* crossing a logid boundary */
1287                 NewPageEndPtr.xlogid += 1;
1288                 NewPageEndPtr.xrecoff = XLOG_BLCKSZ;
1289         }
1290         else
1291                 NewPageEndPtr.xrecoff += XLOG_BLCKSZ;
1292         XLogCtl->xlblocks[nextidx] = NewPageEndPtr;
1293         NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ);
1294
1295         Insert->curridx = nextidx;
1296         Insert->currpage = NewPage;
1297
1298         Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD;
1299
1300         /*
1301          * Be sure to re-zero the buffer so that bytes beyond what we've written
1302          * will look like zeroes and not valid XLOG records...
1303          */
1304         MemSet((char *) NewPage, 0, XLOG_BLCKSZ);
1305
1306         /*
1307          * Fill the new page's header
1308          */
1309         NewPage   ->xlp_magic = XLOG_PAGE_MAGIC;
1310
1311         /* NewPage->xlp_info = 0; */    /* done by memset */
1312         NewPage   ->xlp_tli = ThisTimeLineID;
1313         NewPage   ->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
1314         NewPage   ->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - XLOG_BLCKSZ;
1315
1316         /*
1317          * If first page of an XLOG segment file, make it a long header.
1318          */
1319         if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
1320         {
1321                 XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage;
1322
1323                 NewLongPage->xlp_sysid = ControlFile->system_identifier;
1324                 NewLongPage->xlp_seg_size = XLogSegSize;
1325                 NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ;
1326                 NewPage   ->xlp_info |= XLP_LONG_HEADER;
1327
1328                 Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD;
1329         }
1330
1331         return update_needed;
1332 }
1333
1334 /*
1335  * Check whether we've consumed enough xlog space that a checkpoint is needed.
1336  *
1337  * Caller must have just finished filling the open log file (so that
1338  * openLogId/openLogSeg are valid).  We measure the distance from RedoRecPtr
1339  * to the open log file and see if that exceeds CheckPointSegments.
1340  *
1341  * Note: it is caller's responsibility that RedoRecPtr is up-to-date.
1342  */
1343 static bool
1344 XLogCheckpointNeeded(void)
1345 {
1346         /*
1347          * A straight computation of segment number could overflow 32
1348          * bits.  Rather than assuming we have working 64-bit
1349          * arithmetic, we compare the highest-order bits separately,
1350          * and force a checkpoint immediately when they change.
1351          */
1352         uint32          old_segno,
1353                                 new_segno;
1354         uint32          old_highbits,
1355                                 new_highbits;
1356
1357         old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
1358                 (RedoRecPtr.xrecoff / XLogSegSize);
1359         old_highbits = RedoRecPtr.xlogid / XLogSegSize;
1360         new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile + openLogSeg;
1361         new_highbits = openLogId / XLogSegSize;
1362         if (new_highbits != old_highbits ||
1363                 new_segno >= old_segno + (uint32) (CheckPointSegments-1))
1364                 return true;
1365         return false;
1366 }
1367
1368 /*
1369  * Write and/or fsync the log at least as far as WriteRqst indicates.
1370  *
1371  * If flexible == TRUE, we don't have to write as far as WriteRqst, but
1372  * may stop at any convenient boundary (such as a cache or logfile boundary).
1373  * This option allows us to avoid uselessly issuing multiple writes when a
1374  * single one would do.
1375  *
1376  * If xlog_switch == TRUE, we are intending an xlog segment switch, so
1377  * perform end-of-segment actions after writing the last page, even if
1378  * it's not physically the end of its segment.  (NB: this will work properly
1379  * only if caller specifies WriteRqst == page-end and flexible == false,
1380  * and there is some data to write.)
1381  *
1382  * Must be called with WALWriteLock held.
1383  */
1384 static void
1385 XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
1386 {
1387         XLogCtlWrite *Write = &XLogCtl->Write;
1388         bool            ispartialpage;
1389         bool            last_iteration;
1390         bool            finishing_seg;
1391         bool            use_existent;
1392         int                     curridx;
1393         int                     npages;
1394         int                     startidx;
1395         uint32          startoffset;
1396
1397         /* We should always be inside a critical section here */
1398         Assert(CritSectionCount > 0);
1399
1400         /*
1401          * Update local LogwrtResult (caller probably did this already, but...)
1402          */
1403         LogwrtResult = Write->LogwrtResult;
1404
1405         /*
1406          * Since successive pages in the xlog cache are consecutively allocated,
1407          * we can usually gather multiple pages together and issue just one
1408          * write() call.  npages is the number of pages we have determined can be
1409          * written together; startidx is the cache block index of the first one,
1410          * and startoffset is the file offset at which it should go. The latter
1411          * two variables are only valid when npages > 0, but we must initialize
1412          * all of them to keep the compiler quiet.
1413          */
1414         npages = 0;
1415         startidx = 0;
1416         startoffset = 0;
1417
1418         /*
1419          * Within the loop, curridx is the cache block index of the page to
1420          * consider writing.  We advance Write->curridx only after successfully
1421          * writing pages.  (Right now, this refinement is useless since we are
1422          * going to PANIC if any error occurs anyway; but someday it may come in
1423          * useful.)
1424          */
1425         curridx = Write->curridx;
1426
1427         while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
1428         {
1429                 /*
1430                  * Make sure we're not ahead of the insert process.  This could happen
1431                  * if we're passed a bogus WriteRqst.Write that is past the end of the
1432                  * last page that's been initialized by AdvanceXLInsertBuffer.
1433                  */
1434                 if (!XLByteLT(LogwrtResult.Write, XLogCtl->xlblocks[curridx]))
1435                         elog(PANIC, "xlog write request %X/%X is past end of log %X/%X",
1436                                  LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1437                                  XLogCtl->xlblocks[curridx].xlogid,
1438                                  XLogCtl->xlblocks[curridx].xrecoff);
1439
1440                 /* Advance LogwrtResult.Write to end of current buffer page */
1441                 LogwrtResult.Write = XLogCtl->xlblocks[curridx];
1442                 ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
1443
1444                 if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1445                 {
1446                         /*
1447                          * Switch to new logfile segment.  We cannot have any pending
1448                          * pages here (since we dump what we have at segment end).
1449                          */
1450                         Assert(npages == 0);
1451                         if (openLogFile >= 0)
1452                                 XLogFileClose();
1453                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1454
1455                         /* create/use new log file */
1456                         use_existent = true;
1457                         openLogFile = XLogFileInit(openLogId, openLogSeg,
1458                                                                            &use_existent, true);
1459                         openLogOff = 0;
1460                 }
1461
1462                 /* Make sure we have the current logfile open */
1463                 if (openLogFile < 0)
1464                 {
1465                         XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1466                         openLogFile = XLogFileOpen(openLogId, openLogSeg);
1467                         openLogOff = 0;
1468                 }
1469
1470                 /* Add current page to the set of pending pages-to-dump */
1471                 if (npages == 0)
1472                 {
1473                         /* first of group */
1474                         startidx = curridx;
1475                         startoffset = (LogwrtResult.Write.xrecoff - XLOG_BLCKSZ) % XLogSegSize;
1476                 }
1477                 npages++;
1478
1479                 /*
1480                  * Dump the set if this will be the last loop iteration, or if we are
1481                  * at the last page of the cache area (since the next page won't be
1482                  * contiguous in memory), or if we are at the end of the logfile
1483                  * segment.
1484                  */
1485                 last_iteration = !XLByteLT(LogwrtResult.Write, WriteRqst.Write);
1486
1487                 finishing_seg = !ispartialpage &&
1488                         (startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
1489
1490                 if (last_iteration ||
1491                         curridx == XLogCtl->XLogCacheBlck ||
1492                         finishing_seg)
1493                 {
1494                         char       *from;
1495                         Size            nbytes;
1496
1497                         /* Need to seek in the file? */
1498                         if (openLogOff != startoffset)
1499                         {
1500                                 if (lseek(openLogFile, (off_t) startoffset, SEEK_SET) < 0)
1501                                         ereport(PANIC,
1502                                                         (errcode_for_file_access(),
1503                                                          errmsg("could not seek in log file %u, "
1504                                                                         "segment %u to offset %u: %m",
1505                                                                         openLogId, openLogSeg, startoffset)));
1506                                 openLogOff = startoffset;
1507                         }
1508
1509                         /* OK to write the page(s) */
1510                         from = XLogCtl->pages + startidx * (Size) XLOG_BLCKSZ;
1511                         nbytes = npages * (Size) XLOG_BLCKSZ;
1512                         errno = 0;
1513                         if (write(openLogFile, from, nbytes) != nbytes)
1514                         {
1515                                 /* if write didn't set errno, assume no disk space */
1516                                 if (errno == 0)
1517                                         errno = ENOSPC;
1518                                 ereport(PANIC,
1519                                                 (errcode_for_file_access(),
1520                                                  errmsg("could not write to log file %u, segment %u "
1521                                                                 "at offset %u, length %lu: %m",
1522                                                                 openLogId, openLogSeg,
1523                                                                 openLogOff, (unsigned long) nbytes)));
1524                         }
1525
1526                         /* Update state for write */
1527                         openLogOff += nbytes;
1528                         Write->curridx = ispartialpage ? curridx : NextBufIdx(curridx);
1529                         npages = 0;
1530
1531                         /*
1532                          * If we just wrote the whole last page of a logfile segment,
1533                          * fsync the segment immediately.  This avoids having to go back
1534                          * and re-open prior segments when an fsync request comes along
1535                          * later. Doing it here ensures that one and only one backend will
1536                          * perform this fsync.
1537                          *
1538                          * We also do this if this is the last page written for an xlog
1539                          * switch.
1540                          *
1541                          * This is also the right place to notify the Archiver that the
1542                          * segment is ready to copy to archival storage, and to update the
1543                          * timer for archive_timeout, and to signal for a checkpoint if
1544                          * too many logfile segments have been used since the last
1545                          * checkpoint.
1546                          */
1547                         if (finishing_seg || (xlog_switch && last_iteration))
1548                         {
1549                                 issue_xlog_fsync();
1550                                 LogwrtResult.Flush = LogwrtResult.Write;                /* end of page */
1551
1552                                 if (XLogArchivingActive())
1553                                         XLogArchiveNotifySeg(openLogId, openLogSeg);
1554
1555                                 Write->lastSegSwitchTime = time(NULL);
1556
1557                                 /*
1558                                  * Signal bgwriter to start a checkpoint if we've consumed too
1559                                  * much xlog since the last one.  For speed, we first check
1560                                  * using the local copy of RedoRecPtr, which might be
1561                                  * out of date; if it looks like a checkpoint is needed,
1562                                  * forcibly update RedoRecPtr and recheck.
1563                                  */
1564                                 if (IsUnderPostmaster &&
1565                                         XLogCheckpointNeeded())
1566                                 {
1567                                         (void) GetRedoRecPtr();
1568                                         if (XLogCheckpointNeeded())
1569                                                 RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
1570                                 }
1571                         }
1572                 }
1573
1574                 if (ispartialpage)
1575                 {
1576                         /* Only asked to write a partial page */
1577                         LogwrtResult.Write = WriteRqst.Write;
1578                         break;
1579                 }
1580                 curridx = NextBufIdx(curridx);
1581
1582                 /* If flexible, break out of loop as soon as we wrote something */
1583                 if (flexible && npages == 0)
1584                         break;
1585         }
1586
1587         Assert(npages == 0);
1588         Assert(curridx == Write->curridx);
1589
1590         /*
1591          * If asked to flush, do so
1592          */
1593         if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
1594                 XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
1595         {
1596                 /*
1597                  * Could get here without iterating above loop, in which case we might
1598                  * have no open file or the wrong one.  However, we do not need to
1599                  * fsync more than one file.
1600                  */
1601                 if (sync_method != SYNC_METHOD_OPEN)
1602                 {
1603                         if (openLogFile >= 0 &&
1604                                 !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
1605                                 XLogFileClose();
1606                         if (openLogFile < 0)
1607                         {
1608                                 XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
1609                                 openLogFile = XLogFileOpen(openLogId, openLogSeg);
1610                                 openLogOff = 0;
1611                         }
1612                         issue_xlog_fsync();
1613                 }
1614                 LogwrtResult.Flush = LogwrtResult.Write;
1615         }
1616
1617         /*
1618          * Update shared-memory status
1619          *
1620          * We make sure that the shared 'request' values do not fall behind the
1621          * 'result' values.  This is not absolutely essential, but it saves some
1622          * code in a couple of places.
1623          */
1624         {
1625                 /* use volatile pointer to prevent code rearrangement */
1626                 volatile XLogCtlData *xlogctl = XLogCtl;
1627
1628                 SpinLockAcquire(&xlogctl->info_lck);
1629                 xlogctl->LogwrtResult = LogwrtResult;
1630                 if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
1631                         xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
1632                 if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
1633                         xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
1634                 SpinLockRelease(&xlogctl->info_lck);
1635         }
1636
1637         Write->LogwrtResult = LogwrtResult;
1638 }
1639
1640 /*
1641  * Record the LSN for an asynchronous transaction commit.
1642  * (This should not be called for aborts, nor for synchronous commits.)
1643  */
1644 void
1645 XLogSetAsyncCommitLSN(XLogRecPtr asyncCommitLSN)
1646 {
1647         /* use volatile pointer to prevent code rearrangement */
1648         volatile XLogCtlData *xlogctl = XLogCtl;
1649
1650         SpinLockAcquire(&xlogctl->info_lck);
1651         if (XLByteLT(xlogctl->asyncCommitLSN, asyncCommitLSN))
1652                 xlogctl->asyncCommitLSN = asyncCommitLSN;
1653         SpinLockRelease(&xlogctl->info_lck);
1654 }
1655
1656 /*
1657  * Ensure that all XLOG data through the given position is flushed to disk.
1658  *
1659  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
1660  * already held, and we try to avoid acquiring it if possible.
1661  */
1662 void
1663 XLogFlush(XLogRecPtr record)
1664 {
1665         XLogRecPtr      WriteRqstPtr;
1666         XLogwrtRqst WriteRqst;
1667
1668         /* Disabled during REDO */
1669         if (InRedo)
1670                 return;
1671
1672         /* Quick exit if already known flushed */
1673         if (XLByteLE(record, LogwrtResult.Flush))
1674                 return;
1675
1676 #ifdef WAL_DEBUG
1677         if (XLOG_DEBUG)
1678                 elog(LOG, "xlog flush request %X/%X; write %X/%X; flush %X/%X",
1679                          record.xlogid, record.xrecoff,
1680                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1681                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1682 #endif
1683
1684         START_CRIT_SECTION();
1685
1686         /*
1687          * Since fsync is usually a horribly expensive operation, we try to
1688          * piggyback as much data as we can on each fsync: if we see any more data
1689          * entered into the xlog buffer, we'll write and fsync that too, so that
1690          * the final value of LogwrtResult.Flush is as large as possible. This
1691          * gives us some chance of avoiding another fsync immediately after.
1692          */
1693
1694         /* initialize to given target; may increase below */
1695         WriteRqstPtr = record;
1696
1697         /* read LogwrtResult and update local state */
1698         {
1699                 /* use volatile pointer to prevent code rearrangement */
1700                 volatile XLogCtlData *xlogctl = XLogCtl;
1701
1702                 SpinLockAcquire(&xlogctl->info_lck);
1703                 if (XLByteLT(WriteRqstPtr, xlogctl->LogwrtRqst.Write))
1704                         WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1705                 LogwrtResult = xlogctl->LogwrtResult;
1706                 SpinLockRelease(&xlogctl->info_lck);
1707         }
1708
1709         /* done already? */
1710         if (!XLByteLE(record, LogwrtResult.Flush))
1711         {
1712                 /* now wait for the write lock */
1713                 LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1714                 LogwrtResult = XLogCtl->Write.LogwrtResult;
1715                 if (!XLByteLE(record, LogwrtResult.Flush))
1716                 {
1717                         /* try to write/flush later additions to XLOG as well */
1718                         if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
1719                         {
1720                                 XLogCtlInsert *Insert = &XLogCtl->Insert;
1721                                 uint32          freespace = INSERT_FREESPACE(Insert);
1722
1723                                 if (freespace < SizeOfXLogRecord)               /* buffer is full */
1724                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1725                                 else
1726                                 {
1727                                         WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
1728                                         WriteRqstPtr.xrecoff -= freespace;
1729                                 }
1730                                 LWLockRelease(WALInsertLock);
1731                                 WriteRqst.Write = WriteRqstPtr;
1732                                 WriteRqst.Flush = WriteRqstPtr;
1733                         }
1734                         else
1735                         {
1736                                 WriteRqst.Write = WriteRqstPtr;
1737                                 WriteRqst.Flush = record;
1738                         }
1739                         XLogWrite(WriteRqst, false, false);
1740                 }
1741                 LWLockRelease(WALWriteLock);
1742         }
1743
1744         END_CRIT_SECTION();
1745
1746         /*
1747          * If we still haven't flushed to the request point then we have a
1748          * problem; most likely, the requested flush point is past end of XLOG.
1749          * This has been seen to occur when a disk page has a corrupted LSN.
1750          *
1751          * Formerly we treated this as a PANIC condition, but that hurts the
1752          * system's robustness rather than helping it: we do not want to take down
1753          * the whole system due to corruption on one data page.  In particular, if
1754          * the bad page is encountered again during recovery then we would be
1755          * unable to restart the database at all!  (This scenario has actually
1756          * happened in the field several times with 7.1 releases. Note that we
1757          * cannot get here while InRedo is true, but if the bad page is brought in
1758          * and marked dirty during recovery then CreateCheckPoint will try to
1759          * flush it at the end of recovery.)
1760          *
1761          * The current approach is to ERROR under normal conditions, but only
1762          * WARNING during recovery, so that the system can be brought up even if
1763          * there's a corrupt LSN.  Note that for calls from xact.c, the ERROR will
1764          * be promoted to PANIC since xact.c calls this routine inside a critical
1765          * section.  However, calls from bufmgr.c are not within critical sections
1766          * and so we will not force a restart for a bad LSN on a data page.
1767          */
1768         if (XLByteLT(LogwrtResult.Flush, record))
1769                 elog(InRecovery ? WARNING : ERROR,
1770                 "xlog flush request %X/%X is not satisfied --- flushed only to %X/%X",
1771                          record.xlogid, record.xrecoff,
1772                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1773 }
1774
1775 /*
1776  * Flush xlog, but without specifying exactly where to flush to.
1777  *
1778  * We normally flush only completed blocks; but if there is nothing to do on
1779  * that basis, we check for unflushed async commits in the current incomplete
1780  * block, and flush through the latest one of those.  Thus, if async commits
1781  * are not being used, we will flush complete blocks only.  We can guarantee
1782  * that async commits reach disk after at most three cycles; normally only
1783  * one or two.  (We allow XLogWrite to write "flexibly", meaning it can stop
1784  * at the end of the buffer ring; this makes a difference only with very high
1785  * load or long wal_writer_delay, but imposes one extra cycle for the worst
1786  * case for async commits.)
1787  *
1788  * This routine is invoked periodically by the background walwriter process.
1789  */
1790 void
1791 XLogBackgroundFlush(void)
1792 {
1793         XLogRecPtr      WriteRqstPtr;
1794         bool            flexible = true;
1795
1796         /* read LogwrtResult and update local state */
1797         {
1798                 /* use volatile pointer to prevent code rearrangement */
1799                 volatile XLogCtlData *xlogctl = XLogCtl;
1800
1801                 SpinLockAcquire(&xlogctl->info_lck);
1802                 LogwrtResult = xlogctl->LogwrtResult;
1803                 WriteRqstPtr = xlogctl->LogwrtRqst.Write;
1804                 SpinLockRelease(&xlogctl->info_lck);
1805         }
1806
1807         /* back off to last completed page boundary */
1808         WriteRqstPtr.xrecoff -= WriteRqstPtr.xrecoff % XLOG_BLCKSZ;
1809
1810         /* if we have already flushed that far, consider async commit records */
1811         if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
1812         {
1813                 /* use volatile pointer to prevent code rearrangement */
1814                 volatile XLogCtlData *xlogctl = XLogCtl;
1815
1816                 SpinLockAcquire(&xlogctl->info_lck);
1817                 WriteRqstPtr = xlogctl->asyncCommitLSN;
1818                 SpinLockRelease(&xlogctl->info_lck);
1819                 flexible = false;               /* ensure it all gets written */
1820         }
1821
1822         /* Done if already known flushed */
1823         if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
1824                 return;
1825
1826 #ifdef WAL_DEBUG
1827         if (XLOG_DEBUG)
1828                 elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
1829                          WriteRqstPtr.xlogid, WriteRqstPtr.xrecoff,
1830                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
1831                          LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
1832 #endif
1833
1834         START_CRIT_SECTION();
1835
1836         /* now wait for the write lock */
1837         LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
1838         LogwrtResult = XLogCtl->Write.LogwrtResult;
1839         if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
1840         {
1841                 XLogwrtRqst WriteRqst;
1842
1843                 WriteRqst.Write = WriteRqstPtr;
1844                 WriteRqst.Flush = WriteRqstPtr;
1845                 XLogWrite(WriteRqst, flexible, false);
1846         }
1847         LWLockRelease(WALWriteLock);
1848
1849         END_CRIT_SECTION();
1850 }
1851
1852 /*
1853  * Flush any previous asynchronously-committed transactions' commit records.
1854  *
1855  * NOTE: it is unwise to assume that this provides any strong guarantees.
1856  * In particular, because of the inexact LSN bookkeeping used by clog.c,
1857  * we cannot assume that hint bits will be settable for these transactions.
1858  */
1859 void
1860 XLogAsyncCommitFlush(void)
1861 {
1862         XLogRecPtr      WriteRqstPtr;
1863         /* use volatile pointer to prevent code rearrangement */
1864         volatile XLogCtlData *xlogctl = XLogCtl;
1865
1866         SpinLockAcquire(&xlogctl->info_lck);
1867         WriteRqstPtr = xlogctl->asyncCommitLSN;
1868         SpinLockRelease(&xlogctl->info_lck);
1869
1870         XLogFlush(WriteRqstPtr);
1871 }
1872
1873 /*
1874  * Test whether XLOG data has been flushed up to (at least) the given position.
1875  *
1876  * Returns true if a flush is still needed.  (It may be that someone else
1877  * is already in process of flushing that far, however.)
1878  */
1879 bool
1880 XLogNeedsFlush(XLogRecPtr record)
1881 {
1882         /* Quick exit if already known flushed */
1883         if (XLByteLE(record, LogwrtResult.Flush))
1884                 return false;
1885
1886         /* read LogwrtResult and update local state */
1887         {
1888                 /* use volatile pointer to prevent code rearrangement */
1889                 volatile XLogCtlData *xlogctl = XLogCtl;
1890
1891                 SpinLockAcquire(&xlogctl->info_lck);
1892                 LogwrtResult = xlogctl->LogwrtResult;
1893                 SpinLockRelease(&xlogctl->info_lck);
1894         }
1895
1896         /* check again */
1897         if (XLByteLE(record, LogwrtResult.Flush))
1898                 return false;
1899
1900         return true;
1901 }
1902
1903 /*
1904  * Create a new XLOG file segment, or open a pre-existing one.
1905  *
1906  * log, seg: identify segment to be created/opened.
1907  *
1908  * *use_existent: if TRUE, OK to use a pre-existing file (else, any
1909  * pre-existing file will be deleted).  On return, TRUE if a pre-existing
1910  * file was used.
1911  *
1912  * use_lock: if TRUE, acquire ControlFileLock while moving file into
1913  * place.  This should be TRUE except during bootstrap log creation.  The
1914  * caller must *not* hold the lock at call.
1915  *
1916  * Returns FD of opened file.
1917  *
1918  * Note: errors here are ERROR not PANIC because we might or might not be
1919  * inside a critical section (eg, during checkpoint there is no reason to
1920  * take down the system on failure).  They will promote to PANIC if we are
1921  * in a critical section.
1922  */
1923 static int
1924 XLogFileInit(uint32 log, uint32 seg,
1925                          bool *use_existent, bool use_lock)
1926 {
1927         char            path[MAXPGPATH];
1928         char            tmppath[MAXPGPATH];
1929         char       *zbuffer;
1930         uint32          installed_log;
1931         uint32          installed_seg;
1932         int                     max_advance;
1933         int                     fd;
1934         int                     nbytes;
1935
1936         XLogFilePath(path, ThisTimeLineID, log, seg);
1937
1938         /*
1939          * Try to use existent file (checkpoint maker may have created it already)
1940          */
1941         if (*use_existent)
1942         {
1943                 fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
1944                                                    S_IRUSR | S_IWUSR);
1945                 if (fd < 0)
1946                 {
1947                         if (errno != ENOENT)
1948                                 ereport(ERROR,
1949                                                 (errcode_for_file_access(),
1950                                                  errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
1951                                                                 path, log, seg)));
1952                 }
1953                 else
1954                         return fd;
1955         }
1956
1957         /*
1958          * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
1959          * another process is doing the same thing.  If so, we will end up
1960          * pre-creating an extra log segment.  That seems OK, and better than
1961          * holding the lock throughout this lengthy process.
1962          */
1963         elog(DEBUG2, "creating and filling new WAL file");
1964
1965         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
1966
1967         unlink(tmppath);
1968
1969         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
1970         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
1971                                            S_IRUSR | S_IWUSR);
1972         if (fd < 0)
1973                 ereport(ERROR,
1974                                 (errcode_for_file_access(),
1975                                  errmsg("could not create file \"%s\": %m", tmppath)));
1976
1977         /*
1978          * Zero-fill the file.  We have to do this the hard way to ensure that all
1979          * the file space has really been allocated --- on platforms that allow
1980          * "holes" in files, just seeking to the end doesn't allocate intermediate
1981          * space.  This way, we know that we have all the space and (after the
1982          * fsync below) that all the indirect blocks are down on disk.  Therefore,
1983          * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
1984          * log file.
1985          *
1986          * Note: palloc zbuffer, instead of just using a local char array, to
1987          * ensure it is reasonably well-aligned; this may save a few cycles
1988          * transferring data to the kernel.
1989          */
1990         zbuffer = (char *) palloc0(XLOG_BLCKSZ);
1991         for (nbytes = 0; nbytes < XLogSegSize; nbytes += XLOG_BLCKSZ)
1992         {
1993                 errno = 0;
1994                 if ((int) write(fd, zbuffer, XLOG_BLCKSZ) != (int) XLOG_BLCKSZ)
1995                 {
1996                         int                     save_errno = errno;
1997
1998                         /*
1999                          * If we fail to make the file, delete it to release disk space
2000                          */
2001                         unlink(tmppath);
2002                         /* if write didn't set errno, assume problem is no disk space */
2003                         errno = save_errno ? save_errno : ENOSPC;
2004
2005                         ereport(ERROR,
2006                                         (errcode_for_file_access(),
2007                                          errmsg("could not write to file \"%s\": %m", tmppath)));
2008                 }
2009         }
2010         pfree(zbuffer);
2011
2012         if (pg_fsync(fd) != 0)
2013                 ereport(ERROR,
2014                                 (errcode_for_file_access(),
2015                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
2016
2017         if (close(fd))
2018                 ereport(ERROR,
2019                                 (errcode_for_file_access(),
2020                                  errmsg("could not close file \"%s\": %m", tmppath)));
2021
2022         /*
2023          * Now move the segment into place with its final name.
2024          *
2025          * If caller didn't want to use a pre-existing file, get rid of any
2026          * pre-existing file.  Otherwise, cope with possibility that someone else
2027          * has created the file while we were filling ours: if so, use ours to
2028          * pre-create a future log segment.
2029          */
2030         installed_log = log;
2031         installed_seg = seg;
2032         max_advance = XLOGfileslop;
2033         if (!InstallXLogFileSegment(&installed_log, &installed_seg, tmppath,
2034                                                                 *use_existent, &max_advance,
2035                                                                 use_lock))
2036         {
2037                 /* No need for any more future segments... */
2038                 unlink(tmppath);
2039         }
2040
2041         elog(DEBUG2, "done creating and filling new WAL file");
2042
2043         /* Set flag to tell caller there was no existent file */
2044         *use_existent = false;
2045
2046         /* Now open original target segment (might not be file I just made) */
2047         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
2048                                            S_IRUSR | S_IWUSR);
2049         if (fd < 0)
2050                 ereport(ERROR,
2051                                 (errcode_for_file_access(),
2052                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2053                                   path, log, seg)));
2054
2055         return fd;
2056 }
2057
2058 /*
2059  * Create a new XLOG file segment by copying a pre-existing one.
2060  *
2061  * log, seg: identify segment to be created.
2062  *
2063  * srcTLI, srclog, srcseg: identify segment to be copied (could be from
2064  *              a different timeline)
2065  *
2066  * Currently this is only used during recovery, and so there are no locking
2067  * considerations.      But we should be just as tense as XLogFileInit to avoid
2068  * emplacing a bogus file.
2069  */
2070 static void
2071 XLogFileCopy(uint32 log, uint32 seg,
2072                          TimeLineID srcTLI, uint32 srclog, uint32 srcseg)
2073 {
2074         char            path[MAXPGPATH];
2075         char            tmppath[MAXPGPATH];
2076         char            buffer[XLOG_BLCKSZ];
2077         int                     srcfd;
2078         int                     fd;
2079         int                     nbytes;
2080
2081         /*
2082          * Open the source file
2083          */
2084         XLogFilePath(path, srcTLI, srclog, srcseg);
2085         srcfd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2086         if (srcfd < 0)
2087                 ereport(ERROR,
2088                                 (errcode_for_file_access(),
2089                                  errmsg("could not open file \"%s\": %m", path)));
2090
2091         /*
2092          * Copy into a temp file name.
2093          */
2094         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
2095
2096         unlink(tmppath);
2097
2098         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
2099         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
2100                                            S_IRUSR | S_IWUSR);
2101         if (fd < 0)
2102                 ereport(ERROR,
2103                                 (errcode_for_file_access(),
2104                                  errmsg("could not create file \"%s\": %m", tmppath)));
2105
2106         /*
2107          * Do the data copying.
2108          */
2109         for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(buffer))
2110         {
2111                 errno = 0;
2112                 if ((int) read(srcfd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
2113                 {
2114                         if (errno != 0)
2115                                 ereport(ERROR,
2116                                                 (errcode_for_file_access(),
2117                                                  errmsg("could not read file \"%s\": %m", path)));
2118                         else
2119                                 ereport(ERROR,
2120                                                 (errmsg("not enough data in file \"%s\"", path)));
2121                 }
2122                 errno = 0;
2123                 if ((int) write(fd, buffer, sizeof(buffer)) != (int) sizeof(buffer))
2124                 {
2125                         int                     save_errno = errno;
2126
2127                         /*
2128                          * If we fail to make the file, delete it to release disk space
2129                          */
2130                         unlink(tmppath);
2131                         /* if write didn't set errno, assume problem is no disk space */
2132                         errno = save_errno ? save_errno : ENOSPC;
2133
2134                         ereport(ERROR,
2135                                         (errcode_for_file_access(),
2136                                          errmsg("could not write to file \"%s\": %m", tmppath)));
2137                 }
2138         }
2139
2140         if (pg_fsync(fd) != 0)
2141                 ereport(ERROR,
2142                                 (errcode_for_file_access(),
2143                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
2144
2145         if (close(fd))
2146                 ereport(ERROR,
2147                                 (errcode_for_file_access(),
2148                                  errmsg("could not close file \"%s\": %m", tmppath)));
2149
2150         close(srcfd);
2151
2152         /*
2153          * Now move the segment into place with its final name.
2154          */
2155         if (!InstallXLogFileSegment(&log, &seg, tmppath, false, NULL, false))
2156                 elog(ERROR, "InstallXLogFileSegment should not have failed");
2157 }
2158
2159 /*
2160  * Install a new XLOG segment file as a current or future log segment.
2161  *
2162  * This is used both to install a newly-created segment (which has a temp
2163  * filename while it's being created) and to recycle an old segment.
2164  *
2165  * *log, *seg: identify segment to install as (or first possible target).
2166  * When find_free is TRUE, these are modified on return to indicate the
2167  * actual installation location or last segment searched.
2168  *
2169  * tmppath: initial name of file to install.  It will be renamed into place.
2170  *
2171  * find_free: if TRUE, install the new segment at the first empty log/seg
2172  * number at or after the passed numbers.  If FALSE, install the new segment
2173  * exactly where specified, deleting any existing segment file there.
2174  *
2175  * *max_advance: maximum number of log/seg slots to advance past the starting
2176  * point.  Fail if no free slot is found in this range.  On return, reduced
2177  * by the number of slots skipped over.  (Irrelevant, and may be NULL,
2178  * when find_free is FALSE.)
2179  *
2180  * use_lock: if TRUE, acquire ControlFileLock while moving file into
2181  * place.  This should be TRUE except during bootstrap log creation.  The
2182  * caller must *not* hold the lock at call.
2183  *
2184  * Returns TRUE if file installed, FALSE if not installed because of
2185  * exceeding max_advance limit.  On Windows, we also return FALSE if we
2186  * can't rename the file into place because someone's got it open.
2187  * (Any other kind of failure causes ereport().)
2188  */
2189 static bool
2190 InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
2191                                            bool find_free, int *max_advance,
2192                                            bool use_lock)
2193 {
2194         char            path[MAXPGPATH];
2195         struct stat stat_buf;
2196
2197         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2198
2199         /*
2200          * We want to be sure that only one process does this at a time.
2201          */
2202         if (use_lock)
2203                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
2204
2205         if (!find_free)
2206         {
2207                 /* Force installation: get rid of any pre-existing segment file */
2208                 unlink(path);
2209         }
2210         else
2211         {
2212                 /* Find a free slot to put it in */
2213                 while (stat(path, &stat_buf) == 0)
2214                 {
2215                         if (*max_advance <= 0)
2216                         {
2217                                 /* Failed to find a free slot within specified range */
2218                                 if (use_lock)
2219                                         LWLockRelease(ControlFileLock);
2220                                 return false;
2221                         }
2222                         NextLogSeg(*log, *seg);
2223                         (*max_advance)--;
2224                         XLogFilePath(path, ThisTimeLineID, *log, *seg);
2225                 }
2226         }
2227
2228         /*
2229          * Prefer link() to rename() here just to be really sure that we don't
2230          * overwrite an existing logfile.  However, there shouldn't be one, so
2231          * rename() is an acceptable substitute except for the truly paranoid.
2232          */
2233 #if HAVE_WORKING_LINK
2234         if (link(tmppath, path) < 0)
2235                 ereport(ERROR,
2236                                 (errcode_for_file_access(),
2237                                  errmsg("could not link file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2238                                                 tmppath, path, *log, *seg)));
2239         unlink(tmppath);
2240 #else
2241         if (rename(tmppath, path) < 0)
2242         {
2243 #ifdef WIN32
2244 #if !defined(__CYGWIN__)
2245                 if (GetLastError() == ERROR_ACCESS_DENIED)
2246 #else
2247                 if (errno == EACCES)
2248 #endif
2249                 {
2250                         if (use_lock)
2251                                 LWLockRelease(ControlFileLock);
2252                         return false;
2253                 }
2254 #endif /* WIN32 */
2255
2256                 ereport(ERROR,
2257                                 (errcode_for_file_access(),
2258                                  errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
2259                                                 tmppath, path, *log, *seg)));
2260         }
2261 #endif
2262
2263         if (use_lock)
2264                 LWLockRelease(ControlFileLock);
2265
2266         return true;
2267 }
2268
2269 /*
2270  * Open a pre-existing logfile segment for writing.
2271  */
2272 static int
2273 XLogFileOpen(uint32 log, uint32 seg)
2274 {
2275         char            path[MAXPGPATH];
2276         int                     fd;
2277
2278         XLogFilePath(path, ThisTimeLineID, log, seg);
2279
2280         fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
2281                                            S_IRUSR | S_IWUSR);
2282         if (fd < 0)
2283                 ereport(PANIC,
2284                                 (errcode_for_file_access(),
2285                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2286                                   path, log, seg)));
2287
2288         return fd;
2289 }
2290
2291 /*
2292  * Open a logfile segment for reading (during recovery).
2293  */
2294 static int
2295 XLogFileRead(uint32 log, uint32 seg, int emode)
2296 {
2297         char            path[MAXPGPATH];
2298         char            xlogfname[MAXFNAMELEN];
2299         char            activitymsg[MAXFNAMELEN + 16];
2300         ListCell   *cell;
2301         int                     fd;
2302
2303         /*
2304          * Loop looking for a suitable timeline ID: we might need to read any of
2305          * the timelines listed in expectedTLIs.
2306          *
2307          * We expect curFileTLI on entry to be the TLI of the preceding file in
2308          * sequence, or 0 if there was no predecessor.  We do not allow curFileTLI
2309          * to go backwards; this prevents us from picking up the wrong file when a
2310          * parent timeline extends to higher segment numbers than the child we
2311          * want to read.
2312          */
2313         foreach(cell, expectedTLIs)
2314         {
2315                 TimeLineID      tli = (TimeLineID) lfirst_int(cell);
2316
2317                 if (tli < curFileTLI)
2318                         break;                          /* don't bother looking at too-old TLIs */
2319
2320                 XLogFileName(xlogfname, tli, log, seg);
2321
2322                 if (InArchiveRecovery)
2323                 {
2324                         /* Report recovery progress in PS display */
2325                         snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
2326                                          xlogfname);
2327                         set_ps_display(activitymsg, false);
2328
2329                         restoredFromArchive = RestoreArchivedFile(path, xlogfname,
2330                                                                                                           "RECOVERYXLOG",
2331                                                                                                           XLogSegSize);
2332                 }
2333                 else
2334                         XLogFilePath(path, tli, log, seg);
2335
2336                 fd = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
2337                 if (fd >= 0)
2338                 {
2339                         /* Success! */
2340                         curFileTLI = tli;
2341
2342                         /* Report recovery progress in PS display */
2343                         snprintf(activitymsg, sizeof(activitymsg), "recovering %s",
2344                                          xlogfname);
2345                         set_ps_display(activitymsg, false);
2346
2347                         return fd;
2348                 }
2349                 if (errno != ENOENT)    /* unexpected failure? */
2350                         ereport(PANIC,
2351                                         (errcode_for_file_access(),
2352                         errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2353                                    path, log, seg)));
2354         }
2355
2356         /* Couldn't find it.  For simplicity, complain about front timeline */
2357         XLogFilePath(path, recoveryTargetTLI, log, seg);
2358         errno = ENOENT;
2359         ereport(emode,
2360                         (errcode_for_file_access(),
2361                    errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
2362                                   path, log, seg)));
2363         return -1;
2364 }
2365
2366 /*
2367  * Close the current logfile segment for writing.
2368  */
2369 static void
2370 XLogFileClose(void)
2371 {
2372         Assert(openLogFile >= 0);
2373
2374         /*
2375          * posix_fadvise is problematic on many platforms: on older x86 Linux it
2376          * just dumps core, and there are reports of problems on PPC platforms as
2377          * well.  The following is therefore disabled for the time being. We could
2378          * consider some kind of configure test to see if it's safe to use, but
2379          * since we lack hard evidence that there's any useful performance gain to
2380          * be had, spending time on that seems unprofitable for now.
2381          */
2382 #ifdef NOT_USED
2383
2384         /*
2385          * WAL segment files will not be re-read in normal operation, so we advise
2386          * OS to release any cached pages.      But do not do so if WAL archiving is
2387          * active, because archiver process could use the cache to read the WAL
2388          * segment.
2389          *
2390          * While O_DIRECT works for O_SYNC, posix_fadvise() works for fsync() and
2391          * O_SYNC, and some platforms only have posix_fadvise().
2392          */
2393 #if defined(HAVE_DECL_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
2394         if (!XLogArchivingActive())
2395                 posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
2396 #endif
2397 #endif   /* NOT_USED */
2398
2399         if (close(openLogFile))
2400                 ereport(PANIC,
2401                                 (errcode_for_file_access(),
2402                                  errmsg("could not close log file %u, segment %u: %m",
2403                                                 openLogId, openLogSeg)));
2404         openLogFile = -1;
2405 }
2406
2407 /*
2408  * Attempt to retrieve the specified file from off-line archival storage.
2409  * If successful, fill "path" with its complete path (note that this will be
2410  * a temp file name that doesn't follow the normal naming convention), and
2411  * return TRUE.
2412  *
2413  * If not successful, fill "path" with the name of the normal on-line file
2414  * (which may or may not actually exist, but we'll try to use it), and return
2415  * FALSE.
2416  *
2417  * For fixed-size files, the caller may pass the expected size as an
2418  * additional crosscheck on successful recovery.  If the file size is not
2419  * known, set expectedSize = 0.
2420  */
2421 static bool
2422 RestoreArchivedFile(char *path, const char *xlogfname,
2423                                         const char *recovername, off_t expectedSize)
2424 {
2425         char            xlogpath[MAXPGPATH];
2426         char            xlogRestoreCmd[MAXPGPATH];
2427         char            lastRestartPointFname[MAXPGPATH];
2428         char       *dp;
2429         char       *endp;
2430         const char *sp;
2431         int                     rc;
2432         bool            signaled;
2433         struct stat stat_buf;
2434         uint32          restartLog;
2435         uint32          restartSeg;
2436
2437         /*
2438          * When doing archive recovery, we always prefer an archived log file even
2439          * if a file of the same name exists in XLOGDIR.  The reason is that the
2440          * file in XLOGDIR could be an old, un-filled or partly-filled version
2441          * that was copied and restored as part of backing up $PGDATA.
2442          *
2443          * We could try to optimize this slightly by checking the local copy
2444          * lastchange timestamp against the archived copy, but we have no API to
2445          * do this, nor can we guarantee that the lastchange timestamp was
2446          * preserved correctly when we copied to archive. Our aim is robustness,
2447          * so we elect not to do this.
2448          *
2449          * If we cannot obtain the log file from the archive, however, we will try
2450          * to use the XLOGDIR file if it exists.  This is so that we can make use
2451          * of log segments that weren't yet transferred to the archive.
2452          *
2453          * Notice that we don't actually overwrite any files when we copy back
2454          * from archive because the recoveryRestoreCommand may inadvertently
2455          * restore inappropriate xlogs, or they may be corrupt, so we may wish to
2456          * fallback to the segments remaining in current XLOGDIR later. The
2457          * copy-from-archive filename is always the same, ensuring that we don't
2458          * run out of disk space on long recoveries.
2459          */
2460         snprintf(xlogpath, MAXPGPATH, XLOGDIR "/%s", recovername);
2461
2462         /*
2463          * Make sure there is no existing file named recovername.
2464          */
2465         if (stat(xlogpath, &stat_buf) != 0)
2466         {
2467                 if (errno != ENOENT)
2468                         ereport(FATAL,
2469                                         (errcode_for_file_access(),
2470                                          errmsg("could not stat file \"%s\": %m",
2471                                                         xlogpath)));
2472         }
2473         else
2474         {
2475                 if (unlink(xlogpath) != 0)
2476                         ereport(FATAL,
2477                                         (errcode_for_file_access(),
2478                                          errmsg("could not remove file \"%s\": %m",
2479                                                         xlogpath)));
2480         }
2481
2482         /*
2483          * construct the command to be executed
2484          */
2485         dp = xlogRestoreCmd;
2486         endp = xlogRestoreCmd + MAXPGPATH - 1;
2487         *endp = '\0';
2488
2489         for (sp = recoveryRestoreCommand; *sp; sp++)
2490         {
2491                 if (*sp == '%')
2492                 {
2493                         switch (sp[1])
2494                         {
2495                                 case 'p':
2496                                         /* %p: relative path of target file */
2497                                         sp++;
2498                                         StrNCpy(dp, xlogpath, endp - dp);
2499                                         make_native_path(dp);
2500                                         dp += strlen(dp);
2501                                         break;
2502                                 case 'f':
2503                                         /* %f: filename of desired file */
2504                                         sp++;
2505                                         StrNCpy(dp, xlogfname, endp - dp);
2506                                         dp += strlen(dp);
2507                                         break;
2508                                 case 'r':
2509                                         /* %r: filename of last restartpoint */
2510                                         sp++;
2511                                         XLByteToSeg(ControlFile->checkPointCopy.redo,
2512                                                                 restartLog, restartSeg);
2513                                         XLogFileName(lastRestartPointFname, 
2514                                                                  ControlFile->checkPointCopy.ThisTimeLineID, 
2515                                                                  restartLog, restartSeg);
2516                                         StrNCpy(dp, lastRestartPointFname, endp - dp);
2517                                         dp += strlen(dp);
2518                                         break;
2519                                 case '%':
2520                                         /* convert %% to a single % */
2521                                         sp++;
2522                                         if (dp < endp)
2523                                                 *dp++ = *sp;
2524                                         break;
2525                                 default:
2526                                         /* otherwise treat the % as not special */
2527                                         if (dp < endp)
2528                                                 *dp++ = *sp;
2529                                         break;
2530                         }
2531                 }
2532                 else
2533                 {
2534                         if (dp < endp)
2535                                 *dp++ = *sp;
2536                 }
2537         }
2538         *dp = '\0';
2539
2540         ereport(DEBUG3,
2541                         (errmsg_internal("executing restore command \"%s\"",
2542                                                          xlogRestoreCmd)));
2543
2544         /*
2545          * Copy xlog from archival storage to XLOGDIR
2546          */
2547         rc = system(xlogRestoreCmd);
2548         if (rc == 0)
2549         {
2550                 /*
2551                  * command apparently succeeded, but let's make sure the file is
2552                  * really there now and has the correct size.
2553                  *
2554                  * XXX I made wrong-size a fatal error to ensure the DBA would notice
2555                  * it, but is that too strong?  We could try to plow ahead with a
2556                  * local copy of the file ... but the problem is that there probably
2557                  * isn't one, and we'd incorrectly conclude we've reached the end of
2558                  * WAL and we're done recovering ...
2559                  */
2560                 if (stat(xlogpath, &stat_buf) == 0)
2561                 {
2562                         if (expectedSize > 0 && stat_buf.st_size != expectedSize)
2563                                 ereport(FATAL,
2564                                                 (errmsg("archive file \"%s\" has wrong size: %lu instead of %lu",
2565                                                                 xlogfname,
2566                                                                 (unsigned long) stat_buf.st_size,
2567                                                                 (unsigned long) expectedSize)));
2568                         else
2569                         {
2570                                 ereport(LOG,
2571                                                 (errmsg("restored log file \"%s\" from archive",
2572                                                                 xlogfname)));
2573                                 strcpy(path, xlogpath);
2574                                 return true;
2575                         }
2576                 }
2577                 else
2578                 {
2579                         /* stat failed */
2580                         if (errno != ENOENT)
2581                                 ereport(FATAL,
2582                                                 (errcode_for_file_access(),
2583                                                  errmsg("could not stat file \"%s\": %m",
2584                                                                 xlogpath)));
2585                 }
2586         }
2587
2588         /*
2589          * Remember, we rollforward UNTIL the restore fails so failure here is
2590          * just part of the process... that makes it difficult to determine
2591          * whether the restore failed because there isn't an archive to restore,
2592          * or because the administrator has specified the restore program
2593          * incorrectly.  We have to assume the former.
2594          *
2595          * However, if the failure was due to any sort of signal, it's best to
2596          * punt and abort recovery.  (If we "return false" here, upper levels
2597          * will assume that recovery is complete and start up the database!)
2598          * It's essential to abort on child SIGINT and SIGQUIT, because per spec
2599          * system() ignores SIGINT and SIGQUIT while waiting; if we see one of
2600          * those it's a good bet we should have gotten it too.  Aborting on other
2601          * signals such as SIGTERM seems a good idea as well.
2602          *
2603          * Per the Single Unix Spec, shells report exit status > 128 when
2604          * a called command died on a signal.  Also, 126 and 127 are used to
2605          * report problems such as an unfindable command; treat those as fatal
2606          * errors too.
2607          */
2608         signaled = WIFSIGNALED(rc) || WEXITSTATUS(rc) > 125;
2609
2610         ereport(signaled ? FATAL : DEBUG2,
2611                 (errmsg("could not restore file \"%s\" from archive: return code %d",
2612                                 xlogfname, rc)));
2613
2614         /*
2615          * if an archived file is not available, there might still be a version of
2616          * this file in XLOGDIR, so return that as the filename to open.
2617          *
2618          * In many recovery scenarios we expect this to fail also, but if so that
2619          * just means we've reached the end of WAL.
2620          */
2621         snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlogfname);
2622         return false;
2623 }
2624
2625 /*
2626  * Preallocate log files beyond the specified log endpoint.
2627  *
2628  * XXX this is currently extremely conservative, since it forces only one
2629  * future log segment to exist, and even that only if we are 75% done with
2630  * the current one.  This is only appropriate for very low-WAL-volume systems.
2631  * High-volume systems will be OK once they've built up a sufficient set of
2632  * recycled log segments, but the startup transient is likely to include
2633  * a lot of segment creations by foreground processes, which is not so good.
2634  */
2635 static void
2636 PreallocXlogFiles(XLogRecPtr endptr)
2637 {
2638         uint32          _logId;
2639         uint32          _logSeg;
2640         int                     lf;
2641         bool            use_existent;
2642
2643         XLByteToPrevSeg(endptr, _logId, _logSeg);
2644         if ((endptr.xrecoff - 1) % XLogSegSize >=
2645                 (uint32) (0.75 * XLogSegSize))
2646         {
2647                 NextLogSeg(_logId, _logSeg);
2648                 use_existent = true;
2649                 lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
2650                 close(lf);
2651                 if (!use_existent)
2652                         CheckpointStats.ckpt_segs_added++;
2653         }
2654 }
2655
2656 /*
2657  * Recycle or remove all log files older or equal to passed log/seg#
2658  *
2659  * endptr is current (or recent) end of xlog; this is used to determine
2660  * whether we want to recycle rather than delete no-longer-wanted log files.
2661  */
2662 static void
2663 RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
2664 {
2665         uint32          endlogId;
2666         uint32          endlogSeg;
2667         int                     max_advance;
2668         DIR                *xldir;
2669         struct dirent *xlde;
2670         char            lastoff[MAXFNAMELEN];
2671         char            path[MAXPGPATH];
2672
2673         /*
2674          * Initialize info about where to try to recycle to.  We allow recycling
2675          * segments up to XLOGfileslop segments beyond the current XLOG location.
2676          */
2677         XLByteToPrevSeg(endptr, endlogId, endlogSeg);
2678         max_advance = XLOGfileslop;
2679
2680         xldir = AllocateDir(XLOGDIR);
2681         if (xldir == NULL)
2682                 ereport(ERROR,
2683                                 (errcode_for_file_access(),
2684                                  errmsg("could not open transaction log directory \"%s\": %m",
2685                                                 XLOGDIR)));
2686
2687         XLogFileName(lastoff, ThisTimeLineID, log, seg);
2688
2689         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2690         {
2691                 /*
2692                  * We ignore the timeline part of the XLOG segment identifiers in
2693                  * deciding whether a segment is still needed.  This ensures that we
2694                  * won't prematurely remove a segment from a parent timeline. We could
2695                  * probably be a little more proactive about removing segments of
2696                  * non-parent timelines, but that would be a whole lot more
2697                  * complicated.
2698                  *
2699                  * We use the alphanumeric sorting property of the filenames to decide
2700                  * which ones are earlier than the lastoff segment.
2701                  */
2702                 if (strlen(xlde->d_name) == 24 &&
2703                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2704                         strcmp(xlde->d_name + 8, lastoff + 8) <= 0)
2705                 {
2706                         if (XLogArchiveCheckDone(xlde->d_name))
2707                         {
2708                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2709
2710                                 /*
2711                                  * Before deleting the file, see if it can be recycled as a
2712                                  * future log segment.
2713                                  */
2714                                 if (InstallXLogFileSegment(&endlogId, &endlogSeg, path,
2715                                                                                    true, &max_advance,
2716                                                                                    true))
2717                                 {
2718                                         ereport(DEBUG2,
2719                                                         (errmsg("recycled transaction log file \"%s\"",
2720                                                                         xlde->d_name)));
2721                                         CheckpointStats.ckpt_segs_recycled++;
2722                                         /* Needn't recheck that slot on future iterations */
2723                                         if (max_advance > 0)
2724                                         {
2725                                                 NextLogSeg(endlogId, endlogSeg);
2726                                                 max_advance--;
2727                                         }
2728                                 }
2729                                 else
2730                                 {
2731                                         /* No need for any more future segments... */
2732                                         ereport(DEBUG2,
2733                                                         (errmsg("removing transaction log file \"%s\"",
2734                                                                         xlde->d_name)));
2735                                         unlink(path);
2736                                         CheckpointStats.ckpt_segs_removed++;
2737                                 }
2738
2739                                 XLogArchiveCleanup(xlde->d_name);
2740                         }
2741                 }
2742         }
2743
2744         FreeDir(xldir);
2745 }
2746
2747 /*
2748  * Remove previous backup history files.  This also retries creation of
2749  * .ready files for any backup history files for which XLogArchiveNotify
2750  * failed earlier.
2751  */
2752 static void
2753 CleanupBackupHistory(void)
2754 {
2755         DIR                *xldir;
2756         struct dirent *xlde;
2757         char            path[MAXPGPATH];
2758
2759         xldir = AllocateDir(XLOGDIR);
2760         if (xldir == NULL)
2761                 ereport(ERROR,
2762                                 (errcode_for_file_access(),
2763                                  errmsg("could not open transaction log directory \"%s\": %m",
2764                                                 XLOGDIR)));
2765
2766         while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
2767         {
2768                 if (strlen(xlde->d_name) > 24 &&
2769                         strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
2770                         strcmp(xlde->d_name + strlen(xlde->d_name) - strlen(".backup"),
2771                                    ".backup") == 0)
2772                 {
2773                         if (XLogArchiveCheckDone(xlde->d_name))
2774                         {
2775                                 ereport(DEBUG2,
2776                                 (errmsg("removing transaction log backup history file \"%s\"",
2777                                                 xlde->d_name)));
2778                                 snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name);
2779                                 unlink(path);
2780                                 XLogArchiveCleanup(xlde->d_name);
2781                         }
2782                 }
2783         }
2784
2785         FreeDir(xldir);
2786 }
2787
2788 /*
2789  * Restore the backup blocks present in an XLOG record, if any.
2790  *
2791  * We assume all of the record has been read into memory at *record.
2792  *
2793  * Note: when a backup block is available in XLOG, we restore it
2794  * unconditionally, even if the page in the database appears newer.
2795  * This is to protect ourselves against database pages that were partially
2796  * or incorrectly written during a crash.  We assume that the XLOG data
2797  * must be good because it has passed a CRC check, while the database
2798  * page might not be.  This will force us to replay all subsequent
2799  * modifications of the page that appear in XLOG, rather than possibly
2800  * ignoring them as already applied, but that's not a huge drawback.
2801  */
2802 static void
2803 RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
2804 {
2805         Relation        reln;
2806         Buffer          buffer;
2807         Page            page;
2808         BkpBlock        bkpb;
2809         char       *blk;
2810         int                     i;
2811
2812         blk = (char *) XLogRecGetData(record) + record->xl_len;
2813         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2814         {
2815                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2816                         continue;
2817
2818                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2819                 blk += sizeof(BkpBlock);
2820
2821                 reln = XLogOpenRelation(bkpb.node);
2822                 buffer = XLogReadBuffer(reln, bkpb.block, true);
2823                 Assert(BufferIsValid(buffer));
2824                 page = (Page) BufferGetPage(buffer);
2825
2826                 if (bkpb.hole_length == 0)
2827                 {
2828                         memcpy((char *) page, blk, BLCKSZ);
2829                 }
2830                 else
2831                 {
2832                         /* must zero-fill the hole */
2833                         MemSet((char *) page, 0, BLCKSZ);
2834                         memcpy((char *) page, blk, bkpb.hole_offset);
2835                         memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
2836                                    blk + bkpb.hole_offset,
2837                                    BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
2838                 }
2839
2840                 PageSetLSN(page, lsn);
2841                 PageSetTLI(page, ThisTimeLineID);
2842                 MarkBufferDirty(buffer);
2843                 UnlockReleaseBuffer(buffer);
2844
2845                 blk += BLCKSZ - bkpb.hole_length;
2846         }
2847 }
2848
2849 /*
2850  * CRC-check an XLOG record.  We do not believe the contents of an XLOG
2851  * record (other than to the minimal extent of computing the amount of
2852  * data to read in) until we've checked the CRCs.
2853  *
2854  * We assume all of the record has been read into memory at *record.
2855  */
2856 static bool
2857 RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
2858 {
2859         pg_crc32        crc;
2860         int                     i;
2861         uint32          len = record->xl_len;
2862         BkpBlock        bkpb;
2863         char       *blk;
2864
2865         /* First the rmgr data */
2866         INIT_CRC32(crc);
2867         COMP_CRC32(crc, XLogRecGetData(record), len);
2868
2869         /* Add in the backup blocks, if any */
2870         blk = (char *) XLogRecGetData(record) + len;
2871         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
2872         {
2873                 uint32          blen;
2874
2875                 if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
2876                         continue;
2877
2878                 memcpy(&bkpb, blk, sizeof(BkpBlock));
2879                 if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
2880                 {
2881                         ereport(emode,
2882                                         (errmsg("incorrect hole size in record at %X/%X",
2883                                                         recptr.xlogid, recptr.xrecoff)));
2884                         return false;
2885                 }
2886                 blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
2887                 COMP_CRC32(crc, blk, blen);
2888                 blk += blen;
2889         }
2890
2891         /* Check that xl_tot_len agrees with our calculation */
2892         if (blk != (char *) record + record->xl_tot_len)
2893         {
2894                 ereport(emode,
2895                                 (errmsg("incorrect total length in record at %X/%X",
2896                                                 recptr.xlogid, recptr.xrecoff)));
2897                 return false;
2898         }
2899
2900         /* Finally include the record header */
2901         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
2902                            SizeOfXLogRecord - sizeof(pg_crc32));
2903         FIN_CRC32(crc);
2904
2905         if (!EQ_CRC32(record->xl_crc, crc))
2906         {
2907                 ereport(emode,
2908                 (errmsg("incorrect resource manager data checksum in record at %X/%X",
2909                                 recptr.xlogid, recptr.xrecoff)));
2910                 return false;
2911         }
2912
2913         return true;
2914 }
2915
2916 /*
2917  * Attempt to read an XLOG record.
2918  *
2919  * If RecPtr is not NULL, try to read a record at that position.  Otherwise
2920  * try to read a record just after the last one previously read.
2921  *
2922  * If no valid record is available, returns NULL, or fails if emode is PANIC.
2923  * (emode must be either PANIC or LOG.)
2924  *
2925  * The record is copied into readRecordBuf, so that on successful return,
2926  * the returned record pointer always points there.
2927  */
2928 static XLogRecord *
2929 ReadRecord(XLogRecPtr *RecPtr, int emode)
2930 {
2931         XLogRecord *record;
2932         char       *buffer;
2933         XLogRecPtr      tmpRecPtr = EndRecPtr;
2934         bool            randAccess = false;
2935         uint32          len,
2936                                 total_len;
2937         uint32          targetPageOff;
2938         uint32          targetRecOff;
2939         uint32          pageHeaderSize;
2940
2941         if (readBuf == NULL)
2942         {
2943                 /*
2944                  * First time through, permanently allocate readBuf.  We do it this
2945                  * way, rather than just making a static array, for two reasons: (1)
2946                  * no need to waste the storage in most instantiations of the backend;
2947                  * (2) a static char array isn't guaranteed to have any particular
2948                  * alignment, whereas malloc() will provide MAXALIGN'd storage.
2949                  */
2950                 readBuf = (char *) malloc(XLOG_BLCKSZ);
2951                 Assert(readBuf != NULL);
2952         }
2953
2954         if (RecPtr == NULL)
2955         {
2956                 RecPtr = &tmpRecPtr;
2957                 /* fast case if next record is on same page */
2958                 if (nextRecord != NULL)
2959                 {
2960                         record = nextRecord;
2961                         goto got_record;
2962                 }
2963                 /* align old recptr to next page */
2964                 if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0)
2965                         tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ);
2966                 if (tmpRecPtr.xrecoff >= XLogFileSize)
2967                 {
2968                         (tmpRecPtr.xlogid)++;
2969                         tmpRecPtr.xrecoff = 0;
2970                 }
2971                 /* We will account for page header size below */
2972         }
2973         else
2974         {
2975                 if (!XRecOffIsValid(RecPtr->xrecoff))
2976                         ereport(PANIC,
2977                                         (errmsg("invalid record offset at %X/%X",
2978                                                         RecPtr->xlogid, RecPtr->xrecoff)));
2979
2980                 /*
2981                  * Since we are going to a random position in WAL, forget any prior
2982                  * state about what timeline we were in, and allow it to be any
2983                  * timeline in expectedTLIs.  We also set a flag to allow curFileTLI
2984                  * to go backwards (but we can't reset that variable right here, since
2985                  * we might not change files at all).
2986                  */
2987                 lastPageTLI = 0;                /* see comment in ValidXLOGHeader */
2988                 randAccess = true;              /* allow curFileTLI to go backwards too */
2989         }
2990
2991         if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
2992         {
2993                 close(readFile);
2994                 readFile = -1;
2995         }
2996         XLByteToSeg(*RecPtr, readId, readSeg);
2997         if (readFile < 0)
2998         {
2999                 /* Now it's okay to reset curFileTLI if random fetch */
3000                 if (randAccess)
3001                         curFileTLI = 0;
3002
3003                 readFile = XLogFileRead(readId, readSeg, emode);
3004                 if (readFile < 0)
3005                         goto next_record_is_invalid;
3006
3007                 /*
3008                  * Whenever switching to a new WAL segment, we read the first page of
3009                  * the file and validate its header, even if that's not where the
3010                  * target record is.  This is so that we can check the additional
3011                  * identification info that is present in the first page's "long"
3012                  * header.
3013                  */
3014                 readOff = 0;
3015                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3016                 {
3017                         ereport(emode,
3018                                         (errcode_for_file_access(),
3019                                          errmsg("could not read from log file %u, segment %u, offset %u: %m",
3020                                                         readId, readSeg, readOff)));
3021                         goto next_record_is_invalid;
3022                 }
3023                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
3024                         goto next_record_is_invalid;
3025         }
3026
3027         targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ;
3028         if (readOff != targetPageOff)
3029         {
3030                 readOff = targetPageOff;
3031                 if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
3032                 {
3033                         ereport(emode,
3034                                         (errcode_for_file_access(),
3035                                          errmsg("could not seek in log file %u, segment %u to offset %u: %m",
3036                                                         readId, readSeg, readOff)));
3037                         goto next_record_is_invalid;
3038                 }
3039                 if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3040                 {
3041                         ereport(emode,
3042                                         (errcode_for_file_access(),
3043                                          errmsg("could not read from log file %u, segment %u, offset %u: %m",
3044                                                         readId, readSeg, readOff)));
3045                         goto next_record_is_invalid;
3046                 }
3047                 if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
3048                         goto next_record_is_invalid;
3049         }
3050         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3051         targetRecOff = RecPtr->xrecoff % XLOG_BLCKSZ;
3052         if (targetRecOff == 0)
3053         {
3054                 /*
3055                  * Can only get here in the continuing-from-prev-page case, because
3056                  * XRecOffIsValid eliminated the zero-page-offset case otherwise. Need
3057                  * to skip over the new page's header.
3058                  */
3059                 tmpRecPtr.xrecoff += pageHeaderSize;
3060                 targetRecOff = pageHeaderSize;
3061         }
3062         else if (targetRecOff < pageHeaderSize)
3063         {
3064                 ereport(emode,
3065                                 (errmsg("invalid record offset at %X/%X",
3066                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3067                 goto next_record_is_invalid;
3068         }
3069         if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
3070                 targetRecOff == pageHeaderSize)
3071         {
3072                 ereport(emode,
3073                                 (errmsg("contrecord is requested by %X/%X",
3074                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3075                 goto next_record_is_invalid;
3076         }
3077         record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % XLOG_BLCKSZ);
3078
3079 got_record:;
3080
3081         /*
3082          * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
3083          * required.
3084          */
3085         if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
3086         {
3087                 if (record->xl_len != 0)
3088                 {
3089                         ereport(emode,
3090                                         (errmsg("invalid xlog switch record at %X/%X",
3091                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3092                         goto next_record_is_invalid;
3093                 }
3094         }
3095         else if (record->xl_len == 0)
3096         {
3097                 ereport(emode,
3098                                 (errmsg("record with zero length at %X/%X",
3099                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3100                 goto next_record_is_invalid;
3101         }
3102         if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
3103                 record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
3104                 XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
3105         {
3106                 ereport(emode,
3107                                 (errmsg("invalid record length at %X/%X",
3108                                                 RecPtr->xlogid, RecPtr->xrecoff)));
3109                 goto next_record_is_invalid;
3110         }
3111         if (record->xl_rmid > RM_MAX_ID)
3112         {
3113                 ereport(emode,
3114                                 (errmsg("invalid resource manager ID %u at %X/%X",
3115                                                 record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff)));
3116                 goto next_record_is_invalid;
3117         }
3118         if (randAccess)
3119         {
3120                 /*
3121                  * We can't exactly verify the prev-link, but surely it should be less
3122                  * than the record's own address.
3123                  */
3124                 if (!XLByteLT(record->xl_prev, *RecPtr))
3125                 {
3126                         ereport(emode,
3127                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
3128                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
3129                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3130                         goto next_record_is_invalid;
3131                 }
3132         }
3133         else
3134         {
3135                 /*
3136                  * Record's prev-link should exactly match our previous location. This
3137                  * check guards against torn WAL pages where a stale but valid-looking
3138                  * WAL record starts on a sector boundary.
3139                  */
3140                 if (!XLByteEQ(record->xl_prev, ReadRecPtr))
3141                 {
3142                         ereport(emode,
3143                                         (errmsg("record with incorrect prev-link %X/%X at %X/%X",
3144                                                         record->xl_prev.xlogid, record->xl_prev.xrecoff,
3145                                                         RecPtr->xlogid, RecPtr->xrecoff)));
3146                         goto next_record_is_invalid;
3147                 }
3148         }
3149
3150         /*
3151          * Allocate or enlarge readRecordBuf as needed.  To avoid useless small
3152          * increases, round its size to a multiple of XLOG_BLCKSZ, and make sure
3153          * it's at least 4*Max(BLCKSZ, XLOG_BLCKSZ) to start with.  (That is
3154          * enough for all "normal" records, but very large commit or abort records
3155          * might need more space.)
3156          */
3157         total_len = record->xl_tot_len;
3158         if (total_len > readRecordBufSize)
3159         {
3160                 uint32          newSize = total_len;
3161
3162                 newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
3163                 newSize = Max(newSize, 4 * Max(BLCKSZ, XLOG_BLCKSZ));
3164                 if (readRecordBuf)
3165                         free(readRecordBuf);
3166                 readRecordBuf = (char *) malloc(newSize);
3167                 if (!readRecordBuf)
3168                 {
3169                         readRecordBufSize = 0;
3170                         /* We treat this as a "bogus data" condition */
3171                         ereport(emode,
3172                                         (errmsg("record length %u at %X/%X too long",
3173                                                         total_len, RecPtr->xlogid, RecPtr->xrecoff)));
3174                         goto next_record_is_invalid;
3175                 }
3176                 readRecordBufSize = newSize;
3177         }
3178
3179         buffer = readRecordBuf;
3180         nextRecord = NULL;
3181         len = XLOG_BLCKSZ - RecPtr->xrecoff % XLOG_BLCKSZ;
3182         if (total_len > len)
3183         {
3184                 /* Need to reassemble record */
3185                 XLogContRecord *contrecord;
3186                 uint32          gotlen = len;
3187
3188                 memcpy(buffer, record, len);
3189                 record = (XLogRecord *) buffer;
3190                 buffer += len;
3191                 for (;;)
3192                 {
3193                         readOff += XLOG_BLCKSZ;
3194                         if (readOff >= XLogSegSize)
3195                         {
3196                                 close(readFile);
3197                                 readFile = -1;
3198                                 NextLogSeg(readId, readSeg);
3199                                 readFile = XLogFileRead(readId, readSeg, emode);
3200                                 if (readFile < 0)
3201                                         goto next_record_is_invalid;
3202                                 readOff = 0;
3203                         }
3204                         if (read(readFile, readBuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
3205                         {
3206                                 ereport(emode,
3207                                                 (errcode_for_file_access(),
3208                                                  errmsg("could not read from log file %u, segment %u, offset %u: %m",
3209                                                                 readId, readSeg, readOff)));
3210                                 goto next_record_is_invalid;
3211                         }
3212                         if (!ValidXLOGHeader((XLogPageHeader) readBuf, emode))
3213                                 goto next_record_is_invalid;
3214                         if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
3215                         {
3216                                 ereport(emode,
3217                                                 (errmsg("there is no contrecord flag in log file %u, segment %u, offset %u",
3218                                                                 readId, readSeg, readOff)));
3219                                 goto next_record_is_invalid;
3220                         }
3221                         pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3222                         contrecord = (XLogContRecord *) ((char *) readBuf + pageHeaderSize);
3223                         if (contrecord->xl_rem_len == 0 ||
3224                                 total_len != (contrecord->xl_rem_len + gotlen))
3225                         {
3226                                 ereport(emode,
3227                                                 (errmsg("invalid contrecord length %u in log file %u, segment %u, offset %u",
3228                                                                 contrecord->xl_rem_len,
3229                                                                 readId, readSeg, readOff)));
3230                                 goto next_record_is_invalid;
3231                         }
3232                         len = XLOG_BLCKSZ - pageHeaderSize - SizeOfXLogContRecord;
3233                         if (contrecord->xl_rem_len > len)
3234                         {
3235                                 memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord, len);
3236                                 gotlen += len;
3237                                 buffer += len;
3238                                 continue;
3239                         }
3240                         memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
3241                                    contrecord->xl_rem_len);
3242                         break;
3243                 }
3244                 if (!RecordIsValid(record, *RecPtr, emode))
3245                         goto next_record_is_invalid;
3246                 pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) readBuf);
3247                 if (XLOG_BLCKSZ - SizeOfXLogRecord >= pageHeaderSize +
3248                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len))
3249                 {
3250                         nextRecord = (XLogRecord *) ((char *) contrecord +
3251                                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len));
3252                 }
3253                 EndRecPtr.xlogid = readId;
3254                 EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
3255                         pageHeaderSize +
3256                         MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
3257                 ReadRecPtr = *RecPtr;
3258                 /* needn't worry about XLOG SWITCH, it can't cross page boundaries */
3259                 return record;
3260         }
3261
3262         /* Record does not cross a page boundary */
3263         if (!RecordIsValid(record, *RecPtr, emode))
3264                 goto next_record_is_invalid;
3265         if (XLOG_BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % XLOG_BLCKSZ +
3266                 MAXALIGN(total_len))
3267                 nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
3268         EndRecPtr.xlogid = RecPtr->xlogid;
3269         EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
3270         ReadRecPtr = *RecPtr;
3271         memcpy(buffer, record, total_len);
3272
3273         /*
3274          * Special processing if it's an XLOG SWITCH record
3275          */
3276         if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
3277         {
3278                 /* Pretend it extends to end of segment */
3279                 EndRecPtr.xrecoff += XLogSegSize - 1;
3280                 EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
3281                 nextRecord = NULL;              /* definitely not on same page */
3282
3283                 /*
3284                  * Pretend that readBuf contains the last page of the segment. This is
3285                  * just to avoid Assert failure in StartupXLOG if XLOG ends with this
3286                  * segment.
3287                  */
3288                 readOff = XLogSegSize - XLOG_BLCKSZ;
3289         }
3290         return (XLogRecord *) buffer;
3291
3292 next_record_is_invalid:;
3293         close(readFile);
3294         readFile = -1;
3295         nextRecord = NULL;
3296         return NULL;
3297 }
3298
3299 /*
3300  * Check whether the xlog header of a page just read in looks valid.
3301  *
3302  * This is just a convenience subroutine to avoid duplicated code in
3303  * ReadRecord.  It's not intended for use from anywhere else.
3304  */
3305 static bool
3306 ValidXLOGHeader(XLogPageHeader hdr, int emode)
3307 {
3308         XLogRecPtr      recaddr;
3309
3310         if (hdr->xlp_magic != XLOG_PAGE_MAGIC)
3311         {
3312                 ereport(emode,
3313                                 (errmsg("invalid magic number %04X in log file %u, segment %u, offset %u",
3314                                                 hdr->xlp_magic, readId, readSeg, readOff)));
3315                 return false;
3316         }
3317         if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0)
3318         {
3319                 ereport(emode,
3320                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3321                                                 hdr->xlp_info, readId, readSeg, readOff)));
3322                 return false;
3323         }
3324         if (hdr->xlp_info & XLP_LONG_HEADER)
3325         {
3326                 XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr;
3327
3328                 if (longhdr->xlp_sysid != ControlFile->system_identifier)
3329                 {
3330                         char            fhdrident_str[32];
3331                         char            sysident_str[32];
3332
3333                         /*
3334                          * Format sysids separately to keep platform-dependent format code
3335                          * out of the translatable message string.
3336                          */
3337                         snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
3338                                          longhdr->xlp_sysid);
3339                         snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
3340                                          ControlFile->system_identifier);
3341                         ereport(emode,
3342                                         (errmsg("WAL file is from different system"),
3343                                          errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
3344                                                            fhdrident_str, sysident_str)));
3345                         return false;
3346                 }
3347                 if (longhdr->xlp_seg_size != XLogSegSize)
3348                 {
3349                         ereport(emode,
3350                                         (errmsg("WAL file is from different system"),
3351                                          errdetail("Incorrect XLOG_SEG_SIZE in page header.")));
3352                         return false;
3353                 }
3354                 if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ)
3355                 {
3356                         ereport(emode,
3357                                         (errmsg("WAL file is from different system"),
3358                                          errdetail("Incorrect XLOG_BLCKSZ in page header.")));
3359                         return false;
3360                 }
3361         }
3362         else if (readOff == 0)
3363         {
3364                 /* hmm, first page of file doesn't have a long header? */
3365                 ereport(emode,
3366                                 (errmsg("invalid info bits %04X in log file %u, segment %u, offset %u",
3367                                                 hdr->xlp_info, readId, readSeg, readOff)));
3368                 return false;
3369         }
3370
3371         recaddr.xlogid = readId;
3372         recaddr.xrecoff = readSeg * XLogSegSize + readOff;
3373         if (!XLByteEQ(hdr->xlp_pageaddr, recaddr))
3374         {
3375                 ereport(emode,
3376                                 (errmsg("unexpected pageaddr %X/%X in log file %u, segment %u, offset %u",
3377                                                 hdr->xlp_pageaddr.xlogid, hdr->xlp_pageaddr.xrecoff,
3378                                                 readId, readSeg, readOff)));
3379                 return false;
3380         }
3381
3382         /*
3383          * Check page TLI is one of the expected values.
3384          */
3385         if (!list_member_int(expectedTLIs, (int) hdr->xlp_tli))
3386         {
3387                 ereport(emode,
3388                                 (errmsg("unexpected timeline ID %u in log file %u, segment %u, offset %u",
3389                                                 hdr->xlp_tli,
3390                                                 readId, readSeg, readOff)));
3391                 return false;
3392         }
3393
3394         /*
3395          * Since child timelines are always assigned a TLI greater than their
3396          * immediate parent's TLI, we should never see TLI go backwards across
3397          * successive pages of a consistent WAL sequence.
3398          *
3399          * Of course this check should only be applied when advancing sequentially
3400          * across pages; therefore ReadRecord resets lastPageTLI to zero when
3401          * going to a random page.
3402          */
3403         if (hdr->xlp_tli < lastPageTLI)
3404         {
3405                 ereport(emode,
3406                                 (errmsg("out-of-sequence timeline ID %u (after %u) in log file %u, segment %u, offset %u",
3407                                                 hdr->xlp_tli, lastPageTLI,
3408                                                 readId, readSeg, readOff)));
3409                 return false;
3410         }
3411         lastPageTLI = hdr->xlp_tli;
3412         return true;
3413 }
3414
3415 /*
3416  * Try to read a timeline's history file.
3417  *
3418  * If successful, return the list of component TLIs (the given TLI followed by
3419  * its ancestor TLIs).  If we can't find the history file, assume that the
3420  * timeline has no parents, and return a list of just the specified timeline
3421  * ID.
3422  */
3423 static List *
3424 readTimeLineHistory(TimeLineID targetTLI)
3425 {
3426         List       *result;
3427         char            path[MAXPGPATH];
3428         char            histfname[MAXFNAMELEN];
3429         char            fline[MAXPGPATH];
3430         FILE       *fd;
3431
3432         if (InArchiveRecovery)
3433         {
3434                 TLHistoryFileName(histfname, targetTLI);
3435                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3436         }
3437         else
3438                 TLHistoryFilePath(path, targetTLI);
3439
3440         fd = AllocateFile(path, "r");
3441         if (fd == NULL)
3442         {
3443                 if (errno != ENOENT)
3444                         ereport(FATAL,
3445                                         (errcode_for_file_access(),
3446                                          errmsg("could not open file \"%s\": %m", path)));
3447                 /* Not there, so assume no parents */
3448                 return list_make1_int((int) targetTLI);
3449         }
3450
3451         result = NIL;
3452
3453         /*
3454          * Parse the file...
3455          */
3456         while (fgets(fline, sizeof(fline), fd) != NULL)
3457         {
3458                 /* skip leading whitespace and check for # comment */
3459                 char       *ptr;
3460                 char       *endptr;
3461                 TimeLineID      tli;
3462
3463                 for (ptr = fline; *ptr; ptr++)
3464                 {
3465                         if (!isspace((unsigned char) *ptr))
3466                                 break;
3467                 }
3468                 if (*ptr == '\0' || *ptr == '#')
3469                         continue;
3470
3471                 /* expect a numeric timeline ID as first field of line */
3472                 tli = (TimeLineID) strtoul(ptr, &endptr, 0);
3473                 if (endptr == ptr)
3474                         ereport(FATAL,
3475                                         (errmsg("syntax error in history file: %s", fline),
3476                                          errhint("Expected a numeric timeline ID.")));
3477
3478                 if (result &&
3479                         tli <= (TimeLineID) linitial_int(result))
3480                         ereport(FATAL,
3481                                         (errmsg("invalid data in history file: %s", fline),
3482                                    errhint("Timeline IDs must be in increasing sequence.")));
3483
3484                 /* Build list with newest item first */
3485                 result = lcons_int((int) tli, result);
3486
3487                 /* we ignore the remainder of each line */
3488         }
3489
3490         FreeFile(fd);
3491
3492         if (result &&
3493                 targetTLI <= (TimeLineID) linitial_int(result))
3494                 ereport(FATAL,
3495                                 (errmsg("invalid data in history file \"%s\"", path),
3496                         errhint("Timeline IDs must be less than child timeline's ID.")));
3497
3498         result = lcons_int((int) targetTLI, result);
3499
3500         ereport(DEBUG3,
3501                         (errmsg_internal("history of timeline %u is %s",
3502                                                          targetTLI, nodeToString(result))));
3503
3504         return result;
3505 }
3506
3507 /*
3508  * Probe whether a timeline history file exists for the given timeline ID
3509  */
3510 static bool
3511 existsTimeLineHistory(TimeLineID probeTLI)
3512 {
3513         char            path[MAXPGPATH];
3514         char            histfname[MAXFNAMELEN];
3515         FILE       *fd;
3516
3517         if (InArchiveRecovery)
3518         {
3519                 TLHistoryFileName(histfname, probeTLI);
3520                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3521         }
3522         else
3523                 TLHistoryFilePath(path, probeTLI);
3524
3525         fd = AllocateFile(path, "r");
3526         if (fd != NULL)
3527         {
3528                 FreeFile(fd);
3529                 return true;
3530         }
3531         else
3532         {
3533                 if (errno != ENOENT)
3534                         ereport(FATAL,
3535                                         (errcode_for_file_access(),
3536                                          errmsg("could not open file \"%s\": %m", path)));
3537                 return false;
3538         }
3539 }
3540
3541 /*
3542  * Find the newest existing timeline, assuming that startTLI exists.
3543  *
3544  * Note: while this is somewhat heuristic, it does positively guarantee
3545  * that (result + 1) is not a known timeline, and therefore it should
3546  * be safe to assign that ID to a new timeline.
3547  */
3548 static TimeLineID
3549 findNewestTimeLine(TimeLineID startTLI)
3550 {
3551         TimeLineID      newestTLI;
3552         TimeLineID      probeTLI;
3553
3554         /*
3555          * The algorithm is just to probe for the existence of timeline history
3556          * files.  XXX is it useful to allow gaps in the sequence?
3557          */
3558         newestTLI = startTLI;
3559
3560         for (probeTLI = startTLI + 1;; probeTLI++)
3561         {
3562                 if (existsTimeLineHistory(probeTLI))
3563                 {
3564                         newestTLI = probeTLI;           /* probeTLI exists */
3565                 }
3566                 else
3567                 {
3568                         /* doesn't exist, assume we're done */
3569                         break;
3570                 }
3571         }
3572
3573         return newestTLI;
3574 }
3575
3576 /*
3577  * Create a new timeline history file.
3578  *
3579  *      newTLI: ID of the new timeline
3580  *      parentTLI: ID of its immediate parent
3581  *      endTLI et al: ID of the last used WAL file, for annotation purposes
3582  *
3583  * Currently this is only used during recovery, and so there are no locking
3584  * considerations.      But we should be just as tense as XLogFileInit to avoid
3585  * emplacing a bogus file.
3586  */
3587 static void
3588 writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
3589                                          TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
3590 {
3591         char            path[MAXPGPATH];
3592         char            tmppath[MAXPGPATH];
3593         char            histfname[MAXFNAMELEN];
3594         char            xlogfname[MAXFNAMELEN];
3595         char            buffer[BLCKSZ];
3596         int                     srcfd;
3597         int                     fd;
3598         int                     nbytes;
3599
3600         Assert(newTLI > parentTLI); /* else bad selection of newTLI */
3601
3602         /*
3603          * Write into a temp file name.
3604          */
3605         snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
3606
3607         unlink(tmppath);
3608
3609         /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
3610         fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
3611                                            S_IRUSR | S_IWUSR);
3612         if (fd < 0)
3613                 ereport(ERROR,
3614                                 (errcode_for_file_access(),
3615                                  errmsg("could not create file \"%s\": %m", tmppath)));
3616
3617         /*
3618          * If a history file exists for the parent, copy it verbatim
3619          */
3620         if (InArchiveRecovery)
3621         {
3622                 TLHistoryFileName(histfname, parentTLI);
3623                 RestoreArchivedFile(path, histfname, "RECOVERYHISTORY", 0);
3624         }
3625         else
3626                 TLHistoryFilePath(path, parentTLI);
3627
3628         srcfd = BasicOpenFile(path, O_RDONLY, 0);
3629         if (srcfd < 0)
3630         {
3631                 if (errno != ENOENT)
3632                         ereport(ERROR,
3633                                         (errcode_for_file_access(),
3634                                          errmsg("could not open file \"%s\": %m", path)));
3635                 /* Not there, so assume parent has no parents */
3636         }
3637         else
3638         {
3639                 for (;;)
3640                 {
3641                         errno = 0;
3642                         nbytes = (int) read(srcfd, buffer, sizeof(buffer));
3643                         if (nbytes < 0 || errno != 0)
3644                                 ereport(ERROR,
3645                                                 (errcode_for_file_access(),
3646                                                  errmsg("could not read file \"%s\": %m", path)));
3647                         if (nbytes == 0)
3648                                 break;
3649                         errno = 0;
3650                         if ((int) write(fd, buffer, nbytes) != nbytes)
3651                         {
3652                                 int                     save_errno = errno;
3653
3654                                 /*
3655                                  * If we fail to make the file, delete it to release disk
3656                                  * space
3657                                  */
3658                                 unlink(tmppath);
3659
3660                                 /*
3661                                  * if write didn't set errno, assume problem is no disk space
3662                                  */
3663                                 errno = save_errno ? save_errno : ENOSPC;
3664
3665                                 ereport(ERROR,
3666                                                 (errcode_for_file_access(),
3667                                          errmsg("could not write to file \"%s\": %m", tmppath)));
3668                         }
3669                 }
3670                 close(srcfd);
3671         }
3672
3673         /*
3674          * Append one line with the details of this timeline split.
3675          *
3676          * If we did have a parent file, insert an extra newline just in case the
3677          * parent file failed to end with one.
3678          */
3679         XLogFileName(xlogfname, endTLI, endLogId, endLogSeg);
3680
3681         snprintf(buffer, sizeof(buffer),
3682                          "%s%u\t%s\t%s transaction %u at %s\n",
3683                          (srcfd < 0) ? "" : "\n",
3684                          parentTLI,
3685                          xlogfname,
3686                          recoveryStopAfter ? "after" : "before",
3687                          recoveryStopXid,
3688                          timestamptz_to_str(recoveryStopTime));
3689
3690         nbytes = strlen(buffer);
3691         errno = 0;
3692         if ((int) write(fd, buffer, nbytes) != nbytes)
3693         {
3694                 int                     save_errno = errno;
3695
3696                 /*
3697                  * If we fail to make the file, delete it to release disk space
3698                  */
3699                 unlink(tmppath);
3700                 /* if write didn't set errno, assume problem is no disk space */
3701                 errno = save_errno ? save_errno : ENOSPC;
3702
3703                 ereport(ERROR,
3704                                 (errcode_for_file_access(),
3705                                  errmsg("could not write to file \"%s\": %m", tmppath)));
3706         }
3707
3708         if (pg_fsync(fd) != 0)
3709                 ereport(ERROR,
3710                                 (errcode_for_file_access(),
3711                                  errmsg("could not fsync file \"%s\": %m", tmppath)));
3712
3713         if (close(fd))
3714                 ereport(ERROR,
3715                                 (errcode_for_file_access(),
3716                                  errmsg("could not close file \"%s\": %m", tmppath)));
3717
3718
3719         /*
3720          * Now move the completed history file into place with its final name.
3721          */
3722         TLHistoryFilePath(path, newTLI);
3723
3724         /*
3725          * Prefer link() to rename() here just to be really sure that we don't
3726          * overwrite an existing logfile.  However, there shouldn't be one, so
3727          * rename() is an acceptable substitute except for the truly paranoid.
3728          */
3729 #if HAVE_WORKING_LINK
3730         if (link(tmppath, path) < 0)
3731                 ereport(ERROR,
3732                                 (errcode_for_file_access(),
3733                                  errmsg("could not link file \"%s\" to \"%s\": %m",
3734                                                 tmppath, path)));
3735         unlink(tmppath);
3736 #else
3737         if (rename(tmppath, path) < 0)
3738                 ereport(ERROR,
3739                                 (errcode_for_file_access(),
3740                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
3741                                                 tmppath, path)));
3742 #endif
3743
3744         /* The history file can be archived immediately. */
3745         TLHistoryFileName(histfname, newTLI);
3746         XLogArchiveNotify(histfname);
3747 }
3748
3749 /*
3750  * I/O routines for pg_control
3751  *
3752  * *ControlFile is a buffer in shared memory that holds an image of the
3753  * contents of pg_control.      WriteControlFile() initializes pg_control
3754  * given a preloaded buffer, ReadControlFile() loads the buffer from
3755  * the pg_control file (during postmaster or standalone-backend startup),
3756  * and UpdateControlFile() rewrites pg_control after we modify xlog state.
3757  *
3758  * For simplicity, WriteControlFile() initializes the fields of pg_control
3759  * that are related to checking backend/database compatibility, and
3760  * ReadControlFile() verifies they are correct.  We could split out the
3761  * I/O and compatibility-check functions, but there seems no need currently.
3762  */
3763 static void
3764 WriteControlFile(void)
3765 {
3766         int                     fd;
3767         char            buffer[PG_CONTROL_SIZE];                /* need not be aligned */
3768         char       *localeptr;
3769
3770         /*
3771          * Initialize version and compatibility-check fields
3772          */
3773         ControlFile->pg_control_version = PG_CONTROL_VERSION;
3774         ControlFile->catalog_version_no = CATALOG_VERSION_NO;
3775
3776         ControlFile->maxAlign = MAXIMUM_ALIGNOF;
3777         ControlFile->floatFormat = FLOATFORMAT_VALUE;
3778
3779         ControlFile->blcksz = BLCKSZ;
3780         ControlFile->relseg_size = RELSEG_SIZE;
3781         ControlFile->xlog_blcksz = XLOG_BLCKSZ;
3782         ControlFile->xlog_seg_size = XLOG_SEG_SIZE;
3783
3784         ControlFile->nameDataLen = NAMEDATALEN;
3785         ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
3786
3787         ControlFile->toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE;
3788
3789 #ifdef HAVE_INT64_TIMESTAMP
3790         ControlFile->enableIntTimes = TRUE;
3791 #else
3792         ControlFile->enableIntTimes = FALSE;
3793 #endif
3794
3795         ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
3796         localeptr = setlocale(LC_COLLATE, NULL);
3797         if (!localeptr)
3798                 ereport(PANIC,
3799                                 (errmsg("invalid LC_COLLATE setting")));
3800         StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
3801         localeptr = setlocale(LC_CTYPE, NULL);
3802         if (!localeptr)
3803                 ereport(PANIC,
3804                                 (errmsg("invalid LC_CTYPE setting")));
3805         StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
3806
3807         /* Contents are protected with a CRC */
3808         INIT_CRC32(ControlFile->crc);
3809         COMP_CRC32(ControlFile->crc,
3810                            (char *) ControlFile,
3811                            offsetof(ControlFileData, crc));
3812         FIN_CRC32(ControlFile->crc);
3813
3814         /*
3815          * We write out PG_CONTROL_SIZE bytes into pg_control, zero-padding the
3816          * excess over sizeof(ControlFileData).  This reduces the odds of
3817          * premature-EOF errors when reading pg_control.  We'll still fail when we
3818          * check the contents of the file, but hopefully with a more specific
3819          * error than "couldn't read pg_control".
3820          */
3821         if (sizeof(ControlFileData) > PG_CONTROL_SIZE)
3822                 elog(PANIC, "sizeof(ControlFileData) is larger than PG_CONTROL_SIZE; fix either one");
3823
3824         memset(buffer, 0, PG_CONTROL_SIZE);
3825         memcpy(buffer, ControlFile, sizeof(ControlFileData));
3826
3827         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3828                                            O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
3829                                            S_IRUSR | S_IWUSR);
3830         if (fd < 0)
3831                 ereport(PANIC,
3832                                 (errcode_for_file_access(),
3833                                  errmsg("could not create control file \"%s\": %m",
3834                                                 XLOG_CONTROL_FILE)));
3835
3836         errno = 0;
3837         if (write(fd, buffer, PG_CONTROL_SIZE) != PG_CONTROL_SIZE)
3838         {
3839                 /* if write didn't set errno, assume problem is no disk space */
3840                 if (errno == 0)
3841                         errno = ENOSPC;
3842                 ereport(PANIC,
3843                                 (errcode_for_file_access(),
3844                                  errmsg("could not write to control file: %m")));
3845         }
3846
3847         if (pg_fsync(fd) != 0)
3848                 ereport(PANIC,
3849                                 (errcode_for_file_access(),
3850                                  errmsg("could not fsync control file: %m")));
3851
3852         if (close(fd))
3853                 ereport(PANIC,
3854                                 (errcode_for_file_access(),
3855                                  errmsg("could not close control file: %m")));
3856 }
3857
3858 static void
3859 ReadControlFile(void)
3860 {
3861         pg_crc32        crc;
3862         int                     fd;
3863
3864         /*
3865          * Read data...
3866          */
3867         fd = BasicOpenFile(XLOG_CONTROL_FILE,
3868                                            O_RDWR | PG_BINARY,
3869                                            S_IRUSR | S_IWUSR);
3870         if (fd < 0)
3871                 ereport(PANIC,
3872                                 (errcode_for_file_access(),
3873                                  errmsg("could not open control file \"%s\": %m",
3874                                                 XLOG_CONTROL_FILE)));
3875
3876         if (read(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
3877                 ereport(PANIC,
3878                                 (errcode_for_file_access(),
3879                                  errmsg("could not read from control file: %m")));
3880
3881         close(fd);
3882
3883         /*
3884          * Check for expected pg_control format version.  If this is wrong, the
3885          * CRC check will likely fail because we'll be checking the wrong number
3886          * of bytes.  Complaining about wrong version will probably be more
3887          * enlightening than complaining about wrong CRC.
3888          */
3889         if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
3890                 ereport(FATAL,
3891                                 (errmsg("database files are incompatible with server"),
3892                                  errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d,"
3893                                   " but the server was compiled with PG_CONTROL_VERSION %d.",
3894                                                 ControlFile->pg_control_version, PG_CONTROL_VERSION),
3895                                  errhint("It looks like you need to initdb.")));
3896         /* Now check the CRC. */
3897         INIT_CRC32(crc);
3898         COMP_CRC32(crc,
3899                            (char *) ControlFile,
3900                            offsetof(ControlFileData, crc));
3901         FIN_CRC32(crc);
3902
3903         if (!EQ_CRC32(crc, ControlFile->crc))
3904                 ereport(FATAL,
3905                                 (errmsg("incorrect checksum in control file")));
3906
3907         /*
3908          * Do compatibility checking immediately.  We do this here for 2 reasons:
3909          *
3910          * (1) if the database isn't compatible with the backend executable, we
3911          * want to abort before we can possibly do any damage;
3912          *
3913          * (2) this code is executed in the postmaster, so the setlocale() will
3914          * propagate to forked backends, which aren't going to read this file for
3915          * themselves.  (These locale settings are considered critical
3916          * compatibility items because they can affect sort order of indexes.)
3917          */
3918         if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
3919                 ereport(FATAL,
3920                                 (errmsg("database files are incompatible with server"),
3921                                  errdetail("The database cluster was initialized with CATALOG_VERSION_NO %d,"
3922                                   " but the server was compiled with CATALOG_VERSION_NO %d.",
3923                                                 ControlFile->catalog_version_no, CATALOG_VERSION_NO),
3924                                  errhint("It looks like you need to initdb.")));
3925         if (ControlFile->maxAlign != MAXIMUM_ALIGNOF)
3926                 ereport(FATAL,
3927                                 (errmsg("database files are incompatible with server"),
3928                    errdetail("The database cluster was initialized with MAXALIGN %d,"
3929                                          " but the server was compiled with MAXALIGN %d.",
3930                                          ControlFile->maxAlign, MAXIMUM_ALIGNOF),
3931                                  errhint("It looks like you need to initdb.")));
3932         if (ControlFile->floatFormat != FLOATFORMAT_VALUE)
3933                 ereport(FATAL,
3934                                 (errmsg("database files are incompatible with server"),
3935                                  errdetail("The database cluster appears to use a different floating-point number format than the server executable."),
3936                                  errhint("It looks like you need to initdb.")));
3937         if (ControlFile->blcksz != BLCKSZ)
3938                 ereport(FATAL,
3939                                 (errmsg("database files are incompatible with server"),
3940                          errdetail("The database cluster was initialized with BLCKSZ %d,"
3941                                            " but the server was compiled with BLCKSZ %d.",
3942                                            ControlFile->blcksz, BLCKSZ),
3943                                  errhint("It looks like you need to recompile or initdb.")));
3944         if (ControlFile->relseg_size != RELSEG_SIZE)
3945                 ereport(FATAL,
3946                                 (errmsg("database files are incompatible with server"),
3947                 errdetail("The database cluster was initialized with RELSEG_SIZE %d,"
3948                                   " but the server was compiled with RELSEG_SIZE %d.",
3949                                   ControlFile->relseg_size, RELSEG_SIZE),
3950                                  errhint("It looks like you need to recompile or initdb.")));
3951         if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
3952                 ereport(FATAL,
3953                                 (errmsg("database files are incompatible with server"),
3954                 errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
3955                                   " but the server was compiled with XLOG_BLCKSZ %d.",
3956                                   ControlFile->xlog_blcksz, XLOG_BLCKSZ),
3957                                  errhint("It looks like you need to recompile or initdb.")));
3958         if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
3959                 ereport(FATAL,
3960                                 (errmsg("database files are incompatible with server"),
3961                                  errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d,"
3962                                            " but the server was compiled with XLOG_SEG_SIZE %d.",
3963                                                    ControlFile->xlog_seg_size, XLOG_SEG_SIZE),
3964                                  errhint("It looks like you need to recompile or initdb.")));
3965         if (ControlFile->nameDataLen != NAMEDATALEN)
3966                 ereport(FATAL,
3967                                 (errmsg("database files are incompatible with server"),
3968                 errdetail("The database cluster was initialized with NAMEDATALEN %d,"
3969                                   " but the server was compiled with NAMEDATALEN %d.",
3970                                   ControlFile->nameDataLen, NAMEDATALEN),
3971                                  errhint("It looks like you need to recompile or initdb.")));
3972         if (ControlFile->indexMaxKeys != INDEX_MAX_KEYS)
3973                 ereport(FATAL,
3974                                 (errmsg("database files are incompatible with server"),
3975                                  errdetail("The database cluster was initialized with INDEX_MAX_KEYS %d,"
3976                                           " but the server was compiled with INDEX_MAX_KEYS %d.",
3977                                                    ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
3978                                  errhint("It looks like you need to recompile or initdb.")));
3979         if (ControlFile->toast_max_chunk_size != TOAST_MAX_CHUNK_SIZE)
3980                 ereport(FATAL,
3981                                 (errmsg("database files are incompatible with server"),
3982                                  errdetail("The database cluster was initialized with TOAST_MAX_CHUNK_SIZE %d,"
3983                                                    " but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.",
3984                                                    ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE),
3985                                  errhint("It looks like you need to recompile or initdb.")));
3986
3987 #ifdef HAVE_INT64_TIMESTAMP
3988         if (ControlFile->enableIntTimes != TRUE)
3989                 ereport(FATAL,
3990                                 (errmsg("database files are incompatible with server"),
3991                                  errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
3992                                   " but the server was compiled with HAVE_INT64_TIMESTAMP."),
3993                                  errhint("It looks like you need to recompile or initdb.")));
3994 #else
3995         if (ControlFile->enableIntTimes != FALSE)
3996                 ereport(FATAL,
3997                                 (errmsg("database files are incompatible with server"),
3998                                  errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
3999                            " but the server was compiled without HAVE_INT64_TIMESTAMP."),
4000                                  errhint("It looks like you need to recompile or initdb.")));
4001 #endif
4002
4003         if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
4004                 ereport(FATAL,
4005                                 (errmsg("database files are incompatible with server"),
4006                                  errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
4007                                   " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
4008                                                    ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
4009                                  errhint("It looks like you need to recompile or initdb.")));
4010         if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
4011                 ereport(FATAL,
4012                         (errmsg("database files are incompatible with operating system"),
4013                          errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
4014                                            " which is not recognized by setlocale().",
4015                                            ControlFile->lc_collate),
4016                          errhint("It looks like you need to initdb or install locale support.")));
4017         if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
4018                 ereport(FATAL,
4019                         (errmsg("database files are incompatible with operating system"),
4020                 errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
4021                                   " which is not recognized by setlocale().",
4022                                   ControlFile->lc_ctype),
4023                          errhint("It looks like you need to initdb or install locale support.")));
4024
4025         /* Make the fixed locale settings visible as GUC variables, too */
4026         SetConfigOption("lc_collate", ControlFile->lc_collate,
4027                                         PGC_INTERNAL, PGC_S_OVERRIDE);
4028         SetConfigOption("lc_ctype", ControlFile->lc_ctype,
4029                                         PGC_INTERNAL, PGC_S_OVERRIDE);
4030 }
4031
4032 void
4033 UpdateControlFile(void)
4034 {
4035         int                     fd;
4036
4037         INIT_CRC32(ControlFile->crc);
4038         COMP_CRC32(ControlFile->crc,
4039                            (char *) ControlFile,
4040                            offsetof(ControlFileData, crc));
4041         FIN_CRC32(ControlFile->crc);
4042
4043         fd = BasicOpenFile(XLOG_CONTROL_FILE,
4044                                            O_RDWR | PG_BINARY,
4045                                            S_IRUSR | S_IWUSR);
4046         if (fd < 0)
4047                 ereport(PANIC,
4048                                 (errcode_for_file_access(),
4049                                  errmsg("could not open control file \"%s\": %m",
4050                                                 XLOG_CONTROL_FILE)));
4051
4052         errno = 0;
4053         if (write(fd, ControlFile, sizeof(ControlFileData)) != sizeof(ControlFileData))
4054         {
4055                 /* if write didn't set errno, assume problem is no disk space */
4056                 if (errno == 0)
4057                         errno = ENOSPC;
4058                 ereport(PANIC,
4059                                 (errcode_for_file_access(),
4060                                  errmsg("could not write to control file: %m")));
4061         }
4062
4063         if (pg_fsync(fd) != 0)
4064                 ereport(PANIC,
4065                                 (errcode_for_file_access(),
4066                                  errmsg("could not fsync control file: %m")));
4067
4068         if (close(fd))
4069                 ereport(PANIC,
4070                                 (errcode_for_file_access(),
4071                                  errmsg("could not close control file: %m")));
4072 }
4073
4074 /*
4075  * Initialization of shared memory for XLOG
4076  */
4077 Size
4078 XLOGShmemSize(void)
4079 {
4080         Size            size;
4081
4082         /* XLogCtl */
4083         size = sizeof(XLogCtlData);
4084         /* xlblocks array */
4085         size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers));
4086         /* extra alignment padding for XLOG I/O buffers */
4087         size = add_size(size, ALIGNOF_XLOG_BUFFER);
4088         /* and the buffers themselves */
4089         size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers));
4090
4091         /*
4092          * Note: we don't count ControlFileData, it comes out of the "slop factor"
4093          * added by CreateSharedMemoryAndSemaphores.  This lets us use this
4094          * routine again below to compute the actual allocation size.
4095          */
4096
4097         return size;
4098 }
4099
4100 void
4101 XLOGShmemInit(void)
4102 {
4103         bool            foundCFile,
4104                                 foundXLog;
4105         char       *allocptr;
4106
4107         ControlFile = (ControlFileData *)
4108                 ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile);
4109         XLogCtl = (XLogCtlData *)
4110                 ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog);
4111
4112         if (foundCFile || foundXLog)
4113         {
4114                 /* both should be present or neither */
4115                 Assert(foundCFile && foundXLog);
4116                 return;
4117         }
4118
4119         memset(XLogCtl, 0, sizeof(XLogCtlData));
4120
4121         /*
4122          * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be a
4123          * multiple of the alignment for same, so no extra alignment padding is
4124          * needed here.
4125          */
4126         allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData);
4127         XLogCtl->xlblocks = (XLogRecPtr *) allocptr;
4128         memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
4129         allocptr += sizeof(XLogRecPtr) * XLOGbuffers;
4130
4131         /*
4132          * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary.
4133          */
4134         allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr);
4135         XLogCtl->pages = allocptr;
4136         memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers);
4137
4138         /*
4139          * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
4140          * in additional info.)
4141          */
4142         XLogCtl->XLogCacheByte = (Size) XLOG_BLCKSZ *XLOGbuffers;
4143
4144         XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
4145         XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
4146         SpinLockInit(&XLogCtl->info_lck);
4147
4148         /*
4149          * If we are not in bootstrap mode, pg_control should already exist. Read
4150          * and validate it immediately (see comments in ReadControlFile() for the
4151          * reasons why).
4152          */
4153         if (!IsBootstrapProcessingMode())
4154                 ReadControlFile();
4155 }
4156
4157 /*
4158  * This func must be called ONCE on system install.  It creates pg_control
4159  * and the initial XLOG segment.
4160  */
4161 void
4162 BootStrapXLOG(void)
4163 {
4164         CheckPoint      checkPoint;
4165         char       *buffer;
4166         XLogPageHeader page;
4167         XLogLongPageHeader longpage;
4168         XLogRecord *record;
4169         bool            use_existent;
4170         uint64          sysidentifier;
4171         struct timeval tv;
4172         pg_crc32        crc;
4173
4174         /*
4175          * Select a hopefully-unique system identifier code for this installation.
4176          * We use the result of gettimeofday(), including the fractional seconds
4177          * field, as being about as unique as we can easily get.  (Think not to
4178          * use random(), since it hasn't been seeded and there's no portable way
4179          * to seed it other than the system clock value...)  The upper half of the
4180          * uint64 value is just the tv_sec part, while the lower half is the XOR
4181          * of tv_sec and tv_usec.  This is to ensure that we don't lose uniqueness
4182          * unnecessarily if "uint64" is really only 32 bits wide.  A person
4183          * knowing this encoding can determine the initialization time of the
4184          * installation, which could perhaps be useful sometimes.
4185          */
4186         gettimeofday(&tv, NULL);
4187         sysidentifier = ((uint64) tv.tv_sec) << 32;
4188         sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
4189
4190         /* First timeline ID is always 1 */
4191         ThisTimeLineID = 1;
4192
4193         /* page buffer must be aligned suitably for O_DIRECT */
4194         buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER);
4195         page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer);
4196         memset(page, 0, XLOG_BLCKSZ);
4197
4198         /* Set up information for the initial checkpoint record */
4199         checkPoint.redo.xlogid = 0;
4200         checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
4201         checkPoint.ThisTimeLineID = ThisTimeLineID;
4202         checkPoint.nextXidEpoch = 0;
4203         checkPoint.nextXid = FirstNormalTransactionId;
4204         checkPoint.nextOid = FirstBootstrapObjectId;
4205         checkPoint.nextMulti = FirstMultiXactId;
4206         checkPoint.nextMultiOffset = 0;
4207         checkPoint.time = time(NULL);
4208
4209         ShmemVariableCache->nextXid = checkPoint.nextXid;
4210         ShmemVariableCache->nextOid = checkPoint.nextOid;
4211         ShmemVariableCache->oidCount = 0;
4212         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4213
4214         /* Set up the XLOG page header */
4215         page->xlp_magic = XLOG_PAGE_MAGIC;
4216         page->xlp_info = XLP_LONG_HEADER;
4217         page->xlp_tli = ThisTimeLineID;
4218         page->xlp_pageaddr.xlogid = 0;
4219         page->xlp_pageaddr.xrecoff = 0;
4220         longpage = (XLogLongPageHeader) page;
4221         longpage->xlp_sysid = sysidentifier;
4222         longpage->xlp_seg_size = XLogSegSize;
4223         longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
4224
4225         /* Insert the initial checkpoint record */
4226         record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
4227         record->xl_prev.xlogid = 0;
4228         record->xl_prev.xrecoff = 0;
4229         record->xl_xid = InvalidTransactionId;
4230         record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
4231         record->xl_len = sizeof(checkPoint);
4232         record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
4233         record->xl_rmid = RM_XLOG_ID;
4234         memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
4235
4236         INIT_CRC32(crc);
4237         COMP_CRC32(crc, &checkPoint, sizeof(checkPoint));
4238         COMP_CRC32(crc, (char *) record + sizeof(pg_crc32),
4239                            SizeOfXLogRecord - sizeof(pg_crc32));
4240         FIN_CRC32(crc);
4241         record->xl_crc = crc;
4242
4243         /* Create first XLOG segment file */
4244         use_existent = false;
4245         openLogFile = XLogFileInit(0, 0, &use_existent, false);
4246
4247         /* Write the first page with the initial record */
4248         errno = 0;
4249         if (write(openLogFile, page, XLOG_BLCKSZ) != XLOG_BLCKSZ)
4250         {
4251                 /* if write didn't set errno, assume problem is no disk space */
4252                 if (errno == 0)
4253                         errno = ENOSPC;
4254                 ereport(PANIC,
4255                                 (errcode_for_file_access(),
4256                           errmsg("could not write bootstrap transaction log file: %m")));
4257         }
4258
4259         if (pg_fsync(openLogFile) != 0)
4260                 ereport(PANIC,
4261                                 (errcode_for_file_access(),
4262                           errmsg("could not fsync bootstrap transaction log file: %m")));
4263
4264         if (close(openLogFile))
4265                 ereport(PANIC,
4266                                 (errcode_for_file_access(),
4267                           errmsg("could not close bootstrap transaction log file: %m")));
4268
4269         openLogFile = -1;
4270
4271         /* Now create pg_control */
4272
4273         memset(ControlFile, 0, sizeof(ControlFileData));
4274         /* Initialize pg_control status fields */
4275         ControlFile->system_identifier = sysidentifier;
4276         ControlFile->state = DB_SHUTDOWNED;
4277         ControlFile->time = checkPoint.time;
4278         ControlFile->checkPoint = checkPoint.redo;
4279         ControlFile->checkPointCopy = checkPoint;
4280         /* some additional ControlFile fields are set in WriteControlFile() */
4281
4282         WriteControlFile();
4283
4284         /* Bootstrap the commit log, too */
4285         BootStrapCLOG();
4286         BootStrapSUBTRANS();
4287         BootStrapMultiXact();
4288
4289         pfree(buffer);
4290 }
4291
4292 static char *
4293 str_time(pg_time_t tnow)
4294 {
4295         static char buf[128];
4296
4297         pg_strftime(buf, sizeof(buf),
4298                                 "%Y-%m-%d %H:%M:%S %Z",
4299                                 pg_localtime(&tnow, log_timezone));
4300
4301         return buf;
4302 }
4303
4304 /*
4305  * See if there is a recovery command file (recovery.conf), and if so
4306  * read in parameters for archive recovery.
4307  *
4308  * XXX longer term intention is to expand this to
4309  * cater for additional parameters and controls
4310  * possibly use a flex lexer similar to the GUC one
4311  */
4312 static void
4313 readRecoveryCommandFile(void)
4314 {
4315         FILE       *fd;
4316         char            cmdline[MAXPGPATH];
4317         TimeLineID      rtli = 0;
4318         bool            rtliGiven = false;
4319         bool            syntaxError = false;
4320
4321         fd = AllocateFile(RECOVERY_COMMAND_FILE, "r");
4322         if (fd == NULL)
4323         {
4324                 if (errno == ENOENT)
4325                         return;                         /* not there, so no archive recovery */
4326                 ereport(FATAL,
4327                                 (errcode_for_file_access(),
4328                                  errmsg("could not open recovery command file \"%s\": %m",
4329                                                 RECOVERY_COMMAND_FILE)));
4330         }
4331
4332         ereport(LOG,
4333                         (errmsg("starting archive recovery")));
4334
4335         /*
4336          * Parse the file...
4337          */
4338         while (fgets(cmdline, sizeof(cmdline), fd) != NULL)
4339         {
4340                 /* skip leading whitespace and check for # comment */
4341                 char       *ptr;
4342                 char       *tok1;
4343                 char       *tok2;
4344
4345                 for (ptr = cmdline; *ptr; ptr++)
4346                 {
4347                         if (!isspace((unsigned char) *ptr))
4348                                 break;
4349                 }
4350                 if (*ptr == '\0' || *ptr == '#')
4351                         continue;
4352
4353                 /* identify the quoted parameter value */
4354                 tok1 = strtok(ptr, "'");
4355                 if (!tok1)
4356                 {
4357                         syntaxError = true;
4358                         break;
4359                 }
4360                 tok2 = strtok(NULL, "'");
4361                 if (!tok2)
4362                 {
4363                         syntaxError = true;
4364                         break;
4365                 }
4366                 /* reparse to get just the parameter name */
4367                 tok1 = strtok(ptr, " \t=");
4368                 if (!tok1)
4369                 {
4370                         syntaxError = true;
4371                         break;
4372                 }
4373
4374                 if (strcmp(tok1, "restore_command") == 0)
4375                 {
4376                         recoveryRestoreCommand = pstrdup(tok2);
4377                         ereport(LOG,
4378                                         (errmsg("restore_command = \"%s\"",
4379                                                         recoveryRestoreCommand)));
4380                 }
4381                 else if (strcmp(tok1, "recovery_target_timeline") == 0)
4382                 {
4383                         rtliGiven = true;
4384                         if (strcmp(tok2, "latest") == 0)
4385                                 rtli = 0;
4386                         else
4387                         {
4388                                 errno = 0;
4389                                 rtli = (TimeLineID) strtoul(tok2, NULL, 0);
4390                                 if (errno == EINVAL || errno == ERANGE)
4391                                         ereport(FATAL,
4392                                                         (errmsg("recovery_target_timeline is not a valid number: \"%s\"",
4393                                                                         tok2)));
4394                         }
4395                         if (rtli)
4396                                 ereport(LOG,
4397                                                 (errmsg("recovery_target_timeline = %u", rtli)));
4398                         else
4399                                 ereport(LOG,
4400                                                 (errmsg("recovery_target_timeline = latest")));
4401                 }
4402                 else if (strcmp(tok1, "recovery_target_xid") == 0)
4403                 {
4404                         errno = 0;
4405                         recoveryTargetXid = (TransactionId) strtoul(tok2, NULL, 0);
4406                         if (errno == EINVAL || errno == ERANGE)
4407                                 ereport(FATAL,
4408                                  (errmsg("recovery_target_xid is not a valid number: \"%s\"",
4409                                                  tok2)));
4410                         ereport(LOG,
4411                                         (errmsg("recovery_target_xid = %u",
4412                                                         recoveryTargetXid)));
4413                         recoveryTarget = true;
4414                         recoveryTargetExact = true;
4415                 }
4416                 else if (strcmp(tok1, "recovery_target_time") == 0)
4417                 {
4418                         /*
4419                          * if recovery_target_xid specified, then this overrides
4420                          * recovery_target_time
4421                          */
4422                         if (recoveryTargetExact)
4423                                 continue;
4424                         recoveryTarget = true;
4425                         recoveryTargetExact = false;
4426
4427                         /*
4428                          * Convert the time string given by the user to TimestampTz form.
4429                          */
4430                         recoveryTargetTime =
4431                                 DatumGetTimestampTz(DirectFunctionCall3(timestamptz_in,
4432                                                                                                    CStringGetDatum(tok2),
4433                                                                                                 ObjectIdGetDatum(InvalidOid),
4434                                                                                                                 Int32GetDatum(-1)));
4435                         ereport(LOG,
4436                                         (errmsg("recovery_target_time = %s",
4437                                                         timestamptz_to_str(recoveryTargetTime))));
4438                 }
4439                 else if (strcmp(tok1, "recovery_target_inclusive") == 0)
4440                 {
4441                         /*
4442                          * does nothing if a recovery_target is not also set
4443                          */
4444                         if (strcmp(tok2, "true") == 0)
4445                                 recoveryTargetInclusive = true;
4446                         else
4447                         {
4448                                 recoveryTargetInclusive = false;
4449                                 tok2 = "false";
4450                         }
4451                         ereport(LOG,
4452                                         (errmsg("recovery_target_inclusive = %s", tok2)));
4453                 }
4454                 else if (strcmp(tok1, "log_restartpoints") == 0)
4455                 {
4456                         /*
4457                          * does nothing if a recovery_target is not also set
4458                          */
4459                         if (strcmp(tok2, "true") == 0)
4460                                 recoveryLogRestartpoints = true;
4461                         else
4462                         {
4463                                 recoveryLogRestartpoints = false;
4464                                 tok2 = "false";
4465                         }
4466                         ereport(LOG,
4467                                         (errmsg("log_restartpoints = %s", tok2)));
4468                 }
4469                 else
4470                         ereport(FATAL,
4471                                         (errmsg("unrecognized recovery parameter \"%s\"",
4472                                                         tok1)));
4473         }
4474
4475         FreeFile(fd);
4476
4477         if (syntaxError)
4478                 ereport(FATAL,
4479                                 (errmsg("syntax error in recovery command file: %s",
4480                                                 cmdline),
4481                           errhint("Lines should have the format parameter = 'value'.")));
4482
4483         /* Check that required parameters were supplied */
4484         if (recoveryRestoreCommand == NULL)
4485                 ereport(FATAL,
4486                                 (errmsg("recovery command file \"%s\" did not specify restore_command",
4487                                                 RECOVERY_COMMAND_FILE)));
4488
4489         /* Enable fetching from archive recovery area */
4490         InArchiveRecovery = true;
4491
4492         /*
4493          * If user specified recovery_target_timeline, validate it or compute the
4494          * "latest" value.      We can't do this until after we've gotten the restore
4495          * command and set InArchiveRecovery, because we need to fetch timeline
4496          * history files from the archive.
4497          */
4498         if (rtliGiven)
4499         {
4500                 if (rtli)
4501                 {
4502                         /* Timeline 1 does not have a history file, all else should */
4503                         if (rtli != 1 && !existsTimeLineHistory(rtli))
4504                                 ereport(FATAL,
4505                                                 (errmsg("recovery_target_timeline %u does not exist",
4506                                                                 rtli)));
4507                         recoveryTargetTLI = rtli;
4508                 }
4509                 else
4510                 {
4511                         /* We start the "latest" search from pg_control's timeline */
4512                         recoveryTargetTLI = findNewestTimeLine(recoveryTargetTLI);
4513                 }
4514         }
4515 }
4516
4517 /*
4518  * Exit archive-recovery state
4519  */
4520 static void
4521 exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
4522 {
4523         char            recoveryPath[MAXPGPATH];
4524         char            xlogpath[MAXPGPATH];
4525
4526         /*
4527          * We are no longer in archive recovery state.
4528          */
4529         InArchiveRecovery = false;
4530
4531         /*
4532          * We should have the ending log segment currently open.  Verify, and then
4533          * close it (to avoid problems on Windows with trying to rename or delete
4534          * an open file).
4535          */
4536         Assert(readFile >= 0);
4537         Assert(readId == endLogId);
4538         Assert(readSeg == endLogSeg);
4539
4540         close(readFile);
4541         readFile = -1;
4542
4543         /*
4544          * If the segment was fetched from archival storage, we want to replace
4545          * the existing xlog segment (if any) with the archival version.  This is
4546          * because whatever is in XLOGDIR is very possibly older than what we have
4547          * from the archives, since it could have come from restoring a PGDATA
4548          * backup.      In any case, the archival version certainly is more
4549          * descriptive of what our current database state is, because that is what
4550          * we replayed from.
4551          *
4552          * Note that if we are establishing a new timeline, ThisTimeLineID is
4553          * already set to the new value, and so we will create a new file instead
4554          * of overwriting any existing file.  (This is, in fact, always the case
4555          * at present.)
4556          */
4557         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
4558         XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4559
4560         if (restoredFromArchive)
4561         {
4562                 ereport(DEBUG3,
4563                                 (errmsg_internal("moving last restored xlog to \"%s\"",
4564                                                                  xlogpath)));
4565                 unlink(xlogpath);               /* might or might not exist */
4566                 if (rename(recoveryPath, xlogpath) != 0)
4567                         ereport(FATAL,
4568                                         (errcode_for_file_access(),
4569                                          errmsg("could not rename file \"%s\" to \"%s\": %m",
4570                                                         recoveryPath, xlogpath)));
4571                 /* XXX might we need to fix permissions on the file? */
4572         }
4573         else
4574         {
4575                 /*
4576                  * If the latest segment is not archival, but there's still a
4577                  * RECOVERYXLOG laying about, get rid of it.
4578                  */
4579                 unlink(recoveryPath);   /* ignore any error */
4580
4581                 /*
4582                  * If we are establishing a new timeline, we have to copy data from
4583                  * the last WAL segment of the old timeline to create a starting WAL
4584                  * segment for the new timeline.
4585                  */
4586                 if (endTLI != ThisTimeLineID)
4587                         XLogFileCopy(endLogId, endLogSeg,
4588                                                  endTLI, endLogId, endLogSeg);
4589         }
4590
4591         /*
4592          * Let's just make real sure there are not .ready or .done flags posted
4593          * for the new segment.
4594          */
4595         XLogFileName(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
4596         XLogArchiveCleanup(xlogpath);
4597
4598         /* Get rid of any remaining recovered timeline-history file, too */
4599         snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYHISTORY");
4600         unlink(recoveryPath);           /* ignore any error */
4601
4602         /*
4603          * Rename the config file out of the way, so that we don't accidentally
4604          * re-enter archive recovery mode in a subsequent crash.
4605          */
4606         unlink(RECOVERY_COMMAND_DONE);
4607         if (rename(RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE) != 0)
4608                 ereport(FATAL,
4609                                 (errcode_for_file_access(),
4610                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
4611                                                 RECOVERY_COMMAND_FILE, RECOVERY_COMMAND_DONE)));
4612
4613         ereport(LOG,
4614                         (errmsg("archive recovery complete")));
4615 }
4616
4617 /*
4618  * For point-in-time recovery, this function decides whether we want to
4619  * stop applying the XLOG at or after the current record.
4620  *
4621  * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
4622  * *includeThis is set TRUE if we should apply this record before stopping.
4623  * Also, some information is saved in recoveryStopXid et al for use in
4624  * annotating the new timeline's history file.
4625  */
4626 static bool
4627 recoveryStopsHere(XLogRecord *record, bool *includeThis)
4628 {
4629         bool            stopsHere;
4630         uint8           record_info;
4631         TimestampTz     recordXtime;
4632
4633         /* We only consider stopping at COMMIT or ABORT records */
4634         if (record->xl_rmid != RM_XACT_ID)
4635                 return false;
4636         record_info = record->xl_info & ~XLR_INFO_MASK;
4637         if (record_info == XLOG_XACT_COMMIT)
4638         {
4639                 xl_xact_commit *recordXactCommitData;
4640
4641                 recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
4642                 recordXtime = recordXactCommitData->xact_time;
4643         }
4644         else if (record_info == XLOG_XACT_ABORT)
4645         {
4646                 xl_xact_abort *recordXactAbortData;
4647
4648                 recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
4649                 recordXtime = recordXactAbortData->xact_time;
4650         }
4651         else
4652                 return false;
4653
4654         /* Remember the most recent COMMIT/ABORT time for logging purposes */
4655         recoveryLastXTime = recordXtime;
4656
4657         /* Do we have a PITR target at all? */
4658         if (!recoveryTarget)
4659                 return false;
4660
4661         if (recoveryTargetExact)
4662         {
4663                 /*
4664                  * there can be only one transaction end record with this exact
4665                  * transactionid
4666                  *
4667                  * when testing for an xid, we MUST test for equality only, since
4668                  * transactions are numbered in the order they start, not the order
4669                  * they complete. A higher numbered xid will complete before you about
4670                  * 50% of the time...
4671                  */
4672                 stopsHere = (record->xl_xid == recoveryTargetXid);
4673                 if (stopsHere)
4674                         *includeThis = recoveryTargetInclusive;
4675         }
4676         else
4677         {
4678                 /*
4679                  * there can be many transactions that share the same commit time, so
4680                  * we stop after the last one, if we are inclusive, or stop at the
4681                  * first one if we are exclusive
4682                  */
4683                 if (recoveryTargetInclusive)
4684                         stopsHere = (recordXtime > recoveryTargetTime);
4685                 else
4686                         stopsHere = (recordXtime >= recoveryTargetTime);
4687                 if (stopsHere)
4688                         *includeThis = false;
4689         }
4690
4691         if (stopsHere)
4692         {
4693                 recoveryStopXid = record->xl_xid;
4694                 recoveryStopTime = recordXtime;
4695                 recoveryStopAfter = *includeThis;
4696
4697                 if (record_info == XLOG_XACT_COMMIT)
4698                 {
4699                         if (recoveryStopAfter)
4700                                 ereport(LOG,
4701                                                 (errmsg("recovery stopping after commit of transaction %u, time %s",
4702                                                                 recoveryStopXid,
4703                                                                 timestamptz_to_str(recoveryStopTime))));
4704                         else
4705                                 ereport(LOG,
4706                                                 (errmsg("recovery stopping before commit of transaction %u, time %s",
4707                                                                 recoveryStopXid,
4708                                                                 timestamptz_to_str(recoveryStopTime))));
4709                 }
4710                 else
4711                 {
4712                         if (recoveryStopAfter)
4713                                 ereport(LOG,
4714                                                 (errmsg("recovery stopping after abort of transaction %u, time %s",
4715                                                                 recoveryStopXid,
4716                                                                 timestamptz_to_str(recoveryStopTime))));
4717                         else
4718                                 ereport(LOG,
4719                                                 (errmsg("recovery stopping before abort of transaction %u, time %s",
4720                                                                 recoveryStopXid,
4721                                                                 timestamptz_to_str(recoveryStopTime))));
4722                 }
4723         }
4724
4725         return stopsHere;
4726 }
4727
4728 /*
4729  * This must be called ONCE during postmaster or standalone-backend startup
4730  */
4731 void
4732 StartupXLOG(void)
4733 {
4734         XLogCtlInsert *Insert;
4735         CheckPoint      checkPoint;
4736         bool            wasShutdown;
4737         bool            reachedStopPoint = false;
4738         bool            haveBackupLabel = false;
4739         XLogRecPtr      RecPtr,
4740                                 LastRec,
4741                                 checkPointLoc,
4742                                 minRecoveryLoc,
4743                                 EndOfLog;
4744         uint32          endLogId;
4745         uint32          endLogSeg;
4746         XLogRecord *record;
4747         uint32          freespace;
4748         TransactionId oldestActiveXID;
4749
4750         /*
4751          * Read control file and check XLOG status looks valid.
4752          *
4753          * Note: in most control paths, *ControlFile is already valid and we need
4754          * not do ReadControlFile() here, but might as well do it to be sure.
4755          */
4756         ReadControlFile();
4757
4758         if (ControlFile->state < DB_SHUTDOWNED ||
4759                 ControlFile->state > DB_IN_PRODUCTION ||
4760                 !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
4761                 ereport(FATAL,
4762                                 (errmsg("control file contains invalid data")));
4763
4764         if (ControlFile->state == DB_SHUTDOWNED)
4765                 ereport(LOG,
4766                                 (errmsg("database system was shut down at %s",
4767                                                 str_time(ControlFile->time))));
4768         else if (ControlFile->state == DB_SHUTDOWNING)
4769                 ereport(LOG,
4770                                 (errmsg("database system shutdown was interrupted; last known up at %s",
4771                                                 str_time(ControlFile->time))));
4772         else if (ControlFile->state == DB_IN_CRASH_RECOVERY)
4773                 ereport(LOG,
4774                    (errmsg("database system was interrupted while in recovery at %s",
4775                                    str_time(ControlFile->time)),
4776                         errhint("This probably means that some data is corrupted and"
4777                                         " you will have to use the last backup for recovery.")));
4778         else if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY)
4779                 ereport(LOG,
4780                                 (errmsg("database system was interrupted while in recovery at log time %s",
4781                                                 str_time(ControlFile->checkPointCopy.time)),
4782                                  errhint("If this has occurred more than once some data might be corrupted"
4783                                 " and you might need to choose an earlier recovery target.")));
4784         else if (ControlFile->state == DB_IN_PRODUCTION)
4785                 ereport(LOG,
4786                                 (errmsg("database system was interrupted; last known up at %s",
4787                                                 str_time(ControlFile->time))));
4788
4789         /* This is just to allow attaching to startup process with a debugger */
4790 #ifdef XLOG_REPLAY_DELAY
4791         if (ControlFile->state != DB_SHUTDOWNED)
4792                 pg_usleep(60000000L);
4793 #endif
4794
4795         /*
4796          * Initialize on the assumption we want to recover to the same timeline
4797          * that's active according to pg_control.
4798          */
4799         recoveryTargetTLI = ControlFile->checkPointCopy.ThisTimeLineID;
4800
4801         /*
4802          * Check for recovery control file, and if so set up state for offline
4803          * recovery
4804          */
4805         readRecoveryCommandFile();
4806
4807         /* Now we can determine the list of expected TLIs */
4808         expectedTLIs = readTimeLineHistory(recoveryTargetTLI);
4809
4810         /*
4811          * If pg_control's timeline is not in expectedTLIs, then we cannot
4812          * proceed: the backup is not part of the history of the requested
4813          * timeline.
4814          */
4815         if (!list_member_int(expectedTLIs,
4816                                                  (int) ControlFile->checkPointCopy.ThisTimeLineID))
4817                 ereport(FATAL,
4818                                 (errmsg("requested timeline %u is not a child of database system timeline %u",
4819                                                 recoveryTargetTLI,
4820                                                 ControlFile->checkPointCopy.ThisTimeLineID)));
4821
4822         if (read_backup_label(&checkPointLoc, &minRecoveryLoc))
4823         {
4824                 /*
4825                  * When a backup_label file is present, we want to roll forward from
4826                  * the checkpoint it identifies, rather than using pg_control.
4827                  */
4828                 record = ReadCheckpointRecord(checkPointLoc, 0);
4829                 if (record != NULL)
4830                 {
4831                         ereport(DEBUG1,
4832                                         (errmsg("checkpoint record is at %X/%X",
4833                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4834                         InRecovery = true;      /* force recovery even if SHUTDOWNED */
4835                 }
4836                 else
4837                 {
4838                         ereport(PANIC,
4839                                         (errmsg("could not locate required checkpoint record"),
4840                                          errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
4841                 }
4842                 /* set flag to delete it later */
4843                 haveBackupLabel = true;
4844         }
4845         else
4846         {
4847                 /*
4848                  * Get the last valid checkpoint record.  If the latest one according
4849                  * to pg_control is broken, try the next-to-last one.
4850                  */
4851                 checkPointLoc = ControlFile->checkPoint;
4852                 record = ReadCheckpointRecord(checkPointLoc, 1);
4853                 if (record != NULL)
4854                 {
4855                         ereport(DEBUG1,
4856                                         (errmsg("checkpoint record is at %X/%X",
4857                                                         checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4858                 }
4859                 else
4860                 {
4861                         checkPointLoc = ControlFile->prevCheckPoint;
4862                         record = ReadCheckpointRecord(checkPointLoc, 2);
4863                         if (record != NULL)
4864                         {
4865                                 ereport(LOG,
4866                                                 (errmsg("using previous checkpoint record at %X/%X",
4867                                                           checkPointLoc.xlogid, checkPointLoc.xrecoff)));
4868                                 InRecovery = true;              /* force recovery even if SHUTDOWNED */
4869                         }
4870                         else
4871                                 ereport(PANIC,
4872                                          (errmsg("could not locate a valid checkpoint record")));
4873                 }
4874         }
4875
4876         LastRec = RecPtr = checkPointLoc;
4877         memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
4878         wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
4879
4880         ereport(DEBUG1,
4881          (errmsg("redo record is at %X/%X; shutdown %s",
4882                          checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
4883                          wasShutdown ? "TRUE" : "FALSE")));
4884         ereport(DEBUG1,
4885                         (errmsg("next transaction ID: %u/%u; next OID: %u",
4886                                         checkPoint.nextXidEpoch, checkPoint.nextXid,
4887                                         checkPoint.nextOid)));
4888         ereport(DEBUG1,
4889                         (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
4890                                         checkPoint.nextMulti, checkPoint.nextMultiOffset)));
4891         if (!TransactionIdIsNormal(checkPoint.nextXid))
4892                 ereport(PANIC,
4893                                 (errmsg("invalid next transaction ID")));
4894
4895         ShmemVariableCache->nextXid = checkPoint.nextXid;
4896         ShmemVariableCache->nextOid = checkPoint.nextOid;
4897         ShmemVariableCache->oidCount = 0;
4898         MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
4899
4900         /*
4901          * We must replay WAL entries using the same TimeLineID they were created
4902          * under, so temporarily adopt the TLI indicated by the checkpoint (see
4903          * also xlog_redo()).
4904          */
4905         ThisTimeLineID = checkPoint.ThisTimeLineID;
4906
4907         RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
4908
4909         if (XLByteLT(RecPtr, checkPoint.redo))
4910                 ereport(PANIC,
4911                                 (errmsg("invalid redo in checkpoint record")));
4912
4913         /*
4914          * Check whether we need to force recovery from WAL.  If it appears to
4915          * have been a clean shutdown and we did not have a recovery.conf file,
4916          * then assume no recovery needed.
4917          */
4918         if (XLByteLT(checkPoint.redo, RecPtr))
4919         {
4920                 if (wasShutdown)
4921                         ereport(PANIC,
4922                                 (errmsg("invalid redo record in shutdown checkpoint")));
4923                 InRecovery = true;
4924         }
4925         else if (ControlFile->state != DB_SHUTDOWNED)
4926                 InRecovery = true;
4927         else if (InArchiveRecovery)
4928         {
4929                 /* force recovery due to presence of recovery.conf */
4930                 InRecovery = true;
4931         }
4932
4933         /* REDO */
4934         if (InRecovery)
4935         {
4936                 int                     rmid;
4937
4938                 /*
4939                  * Update pg_control to show that we are recovering and to show the
4940                  * selected checkpoint as the place we are starting from. We also mark
4941                  * pg_control with any minimum recovery stop point obtained from a
4942                  * backup history file.
4943                  */
4944                 if (InArchiveRecovery)
4945                 {
4946                         ereport(LOG,
4947                                         (errmsg("automatic recovery in progress")));
4948                         ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
4949                 }
4950                 else
4951                 {
4952                         ereport(LOG,
4953                                         (errmsg("database system was not properly shut down; "
4954                                                         "automatic recovery in progress")));
4955                         ControlFile->state = DB_IN_CRASH_RECOVERY;
4956                 }
4957                 ControlFile->prevCheckPoint = ControlFile->checkPoint;
4958                 ControlFile->checkPoint = checkPointLoc;
4959                 ControlFile->checkPointCopy = checkPoint;
4960                 if (minRecoveryLoc.xlogid != 0 || minRecoveryLoc.xrecoff != 0)
4961                         ControlFile->minRecoveryPoint = minRecoveryLoc;
4962                 ControlFile->time = time(NULL);
4963                 UpdateControlFile();
4964
4965                 /*
4966                  * If there was a backup label file, it's done its job and the info
4967                  * has now been propagated into pg_control.  We must get rid of the
4968                  * label file so that if we crash during recovery, we'll pick up at
4969                  * the latest recovery restartpoint instead of going all the way back
4970                  * to the backup start point.  It seems prudent though to just rename
4971                  * the file out of the way rather than delete it completely.
4972                  */
4973                 if (haveBackupLabel)
4974                 {
4975                         unlink(BACKUP_LABEL_OLD);
4976                         if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) != 0)
4977                                 ereport(FATAL,
4978                                                 (errcode_for_file_access(),
4979                                                  errmsg("could not rename file \"%s\" to \"%s\": %m",
4980                                                                 BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
4981                 }
4982
4983                 /* Start up the recovery environment */
4984                 XLogInitRelationCache();
4985
4986                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
4987                 {
4988                         if (RmgrTable[rmid].rm_startup != NULL)
4989                                 RmgrTable[rmid].rm_startup();
4990                 }
4991
4992                 /*
4993                  * Find the first record that logically follows the checkpoint --- it
4994                  * might physically precede it, though.
4995                  */
4996                 if (XLByteLT(checkPoint.redo, RecPtr))
4997                 {
4998                         /* back up to find the record */
4999                         record = ReadRecord(&(checkPoint.redo), PANIC);
5000                 }
5001                 else
5002                 {
5003                         /* just have to read next record after CheckPoint */
5004                         record = ReadRecord(NULL, LOG);
5005                 }
5006
5007                 if (record != NULL)
5008                 {
5009                         bool            recoveryContinue = true;
5010                         bool            recoveryApply = true;
5011                         ErrorContextCallback errcontext;
5012
5013                         InRedo = true;
5014                         ereport(LOG,
5015                                         (errmsg("redo starts at %X/%X",
5016                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
5017
5018                         /*
5019                          * main redo apply loop
5020                          */
5021                         do
5022                         {
5023 #ifdef WAL_DEBUG
5024                                 if (XLOG_DEBUG)
5025                                 {
5026                                         StringInfoData buf;
5027
5028                                         initStringInfo(&buf);
5029                                         appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
5030                                                                          ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
5031                                                                          EndRecPtr.xlogid, EndRecPtr.xrecoff);
5032                                         xlog_outrec(&buf, record);
5033                                         appendStringInfo(&buf, " - ");
5034                                         RmgrTable[record->xl_rmid].rm_desc(&buf,
5035                                                                                                            record->xl_info,
5036                                                                                                          XLogRecGetData(record));
5037                                         elog(LOG, "%s", buf.data);
5038                                         pfree(buf.data);
5039                                 }
5040 #endif
5041
5042                                 /*
5043                                  * Have we reached our recovery target?
5044                                  */
5045                                 if (recoveryStopsHere(record, &recoveryApply))
5046                                 {
5047                                         reachedStopPoint = true;                /* see below */
5048                                         recoveryContinue = false;
5049                                         if (!recoveryApply)
5050                                                 break;
5051                                 }
5052
5053                                 /* Setup error traceback support for ereport() */
5054                                 errcontext.callback = rm_redo_error_callback;
5055                                 errcontext.arg = (void *) record;
5056                                 errcontext.previous = error_context_stack;
5057                                 error_context_stack = &errcontext;
5058
5059                                 /* nextXid must be beyond record's xid */
5060                                 if (TransactionIdFollowsOrEquals(record->xl_xid,
5061                                                                                                  ShmemVariableCache->nextXid))
5062                                 {
5063                                         ShmemVariableCache->nextXid = record->xl_xid;
5064                                         TransactionIdAdvance(ShmemVariableCache->nextXid);
5065                                 }
5066
5067                                 if (record->xl_info & XLR_BKP_BLOCK_MASK)
5068                                         RestoreBkpBlocks(record, EndRecPtr);
5069
5070                                 RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
5071
5072                                 /* Pop the error context stack */
5073                                 error_context_stack = errcontext.previous;
5074
5075                                 LastRec = ReadRecPtr;
5076
5077                                 record = ReadRecord(NULL, LOG);
5078                         } while (record != NULL && recoveryContinue);
5079
5080                         /*
5081                          * end of main redo apply loop
5082                          */
5083
5084                         ereport(LOG,
5085                                         (errmsg("redo done at %X/%X",
5086                                                         ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
5087                         if (recoveryLastXTime)
5088                                 ereport(LOG,
5089                                                 (errmsg("last completed transaction was at log time %s",
5090                                                                 timestamptz_to_str(recoveryLastXTime))));
5091                         InRedo = false;
5092                 }
5093                 else
5094                 {
5095                         /* there are no WAL records following the checkpoint */
5096                         ereport(LOG,
5097                                         (errmsg("redo is not required")));
5098                 }
5099         }
5100
5101         /*
5102          * Re-fetch the last valid or last applied record, so we can identify the
5103          * exact endpoint of what we consider the valid portion of WAL.
5104          */
5105         record = ReadRecord(&LastRec, PANIC);
5106         EndOfLog = EndRecPtr;
5107         XLByteToPrevSeg(EndOfLog, endLogId, endLogSeg);
5108
5109         /*
5110          * Complain if we did not roll forward far enough to render the backup
5111          * dump consistent.
5112          */
5113         if (XLByteLT(EndOfLog, ControlFile->minRecoveryPoint))
5114         {
5115                 if (reachedStopPoint)   /* stopped because of stop request */
5116                         ereport(FATAL,
5117                                         (errmsg("requested recovery stop point is before end time of backup dump")));
5118                 else                                    /* ran off end of WAL */
5119                         ereport(FATAL,
5120                                         (errmsg("WAL ends before end time of backup dump")));
5121         }
5122
5123         /*
5124          * Consider whether we need to assign a new timeline ID.
5125          *
5126          * If we are doing an archive recovery, we always assign a new ID.  This
5127          * handles a couple of issues.  If we stopped short of the end of WAL
5128          * during recovery, then we are clearly generating a new timeline and must
5129          * assign it a unique new ID.  Even if we ran to the end, modifying the
5130          * current last segment is problematic because it may result in trying
5131          * to overwrite an already-archived copy of that segment, and we encourage
5132          * DBAs to make their archive_commands reject that.  We can dodge the
5133          * problem by making the new active segment have a new timeline ID.
5134          *
5135          * In a normal crash recovery, we can just extend the timeline we were in.
5136          */
5137         if (InArchiveRecovery)
5138         {
5139                 ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
5140                 ereport(LOG,
5141                                 (errmsg("selected new timeline ID: %u", ThisTimeLineID)));
5142                 writeTimeLineHistory(ThisTimeLineID, recoveryTargetTLI,
5143                                                          curFileTLI, endLogId, endLogSeg);
5144         }
5145
5146         /* Save the selected TimeLineID in shared memory, too */
5147         XLogCtl->ThisTimeLineID = ThisTimeLineID;
5148
5149         /*
5150          * We are now done reading the old WAL.  Turn off archive fetching if it
5151          * was active, and make a writable copy of the last WAL segment. (Note
5152          * that we also have a copy of the last block of the old WAL in readBuf;
5153          * we will use that below.)
5154          */
5155         if (InArchiveRecovery)
5156                 exitArchiveRecovery(curFileTLI, endLogId, endLogSeg);
5157
5158         /*
5159          * Prepare to write WAL starting at EndOfLog position, and init xlog
5160          * buffer cache using the block containing the last record from the
5161          * previous incarnation.
5162          */
5163         openLogId = endLogId;
5164         openLogSeg = endLogSeg;
5165         openLogFile = XLogFileOpen(openLogId, openLogSeg);
5166         openLogOff = 0;
5167         Insert = &XLogCtl->Insert;
5168         Insert->PrevRecord = LastRec;
5169         XLogCtl->xlblocks[0].xlogid = openLogId;
5170         XLogCtl->xlblocks[0].xrecoff =
5171                 ((EndOfLog.xrecoff - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ;
5172
5173         /*
5174          * Tricky point here: readBuf contains the *last* block that the LastRec
5175          * record spans, not the one it starts in.      The last block is indeed the
5176          * one we want to use.
5177          */
5178         Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - XLOG_BLCKSZ) % XLogSegSize);
5179         memcpy((char *) Insert->currpage, readBuf, XLOG_BLCKSZ);
5180         Insert->currpos = (char *) Insert->currpage +
5181                 (EndOfLog.xrecoff + XLOG_BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
5182
5183         LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
5184
5185         XLogCtl->Write.LogwrtResult = LogwrtResult;
5186         Insert->LogwrtResult = LogwrtResult;
5187         XLogCtl->LogwrtResult = LogwrtResult;
5188
5189         XLogCtl->LogwrtRqst.Write = EndOfLog;
5190         XLogCtl->LogwrtRqst.Flush = EndOfLog;
5191
5192         freespace = INSERT_FREESPACE(Insert);
5193         if (freespace > 0)
5194         {
5195                 /* Make sure rest of page is zero */
5196                 MemSet(Insert->currpos, 0, freespace);
5197                 XLogCtl->Write.curridx = 0;
5198         }
5199         else
5200         {
5201                 /*
5202                  * Whenever Write.LogwrtResult points to exactly the end of a page,
5203                  * Write.curridx must point to the *next* page (see XLogWrite()).
5204                  *
5205                  * Note: it might seem we should do AdvanceXLInsertBuffer() here, but
5206                  * this is sufficient.  The first actual attempt to insert a log
5207                  * record will advance the insert state.
5208                  */
5209                 XLogCtl->Write.curridx = NextBufIdx(0);
5210         }
5211
5212         /* Pre-scan prepared transactions to find out the range of XIDs present */
5213         oldestActiveXID = PrescanPreparedTransactions();
5214
5215         if (InRecovery)
5216         {
5217                 int                     rmid;
5218
5219                 /*
5220                  * Allow resource managers to do any required cleanup.
5221                  */
5222                 for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
5223                 {
5224                         if (RmgrTable[rmid].rm_cleanup != NULL)
5225                                 RmgrTable[rmid].rm_cleanup();
5226                 }
5227
5228                 /*
5229                  * Check to see if the XLOG sequence contained any unresolved
5230                  * references to uninitialized pages.
5231                  */
5232                 XLogCheckInvalidPages();
5233
5234                 /*
5235                  * Reset pgstat data, because it may be invalid after recovery.
5236                  */
5237                 pgstat_reset_all();
5238
5239                 /*
5240                  * Perform a checkpoint to update all our recovery activity to disk.
5241                  *
5242                  * Note that we write a shutdown checkpoint rather than an on-line
5243                  * one. This is not particularly critical, but since we may be
5244                  * assigning a new TLI, using a shutdown checkpoint allows us to have
5245                  * the rule that TLI only changes in shutdown checkpoints, which
5246                  * allows some extra error checking in xlog_redo.
5247                  */
5248                 CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
5249
5250                 /*
5251                  * Close down recovery environment
5252                  */
5253                 XLogCloseRelationCache();
5254         }
5255
5256         /*
5257          * Preallocate additional log files, if wanted.
5258          */
5259         PreallocXlogFiles(EndOfLog);
5260
5261         /*
5262          * Okay, we're officially UP.
5263          */
5264         InRecovery = false;
5265
5266         ControlFile->state = DB_IN_PRODUCTION;
5267         ControlFile->time = time(NULL);
5268         UpdateControlFile();
5269
5270         /* start the archive_timeout timer running */
5271         XLogCtl->Write.lastSegSwitchTime = ControlFile->time;
5272
5273         /* initialize shared-memory copy of latest checkpoint XID/epoch */
5274         XLogCtl->ckptXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
5275         XLogCtl->ckptXid = ControlFile->checkPointCopy.nextXid;
5276
5277         /* also initialize latestCompletedXid, to nextXid - 1 */
5278         ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
5279         TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
5280
5281         /* Start up the commit log and related stuff, too */
5282         StartupCLOG();
5283         StartupSUBTRANS(oldestActiveXID);
5284         StartupMultiXact();
5285
5286         /* Reload shared-memory state for prepared transactions */
5287         RecoverPreparedTransactions();
5288
5289         /* Shut down readFile facility, free space */
5290         if (readFile >= 0)
5291         {
5292                 close(readFile);
5293                 readFile = -1;
5294         }
5295         if (readBuf)
5296         {
5297                 free(readBuf);
5298                 readBuf = NULL;
5299         }
5300         if (readRecordBuf)
5301         {
5302                 free(readRecordBuf);
5303                 readRecordBuf = NULL;
5304                 readRecordBufSize = 0;
5305         }
5306 }
5307
5308 /*
5309  * Subroutine to try to fetch and validate a prior checkpoint record.
5310  *
5311  * whichChkpt identifies the checkpoint (merely for reporting purposes).
5312  * 1 for "primary", 2 for "secondary", 0 for "other" (backup_label)
5313  */
5314 static XLogRecord *
5315 ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt)
5316 {
5317         XLogRecord *record;
5318
5319         if (!XRecOffIsValid(RecPtr.xrecoff))
5320         {
5321                 switch (whichChkpt)
5322                 {
5323                         case 1:
5324                                 ereport(LOG,
5325                                 (errmsg("invalid primary checkpoint link in control file")));
5326                                 break;
5327                         case 2:
5328                                 ereport(LOG,
5329                                                 (errmsg("invalid secondary checkpoint link in control file")));
5330                                 break;
5331                         default:
5332                                 ereport(LOG,
5333                                    (errmsg("invalid checkpoint link in backup_label file")));
5334                                 break;
5335                 }
5336                 return NULL;
5337         }
5338
5339         record = ReadRecord(&RecPtr, LOG);
5340
5341         if (record == NULL)
5342         {
5343                 switch (whichChkpt)
5344                 {
5345                         case 1:
5346                                 ereport(LOG,
5347                                                 (errmsg("invalid primary checkpoint record")));
5348                                 break;
5349                         case 2:
5350                                 ereport(LOG,
5351                                                 (errmsg("invalid secondary checkpoint record")));
5352                                 break;
5353                         default:
5354                                 ereport(LOG,
5355                                                 (errmsg("invalid checkpoint record")));
5356                                 break;
5357                 }
5358                 return NULL;
5359         }
5360         if (record->xl_rmid != RM_XLOG_ID)
5361         {
5362                 switch (whichChkpt)
5363                 {
5364                         case 1:
5365                                 ereport(LOG,
5366                                                 (errmsg("invalid resource manager ID in primary checkpoint record")));
5367                                 break;
5368                         case 2:
5369                                 ereport(LOG,
5370                                                 (errmsg("invalid resource manager ID in secondary checkpoint record")));
5371                                 break;
5372                         default:
5373                                 ereport(LOG,
5374                                 (errmsg("invalid resource manager ID in checkpoint record")));
5375                                 break;
5376                 }
5377                 return NULL;
5378         }
5379         if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
5380                 record->xl_info != XLOG_CHECKPOINT_ONLINE)
5381         {
5382                 switch (whichChkpt)
5383                 {
5384                         case 1:
5385                                 ereport(LOG,
5386                                    (errmsg("invalid xl_info in primary checkpoint record")));
5387                                 break;
5388                         case 2:
5389                                 ereport(LOG,
5390                                  (errmsg("invalid xl_info in secondary checkpoint record")));
5391                                 break;
5392                         default:
5393                                 ereport(LOG,
5394                                                 (errmsg("invalid xl_info in checkpoint record")));
5395                                 break;
5396                 }
5397                 return NULL;
5398         }
5399         if (record->xl_len != sizeof(CheckPoint) ||
5400                 record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
5401         {
5402                 switch (whichChkpt)
5403                 {
5404                         case 1:
5405                                 ereport(LOG,
5406                                         (errmsg("invalid length of primary checkpoint record")));
5407                                 break;
5408                         case 2:
5409                                 ereport(LOG,
5410                                   (errmsg("invalid length of secondary checkpoint record")));
5411                                 break;
5412                         default:
5413                                 ereport(LOG,
5414                                                 (errmsg("invalid length of checkpoint record")));
5415                                 break;
5416                 }
5417                 return NULL;
5418         }
5419         return record;
5420 }
5421
5422 /*
5423  * This must be called during startup of a backend process, except that
5424  * it need not be called in a standalone backend (which does StartupXLOG
5425  * instead).  We need to initialize the local copies of ThisTimeLineID and
5426  * RedoRecPtr.
5427  *
5428  * Note: before Postgres 8.0, we went to some effort to keep the postmaster
5429  * process's copies of ThisTimeLineID and RedoRecPtr valid too.  This was
5430  * unnecessary however, since the postmaster itself never touches XLOG anyway.
5431  */
5432 void
5433 InitXLOGAccess(void)
5434 {
5435         /* ThisTimeLineID doesn't change so we need no lock to copy it */
5436         ThisTimeLineID = XLogCtl->ThisTimeLineID;
5437         /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
5438         (void) GetRedoRecPtr();
5439 }
5440
5441 /*
5442  * Once spawned, a backend may update its local RedoRecPtr from
5443  * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck
5444  * to do so.  This is done in XLogInsert() or GetRedoRecPtr().
5445  */
5446 XLogRecPtr
5447 GetRedoRecPtr(void)
5448 {
5449         /* use volatile pointer to prevent code rearrangement */
5450         volatile XLogCtlData *xlogctl = XLogCtl;
5451
5452         SpinLockAcquire(&xlogctl->info_lck);
5453         Assert(XLByteLE(RedoRecPtr, xlogctl->Insert.RedoRecPtr));
5454         RedoRecPtr = xlogctl->Insert.RedoRecPtr;
5455         SpinLockRelease(&xlogctl->info_lck);
5456
5457         return RedoRecPtr;
5458 }
5459
5460 /*
5461  * GetInsertRecPtr -- Returns the current insert position.
5462  *
5463  * NOTE: The value *actually* returned is the position of the last full
5464  * xlog page. It lags behind the real insert position by at most 1 page.
5465  * For that, we don't need to acquire WALInsertLock which can be quite
5466  * heavily contended, and an approximation is enough for the current
5467  * usage of this function.
5468  */
5469 XLogRecPtr
5470 GetInsertRecPtr(void)
5471 {
5472         /* use volatile pointer to prevent code rearrangement */
5473         volatile XLogCtlData *xlogctl = XLogCtl;
5474         XLogRecPtr recptr;
5475
5476         SpinLockAcquire(&xlogctl->info_lck);
5477         recptr = xlogctl->LogwrtRqst.Write;
5478         SpinLockRelease(&xlogctl->info_lck);
5479
5480         return recptr;
5481 }
5482
5483 /*
5484  * Get the time of the last xlog segment switch
5485  */
5486 time_t
5487 GetLastSegSwitchTime(void)
5488 {
5489         time_t          result;
5490
5491         /* Need WALWriteLock, but shared lock is sufficient */
5492         LWLockAcquire(WALWriteLock, LW_SHARED);
5493         result = XLogCtl->Write.lastSegSwitchTime;
5494         LWLockRelease(WALWriteLock);
5495
5496         return result;
5497 }
5498
5499 /*
5500  * GetNextXidAndEpoch - get the current nextXid value and associated epoch
5501  *
5502  * This is exported for use by code that would like to have 64-bit XIDs.
5503  * We don't really support such things, but all XIDs within the system
5504  * can be presumed "close to" the result, and thus the epoch associated
5505  * with them can be determined.
5506  */
5507 void
5508 GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
5509 {
5510         uint32          ckptXidEpoch;
5511         TransactionId ckptXid;
5512         TransactionId nextXid;
5513
5514         /* Must read checkpoint info first, else have race condition */
5515         {
5516                 /* use volatile pointer to prevent code rearrangement */
5517                 volatile XLogCtlData *xlogctl = XLogCtl;
5518
5519                 SpinLockAcquire(&xlogctl->info_lck);
5520                 ckptXidEpoch = xlogctl->ckptXidEpoch;
5521                 ckptXid = xlogctl->ckptXid;
5522                 SpinLockRelease(&xlogctl->info_lck);
5523         }
5524
5525         /* Now fetch current nextXid */
5526         nextXid = ReadNewTransactionId();
5527
5528         /*
5529          * nextXid is certainly logically later than ckptXid.  So if it's
5530          * numerically less, it must have wrapped into the next epoch.
5531          */
5532         if (nextXid < ckptXid)
5533                 ckptXidEpoch++;
5534
5535         *xid = nextXid;
5536         *epoch = ckptXidEpoch;
5537 }
5538
5539 /*
5540  * This must be called ONCE during postmaster or standalone-backend shutdown
5541  */
5542 void
5543 ShutdownXLOG(int code, Datum arg)
5544 {
5545         ereport(LOG,
5546                         (errmsg("shutting down")));
5547
5548         CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
5549         ShutdownCLOG();
5550         ShutdownSUBTRANS();
5551         ShutdownMultiXact();
5552
5553         ereport(LOG,
5554                         (errmsg("database system is shut down")));
5555 }
5556
5557 /*
5558  * Log start of a checkpoint.
5559  */
5560 static void
5561 LogCheckpointStart(int flags)
5562 {
5563         elog(LOG, "checkpoint starting:%s%s%s%s%s%s",
5564                  (flags & CHECKPOINT_IS_SHUTDOWN) ? " shutdown" : "",
5565                  (flags & CHECKPOINT_IMMEDIATE) ? " immediate" : "",
5566                  (flags & CHECKPOINT_FORCE) ? " force" : "",
5567                  (flags & CHECKPOINT_WAIT) ? " wait" : "",
5568                  (flags & CHECKPOINT_CAUSE_XLOG) ? " xlog" : "",
5569                  (flags & CHECKPOINT_CAUSE_TIME) ? " time" : "");
5570 }
5571
5572 /*
5573  * Log end of a checkpoint.
5574  */
5575 static void
5576 LogCheckpointEnd(void)
5577 {
5578         long    write_secs, sync_secs, total_secs;
5579         int             write_usecs, sync_usecs, total_usecs;
5580
5581         CheckpointStats.ckpt_end_t = GetCurrentTimestamp();
5582
5583         TimestampDifference(CheckpointStats.ckpt_start_t,
5584                                                 CheckpointStats.ckpt_end_t,
5585                                                 &total_secs, &total_usecs);
5586
5587         TimestampDifference(CheckpointStats.ckpt_write_t,
5588                                                 CheckpointStats.ckpt_sync_t,
5589                                                 &write_secs, &write_usecs);
5590
5591         TimestampDifference(CheckpointStats.ckpt_sync_t,
5592                                                 CheckpointStats.ckpt_sync_end_t,
5593                                                 &sync_secs, &sync_usecs);
5594
5595         elog(LOG, "checkpoint complete: wrote %d buffers (%.1f%%); "
5596                  "%d transaction log file(s) added, %d removed, %d recycled; "
5597                  "write=%ld.%03d s, sync=%ld.%03d s, total=%ld.%03d s",
5598                  CheckpointStats.ckpt_bufs_written,
5599                  (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers,
5600                  CheckpointStats.ckpt_segs_added,
5601                  CheckpointStats.ckpt_segs_removed,
5602                  CheckpointStats.ckpt_segs_recycled,
5603                  write_secs, write_usecs/1000,
5604                  sync_secs, sync_usecs/1000,
5605                  total_secs, total_usecs/1000);
5606 }
5607
5608 /*
5609  * Perform a checkpoint --- either during shutdown, or on-the-fly
5610  *
5611  * flags is a bitwise OR of the following:
5612  *      CHECKPOINT_IS_SHUTDOWN: checkpoint is for database shutdown.
5613  *      CHECKPOINT_IMMEDIATE: finish the checkpoint ASAP,
5614  *              ignoring checkpoint_completion_target parameter.
5615  *      CHECKPOINT_FORCE: force a checkpoint even if no XLOG activity has occured
5616  *              since the last one (implied by CHECKPOINT_IS_SHUTDOWN).
5617  *
5618  * Note: flags contains other bits, of interest here only for logging purposes.
5619  * In particular note that this routine is synchronous and does not pay
5620  * attention to CHECKPOINT_WAIT.
5621  */
5622 void
5623 CreateCheckPoint(int flags)
5624 {
5625         bool            shutdown = (flags & CHECKPOINT_IS_SHUTDOWN) != 0;
5626         CheckPoint      checkPoint;
5627         XLogRecPtr      recptr;
5628         XLogCtlInsert *Insert = &XLogCtl->Insert;
5629         XLogRecData rdata;
5630         uint32          freespace;
5631         uint32          _logId;
5632         uint32          _logSeg;
5633         TransactionId *inCommitXids;
5634         int                     nInCommit;
5635
5636         /*
5637          * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
5638          * (This is just pro forma, since in the present system structure there is
5639          * only one process that is allowed to issue checkpoints at any given
5640          * time.)
5641          */
5642         LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
5643
5644         /*
5645          * Prepare to accumulate statistics.
5646          *
5647          * Note: because it is possible for log_checkpoints to change while a
5648          * checkpoint proceeds, we always accumulate stats, even if
5649          * log_checkpoints is currently off.
5650          */
5651         MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
5652         CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
5653
5654         /*
5655          * Use a critical section to force system panic if we have trouble.
5656          */
5657         START_CRIT_SECTION();
5658
5659         if (shutdown)
5660         {
5661                 ControlFile->state = DB_SHUTDOWNING;
5662                 ControlFile->time = time(NULL);
5663                 UpdateControlFile();
5664         }
5665
5666         MemSet(&checkPoint, 0, sizeof(checkPoint));
5667         checkPoint.ThisTimeLineID = ThisTimeLineID;
5668         checkPoint.time = time(NULL);
5669
5670         /*
5671          * We must hold WALInsertLock while examining insert state to determine
5672          * the checkpoint REDO pointer.
5673          */
5674         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
5675
5676         /*
5677          * If this isn't a shutdown or forced checkpoint, and we have not inserted
5678          * any XLOG records since the start of the last checkpoint, skip the
5679          * checkpoint.  The idea here is to avoid inserting duplicate checkpoints
5680          * when the system is idle. That wastes log space, and more importantly it
5681          * exposes us to possible loss of both current and previous checkpoint
5682          * records if the machine crashes just as we're writing the update.
5683          * (Perhaps it'd make even more sense to checkpoint only when the previous
5684          * checkpoint record is in a different xlog page?)
5685          *
5686          * We have to make two tests to determine that nothing has happened since
5687          * the start of the last checkpoint: current insertion point must match
5688          * the end of the last checkpoint record, and its redo pointer must point
5689          * to itself.
5690          */
5691         if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_FORCE)) == 0)
5692         {
5693                 XLogRecPtr      curInsert;
5694
5695                 INSERT_RECPTR(curInsert, Insert, Insert->curridx);
5696                 if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
5697                         curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
5698                         MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
5699                         ControlFile->checkPoint.xlogid ==
5700                         ControlFile->checkPointCopy.redo.xlogid &&
5701                         ControlFile->checkPoint.xrecoff ==
5702                         ControlFile->checkPointCopy.redo.xrecoff)
5703                 {
5704                         LWLockRelease(WALInsertLock);
5705                         LWLockRelease(CheckpointLock);
5706                         END_CRIT_SECTION();
5707                         return;
5708                 }
5709         }
5710
5711         /*
5712          * Compute new REDO record ptr = location of next XLOG record.
5713          *
5714          * NB: this is NOT necessarily where the checkpoint record itself will be,
5715          * since other backends may insert more XLOG records while we're off doing
5716          * the buffer flush work.  Those XLOG records are logically after the
5717          * checkpoint, even though physically before it.  Got that?
5718          */
5719         freespace = INSERT_FREESPACE(Insert);
5720         if (freespace < SizeOfXLogRecord)
5721         {
5722                 (void) AdvanceXLInsertBuffer(false);
5723                 /* OK to ignore update return flag, since we will do flush anyway */
5724                 freespace = INSERT_FREESPACE(Insert);
5725         }
5726         INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
5727
5728         /*
5729          * Here we update the shared RedoRecPtr for future XLogInsert calls; this
5730          * must be done while holding the insert lock AND the info_lck.
5731          *
5732          * Note: if we fail to complete the checkpoint, RedoRecPtr will be left
5733          * pointing past where it really needs to point.  This is okay; the only
5734          * consequence is that XLogInsert might back up whole buffers that it
5735          * didn't really need to.  We can't postpone advancing RedoRecPtr because
5736          * XLogInserts that happen while we are dumping buffers must assume that
5737          * their buffer changes are not included in the checkpoint.
5738          */
5739         {
5740                 /* use volatile pointer to prevent code rearrangement */
5741                 volatile XLogCtlData *xlogctl = XLogCtl;
5742
5743                 SpinLockAcquire(&xlogctl->info_lck);
5744                 RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo;
5745                 SpinLockRelease(&xlogctl->info_lck);
5746         }
5747
5748         /*
5749          * Now we can release WAL insert lock, allowing other xacts to proceed
5750          * while we are flushing disk buffers.
5751          */
5752         LWLockRelease(WALInsertLock);
5753
5754         /*
5755          * If enabled, log checkpoint start.  We postpone this until now
5756          * so as not to log anything if we decided to skip the checkpoint.
5757          */
5758         if (log_checkpoints)
5759                 LogCheckpointStart(flags);
5760
5761         /*
5762          * Before flushing data, we must wait for any transactions that are
5763          * currently in their commit critical sections.  If an xact inserted its
5764          * commit record into XLOG just before the REDO point, then a crash
5765          * restart from the REDO point would not replay that record, which means
5766          * that our flushing had better include the xact's update of pg_clog.  So
5767          * we wait till he's out of his commit critical section before proceeding.
5768          * See notes in RecordTransactionCommit().
5769          *
5770          * Because we've already released WALInsertLock, this test is a bit fuzzy:
5771          * it is possible that we will wait for xacts we didn't really need to
5772          * wait for.  But the delay should be short and it seems better to make
5773          * checkpoint take a bit longer than to hold locks longer than necessary.
5774          * (In fact, the whole reason we have this issue is that xact.c does
5775          * commit record XLOG insertion and clog update as two separate steps
5776          * protected by different locks, but again that seems best on grounds
5777          * of minimizing lock contention.)
5778          *
5779          * A transaction that has not yet set inCommit when we look cannot be
5780          * at risk, since he's not inserted his commit record yet; and one that's
5781          * already cleared it is not at risk either, since he's done fixing clog
5782          * and we will correctly flush the update below.  So we cannot miss any
5783          * xacts we need to wait for.
5784          */
5785         nInCommit = GetTransactionsInCommit(&inCommitXids);
5786         if (nInCommit > 0)
5787         {
5788                 do {
5789                         pg_usleep(10000L);                              /* wait for 10 msec */
5790                 } while (HaveTransactionsInCommit(inCommitXids, nInCommit));
5791         }
5792         pfree(inCommitXids);
5793
5794         /*
5795          * Get the other info we need for the checkpoint record.
5796          */
5797         LWLockAcquire(XidGenLock, LW_SHARED);
5798         checkPoint.nextXid = ShmemVariableCache->nextXid;
5799         LWLockRelease(XidGenLock);
5800
5801         /* Increase XID epoch if we've wrapped around since last checkpoint */
5802         checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
5803         if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
5804                 checkPoint.nextXidEpoch++;
5805
5806         LWLockAcquire(OidGenLock, LW_SHARED);
5807         checkPoint.nextOid = ShmemVariableCache->nextOid;
5808         if (!shutdown)
5809                 checkPoint.nextOid += ShmemVariableCache->oidCount;
5810         LWLockRelease(OidGenLock);
5811
5812         MultiXactGetCheckptMulti(shutdown,
5813                                                          &checkPoint.nextMulti,
5814                                                          &checkPoint.nextMultiOffset);
5815
5816         /*
5817          * Having constructed the checkpoint record, ensure all shmem disk buffers
5818          * and commit-log buffers are flushed to disk.
5819          *
5820          * This I/O could fail for various reasons.  If so, we will fail to
5821          * complete the checkpoint, but there is no reason to force a system
5822          * panic. Accordingly, exit critical section while doing it.
5823          */
5824         END_CRIT_SECTION();
5825
5826         CheckPointGuts(checkPoint.redo, flags);
5827
5828         START_CRIT_SECTION();
5829
5830         /*
5831          * Now insert the checkpoint record into XLOG.
5832          */
5833         rdata.data = (char *) (&checkPoint);
5834         rdata.len = sizeof(checkPoint);
5835         rdata.buffer = InvalidBuffer;
5836         rdata.next = NULL;
5837
5838         recptr = XLogInsert(RM_XLOG_ID,
5839                                                 shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
5840                                                 XLOG_CHECKPOINT_ONLINE,
5841                                                 &rdata);
5842
5843         XLogFlush(recptr);
5844
5845         /*
5846          * We now have ProcLastRecPtr = start of actual checkpoint record, recptr
5847          * = end of actual checkpoint record.
5848          */
5849         if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
5850                 ereport(PANIC,
5851                                 (errmsg("concurrent transaction log activity while database system is shutting down")));
5852
5853         /*
5854          * Select point at which we can truncate the log, which we base on the
5855          * prior checkpoint's earliest info.
5856          */
5857         XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
5858
5859         /*
5860          * Update the control file.
5861          */
5862         LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
5863         if (shutdown)
5864                 ControlFile->state = DB_SHUTDOWNED;
5865         ControlFile->prevCheckPoint = ControlFile->checkPoint;
5866         ControlFile->checkPoint = ProcLastRecPtr;
5867         ControlFile->checkPointCopy = checkPoint;
5868         ControlFile->time = time(NULL);
5869         UpdateControlFile();
5870         LWLockRelease(ControlFileLock);
5871
5872         /* Update shared-memory copy of checkpoint XID/epoch */
5873         {
5874                 /* use volatile pointer to prevent code rearrangement */
5875                 volatile XLogCtlData *xlogctl = XLogCtl;
5876
5877                 SpinLockAcquire(&xlogctl->info_lck);
5878                 xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
5879                 xlogctl->ckptXid = checkPoint.nextXid;
5880                 SpinLockRelease(&xlogctl->info_lck);
5881         }
5882
5883         /*
5884          * We are now done with critical updates; no need for system panic if we
5885          * have trouble while fooling with old log segments.
5886          */
5887         END_CRIT_SECTION();
5888
5889         /*
5890          * Delete old log files (those no longer needed even for previous
5891          * checkpoint).
5892          */
5893         if (_logId || _logSeg)
5894         {
5895                 PrevLogSeg(_logId, _logSeg);
5896                 RemoveOldXlogFiles(_logId, _logSeg, recptr);
5897         }
5898
5899         /*
5900          * Make more log segments if needed.  (Do this after recycling old log
5901          * segments, since that may supply some of the needed files.)
5902          */
5903         if (!shutdown)
5904                 PreallocXlogFiles(recptr);
5905
5906         /*
5907          * Truncate pg_subtrans if possible.  We can throw away all data before
5908          * the oldest XMIN of any running transaction.  No future transaction will
5909          * attempt to reference any pg_subtrans entry older than that (see Asserts
5910          * in subtrans.c).      During recovery, though, we mustn't do this because
5911          * StartupSUBTRANS hasn't been called yet.
5912          */
5913         if (!InRecovery)
5914                 TruncateSUBTRANS(GetOldestXmin(true, false));
5915
5916         /* All real work is done, but log before releasing lock. */
5917         if (log_checkpoints)
5918                 LogCheckpointEnd();
5919
5920         LWLockRelease(CheckpointLock);
5921 }
5922
5923 /*
5924  * Flush all data in shared memory to disk, and fsync
5925  *
5926  * This is the common code shared between regular checkpoints and
5927  * recovery restartpoints.
5928  */
5929 static void
5930 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
5931 {
5932         CheckPointCLOG();
5933         CheckPointSUBTRANS();
5934         CheckPointMultiXact();
5935         CheckPointBuffers(flags);               /* performs all required fsyncs */
5936         /* We deliberately delay 2PC checkpointing as long as possible */
5937         CheckPointTwoPhase(checkPointRedo);
5938 }
5939
5940 /*
5941  * Set a recovery restart point if appropriate
5942  *
5943  * This is similar to CreateCheckPoint, but is used during WAL recovery
5944  * to establish a point from which recovery can roll forward without
5945  * replaying the entire recovery log.  This function is called each time
5946  * a checkpoint record is read from XLOG; it must determine whether a
5947  * restartpoint is needed or not.
5948  */
5949 static void
5950 RecoveryRestartPoint(const CheckPoint *checkPoint)
5951 {
5952         int                     elapsed_secs;
5953         int                     rmid;
5954
5955         /*
5956          * Do nothing if the elapsed time since the last restartpoint is less than
5957          * half of checkpoint_timeout.  (We use a value less than
5958          * checkpoint_timeout so that variations in the timing of checkpoints on
5959          * the master, or speed of transmission of WAL segments to a slave, won't
5960          * make the slave skip a restartpoint once it's synced with the master.)
5961          * Checking true elapsed time keeps us from doing restartpoints too often
5962          * while rapidly scanning large amounts of WAL.
5963          */
5964         elapsed_secs = time(NULL) - ControlFile->time;
5965         if (elapsed_secs < CheckPointTimeout / 2)
5966                 return;
5967
5968         /*
5969          * Is it safe to checkpoint?  We must ask each of the resource managers
5970          * whether they have any partial state information that might prevent a
5971          * correct restart from this point.  If so, we skip this opportunity, but
5972          * return at the next checkpoint record for another try.
5973          */
5974         for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
5975         {
5976                 if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
5977                         if (!(RmgrTable[rmid].rm_safe_restartpoint()))
5978                         {
5979                                 elog(DEBUG2, "RM %d not safe to record restart point at %X/%X",
5980                                          rmid,
5981                                          checkPoint->redo.xlogid,
5982                                          checkPoint->redo.xrecoff);
5983                                 return;
5984                         }
5985         }
5986
5987         /*
5988          * OK, force data out to disk
5989          */
5990         CheckPointGuts(checkPoint->redo, CHECKPOINT_IMMEDIATE);
5991
5992         /*
5993          * Update pg_control so that any subsequent crash will restart from this
5994          * checkpoint.  Note: ReadRecPtr gives the XLOG address of the checkpoint
5995          * record itself.
5996          */
5997         ControlFile->prevCheckPoint = ControlFile->checkPoint;
5998         ControlFile->checkPoint = ReadRecPtr;
5999         ControlFile->checkPointCopy = *checkPoint;
6000         ControlFile->time = time(NULL);
6001         UpdateControlFile();
6002
6003         ereport((recoveryLogRestartpoints ? LOG : DEBUG2),
6004                         (errmsg("recovery restart point at %X/%X",
6005                                         checkPoint->redo.xlogid, checkPoint->redo.xrecoff)));
6006         if (recoveryLastXTime)
6007                 ereport((recoveryLogRestartpoints ? LOG : DEBUG2),
6008                                 (errmsg("last completed transaction was at log time %s",
6009                                                 timestamptz_to_str(recoveryLastXTime))));
6010 }
6011
6012 /*
6013  * Write a NEXTOID log record
6014  */
6015 void
6016 XLogPutNextOid(Oid nextOid)
6017 {
6018         XLogRecData rdata;
6019
6020         rdata.data = (char *) (&nextOid);
6021         rdata.len = sizeof(Oid);
6022         rdata.buffer = InvalidBuffer;
6023         rdata.next = NULL;
6024         (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
6025
6026         /*
6027          * We need not flush the NEXTOID record immediately, because any of the
6028          * just-allocated OIDs could only reach disk as part of a tuple insert or
6029          * update that would have its own XLOG record that must follow the NEXTOID
6030          * record.      Therefore, the standard buffer LSN interlock applied to those
6031          * records will ensure no such OID reaches disk before the NEXTOID record
6032          * does.
6033          *
6034          * Note, however, that the above statement only covers state "within" the
6035          * database.  When we use a generated OID as a file or directory name,
6036          * we are in a sense violating the basic WAL rule, because that filesystem
6037          * change may reach disk before the NEXTOID WAL record does.  The impact
6038          * of this is that if a database crash occurs immediately afterward,
6039          * we might after restart re-generate the same OID and find that it
6040          * conflicts with the leftover file or directory.  But since for safety's
6041          * sake we always loop until finding a nonconflicting filename, this poses
6042          * no real problem in practice. See pgsql-hackers discussion 27-Sep-2006.
6043          */
6044 }
6045
6046 /*
6047  * Write an XLOG SWITCH record.
6048  *
6049  * Here we just blindly issue an XLogInsert request for the record.
6050  * All the magic happens inside XLogInsert.
6051  *
6052  * The return value is either the end+1 address of the switch record,
6053  * or the end+1 address of the prior segment if we did not need to
6054  * write a switch record because we are already at segment start.
6055  */
6056 XLogRecPtr
6057 RequestXLogSwitch(void)
6058 {
6059         XLogRecPtr      RecPtr;
6060         XLogRecData rdata;
6061
6062         /* XLOG SWITCH, alone among xlog record types, has no data */
6063         rdata.buffer = InvalidBuffer;
6064         rdata.data = NULL;
6065         rdata.len = 0;
6066         rdata.next = NULL;
6067
6068         RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
6069
6070         return RecPtr;
6071 }
6072
6073 /*
6074  * XLOG resource manager's routines
6075  */
6076 void
6077 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
6078 {
6079         uint8           info = record->xl_info & ~XLR_INFO_MASK;
6080
6081         if (info == XLOG_NEXTOID)
6082         {
6083                 Oid                     nextOid;
6084
6085                 memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
6086                 if (ShmemVariableCache->nextOid < nextOid)
6087                 {
6088                         ShmemVariableCache->nextOid = nextOid;
6089                         ShmemVariableCache->oidCount = 0;
6090                 }
6091         }
6092         else if (info == XLOG_CHECKPOINT_SHUTDOWN)
6093         {
6094                 CheckPoint      checkPoint;
6095
6096                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
6097                 /* In a SHUTDOWN checkpoint, believe the counters exactly */
6098                 ShmemVariableCache->nextXid = checkPoint.nextXid;
6099                 ShmemVariableCache->nextOid = checkPoint.nextOid;
6100                 ShmemVariableCache->oidCount = 0;
6101                 MultiXactSetNextMXact(checkPoint.nextMulti,
6102                                                           checkPoint.nextMultiOffset);
6103
6104                 /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
6105                 ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
6106                 ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
6107
6108                 /*
6109                  * TLI may change in a shutdown checkpoint, but it shouldn't decrease
6110                  */
6111                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
6112                 {
6113                         if (checkPoint.ThisTimeLineID < ThisTimeLineID ||
6114                                 !list_member_int(expectedTLIs,
6115                                                                  (int) checkPoint.ThisTimeLineID))
6116                                 ereport(PANIC,
6117                                                 (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
6118                                                                 checkPoint.ThisTimeLineID, ThisTimeLineID)));
6119                         /* Following WAL records should be run with new TLI */
6120                         ThisTimeLineID = checkPoint.ThisTimeLineID;
6121                 }
6122
6123                 RecoveryRestartPoint(&checkPoint);
6124         }
6125         else if (info == XLOG_CHECKPOINT_ONLINE)
6126         {
6127                 CheckPoint      checkPoint;
6128
6129                 memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
6130                 /* In an ONLINE checkpoint, treat the counters like NEXTOID */
6131                 if (TransactionIdPrecedes(ShmemVariableCache->nextXid,
6132                                                                   checkPoint.nextXid))
6133                         ShmemVariableCache->nextXid = checkPoint.nextXid;
6134                 if (ShmemVariableCache->nextOid < checkPoint.nextOid)
6135                 {
6136                         ShmemVariableCache->nextOid = checkPoint.nextOid;
6137                         ShmemVariableCache->oidCount = 0;
6138                 }
6139                 MultiXactAdvanceNextMXact(checkPoint.nextMulti,
6140                                                                   checkPoint.nextMultiOffset);
6141
6142                 /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
6143                 ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
6144                 ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
6145
6146                 /* TLI should not change in an on-line checkpoint */
6147                 if (checkPoint.ThisTimeLineID != ThisTimeLineID)
6148                         ereport(PANIC,
6149                                         (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
6150                                                         checkPoint.ThisTimeLineID, ThisTimeLineID)));
6151
6152                 RecoveryRestartPoint(&checkPoint);
6153         }
6154         else if (info == XLOG_NOOP)
6155         {
6156                 /* nothing to do here */
6157         }
6158         else if (info == XLOG_SWITCH)
6159         {
6160                 /* nothing to do here */
6161         }
6162 }
6163
6164 void
6165 xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
6166 {
6167         uint8           info = xl_info & ~XLR_INFO_MASK;
6168
6169         if (info == XLOG_CHECKPOINT_SHUTDOWN ||
6170                 info == XLOG_CHECKPOINT_ONLINE)
6171         {
6172                 CheckPoint *checkpoint = (CheckPoint *) rec;
6173
6174                 appendStringInfo(buf, "checkpoint: redo %X/%X; "
6175                                                  "tli %u; xid %u/%u; oid %u; multi %u; offset %u; %s",
6176                                                  checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
6177                                                  checkpoint->ThisTimeLineID,
6178                                                  checkpoint->nextXidEpoch, checkpoint->nextXid,
6179                                                  checkpoint->nextOid,
6180                                                  checkpoint->nextMulti,
6181                                                  checkpoint->nextMultiOffset,
6182                                  (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
6183         }
6184         else if (info == XLOG_NOOP)
6185         {
6186                 appendStringInfo(buf, "xlog no-op");
6187         }
6188         else if (info == XLOG_NEXTOID)
6189         {
6190                 Oid                     nextOid;
6191
6192                 memcpy(&nextOid, rec, sizeof(Oid));
6193                 appendStringInfo(buf, "nextOid: %u", nextOid);
6194         }
6195         else if (info == XLOG_SWITCH)
6196         {
6197                 appendStringInfo(buf, "xlog switch");
6198         }
6199         else
6200                 appendStringInfo(buf, "UNKNOWN");
6201 }
6202
6203 #ifdef WAL_DEBUG
6204
6205 static void
6206 xlog_outrec(StringInfo buf, XLogRecord *record)
6207 {
6208         int                     i;
6209
6210         appendStringInfo(buf, "prev %X/%X; xid %u",
6211                                          record->xl_prev.xlogid, record->xl_prev.xrecoff,
6212                                          record->xl_xid);
6213
6214         for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
6215         {
6216                 if (record->xl_info & XLR_SET_BKP_BLOCK(i))
6217                         appendStringInfo(buf, "; bkpb%d", i + 1);
6218         }
6219
6220         appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
6221 }
6222 #endif   /* WAL_DEBUG */
6223
6224
6225 /*
6226  * GUC support
6227  */
6228 const char *
6229 assign_xlog_sync_method(const char *method, bool doit, GucSource source)
6230 {
6231         int                     new_sync_method;
6232         int                     new_sync_bit;
6233
6234         if (pg_strcasecmp(method, "fsync") == 0)
6235         {
6236                 new_sync_method = SYNC_METHOD_FSYNC;
6237                 new_sync_bit = 0;
6238         }
6239 #ifdef HAVE_FSYNC_WRITETHROUGH
6240         else if (pg_strcasecmp(method, "fsync_writethrough") == 0)
6241         {
6242                 new_sync_method = SYNC_METHOD_FSYNC_WRITETHROUGH;
6243                 new_sync_bit = 0;
6244         }
6245 #endif
6246 #ifdef HAVE_FDATASYNC
6247         else if (pg_strcasecmp(method, "fdatasync") == 0)
6248         {
6249                 new_sync_method = SYNC_METHOD_FDATASYNC;
6250                 new_sync_bit = 0;
6251         }
6252 #endif
6253 #ifdef OPEN_SYNC_FLAG
6254         else if (pg_strcasecmp(method, "open_sync") == 0)
6255         {
6256                 new_sync_method = SYNC_METHOD_OPEN;
6257                 new_sync_bit = OPEN_SYNC_FLAG;
6258         }
6259 #endif
6260 #ifdef OPEN_DATASYNC_FLAG
6261         else if (pg_strcasecmp(method, "open_datasync") == 0)
6262         {
6263                 new_sync_method = SYNC_METHOD_OPEN;
6264                 new_sync_bit = OPEN_DATASYNC_FLAG;
6265         }
6266 #endif
6267         else
6268                 return NULL;
6269
6270         if (!doit)
6271                 return method;
6272
6273         if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
6274         {
6275                 /*
6276                  * To ensure that no blocks escape unsynced, force an fsync on the
6277                  * currently open log segment (if any).  Also, if the open flag is
6278                  * changing, close the log file so it will be reopened (with new flag
6279                  * bit) at next use.
6280                  */
6281                 if (openLogFile >= 0)
6282                 {
6283                         if (pg_fsync(openLogFile) != 0)
6284                                 ereport(PANIC,
6285                                                 (errcode_for_file_access(),
6286                                                  errmsg("could not fsync log file %u, segment %u: %m",
6287                                                                 openLogId, openLogSeg)));
6288                         if (open_sync_bit != new_sync_bit)
6289                                 XLogFileClose();
6290                 }
6291                 sync_method = new_sync_method;
6292                 open_sync_bit = new_sync_bit;
6293         }
6294
6295         return method;
6296 }
6297
6298
6299 /*
6300  * Issue appropriate kind of fsync (if any) on the current XLOG output file
6301  */
6302 static void
6303 issue_xlog_fsync(void)
6304 {
6305         switch (sync_method)
6306         {
6307                 case SYNC_METHOD_FSYNC:
6308                         if (pg_fsync_no_writethrough(openLogFile) != 0)
6309                                 ereport(PANIC,
6310                                                 (errcode_for_file_access(),
6311                                                  errmsg("could not fsync log file %u, segment %u: %m",
6312                                                                 openLogId, openLogSeg)));
6313                         break;
6314 #ifdef HAVE_FSYNC_WRITETHROUGH
6315                 case SYNC_METHOD_FSYNC_WRITETHROUGH:
6316                         if (pg_fsync_writethrough(openLogFile) != 0)
6317                                 ereport(PANIC,
6318                                                 (errcode_for_file_access(),
6319                                                  errmsg("could not fsync write-through log file %u, segment %u: %m",
6320                                                                 openLogId, openLogSeg)));
6321                         break;
6322 #endif
6323 #ifdef HAVE_FDATASYNC
6324                 case SYNC_METHOD_FDATASYNC:
6325                         if (pg_fdatasync(openLogFile) != 0)
6326                                 ereport(PANIC,
6327                                                 (errcode_for_file_access(),
6328                                         errmsg("could not fdatasync log file %u, segment %u: %m",
6329                                                    openLogId, openLogSeg)));
6330                         break;
6331 #endif
6332                 case SYNC_METHOD_OPEN:
6333                         /* write synced it already */
6334                         break;
6335                 default:
6336                         elog(PANIC, "unrecognized wal_sync_method: %d", sync_method);
6337                         break;
6338         }
6339 }
6340
6341
6342 /*
6343  * pg_start_backup: set up for taking an on-line backup dump
6344  *
6345  * Essentially what this does is to create a backup label file in $PGDATA,
6346  * where it will be archived as part of the backup dump.  The label file
6347  * contains the user-supplied label string (typically this would be used
6348  * to tell where the backup dump will be stored) and the starting time and
6349  * starting WAL location for the dump.
6350  */
6351 Datum
6352 pg_start_backup(PG_FUNCTION_ARGS)
6353 {
6354         text       *backupid = PG_GETARG_TEXT_P(0);
6355         text       *result;
6356         char       *backupidstr;
6357         XLogRecPtr      checkpointloc;
6358         XLogRecPtr      startpoint;
6359         pg_time_t       stamp_time;
6360         char            strfbuf[128];
6361         char            xlogfilename[MAXFNAMELEN];
6362         uint32          _logId;
6363         uint32          _logSeg;
6364         struct stat stat_buf;
6365         FILE       *fp;
6366
6367         if (!superuser())
6368                 ereport(ERROR,
6369                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6370                                  errmsg("must be superuser to run a backup")));
6371
6372         if (!XLogArchivingActive())
6373                 ereport(ERROR,
6374                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6375                                  errmsg("WAL archiving is not active"),
6376                                  errhint("archive_mode must be enabled at server start.")));
6377
6378         if (!XLogArchiveCommandSet())
6379                 ereport(ERROR,
6380                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6381                                  errmsg("WAL archiving is not active"),
6382                                  errhint("archive_command must be defined before "
6383                                                  "online backups can be made safely.")));
6384
6385         backupidstr = DatumGetCString(DirectFunctionCall1(textout,
6386                                                                                                  PointerGetDatum(backupid)));
6387
6388         /*
6389          * Mark backup active in shared memory.  We must do full-page WAL writes
6390          * during an on-line backup even if not doing so at other times, because
6391          * it's quite possible for the backup dump to obtain a "torn" (partially
6392          * written) copy of a database page if it reads the page concurrently with
6393          * our write to the same page.  This can be fixed as long as the first
6394          * write to the page in the WAL sequence is a full-page write. Hence, we
6395          * turn on forcePageWrites and then force a CHECKPOINT, to ensure there
6396          * are no dirty pages in shared memory that might get dumped while the
6397          * backup is in progress without having a corresponding WAL record.  (Once
6398          * the backup is complete, we need not force full-page writes anymore,
6399          * since we expect that any pages not modified during the backup interval
6400          * must have been correctly captured by the backup.)
6401          *
6402          * We must hold WALInsertLock to change the value of forcePageWrites, to
6403          * ensure adequate interlocking against XLogInsert().
6404          */
6405         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6406         if (XLogCtl->Insert.forcePageWrites)
6407         {
6408                 LWLockRelease(WALInsertLock);
6409                 ereport(ERROR,
6410                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6411                                  errmsg("a backup is already in progress"),
6412                                  errhint("Run pg_stop_backup() and try again.")));
6413         }
6414         XLogCtl->Insert.forcePageWrites = true;
6415         LWLockRelease(WALInsertLock);
6416
6417         /* Use a TRY block to ensure we release forcePageWrites if fail below */
6418         PG_TRY();
6419         {
6420                 /*
6421                  * Force a CHECKPOINT.  Aside from being necessary to prevent torn
6422                  * page problems, this guarantees that two successive backup runs will
6423                  * have different checkpoint positions and hence different history
6424                  * file names, even if nothing happened in between.
6425                  *
6426                  * We don't use CHECKPOINT_IMMEDIATE, hence this can take awhile.
6427                  */
6428                 RequestCheckpoint(CHECKPOINT_FORCE | CHECKPOINT_WAIT);
6429
6430                 /*
6431                  * Now we need to fetch the checkpoint record location, and also its
6432                  * REDO pointer.  The oldest point in WAL that would be needed to
6433                  * restore starting from the checkpoint is precisely the REDO pointer.
6434                  */
6435                 LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
6436                 checkpointloc = ControlFile->checkPoint;
6437                 startpoint = ControlFile->checkPointCopy.redo;
6438                 LWLockRelease(ControlFileLock);
6439
6440                 XLByteToSeg(startpoint, _logId, _logSeg);
6441                 XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
6442
6443                 /* Use the log timezone here, not the session timezone */
6444                 stamp_time = (pg_time_t) time(NULL);
6445                 pg_strftime(strfbuf, sizeof(strfbuf),
6446                                         "%Y-%m-%d %H:%M:%S %Z",
6447                                         pg_localtime(&stamp_time, log_timezone));
6448
6449                 /*
6450                  * Check for existing backup label --- implies a backup is already
6451                  * running.  (XXX given that we checked forcePageWrites above, maybe
6452                  * it would be OK to just unlink any such label file?)
6453                  */
6454                 if (stat(BACKUP_LABEL_FILE, &stat_buf) != 0)
6455                 {
6456                         if (errno != ENOENT)
6457                                 ereport(ERROR,
6458                                                 (errcode_for_file_access(),
6459                                                  errmsg("could not stat file \"%s\": %m",
6460                                                                 BACKUP_LABEL_FILE)));
6461                 }
6462                 else
6463                         ereport(ERROR,
6464                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6465                                          errmsg("a backup is already in progress"),
6466                                          errhint("If you're sure there is no backup in progress, remove file \"%s\" and try again.",
6467                                                          BACKUP_LABEL_FILE)));
6468
6469                 /*
6470                  * Okay, write the file
6471                  */
6472                 fp = AllocateFile(BACKUP_LABEL_FILE, "w");
6473                 if (!fp)
6474                         ereport(ERROR,
6475                                         (errcode_for_file_access(),
6476                                          errmsg("could not create file \"%s\": %m",
6477                                                         BACKUP_LABEL_FILE)));
6478                 fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
6479                                 startpoint.xlogid, startpoint.xrecoff, xlogfilename);
6480                 fprintf(fp, "CHECKPOINT LOCATION: %X/%X\n",
6481                                 checkpointloc.xlogid, checkpointloc.xrecoff);
6482                 fprintf(fp, "START TIME: %s\n", strfbuf);
6483                 fprintf(fp, "LABEL: %s\n", backupidstr);
6484                 if (fflush(fp) || ferror(fp) || FreeFile(fp))
6485                         ereport(ERROR,
6486                                         (errcode_for_file_access(),
6487                                          errmsg("could not write file \"%s\": %m",
6488                                                         BACKUP_LABEL_FILE)));
6489         }
6490         PG_CATCH();
6491         {
6492                 /* Turn off forcePageWrites on failure */
6493                 LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6494                 XLogCtl->Insert.forcePageWrites = false;
6495                 LWLockRelease(WALInsertLock);
6496
6497                 PG_RE_THROW();
6498         }
6499         PG_END_TRY();
6500
6501         /*
6502          * We're done.  As a convenience, return the starting WAL location.
6503          */
6504         snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
6505                          startpoint.xlogid, startpoint.xrecoff);
6506         result = DatumGetTextP(DirectFunctionCall1(textin,
6507                                                                                          CStringGetDatum(xlogfilename)));
6508         PG_RETURN_TEXT_P(result);
6509 }
6510
6511 /*
6512  * pg_stop_backup: finish taking an on-line backup dump
6513  *
6514  * We remove the backup label file created by pg_start_backup, and instead
6515  * create a backup history file in pg_xlog (whence it will immediately be
6516  * archived).  The backup history file contains the same info found in
6517  * the label file, plus the backup-end time and WAL location.
6518  */
6519 Datum
6520 pg_stop_backup(PG_FUNCTION_ARGS)
6521 {
6522         text       *result;
6523         XLogRecPtr      startpoint;
6524         XLogRecPtr      stoppoint;
6525         pg_time_t       stamp_time;
6526         char            strfbuf[128];
6527         char            histfilepath[MAXPGPATH];
6528         char            startxlogfilename[MAXFNAMELEN];
6529         char            stopxlogfilename[MAXFNAMELEN];
6530         uint32          _logId;
6531         uint32          _logSeg;
6532         FILE       *lfp;
6533         FILE       *fp;
6534         char            ch;
6535         int                     ich;
6536
6537         if (!superuser())
6538                 ereport(ERROR,
6539                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6540                                  (errmsg("must be superuser to run a backup"))));
6541
6542         /*
6543          * OK to clear forcePageWrites
6544          */
6545         LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
6546         XLogCtl->Insert.forcePageWrites = false;
6547         LWLockRelease(WALInsertLock);
6548
6549         /*
6550          * Force a switch to a new xlog segment file, so that the backup is valid
6551          * as soon as archiver moves out the current segment file. We'll report
6552          * the end address of the XLOG SWITCH record as the backup stopping point.
6553          */
6554         stoppoint = RequestXLogSwitch();
6555
6556         XLByteToSeg(stoppoint, _logId, _logSeg);
6557         XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
6558
6559         /* Use the log timezone here, not the session timezone */
6560         stamp_time = (pg_time_t) time(NULL);
6561         pg_strftime(strfbuf, sizeof(strfbuf),
6562                                 "%Y-%m-%d %H:%M:%S %Z",
6563                                 pg_localtime(&stamp_time, log_timezone));
6564
6565         /*
6566          * Open the existing label file
6567          */
6568         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6569         if (!lfp)
6570         {
6571                 if (errno != ENOENT)
6572                         ereport(ERROR,
6573                                         (errcode_for_file_access(),
6574                                          errmsg("could not read file \"%s\": %m",
6575                                                         BACKUP_LABEL_FILE)));
6576                 ereport(ERROR,
6577                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6578                                  errmsg("a backup is not in progress")));
6579         }
6580
6581         /*
6582          * Read and parse the START WAL LOCATION line (this code is pretty crude,
6583          * but we are not expecting any variability in the file format).
6584          */
6585         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %24s)%c",
6586                            &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
6587                            &ch) != 4 || ch != '\n')
6588                 ereport(ERROR,
6589                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6590                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6591
6592         /*
6593          * Write the backup history file
6594          */
6595         XLByteToSeg(startpoint, _logId, _logSeg);
6596         BackupHistoryFilePath(histfilepath, ThisTimeLineID, _logId, _logSeg,
6597                                                   startpoint.xrecoff % XLogSegSize);
6598         fp = AllocateFile(histfilepath, "w");
6599         if (!fp)
6600                 ereport(ERROR,
6601                                 (errcode_for_file_access(),
6602                                  errmsg("could not create file \"%s\": %m",
6603                                                 histfilepath)));
6604         fprintf(fp, "START WAL LOCATION: %X/%X (file %s)\n",
6605                         startpoint.xlogid, startpoint.xrecoff, startxlogfilename);
6606         fprintf(fp, "STOP WAL LOCATION: %X/%X (file %s)\n",
6607                         stoppoint.xlogid, stoppoint.xrecoff, stopxlogfilename);
6608         /* transfer remaining lines from label to history file */
6609         while ((ich = fgetc(lfp)) != EOF)
6610                 fputc(ich, fp);
6611         fprintf(fp, "STOP TIME: %s\n", strfbuf);
6612         if (fflush(fp) || ferror(fp) || FreeFile(fp))
6613                 ereport(ERROR,
6614                                 (errcode_for_file_access(),
6615                                  errmsg("could not write file \"%s\": %m",
6616                                                 histfilepath)));
6617
6618         /*
6619          * Close and remove the backup label file
6620          */
6621         if (ferror(lfp) || FreeFile(lfp))
6622                 ereport(ERROR,
6623                                 (errcode_for_file_access(),
6624                                  errmsg("could not read file \"%s\": %m",
6625                                                 BACKUP_LABEL_FILE)));
6626         if (unlink(BACKUP_LABEL_FILE) != 0)
6627                 ereport(ERROR,
6628                                 (errcode_for_file_access(),
6629                                  errmsg("could not remove file \"%s\": %m",
6630                                                 BACKUP_LABEL_FILE)));
6631
6632         /*
6633          * Clean out any no-longer-needed history files.  As a side effect, this
6634          * will post a .ready file for the newly created history file, notifying
6635          * the archiver that history file may be archived immediately.
6636          */
6637         CleanupBackupHistory();
6638
6639         /*
6640          * We're done.  As a convenience, return the ending WAL location.
6641          */
6642         snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
6643                          stoppoint.xlogid, stoppoint.xrecoff);
6644         result = DatumGetTextP(DirectFunctionCall1(textin,
6645                                                                                  CStringGetDatum(stopxlogfilename)));
6646         PG_RETURN_TEXT_P(result);
6647 }
6648
6649 /*
6650  * pg_switch_xlog: switch to next xlog file
6651  */
6652 Datum
6653 pg_switch_xlog(PG_FUNCTION_ARGS)
6654 {
6655         text       *result;
6656         XLogRecPtr      switchpoint;
6657         char            location[MAXFNAMELEN];
6658
6659         if (!superuser())
6660                 ereport(ERROR,
6661                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
6662                                  (errmsg("must be superuser to switch transaction log files"))));
6663
6664         switchpoint = RequestXLogSwitch();
6665
6666         /*
6667          * As a convenience, return the WAL location of the switch record
6668          */
6669         snprintf(location, sizeof(location), "%X/%X",
6670                          switchpoint.xlogid, switchpoint.xrecoff);
6671         result = DatumGetTextP(DirectFunctionCall1(textin,
6672                                                                                            CStringGetDatum(location)));
6673         PG_RETURN_TEXT_P(result);
6674 }
6675
6676 /*
6677  * Report the current WAL write location (same format as pg_start_backup etc)
6678  *
6679  * This is useful for determining how much of WAL is visible to an external
6680  * archiving process.  Note that the data before this point is written out
6681  * to the kernel, but is not necessarily synced to disk.
6682  */
6683 Datum
6684 pg_current_xlog_location(PG_FUNCTION_ARGS)
6685 {
6686         text       *result;
6687         char            location[MAXFNAMELEN];
6688
6689         /* Make sure we have an up-to-date local LogwrtResult */
6690         {
6691                 /* use volatile pointer to prevent code rearrangement */
6692                 volatile XLogCtlData *xlogctl = XLogCtl;
6693
6694                 SpinLockAcquire(&xlogctl->info_lck);
6695                 LogwrtResult = xlogctl->LogwrtResult;
6696                 SpinLockRelease(&xlogctl->info_lck);
6697         }
6698
6699         snprintf(location, sizeof(location), "%X/%X",
6700                          LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff);
6701
6702         result = DatumGetTextP(DirectFunctionCall1(textin,
6703                                                                                            CStringGetDatum(location)));
6704         PG_RETURN_TEXT_P(result);
6705 }
6706
6707 /*
6708  * Report the current WAL insert location (same format as pg_start_backup etc)
6709  *
6710  * This function is mostly for debugging purposes.
6711  */
6712 Datum
6713 pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
6714 {
6715         text       *result;
6716         XLogCtlInsert *Insert = &XLogCtl->Insert;
6717         XLogRecPtr      current_recptr;
6718         char            location[MAXFNAMELEN];
6719
6720         /*
6721          * Get the current end-of-WAL position ... shared lock is sufficient
6722          */
6723         LWLockAcquire(WALInsertLock, LW_SHARED);
6724         INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
6725         LWLockRelease(WALInsertLock);
6726
6727         snprintf(location, sizeof(location), "%X/%X",
6728                          current_recptr.xlogid, current_recptr.xrecoff);
6729
6730         result = DatumGetTextP(DirectFunctionCall1(textin,
6731                                                                                            CStringGetDatum(location)));
6732         PG_RETURN_TEXT_P(result);
6733 }
6734
6735 /*
6736  * Compute an xlog file name and decimal byte offset given a WAL location,
6737  * such as is returned by pg_stop_backup() or pg_xlog_switch().
6738  *
6739  * Note that a location exactly at a segment boundary is taken to be in
6740  * the previous segment.  This is usually the right thing, since the
6741  * expected usage is to determine which xlog file(s) are ready to archive.
6742  */
6743 Datum
6744 pg_xlogfile_name_offset(PG_FUNCTION_ARGS)
6745 {
6746         text       *location = PG_GETARG_TEXT_P(0);
6747         char       *locationstr;
6748         unsigned int uxlogid;
6749         unsigned int uxrecoff;
6750         uint32          xlogid;
6751         uint32          xlogseg;
6752         uint32          xrecoff;
6753         XLogRecPtr      locationpoint;
6754         char            xlogfilename[MAXFNAMELEN];
6755         Datum           values[2];
6756         bool            isnull[2];
6757         TupleDesc       resultTupleDesc;
6758         HeapTuple       resultHeapTuple;
6759         Datum           result;
6760
6761         /*
6762          * Read input and parse
6763          */
6764         locationstr = DatumGetCString(DirectFunctionCall1(textout,
6765                                                                                                  PointerGetDatum(location)));
6766
6767         if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
6768                 ereport(ERROR,
6769                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
6770                                  errmsg("could not parse transaction log location \"%s\"",
6771                                                 locationstr)));
6772
6773         locationpoint.xlogid = uxlogid;
6774         locationpoint.xrecoff = uxrecoff;
6775
6776         /*
6777          * Construct a tuple descriptor for the result row.  This must match this
6778          * function's pg_proc entry!
6779          */
6780         resultTupleDesc = CreateTemplateTupleDesc(2, false);
6781         TupleDescInitEntry(resultTupleDesc, (AttrNumber) 1, "file_name",
6782                                            TEXTOID, -1, 0);
6783         TupleDescInitEntry(resultTupleDesc, (AttrNumber) 2, "file_offset",
6784                                            INT4OID, -1, 0);
6785
6786         resultTupleDesc = BlessTupleDesc(resultTupleDesc);
6787
6788         /*
6789          * xlogfilename
6790          */
6791         XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
6792         XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
6793
6794         values[0] = DirectFunctionCall1(textin,
6795                                                                         CStringGetDatum(xlogfilename));
6796         isnull[0] = false;
6797
6798         /*
6799          * offset
6800          */
6801         xrecoff = locationpoint.xrecoff - xlogseg * XLogSegSize;
6802
6803         values[1] = UInt32GetDatum(xrecoff);
6804         isnull[1] = false;
6805
6806         /*
6807          * Tuple jam: Having first prepared your Datums, then squash together
6808          */
6809         resultHeapTuple = heap_form_tuple(resultTupleDesc, values, isnull);
6810
6811         result = HeapTupleGetDatum(resultHeapTuple);
6812
6813         PG_RETURN_DATUM(result);
6814 }
6815
6816 /*
6817  * Compute an xlog file name given a WAL location,
6818  * such as is returned by pg_stop_backup() or pg_xlog_switch().
6819  */
6820 Datum
6821 pg_xlogfile_name(PG_FUNCTION_ARGS)
6822 {
6823         text       *location = PG_GETARG_TEXT_P(0);
6824         text       *result;
6825         char       *locationstr;
6826         unsigned int uxlogid;
6827         unsigned int uxrecoff;
6828         uint32          xlogid;
6829         uint32          xlogseg;
6830         XLogRecPtr      locationpoint;
6831         char            xlogfilename[MAXFNAMELEN];
6832
6833         locationstr = DatumGetCString(DirectFunctionCall1(textout,
6834                                                                                                  PointerGetDatum(location)));
6835
6836         if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
6837                 ereport(ERROR,
6838                                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
6839                                  errmsg("could not parse transaction log location \"%s\"",
6840                                                 locationstr)));
6841
6842         locationpoint.xlogid = uxlogid;
6843         locationpoint.xrecoff = uxrecoff;
6844
6845         XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
6846         XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
6847
6848         result = DatumGetTextP(DirectFunctionCall1(textin,
6849                                                                                          CStringGetDatum(xlogfilename)));
6850         PG_RETURN_TEXT_P(result);
6851 }
6852
6853 /*
6854  * read_backup_label: check to see if a backup_label file is present
6855  *
6856  * If we see a backup_label during recovery, we assume that we are recovering
6857  * from a backup dump file, and we therefore roll forward from the checkpoint
6858  * identified by the label file, NOT what pg_control says.      This avoids the
6859  * problem that pg_control might have been archived one or more checkpoints
6860  * later than the start of the dump, and so if we rely on it as the start
6861  * point, we will fail to restore a consistent database state.
6862  *
6863  * We also attempt to retrieve the corresponding backup history file.
6864  * If successful, set *minRecoveryLoc to constrain valid PITR stopping
6865  * points.
6866  *
6867  * Returns TRUE if a backup_label was found (and fills the checkpoint
6868  * location into *checkPointLoc); returns FALSE if not.
6869  */
6870 static bool
6871 read_backup_label(XLogRecPtr *checkPointLoc, XLogRecPtr *minRecoveryLoc)
6872 {
6873         XLogRecPtr      startpoint;
6874         XLogRecPtr      stoppoint;
6875         char            histfilename[MAXFNAMELEN];
6876         char            histfilepath[MAXPGPATH];
6877         char            startxlogfilename[MAXFNAMELEN];
6878         char            stopxlogfilename[MAXFNAMELEN];
6879         TimeLineID      tli;
6880         uint32          _logId;
6881         uint32          _logSeg;
6882         FILE       *lfp;
6883         FILE       *fp;
6884         char            ch;
6885
6886         /* Default is to not constrain recovery stop point */
6887         minRecoveryLoc->xlogid = 0;
6888         minRecoveryLoc->xrecoff = 0;
6889
6890         /*
6891          * See if label file is present
6892          */
6893         lfp = AllocateFile(BACKUP_LABEL_FILE, "r");
6894         if (!lfp)
6895         {
6896                 if (errno != ENOENT)
6897                         ereport(FATAL,
6898                                         (errcode_for_file_access(),
6899                                          errmsg("could not read file \"%s\": %m",
6900                                                         BACKUP_LABEL_FILE)));
6901                 return false;                   /* it's not there, all is fine */
6902         }
6903
6904         /*
6905          * Read and parse the START WAL LOCATION and CHECKPOINT lines (this code
6906          * is pretty crude, but we are not expecting any variability in the file
6907          * format).
6908          */
6909         if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c",
6910                            &startpoint.xlogid, &startpoint.xrecoff, &tli,
6911                            startxlogfilename, &ch) != 5 || ch != '\n')
6912                 ereport(FATAL,
6913                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6914                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6915         if (fscanf(lfp, "CHECKPOINT LOCATION: %X/%X%c",
6916                            &checkPointLoc->xlogid, &checkPointLoc->xrecoff,
6917                            &ch) != 3 || ch != '\n')
6918                 ereport(FATAL,
6919                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6920                                  errmsg("invalid data in file \"%s\"", BACKUP_LABEL_FILE)));
6921         if (ferror(lfp) || FreeFile(lfp))
6922                 ereport(FATAL,
6923                                 (errcode_for_file_access(),
6924                                  errmsg("could not read file \"%s\": %m",
6925                                                 BACKUP_LABEL_FILE)));
6926
6927         /*
6928          * Try to retrieve the backup history file (no error if we can't)
6929          */
6930         XLByteToSeg(startpoint, _logId, _logSeg);
6931         BackupHistoryFileName(histfilename, tli, _logId, _logSeg,
6932                                                   startpoint.xrecoff % XLogSegSize);
6933
6934         if (InArchiveRecovery)
6935                 RestoreArchivedFile(histfilepath, histfilename, "RECOVERYHISTORY", 0);
6936         else
6937                 BackupHistoryFilePath(histfilepath, tli, _logId, _logSeg,
6938                                                           startpoint.xrecoff % XLogSegSize);
6939
6940         fp = AllocateFile(histfilepath, "r");
6941         if (fp)
6942         {
6943                 /*
6944                  * Parse history file to identify stop point.
6945                  */
6946                 if (fscanf(fp, "START WAL LOCATION: %X/%X (file %24s)%c",
6947                                    &startpoint.xlogid, &startpoint.xrecoff, startxlogfilename,
6948                                    &ch) != 4 || ch != '\n')
6949                         ereport(FATAL,
6950                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6951                                          errmsg("invalid data in file \"%s\"", histfilename)));
6952                 if (fscanf(fp, "STOP WAL LOCATION: %X/%X (file %24s)%c",
6953                                    &stoppoint.xlogid, &stoppoint.xrecoff, stopxlogfilename,
6954                                    &ch) != 4 || ch != '\n')
6955                         ereport(FATAL,
6956                                         (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
6957                                          errmsg("invalid data in file \"%s\"", histfilename)));
6958                 *minRecoveryLoc = stoppoint;
6959                 if (ferror(fp) || FreeFile(fp))
6960                         ereport(FATAL,
6961                                         (errcode_for_file_access(),
6962                                          errmsg("could not read file \"%s\": %m",
6963                                                         histfilepath)));
6964         }
6965
6966         return true;
6967 }
6968
6969 /*
6970  * Error context callback for errors occurring during rm_redo().
6971  */
6972 static void
6973 rm_redo_error_callback(void *arg)
6974 {
6975         XLogRecord *record = (XLogRecord *) arg;
6976         StringInfoData buf;
6977
6978         initStringInfo(&buf);
6979         RmgrTable[record->xl_rmid].rm_desc(&buf,
6980                                                                            record->xl_info,
6981                                                                            XLogRecGetData(record));
6982
6983         /* don't bother emitting empty description */
6984         if (buf.len > 0)
6985                 errcontext("xlog redo %s", buf.data);
6986
6987         pfree(buf.data);
6988 }