OSDN Git Service

Add recovery_end_command option to recovery.conf. recovery_end_command
[pg-rex/syncrep.git] / src / backend / access / transam / xlog.c
index 53a4646..09b5075 100644 (file)
@@ -4,10 +4,10 @@
  *             PostgreSQL transaction log manager
  *
  *
- * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.260 2007/01/05 22:19:23 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.338 2009/05/14 20:31:09 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 #include <ctype.h>
-#include <fcntl.h>
 #include <signal.h>
 #include <time.h>
+#include <fcntl.h>
 #include <sys/stat.h>
 #include <sys/time.h>
 #include <sys/wait.h>
 #include <unistd.h>
 
 #include "access/clog.h"
-#include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
+#include "access/tuptoaster.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "catalog/pg_control.h"
 #include "catalog/pg_type.h"
 #include "funcapi.h"
+#include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
-#include "storage/bufpage.h"
+#include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/ipc.h"
 #include "storage/pmsignal.h"
 #include "storage/procarray.h"
+#include "storage/smgr.h"
 #include "storage/spin.h"
 #include "utils/builtins.h"
-#include "utils/nabstime.h"
-#include "utils/pg_locale.h"
-
-
-/*
- *     Because O_DIRECT bypasses the kernel buffers, and because we never
- *     read those buffers except during crash recovery, it is a win to use
- *     it in all cases where we sync on each write().  We could allow O_DIRECT
- *     with fsync(), but because skipping the kernel buffer forces writes out
- *     quickly, it seems best just to use it for O_SYNC.  It is hard to imagine
- *     how fsync() could be a win for O_DIRECT compared to O_SYNC and O_DIRECT.
- *     Also, O_DIRECT is never enough to force data to the drives, it merely
- *     tries to bypass the kernel cache, so we still need O_SYNC or fsync().
- */
-#ifdef O_DIRECT
-#define PG_O_DIRECT                            O_DIRECT
-#else
-#define PG_O_DIRECT                            0
-#endif
-
-/*
- * This chunk of hackery attempts to determine which file sync methods
- * are available on the current platform, and to choose an appropriate
- * default method.     We assume that fsync() is always available, and that
- * configure determined whether fdatasync() is.
- */
-#if defined(O_SYNC)
-#define BARE_OPEN_SYNC_FLAG            O_SYNC
-#elif defined(O_FSYNC)
-#define BARE_OPEN_SYNC_FLAG            O_FSYNC
-#endif
-#ifdef BARE_OPEN_SYNC_FLAG
-#define OPEN_SYNC_FLAG                 (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
-#endif
-
-#if defined(O_DSYNC)
-#if defined(OPEN_SYNC_FLAG)
-/* O_DSYNC is distinct? */
-#if O_DSYNC != BARE_OPEN_SYNC_FLAG
-#define OPEN_DATASYNC_FLAG             (O_DSYNC | PG_O_DIRECT)
-#endif
-#else                                                  /* !defined(OPEN_SYNC_FLAG) */
-/* Win32 only has O_DSYNC */
-#define OPEN_DATASYNC_FLAG             (O_DSYNC | PG_O_DIRECT)
-#endif
-#endif
-
-#if defined(OPEN_DATASYNC_FLAG)
-#define DEFAULT_SYNC_METHOD_STR "open_datasync"
-#define DEFAULT_SYNC_METHOD            SYNC_METHOD_OPEN
-#define DEFAULT_SYNC_FLAGBIT   OPEN_DATASYNC_FLAG
-#elif defined(HAVE_FDATASYNC)
-#define DEFAULT_SYNC_METHOD_STR "fdatasync"
-#define DEFAULT_SYNC_METHOD            SYNC_METHOD_FDATASYNC
-#define DEFAULT_SYNC_FLAGBIT   0
-#elif defined(HAVE_FSYNC_WRITETHROUGH_ONLY)
-#define DEFAULT_SYNC_METHOD_STR "fsync_writethrough"
-#define DEFAULT_SYNC_METHOD            SYNC_METHOD_FSYNC_WRITETHROUGH
-#define DEFAULT_SYNC_FLAGBIT   0
-#else
-#define DEFAULT_SYNC_METHOD_STR "fsync"
-#define DEFAULT_SYNC_METHOD            SYNC_METHOD_FSYNC
-#define DEFAULT_SYNC_FLAGBIT   0
-#endif
-
-
-/*
- * Limitation of buffer-alignment for direct IO depends on OS and filesystem,
- * but XLOG_BLCKSZ is assumed to be enough for it.
- */
-#ifdef O_DIRECT
-#define ALIGNOF_XLOG_BUFFER            XLOG_BLCKSZ
-#else
-#define ALIGNOF_XLOG_BUFFER            ALIGNOF_BUFFER
-#endif
+#include "utils/flatfiles.h"
+#include "utils/guc.h"
+#include "utils/ps_status.h"
+#include "pg_trace.h"
 
 
 /* File path names (all relative to $PGDATA) */
 int                    CheckPointSegments = 3;
 int                    XLOGbuffers = 8;
 int                    XLogArchiveTimeout = 0;
+bool           XLogArchiveMode = false;
 char      *XLogArchiveCommand = NULL;
-char      *XLOG_sync_method = NULL;
-const char     XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
 bool           fullPageWrites = true;
+bool           log_checkpoints = false;
+int            sync_method = DEFAULT_SYNC_METHOD;
 
 #ifdef WAL_DEBUG
 bool           XLOG_DEBUG = false;
 #endif
 
 /*
- * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
- * preallocated XLOG segments --- we try to have at least XLOGfiles advance
- * segments but no more than XLOGfileslop segments.  This could
- * be made a separate GUC variable, but at present I think it's sufficient
- * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
- * checkpoint will free no more than 2*CheckPointSegments log segments, and
- * we want to recycle all of them; the +1 allows boundary cases to happen
- * without wasting a delete/create-segment cycle.
+ * XLOGfileslop is the maximum number of preallocated future XLOG segments.
+ * When we are done with an old XLOG segment file, we will recycle it as a
+ * future XLOG segment as long as there aren't already XLOGfileslop future
+ * segments; else we'll delete it.  This could be made a separate GUC
+ * variable, but at present I think it's sufficient to hardwire it as
+ * 2*CheckPointSegments+1.     Under normal conditions, a checkpoint will free
+ * no more than 2*CheckPointSegments log segments, and we want to recycle all
+ * of them; the +1 allows boundary cases to happen without wasting a
+ * delete/create-segment cycle.
  */
-
 #define XLOGfileslop   (2*CheckPointSegments + 1)
 
+/*
+ * GUC support
+ */
+const struct config_enum_entry sync_method_options[] = {
+       {"fsync", SYNC_METHOD_FSYNC, false},
+#ifdef HAVE_FSYNC_WRITETHROUGH
+       {"fsync_writethrough", SYNC_METHOD_FSYNC_WRITETHROUGH, false},
+#endif
+#ifdef HAVE_FDATASYNC
+       {"fdatasync", SYNC_METHOD_FDATASYNC, false},
+#endif
+#ifdef OPEN_SYNC_FLAG
+       {"open_sync", SYNC_METHOD_OPEN, false},
+#endif
+#ifdef OPEN_DATASYNC_FLAG
+       {"open_datasync", SYNC_METHOD_OPEN_DSYNC, false},
+#endif
+       {NULL, 0, false}
+};
 
-/* these are derived from XLOG_sync_method by assign_xlog_sync_method */
-int                    sync_method = DEFAULT_SYNC_METHOD;
-static int     open_sync_bit = DEFAULT_SYNC_FLAGBIT;
-
-#define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
-
+/*
+ * Statistics for current checkpoint are collected in this global struct.
+ * Because only the background writer or a stand-alone backend can perform
+ * checkpoints, this will be unused in normal backends.
+ */
+CheckpointStatsData CheckpointStats;
 
 /*
  * ThisTimeLineID will be same in all backends --- it identifies current
@@ -169,26 +121,43 @@ static int        open_sync_bit = DEFAULT_SYNC_FLAGBIT;
  */
 TimeLineID     ThisTimeLineID = 0;
 
-/* Are we doing recovery from XLOG? */
+/*
+ * Are we doing recovery from XLOG? 
+ *
+ * This is only ever true in the startup process, even if the system is still
+ * in recovery. Prior to 8.4, all activity during recovery were carried out
+ * by Startup process. This local variable continues to be used in functions
+ * that need to act differently when called from a redo function (e.g skip
+ * WAL logging). To check whether the system is in recovery regardless of what
+ * process you're running in, use RecoveryInProgress().
+ */
 bool           InRecovery = false;
 
 /* Are we recovering using offline XLOG archives? */
 static bool InArchiveRecovery = false;
 
+/*
+ * Local copy of SharedRecoveryInProgress variable. True actually means "not
+ * known, need to check the shared state"
+ */
+static bool LocalRecoveryInProgress = true;
+
 /* Was the last xlog file restored from archive, or local? */
 static bool restoredFromArchive = false;
 
 /* options taken from recovery.conf */
 static char *recoveryRestoreCommand = NULL;
+static char *recoveryEndCommand = NULL;
 static bool recoveryTarget = false;
 static bool recoveryTargetExact = false;
 static bool recoveryTargetInclusive = true;
 static TransactionId recoveryTargetXid;
-static time_t recoveryTargetTime;
+static TimestampTz recoveryTargetTime;
+static TimestampTz recoveryLastXTime = 0;
 
 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
 static TransactionId recoveryStopXid;
-static time_t recoveryStopTime;
+static TimestampTz recoveryStopTime;
 static bool recoveryStopAfter;
 
 /*
@@ -218,37 +187,15 @@ static List *expectedTLIs;
 static TimeLineID curFileTLI;
 
 /*
- * MyLastRecPtr points to the start of the last XLOG record inserted by the
- * current transaction.  If MyLastRecPtr.xrecoff == 0, then the current
- * xact hasn't yet inserted any transaction-controlled XLOG records.
- *
- * Note that XLOG records inserted outside transaction control are not
- * reflected into MyLastRecPtr.  They do, however, cause MyXactMadeXLogEntry
- * to be set true.     The latter can be used to test whether the current xact
- * made any loggable changes (including out-of-xact changes, such as
- * sequence updates).
- *
- * When we insert/update/delete a tuple in a temporary relation, we do not
- * make any XLOG record, since we don't care about recovering the state of
- * the temp rel after a crash. However, we will still need to remember
- * whether our transaction committed or aborted in that case.  So, we must
- * set MyXactMadeTempRelUpdate true to indicate that the XID will be of
- * interest later.
- */
-XLogRecPtr     MyLastRecPtr = {0, 0};
-
-bool           MyXactMadeXLogEntry = false;
-
-bool           MyXactMadeTempRelUpdate = false;
-
-/*
  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
- * current backend.  It is updated for all inserts, transaction-controlled
- * or not.     ProcLastRecEnd is similar but points to end+1 of last record.
+ * current backend.  It is updated for all inserts.  XactLastRecEnd points to
+ * end+1 of the last record, and is reset when we end a top-level transaction,
+ * or start a new one; so it can be used to tell if the current transaction has
+ * created any XLOG records.
  */
 static XLogRecPtr ProcLastRecPtr = {0, 0};
 
-XLogRecPtr     ProcLastRecEnd = {0, 0};
+XLogRecPtr     XactLastRecEnd = {0, 0};
 
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
@@ -312,9 +259,8 @@ static XLogRecPtr RedoRecPtr;
  * ControlFileLock: must be held to read/update control file or create
  * new log file.
  *
- * CheckpointLock: must be held to do a checkpoint (ensures only one
- * checkpointer at a time; currently, with all checkpoints done by the
- * bgwriter, this is just pro forma).
+ * CheckpointLock: must be held to do a checkpoint or restartpoint (ensures
+ * only one checkpointer at a time)
  *
  *----------
  */
@@ -352,7 +298,7 @@ typedef struct XLogCtlWrite
 {
        XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
        int                     curridx;                /* cache index of next block to write */
-       time_t          lastSegSwitchTime;              /* time of last xlog segment switch */
+       pg_time_t       lastSegSwitchTime;              /* time of last xlog segment switch */
 } XLogCtlWrite;
 
 /*
@@ -368,6 +314,7 @@ typedef struct XLogCtlData
        XLogwrtResult LogwrtResult;
        uint32          ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
        TransactionId ckptXid;
+       XLogRecPtr      asyncCommitLSN; /* LSN of newest async commit */
 
        /* Protected by WALWriteLock: */
        XLogCtlWrite Write;
@@ -379,10 +326,28 @@ typedef struct XLogCtlData
         */
        char       *pages;                      /* buffers for unwritten XLOG pages */
        XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
-       Size            XLogCacheByte;  /* # bytes in xlog buffers */
        int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
        TimeLineID      ThisTimeLineID;
 
+       /*
+        * SharedRecoveryInProgress indicates if we're still in crash or archive
+        * recovery.  It's checked by RecoveryInProgress().
+        */
+       bool            SharedRecoveryInProgress;
+
+       /*
+        * During recovery, we keep a copy of the latest checkpoint record
+        * here.  Used by the background writer when it wants to create
+        * a restartpoint.
+        *
+        * Protected by info_lck.
+        */
+       XLogRecPtr      lastCheckPointRecPtr;
+       CheckPoint      lastCheckPoint;
+
+       /* end+1 of the last record replayed (or being replayed) */
+       XLogRecPtr      replayEndRecPtr;
+
        slock_t         info_lck;               /* locks shared variables shown above */
 } XLogCtlData;
 
@@ -457,19 +422,33 @@ static XLogRecPtr ReadRecPtr;     /* start of last record read */
 static XLogRecPtr EndRecPtr;   /* end+1 of last record read */
 static XLogRecord *nextRecord = NULL;
 static TimeLineID lastPageTLI = 0;
+static XLogRecPtr minRecoveryPoint; /* local copy of ControlFile->minRecoveryPoint */
+static bool    updateMinRecoveryPoint = true;
 
 static bool InRedo = false;
 
+/*
+ * Flag set by interrupt handlers for later service in the redo loop.
+ */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t shutdown_requested = false;
+/*
+ * Flag set when executing a restore command, to tell SIGTERM signal handler
+ * that it's safe to just proc_exit.
+ */
+static volatile sig_atomic_t in_restore_command = false;
+
 
 static void XLogArchiveNotify(const char *xlog);
 static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
 static bool XLogArchiveCheckDone(const char *xlog);
+static bool XLogArchiveIsBusy(const char *xlog);
 static void XLogArchiveCleanup(const char *xlog);
 static void readRecoveryCommandFile(void);
 static void exitArchiveRecovery(TimeLineID endTLI,
                                        uint32 endLogId, uint32 endLogSeg);
 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
-static void CheckPointGuts(XLogRecPtr checkPointRedo);
+static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 
 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
                                XLogRecPtr *lsn, BkpBlock *bkpb);
@@ -485,10 +464,12 @@ static int        XLogFileRead(uint32 log, uint32 seg, int emode);
 static void XLogFileClose(void);
 static bool RestoreArchivedFile(char *path, const char *xlogfname,
                                        const char *recovername, off_t expectedSize);
-static int     PreallocXlogFiles(XLogRecPtr endptr);
-static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
-                               int *nsegsremoved, int *nsegsrecycled);
+static void ExecuteRecoveryEndCommand(void);
+static void PreallocXlogFiles(XLogRecPtr endptr);
+static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
+static void ValidateXLOGDirectoryStructure(void);
 static void CleanupBackupHistory(void);
+static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
@@ -500,15 +481,16 @@ static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
                                         uint32 endLogId, uint32 endLogSeg);
 static void WriteControlFile(void);
 static void ReadControlFile(void);
-static char *str_time(time_t tnow);
-static void issue_xlog_fsync(void);
-
+static char *str_time(pg_time_t tnow);
 #ifdef WAL_DEBUG
 static void xlog_outrec(StringInfo buf, XLogRecord *record);
 #endif
+static void issue_xlog_fsync(void);
+static void pg_start_backup_callback(int code, Datum arg);
 static bool read_backup_label(XLogRecPtr *checkPointLoc,
                                  XLogRecPtr *minRecoveryLoc);
 static void rm_redo_error_callback(void *arg);
+static int get_sync_bit(int method);
 
 
 /*
@@ -548,19 +530,19 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
        uint32          len,
                                write_len;
        unsigned        i;
-       XLogwrtRqst LogwrtRqst;
        bool            updrqst;
        bool            doPageWrites;
        bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
-       bool            no_tran = (rmid == RM_XLOG_ID);
 
+       /* cross-check on whether we should be here or not */
+       if (RecoveryInProgress())
+               elog(FATAL, "cannot make new WAL entries during recovery");
+
+       /* info's high bits are reserved for use by me */
        if (info & XLR_INFO_MASK)
-       {
-               if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
-                       elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK));
-               no_tran = true;
-               info &= ~XLR_INFO_MASK;
-       }
+               elog(PANIC, "invalid xlog info mask %02X", info);
+
+       TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
 
        /*
         * In bootstrap mode, we don't actually log anything but XLOG resources;
@@ -707,43 +689,6 @@ begin:;
 
        START_CRIT_SECTION();
 
-       /* update LogwrtResult before doing cache fill check */
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile XLogCtlData *xlogctl = XLogCtl;
-
-               SpinLockAcquire(&xlogctl->info_lck);
-               LogwrtRqst = xlogctl->LogwrtRqst;
-               LogwrtResult = xlogctl->LogwrtResult;
-               SpinLockRelease(&xlogctl->info_lck);
-       }
-
-       /*
-        * If cache is half filled then try to acquire write lock and do
-        * XLogWrite. Ignore any fractional blocks in performing this check.
-        */
-       LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % XLOG_BLCKSZ;
-       if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
-               (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
-                XLogCtl->XLogCacheByte / 2))
-       {
-               if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
-               {
-                       /*
-                        * Since the amount of data we write here is completely optional
-                        * anyway, tell XLogWrite it can be "flexible" and stop at a
-                        * convenient boundary.  This allows writes triggered by this
-                        * mechanism to synchronize with the cache boundaries, so that in
-                        * a long transaction we'll basically dump alternating halves of
-                        * the buffer array.
-                        */
-                       LogwrtResult = XLogCtl->Write.LogwrtResult;
-                       if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
-                               XLogWrite(LogwrtRqst, true, false);
-                       LWLockRelease(WALWriteLock);
-               }
-       }
-
        /* Now wait to get insert lock */
        LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
 
@@ -855,6 +800,19 @@ begin:;
        }
 
        /*
+        * If we backed up any full blocks and online backup is not in progress,
+        * mark the backup blocks as removable.  This allows the WAL archiver to
+        * know whether it is safe to compress archived WAL data by transforming
+        * full-block records into the non-full-block format.
+        *
+        * Note: we could just set the flag whenever !forcePageWrites, but
+        * defining it like this leaves the info bit free for some potential other
+        * use in records without any backup blocks.
+        */
+       if ((info & XLR_BKP_BLOCK_MASK) && !Insert->forcePageWrites)
+               info |= XLR_BKP_REMOVABLE;
+
+       /*
         * If there isn't enough space on the current XLOG page for a record
         * header, advance to the next page (leaving the unused space as zeroes).
         */
@@ -944,11 +902,8 @@ begin:;
 #endif
 
        /* Record begin of record in appropriate places */
-       if (!no_tran)
-               MyLastRecPtr = RecPtr;
        ProcLastRecPtr = RecPtr;
        Insert->PrevRecord = RecPtr;
-       MyXactMadeXLogEntry = true;
 
        Insert->currpos += SizeOfXLogRecord;
        freespace -= SizeOfXLogRecord;
@@ -1017,6 +972,8 @@ begin:;
                XLogwrtRqst FlushRqst;
                XLogRecPtr      OldSegEnd;
 
+               TRACE_POSTGRESQL_XLOG_SWITCH();
+
                LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
                /*
@@ -1106,7 +1063,7 @@ begin:;
                SpinLockRelease(&xlogctl->info_lck);
        }
 
-       ProcLastRecEnd = RecPtr;
+       XactLastRecEnd = RecPtr;
 
        END_CRIT_SECTION();
 
@@ -1122,31 +1079,30 @@ static bool
 XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
                                XLogRecPtr *lsn, BkpBlock *bkpb)
 {
-       PageHeader      page;
+       Page            page;
 
-       page = (PageHeader) BufferGetBlock(rdata->buffer);
+       page = BufferGetPage(rdata->buffer);
 
        /*
         * XXX We assume page LSN is first data on *every* page that can be passed
         * to XLogInsert, whether it otherwise has the standard page layout or
         * not.
         */
-       *lsn = page->pd_lsn;
+       *lsn = PageGetLSN(page);
 
        if (doPageWrites &&
-               XLByteLE(page->pd_lsn, RedoRecPtr))
+               XLByteLE(PageGetLSN(page), RedoRecPtr))
        {
                /*
                 * The page needs to be backed up, so set up *bkpb
                 */
-               bkpb->node = BufferGetFileNode(rdata->buffer);
-               bkpb->block = BufferGetBlockNumber(rdata->buffer);
+               BufferGetTag(rdata->buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
 
                if (rdata->buffer_std)
                {
                        /* Assume we can omit data between pd_lower and pd_upper */
-                       uint16          lower = page->pd_lower;
-                       uint16          upper = page->pd_upper;
+                       uint16          lower = ((PageHeader) page)->pd_lower;
+                       uint16          upper = ((PageHeader) page)->pd_upper;
 
                        if (lower >= SizeOfPageHeaderData &&
                                upper > lower &&
@@ -1273,6 +1229,50 @@ XLogArchiveCheckDone(const char *xlog)
 }
 
 /*
+ * XLogArchiveIsBusy
+ *
+ * Check to see if an XLOG segment file is still unarchived.
+ * This is almost but not quite the inverse of XLogArchiveCheckDone: in
+ * the first place we aren't chartered to recreate the .ready file, and
+ * in the second place we should consider that if the file is already gone
+ * then it's not busy.  (This check is needed to handle the race condition
+ * that a checkpoint already deleted the no-longer-needed file.)
+ */
+static bool
+XLogArchiveIsBusy(const char *xlog)
+{
+       char            archiveStatusPath[MAXPGPATH];
+       struct stat stat_buf;
+
+       /* First check for .done --- this means archiver is done with it */
+       StatusFilePath(archiveStatusPath, xlog, ".done");
+       if (stat(archiveStatusPath, &stat_buf) == 0)
+               return false;
+
+       /* check for .ready --- this means archiver is still busy with it */
+       StatusFilePath(archiveStatusPath, xlog, ".ready");
+       if (stat(archiveStatusPath, &stat_buf) == 0)
+               return true;
+
+       /* Race condition --- maybe archiver just finished, so recheck */
+       StatusFilePath(archiveStatusPath, xlog, ".done");
+       if (stat(archiveStatusPath, &stat_buf) == 0)
+               return false;
+
+       /*
+        * Check to see if the WAL file has been removed by checkpoint,
+        * which implies it has already been archived, and explains why we
+        * can't see a status file for it.
+        */
+       snprintf(archiveStatusPath, MAXPGPATH, XLOGDIR "/%s", xlog);
+       if (stat(archiveStatusPath, &stat_buf) != 0 &&
+               errno == ENOENT)
+               return false;
+
+       return true;
+}
+
+/*
  * XLogArchiveCleanup
  *
  * Cleanup archive notification file(s) for a particular xlog segment
@@ -1373,12 +1373,14 @@ AdvanceXLInsertBuffer(bool new_segment)
                                 * Have to write buffers while holding insert lock. This is
                                 * not good, so only write as much as we absolutely must.
                                 */
+                               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
                                WriteRqst.Write = OldPageRqstPtr;
                                WriteRqst.Flush.xlogid = 0;
                                WriteRqst.Flush.xrecoff = 0;
                                XLogWrite(WriteRqst, false, false);
                                LWLockRelease(WALWriteLock);
                                Insert->LogwrtResult = LogwrtResult;
+                               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
                        }
                }
        }
@@ -1447,6 +1449,40 @@ AdvanceXLInsertBuffer(bool new_segment)
 }
 
 /*
+ * Check whether we've consumed enough xlog space that a checkpoint is needed.
+ *
+ * Caller must have just finished filling the open log file (so that
+ * openLogId/openLogSeg are valid).  We measure the distance from RedoRecPtr
+ * to the open log file and see if that exceeds CheckPointSegments.
+ *
+ * Note: it is caller's responsibility that RedoRecPtr is up-to-date.
+ */
+static bool
+XLogCheckpointNeeded(void)
+{
+       /*
+        * A straight computation of segment number could overflow 32 bits. Rather
+        * than assuming we have working 64-bit arithmetic, we compare the
+        * highest-order bits separately, and force a checkpoint immediately when
+        * they change.
+        */
+       uint32          old_segno,
+                               new_segno;
+       uint32          old_highbits,
+                               new_highbits;
+
+       old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
+               (RedoRecPtr.xrecoff / XLogSegSize);
+       old_highbits = RedoRecPtr.xlogid / XLogSegSize;
+       new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile + openLogSeg;
+       new_highbits = openLogId / XLogSegSize;
+       if (new_highbits != old_highbits ||
+               new_segno >= old_segno + (uint32) (CheckPointSegments - 1))
+               return true;
+       return false;
+}
+
+/*
  * Write and/or fsync the log at least as far as WriteRqst indicates.
  *
  * If flexible == TRUE, we don't have to write as far as WriteRqst, but
@@ -1633,41 +1669,21 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                                if (XLogArchivingActive())
                                        XLogArchiveNotifySeg(openLogId, openLogSeg);
 
-                               Write->lastSegSwitchTime = time(NULL);
+                               Write->lastSegSwitchTime = (pg_time_t) time(NULL);
 
                                /*
-                                * Signal bgwriter to start a checkpoint if it's been too long
-                                * since the last one.  (We look at local copy of RedoRecPtr
-                                * which might be a little out of date, but should be close
-                                * enough for this purpose.)
-                                *
-                                * A straight computation of segment number could overflow 32
-                                * bits.  Rather than assuming we have working 64-bit
-                                * arithmetic, we compare the highest-order bits separately,
-                                * and force a checkpoint immediately when they change.
+                                * Signal bgwriter to start a checkpoint if we've consumed too
+                                * much xlog since the last one.  For speed, we first check
+                                * using the local copy of RedoRecPtr, which might be out of
+                                * date; if it looks like a checkpoint is needed, forcibly
+                                * update RedoRecPtr and recheck.
                                 */
-                               if (IsUnderPostmaster)
+                               if (IsUnderPostmaster &&
+                                       XLogCheckpointNeeded())
                                {
-                                       uint32          old_segno,
-                                                               new_segno;
-                                       uint32          old_highbits,
-                                                               new_highbits;
-
-                                       old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
-                                               (RedoRecPtr.xrecoff / XLogSegSize);
-                                       old_highbits = RedoRecPtr.xlogid / XLogSegSize;
-                                       new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile +
-                                               openLogSeg;
-                                       new_highbits = openLogId / XLogSegSize;
-                                       if (new_highbits != old_highbits ||
-                                               new_segno >= old_segno + (uint32) (CheckPointSegments-1))
-                                       {
-#ifdef WAL_DEBUG
-                                               if (XLOG_DEBUG)
-                                                       elog(LOG, "time for a checkpoint, signaling bgwriter");
-#endif
-                                               RequestCheckpoint(false, true);
-                                       }
+                                       (void) GetRedoRecPtr();
+                                       if (XLogCheckpointNeeded())
+                                               RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
                                }
                        }
                }
@@ -1699,7 +1715,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                 * have no open file or the wrong one.  However, we do not need to
                 * fsync more than one file.
                 */
-               if (sync_method != SYNC_METHOD_OPEN)
+               if (sync_method != SYNC_METHOD_OPEN &&
+                       sync_method != SYNC_METHOD_OPEN_DSYNC)
                {
                        if (openLogFile >= 0 &&
                                !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
@@ -1739,6 +1756,79 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 }
 
 /*
+ * Record the LSN for an asynchronous transaction commit.
+ * (This should not be called for aborts, nor for synchronous commits.)
+ */
+void
+XLogSetAsyncCommitLSN(XLogRecPtr asyncCommitLSN)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       if (XLByteLT(xlogctl->asyncCommitLSN, asyncCommitLSN))
+               xlogctl->asyncCommitLSN = asyncCommitLSN;
+       SpinLockRelease(&xlogctl->info_lck);
+}
+
+/*
+ * Advance minRecoveryPoint in control file.
+ *
+ * If we crash during recovery, we must reach this point again before the
+ * database is consistent. 
+ * 
+ * If 'force' is true, 'lsn' argument is ignored. Otherwise, minRecoveryPoint
+ * is is only updated if it's not already greater than or equal to 'lsn'.
+ */
+static void
+UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
+{
+       /* Quick check using our local copy of the variable */
+       if (!updateMinRecoveryPoint || (!force && XLByteLE(lsn, minRecoveryPoint)))
+               return;
+
+       LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+
+       /* update local copy */
+       minRecoveryPoint = ControlFile->minRecoveryPoint;
+
+       /*
+        * An invalid minRecoveryPoint means that we need to recover all the WAL,
+        * ie. crash recovery. Don't update the control file in that case.
+        */
+       if (minRecoveryPoint.xlogid == 0 && minRecoveryPoint.xrecoff == 0)
+               updateMinRecoveryPoint = false;
+       else if (force || XLByteLT(minRecoveryPoint, lsn))
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+               XLogRecPtr newMinRecoveryPoint;
+
+               /*
+                * To avoid having to update the control file too often, we update it
+                * all the way to the last record being replayed, even though 'lsn'
+                * would suffice for correctness.
+                */
+               SpinLockAcquire(&xlogctl->info_lck);
+               newMinRecoveryPoint = xlogctl->replayEndRecPtr;
+               SpinLockRelease(&xlogctl->info_lck);
+
+               /* update control file */
+               if (XLByteLT(ControlFile->minRecoveryPoint, newMinRecoveryPoint))
+               {
+                       ControlFile->minRecoveryPoint = newMinRecoveryPoint;
+                       UpdateControlFile();
+                       minRecoveryPoint = newMinRecoveryPoint;
+
+                       ereport(DEBUG2,
+                                       (errmsg("updated min recovery point to %X/%X",
+                                               minRecoveryPoint.xlogid, minRecoveryPoint.xrecoff)));
+               }
+       }
+       LWLockRelease(ControlFileLock);
+}
+
+/*
  * Ensure that all XLOG data through the given position is flushed to disk.
  *
  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
@@ -1750,9 +1840,15 @@ XLogFlush(XLogRecPtr record)
        XLogRecPtr      WriteRqstPtr;
        XLogwrtRqst WriteRqst;
 
-       /* Disabled during REDO */
-       if (InRedo)
+       /*
+        * During REDO, we don't try to flush the WAL, but update minRecoveryPoint
+        * instead.
+        */
+       if (RecoveryInProgress())
+       {
+               UpdateMinRecoveryPoint(record, false);
                return;
+       }
 
        /* Quick exit if already known flushed */
        if (XLByteLE(record, LogwrtResult.Flush))
@@ -1839,9 +1935,9 @@ XLogFlush(XLogRecPtr record)
         * the bad page is encountered again during recovery then we would be
         * unable to restart the database at all!  (This scenario has actually
         * happened in the field several times with 7.1 releases. Note that we
-        * cannot get here while InRedo is true, but if the bad page is brought in
-        * and marked dirty during recovery then CreateCheckPoint will try to
-        * flush it at the end of recovery.)
+        * cannot get here while RecoveryInProgress(), but if the bad page is
+        * brought in and marked dirty during recovery then if a checkpoint were
+        * performed at the end of recovery it will try to flush it.
         *
         * The current approach is to ERROR under normal conditions, but only
         * WARNING during recovery, so that the system can be brought up even if
@@ -1858,74 +1954,217 @@ XLogFlush(XLogRecPtr record)
 }
 
 /*
- * Create a new XLOG file segment, or open a pre-existing one.
- *
- * log, seg: identify segment to be created/opened.
+ * Flush xlog, but without specifying exactly where to flush to.
  *
- * *use_existent: if TRUE, OK to use a pre-existing file (else, any
- * pre-existing file will be deleted). On return, TRUE if a pre-existing
- * file was used.
- *
- * use_lock: if TRUE, acquire ControlFileLock while moving file into
- * place.  This should be TRUE except during bootstrap log creation.  The
- * caller must *not* hold the lock at call.
- *
- * Returns FD of opened file.
+ * We normally flush only completed blocks; but if there is nothing to do on
+ * that basis, we check for unflushed async commits in the current incomplete
+ * block, and flush through the latest one of those.  Thus, if async commits
+ * are not being used, we will flush complete blocks only.     We can guarantee
+ * that async commits reach disk after at most three cycles; normally only
+ * one or two. (We allow XLogWrite to write "flexibly", meaning it can stop
+ * at the end of the buffer ring; this makes a difference only with very high
+ * load or long wal_writer_delay, but imposes one extra cycle for the worst
+ * case for async commits.)
  *
- * Note: errors here are ERROR not PANIC because we might or might not be
- * inside a critical section (eg, during checkpoint there is no reason to
- * take down the system on failure).  They will promote to PANIC if we are
- * in a critical section.
+ * This routine is invoked periodically by the background walwriter process.
  */
-static int
-XLogFileInit(uint32 log, uint32 seg,
-                        bool *use_existent, bool use_lock)
+void
+XLogBackgroundFlush(void)
 {
-       char            path[MAXPGPATH];
-       char            tmppath[MAXPGPATH];
-       char            zbuffer[XLOG_BLCKSZ];
-       uint32          installed_log;
-       uint32          installed_seg;
-       int                     max_advance;
-       int                     fd;
-       int                     nbytes;
+       XLogRecPtr      WriteRqstPtr;
+       bool            flexible = true;
 
-       XLogFilePath(path, ThisTimeLineID, log, seg);
+       /* XLOG doesn't need flushing during recovery */
+       if (RecoveryInProgress())
+               return;
 
-       /*
-        * Try to use existent file (checkpoint maker may have created it already)
-        */
-       if (*use_existent)
+       /* read LogwrtResult and update local state */
        {
-               fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
-                                                  S_IRUSR | S_IWUSR);
-               if (fd < 0)
-               {
-                       if (errno != ENOENT)
-                               ereport(ERROR,
-                                               (errcode_for_file_access(),
-                                                errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
-                                                               path, log, seg)));
-               }
-               else
-                       return fd;
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               LogwrtResult = xlogctl->LogwrtResult;
+               WriteRqstPtr = xlogctl->LogwrtRqst.Write;
+               SpinLockRelease(&xlogctl->info_lck);
        }
 
-       /*
-        * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
-        * another process is doing the same thing.  If so, we will end up
-        * pre-creating an extra log segment.  That seems OK, and better than
-        * holding the lock throughout this lengthy process.
-        */
-       snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
+       /* back off to last completed page boundary */
+       WriteRqstPtr.xrecoff -= WriteRqstPtr.xrecoff % XLOG_BLCKSZ;
 
-       unlink(tmppath);
+       /* if we have already flushed that far, consider async commit records */
+       if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
 
-       /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
-       fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
-                                          S_IRUSR | S_IWUSR);
-       if (fd < 0)
-               ereport(ERROR,
+               SpinLockAcquire(&xlogctl->info_lck);
+               WriteRqstPtr = xlogctl->asyncCommitLSN;
+               SpinLockRelease(&xlogctl->info_lck);
+               flexible = false;               /* ensure it all gets written */
+       }
+
+       /* Done if already known flushed */
+       if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+               return;
+
+#ifdef WAL_DEBUG
+       if (XLOG_DEBUG)
+               elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
+                        WriteRqstPtr.xlogid, WriteRqstPtr.xrecoff,
+                        LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
+                        LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
+#endif
+
+       START_CRIT_SECTION();
+
+       /* now wait for the write lock */
+       LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+       LogwrtResult = XLogCtl->Write.LogwrtResult;
+       if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+       {
+               XLogwrtRqst WriteRqst;
+
+               WriteRqst.Write = WriteRqstPtr;
+               WriteRqst.Flush = WriteRqstPtr;
+               XLogWrite(WriteRqst, flexible, false);
+       }
+       LWLockRelease(WALWriteLock);
+
+       END_CRIT_SECTION();
+}
+
+/*
+ * Flush any previous asynchronously-committed transactions' commit records.
+ *
+ * NOTE: it is unwise to assume that this provides any strong guarantees.
+ * In particular, because of the inexact LSN bookkeeping used by clog.c,
+ * we cannot assume that hint bits will be settable for these transactions.
+ */
+void
+XLogAsyncCommitFlush(void)
+{
+       XLogRecPtr      WriteRqstPtr;
+
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       /* There's no asynchronously committed transactions during recovery */
+       if (RecoveryInProgress())
+               return;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       WriteRqstPtr = xlogctl->asyncCommitLSN;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       XLogFlush(WriteRqstPtr);
+}
+
+/*
+ * Test whether XLOG data has been flushed up to (at least) the given position.
+ *
+ * Returns true if a flush is still needed.  (It may be that someone else
+ * is already in process of flushing that far, however.)
+ */
+bool
+XLogNeedsFlush(XLogRecPtr record)
+{
+       /* XLOG doesn't need flushing during recovery */
+       if (RecoveryInProgress())
+               return false;
+
+       /* Quick exit if already known flushed */
+       if (XLByteLE(record, LogwrtResult.Flush))
+               return false;
+
+       /* read LogwrtResult and update local state */
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               LogwrtResult = xlogctl->LogwrtResult;
+               SpinLockRelease(&xlogctl->info_lck);
+       }
+
+       /* check again */
+       if (XLByteLE(record, LogwrtResult.Flush))
+               return false;
+
+       return true;
+}
+
+/*
+ * Create a new XLOG file segment, or open a pre-existing one.
+ *
+ * log, seg: identify segment to be created/opened.
+ *
+ * *use_existent: if TRUE, OK to use a pre-existing file (else, any
+ * pre-existing file will be deleted). On return, TRUE if a pre-existing
+ * file was used.
+ *
+ * use_lock: if TRUE, acquire ControlFileLock while moving file into
+ * place.  This should be TRUE except during bootstrap log creation.  The
+ * caller must *not* hold the lock at call.
+ *
+ * Returns FD of opened file.
+ *
+ * Note: errors here are ERROR not PANIC because we might or might not be
+ * inside a critical section (eg, during checkpoint there is no reason to
+ * take down the system on failure).  They will promote to PANIC if we are
+ * in a critical section.
+ */
+static int
+XLogFileInit(uint32 log, uint32 seg,
+                        bool *use_existent, bool use_lock)
+{
+       char            path[MAXPGPATH];
+       char            tmppath[MAXPGPATH];
+       char       *zbuffer;
+       uint32          installed_log;
+       uint32          installed_seg;
+       int                     max_advance;
+       int                     fd;
+       int                     nbytes;
+
+       XLogFilePath(path, ThisTimeLineID, log, seg);
+
+       /*
+        * Try to use existent file (checkpoint maker may have created it already)
+        */
+       if (*use_existent)
+       {
+               fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
+                                                  S_IRUSR | S_IWUSR);
+               if (fd < 0)
+               {
+                       if (errno != ENOENT)
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
+                                                               path, log, seg)));
+               }
+               else
+                       return fd;
+       }
+
+       /*
+        * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
+        * another process is doing the same thing.  If so, we will end up
+        * pre-creating an extra log segment.  That seems OK, and better than
+        * holding the lock throughout this lengthy process.
+        */
+       elog(DEBUG2, "creating and filling new WAL file");
+
+       snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
+
+       unlink(tmppath);
+
+       /* do not use get_sync_bit() here --- want to fsync only at end of fill */
+       fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
+                                          S_IRUSR | S_IWUSR);
+       if (fd < 0)
+               ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not create file \"%s\": %m", tmppath)));
 
@@ -1937,12 +2176,16 @@ XLogFileInit(uint32 log, uint32 seg,
         * fsync below) that all the indirect blocks are down on disk.  Therefore,
         * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
         * log file.
+        *
+        * Note: palloc zbuffer, instead of just using a local char array, to
+        * ensure it is reasonably well-aligned; this may save a few cycles
+        * transferring data to the kernel.
         */
-       MemSet(zbuffer, 0, sizeof(zbuffer));
-       for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
+       zbuffer = (char *) palloc0(XLOG_BLCKSZ);
+       for (nbytes = 0; nbytes < XLogSegSize; nbytes += XLOG_BLCKSZ)
        {
                errno = 0;
-               if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
+               if ((int) write(fd, zbuffer, XLOG_BLCKSZ) != (int) XLOG_BLCKSZ)
                {
                        int                     save_errno = errno;
 
@@ -1958,6 +2201,7 @@ XLogFileInit(uint32 log, uint32 seg,
                                         errmsg("could not write to file \"%s\": %m", tmppath)));
                }
        }
+       pfree(zbuffer);
 
        if (pg_fsync(fd) != 0)
                ereport(ERROR,
@@ -1988,11 +2232,13 @@ XLogFileInit(uint32 log, uint32 seg,
                unlink(tmppath);
        }
 
+       elog(DEBUG2, "done creating and filling new WAL file");
+
        /* Set flag to tell caller there was no existent file */
        *use_existent = false;
 
        /* Now open original target segment (might not be file I just made) */
-       fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
+       fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
                                           S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(ERROR,
@@ -2043,7 +2289,7 @@ XLogFileCopy(uint32 log, uint32 seg,
 
        unlink(tmppath);
 
-       /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
+       /* do not use get_sync_bit() here --- want to fsync only at end of fill */
        fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
                                           S_IRUSR | S_IWUSR);
        if (fd < 0)
@@ -2199,7 +2445,7 @@ InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
                                LWLockRelease(ControlFileLock);
                        return false;
                }
-#endif /* WIN32 */
+#endif   /* WIN32 */
 
                ereport(ERROR,
                                (errcode_for_file_access(),
@@ -2225,7 +2471,7 @@ XLogFileOpen(uint32 log, uint32 seg)
 
        XLogFilePath(path, ThisTimeLineID, log, seg);
 
-       fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
+       fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
                                           S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(PANIC,
@@ -2244,6 +2490,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
 {
        char            path[MAXPGPATH];
        char            xlogfname[MAXFNAMELEN];
+       char            activitymsg[MAXFNAMELEN + 16];
        ListCell   *cell;
        int                     fd;
 
@@ -2264,9 +2511,15 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
                if (tli < curFileTLI)
                        break;                          /* don't bother looking at too-old TLIs */
 
+               XLogFileName(xlogfname, tli, log, seg);
+
                if (InArchiveRecovery)
                {
-                       XLogFileName(xlogfname, tli, log, seg);
+                       /* Report recovery progress in PS display */
+                       snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
+                                        xlogfname);
+                       set_ps_display(activitymsg, false);
+
                        restoredFromArchive = RestoreArchivedFile(path, xlogfname,
                                                                                                          "RECOVERYXLOG",
                                                                                                          XLogSegSize);
@@ -2279,6 +2532,12 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
                {
                        /* Success! */
                        curFileTLI = tli;
+
+                       /* Report recovery progress in PS display */
+                       snprintf(activitymsg, sizeof(activitymsg), "recovering %s",
+                                        xlogfname);
+                       set_ps_display(activitymsg, false);
+
                        return fd;
                }
                if (errno != ENOENT)    /* unexpected failure? */
@@ -2307,29 +2566,17 @@ XLogFileClose(void)
        Assert(openLogFile >= 0);
 
        /*
-        * posix_fadvise is problematic on many platforms: on older x86 Linux it
-        * just dumps core, and there are reports of problems on PPC platforms as
-        * well.  The following is therefore disabled for the time being. We could
-        * consider some kind of configure test to see if it's safe to use, but
-        * since we lack hard evidence that there's any useful performance gain to
-        * be had, spending time on that seems unprofitable for now.
-        */
-#ifdef NOT_USED
-
-       /*
         * WAL segment files will not be re-read in normal operation, so we advise
-        * OS to release any cached pages.      But do not do so if WAL archiving is
-        * active, because archiver process could use the cache to read the WAL
-        * segment.
-        *
-        * While O_DIRECT works for O_SYNC, posix_fadvise() works for fsync() and
-        * O_SYNC, and some platforms only have posix_fadvise().
-        */
-#if defined(HAVE_DECL_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
-       if (!XLogArchivingActive())
-               posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
+        * the OS to release any cached pages.  But do not do so if WAL archiving
+        * is active, because archiver process could use the cache to read the WAL
+        * segment.  Also, don't bother with it if we are using O_DIRECT, since
+        * the kernel is presumably not caching in that case.
+        */
+#if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
+       if (!XLogArchivingActive() &&
+               (get_sync_bit(sync_method) & PG_O_DIRECT) == 0)
+               (void) posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
 #endif
-#endif   /* NOT_USED */
 
        if (close(openLogFile))
                ereport(PANIC,
@@ -2359,12 +2606,15 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 {
        char            xlogpath[MAXPGPATH];
        char            xlogRestoreCmd[MAXPGPATH];
+       char            lastRestartPointFname[MAXPGPATH];
        char       *dp;
        char       *endp;
        const char *sp;
        int                     rc;
        bool            signaled;
        struct stat stat_buf;
+       uint32          restartLog;
+       uint32          restartSeg;
 
        /*
         * When doing archive recovery, we always prefer an archived log file even
@@ -2412,6 +2662,35 @@ RestoreArchivedFile(char *path, const char *xlogfname,
        }
 
        /*
+        * Calculate the archive file cutoff point for use during log shipping
+        * replication. All files earlier than this point can be deleted
+        * from the archive, though there is no requirement to do so.
+        *
+        * We initialise this with the filename of an InvalidXLogRecPtr, which
+        * will prevent the deletion of any WAL files from the archive
+        * because of the alphabetic sorting property of WAL filenames.
+        *
+        * Once we have successfully located the redo pointer of the checkpoint
+        * from which we start recovery we never request a file prior to the redo
+        * pointer of the last restartpoint. When redo begins we know that we
+        * have successfully located it, so there is no need for additional
+        * status flags to signify the point when we can begin deleting WAL files
+        * from the archive.
+        */
+       if (InRedo)
+       {
+               XLByteToSeg(ControlFile->checkPointCopy.redo,
+                                       restartLog, restartSeg);
+               XLogFileName(lastRestartPointFname,
+                                        ControlFile->checkPointCopy.ThisTimeLineID,
+                                        restartLog, restartSeg);
+               /* we shouldn't need anything earlier than last restart point */
+               Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
+       }
+       else
+               XLogFileName(lastRestartPointFname, 0, 0, 0);
+
+       /*
         * construct the command to be executed
         */
        dp = xlogRestoreCmd;
@@ -2437,6 +2716,12 @@ RestoreArchivedFile(char *path, const char *xlogfname,
                                        StrNCpy(dp, xlogfname, endp - dp);
                                        dp += strlen(dp);
                                        break;
+                               case 'r':
+                                       /* %r: filename of last restartpoint */
+                                       sp++;
+                                       StrNCpy(dp, lastRestartPointFname, endp - dp);
+                                       dp += strlen(dp);
+                                       break;
                                case '%':
                                        /* convert %% to a single % */
                                        sp++;
@@ -2463,9 +2748,22 @@ RestoreArchivedFile(char *path, const char *xlogfname,
                                                         xlogRestoreCmd)));
 
        /*
+        * Set in_restore_command to tell the signal handler that we should exit
+        * right away on SIGTERM. We know that we're in a safe point to do that.
+        * Check if we had already received the signal, so that we don't miss a
+        * shutdown request received just before this.
+        */
+       in_restore_command = true;
+       if (shutdown_requested)
+               proc_exit(1);
+
+       /*
         * Copy xlog from archival storage to XLOGDIR
         */
        rc = system(xlogRestoreCmd);
+
+       in_restore_command = false;
+
        if (rc == 0)
        {
                /*
@@ -2514,18 +2812,28 @@ RestoreArchivedFile(char *path, const char *xlogfname,
         * incorrectly.  We have to assume the former.
         *
         * However, if the failure was due to any sort of signal, it's best to
-        * punt and abort recovery.  (If we "return false" here, upper levels
-        * will assume that recovery is complete and start up the database!)
-        * It's essential to abort on child SIGINT and SIGQUIT, because per spec
+        * punt and abort recovery.  (If we "return false" here, upper levels will
+        * assume that recovery is complete and start up the database!) It's
+        * essential to abort on child SIGINT and SIGQUIT, because per spec
         * system() ignores SIGINT and SIGQUIT while waiting; if we see one of
-        * those it's a good bet we should have gotten it too.  Aborting on other
-        * signals such as SIGTERM seems a good idea as well.
+        * those it's a good bet we should have gotten it too.
+        *
+        * On SIGTERM, assume we have received a fast shutdown request, and exit
+        * cleanly. It's pure chance whether we receive the SIGTERM first, or the
+        * child process. If we receive it first, the signal handler will call
+        * proc_exit, otherwise we do it here. If we or the child process
+        * received SIGTERM for any other reason than a fast shutdown request,
+        * postmaster will perform an immediate shutdown when it sees us exiting
+        * unexpectedly.
         *
-        * Per the Single Unix Spec, shells report exit status > 128 when
-        * a called command died on a signal.  Also, 126 and 127 are used to
-        * report problems such as an unfindable command; treat those as fatal
-        * errors too.
+        * Per the Single Unix Spec, shells report exit status > 128 when a called
+        * command died on a signal.  Also, 126 and 127 are used to report
+        * problems such as an unfindable command; treat those as fatal errors
+        * too.
         */
+       if (WTERMSIG(rc) == SIGTERM)
+               proc_exit(1);
+
        signaled = WIFSIGNALED(rc) || WEXITSTATUS(rc) > 125;
 
        ereport(signaled ? FATAL : DEBUG2,
@@ -2544,13 +2852,126 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 }
 
 /*
- * Preallocate log files beyond the specified log endpoint, according to
- * the XLOGfile user parameter.
+ * Attempt to execute the recovery_end_command.
  */
-static int
+static void
+ExecuteRecoveryEndCommand(void)
+{
+       char            xlogRecoveryEndCmd[MAXPGPATH];
+       char            lastRestartPointFname[MAXPGPATH];
+       char       *dp;
+       char       *endp;
+       const char *sp;
+       int                     rc;
+       bool            signaled;
+       uint32          restartLog;
+       uint32          restartSeg;
+
+       Assert(recoveryEndCommand);
+
+       /*
+        * Calculate the archive file cutoff point for use during log shipping
+        * replication. All files earlier than this point can be deleted
+        * from the archive, though there is no requirement to do so.
+        *
+        * We initialise this with the filename of an InvalidXLogRecPtr, which
+        * will prevent the deletion of any WAL files from the archive
+        * because of the alphabetic sorting property of WAL filenames. 
+        *
+        * Once we have successfully located the redo pointer of the checkpoint
+        * from which we start recovery we never request a file prior to the redo
+        * pointer of the last restartpoint. When redo begins we know that we
+        * have successfully located it, so there is no need for additional
+        * status flags to signify the point when we can begin deleting WAL files
+        * from the archive. 
+        */
+       if (InRedo)
+       {
+               XLByteToSeg(ControlFile->checkPointCopy.redo,
+                                       restartLog, restartSeg);
+               XLogFileName(lastRestartPointFname,
+                                        ControlFile->checkPointCopy.ThisTimeLineID,
+                                        restartLog, restartSeg);
+       }
+       else
+               XLogFileName(lastRestartPointFname, 0, 0, 0);
+
+       /*
+        * construct the command to be executed
+        */
+       dp = xlogRecoveryEndCmd;
+       endp = xlogRecoveryEndCmd + MAXPGPATH - 1;
+       *endp = '\0';
+
+       for (sp = recoveryEndCommand; *sp; sp++)
+       {
+               if (*sp == '%')
+               {
+                       switch (sp[1])
+                       {
+                               case 'r':
+                                       /* %r: filename of last restartpoint */
+                                       sp++;
+                                       StrNCpy(dp, lastRestartPointFname, endp - dp);
+                                       dp += strlen(dp);
+                                       break;
+                               case '%':
+                                       /* convert %% to a single % */
+                                       sp++;
+                                       if (dp < endp)
+                                               *dp++ = *sp;
+                                       break;
+                               default:
+                                       /* otherwise treat the % as not special */
+                                       if (dp < endp)
+                                               *dp++ = *sp;
+                                       break;
+                       }
+               }
+               else
+               {
+                       if (dp < endp)
+                               *dp++ = *sp;
+               }
+       }
+       *dp = '\0';
+
+       ereport(DEBUG3,
+                       (errmsg_internal("executing recovery end command \"%s\"",
+                                                        xlogRecoveryEndCmd)));
+
+       /*
+        * Copy xlog from archival storage to XLOGDIR
+        */
+       rc = system(xlogRecoveryEndCmd);
+       if (rc != 0)
+       {
+               /*
+                * If the failure was due to any sort of signal, it's best to punt and
+                * abort recovery. See also detailed comments on signals in 
+                * RestoreArchivedFile().
+                */
+               signaled = WIFSIGNALED(rc) || WEXITSTATUS(rc) > 125;
+
+               ereport(signaled ? FATAL : WARNING,
+                               (errmsg("recovery_end_command \"%s\": return code %d",
+                                                               xlogRecoveryEndCmd, rc)));
+       }
+}
+
+/*
+ * Preallocate log files beyond the specified log endpoint.
+ *
+ * XXX this is currently extremely conservative, since it forces only one
+ * future log segment to exist, and even that only if we are 75% done with
+ * the current one.  This is only appropriate for very low-WAL-volume systems.
+ * High-volume systems will be OK once they've built up a sufficient set of
+ * recycled log segments, but the startup transient is likely to include
+ * a lot of segment creations by foreground processes, which is not so good.
+ */
+static void
 PreallocXlogFiles(XLogRecPtr endptr)
 {
-       int                     nsegsadded = 0;
        uint32          _logId;
        uint32          _logSeg;
        int                     lf;
@@ -2565,20 +2986,18 @@ PreallocXlogFiles(XLogRecPtr endptr)
                lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
                close(lf);
                if (!use_existent)
-                       nsegsadded++;
+                       CheckpointStats.ckpt_segs_added++;
        }
-       return nsegsadded;
 }
 
 /*
- * Remove or move offline all log files older or equal to passed log/seg#
+ * Recycle or remove all log files older or equal to passed log/seg#
  *
  * endptr is current (or recent) end of xlog; this is used to determine
  * whether we want to recycle rather than delete no-longer-wanted log files.
  */
 static void
-MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
-                               int *nsegsremoved, int *nsegsrecycled)
+RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
 {
        uint32          endlogId;
        uint32          endlogSeg;
@@ -2588,9 +3007,6 @@ MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
        char            lastoff[MAXFNAMELEN];
        char            path[MAXPGPATH];
 
-       *nsegsremoved = 0;
-       *nsegsrecycled = 0;
-
        /*
         * Initialize info about where to try to recycle to.  We allow recycling
         * segments up to XLOGfileslop segments beyond the current XLOG location.
@@ -2639,7 +3055,7 @@ MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
                                        ereport(DEBUG2,
                                                        (errmsg("recycled transaction log file \"%s\"",
                                                                        xlde->d_name)));
-                                       (*nsegsrecycled)++;
+                                       CheckpointStats.ckpt_segs_recycled++;
                                        /* Needn't recheck that slot on future iterations */
                                        if (max_advance > 0)
                                        {
@@ -2654,7 +3070,7 @@ MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
                                                        (errmsg("removing transaction log file \"%s\"",
                                                                        xlde->d_name)));
                                        unlink(path);
-                                       (*nsegsremoved)++;
+                                       CheckpointStats.ckpt_segs_removed++;
                                }
 
                                XLogArchiveCleanup(xlde->d_name);
@@ -2666,6 +3082,53 @@ MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
 }
 
 /*
+ * Verify whether pg_xlog and pg_xlog/archive_status exist.
+ * If the latter does not exist, recreate it.
+ *
+ * It is not the goal of this function to verify the contents of these
+ * directories, but to help in cases where someone has performed a cluster
+ * copy for PITR purposes but omitted pg_xlog from the copy.
+ *
+ * We could also recreate pg_xlog if it doesn't exist, but a deliberate
+ * policy decision was made not to.  It is fairly common for pg_xlog to be
+ * a symlink, and if that was the DBA's intent then automatically making a
+ * plain directory would result in degraded performance with no notice.
+ */
+static void
+ValidateXLOGDirectoryStructure(void)
+{
+       char            path[MAXPGPATH];
+       struct stat     stat_buf;
+
+       /* Check for pg_xlog; if it doesn't exist, error out */
+       if (stat(XLOGDIR, &stat_buf) != 0 ||
+               !S_ISDIR(stat_buf.st_mode))
+               ereport(FATAL, 
+                               (errmsg("required WAL directory \"%s\" does not exist",
+                                               XLOGDIR)));
+
+       /* Check for archive_status */
+       snprintf(path, MAXPGPATH, XLOGDIR "/archive_status");
+       if (stat(path, &stat_buf) == 0)
+       {
+               /* Check for weird cases where it exists but isn't a directory */
+               if (!S_ISDIR(stat_buf.st_mode))
+                       ereport(FATAL, 
+                                       (errmsg("required WAL directory \"%s\" does not exist",
+                                                       path)));
+       }
+       else
+       {
+               ereport(LOG,
+                               (errmsg("creating missing WAL directory \"%s\"", path)));
+               if (mkdir(path, 0700) < 0)
+                       ereport(FATAL, 
+                                       (errmsg("could not create missing directory \"%s\": %m",
+                                                       path)));
+       }
+}
+
+/*
  * Remove previous backup history files.  This also retries creation of
  * .ready files for any backup history files for which XLogArchiveNotify
  * failed earlier.
@@ -2719,17 +3182,25 @@ CleanupBackupHistory(void)
  * page might not be.  This will force us to replay all subsequent
  * modifications of the page that appear in XLOG, rather than possibly
  * ignoring them as already applied, but that's not a huge drawback.
+ *
+ * If 'cleanup' is true, a cleanup lock is used when restoring blocks.
+ * Otherwise, a normal exclusive lock is used.  At the moment, that's just
+ * pro forma, because there can't be any regular backends in the system
+ * during recovery.  The 'cleanup' argument applies to all backup blocks
+ * in the WAL record, that suffices for now.
  */
-static void
-RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
+void
+RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
 {
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        BkpBlock        bkpb;
        char       *blk;
        int                     i;
 
+       if (!(record->xl_info & XLR_BKP_BLOCK_MASK))
+               return;
+
        blk = (char *) XLogRecGetData(record) + record->xl_len;
        for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
        {
@@ -2739,9 +3210,14 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
                memcpy(&bkpb, blk, sizeof(BkpBlock));
                blk += sizeof(BkpBlock);
 
-               reln = XLogOpenRelation(bkpb.node);
-               buffer = XLogReadBuffer(reln, bkpb.block, true);
+               buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
+                                                                               RBM_ZERO);
                Assert(BufferIsValid(buffer));
+               if (cleanup)
+                       LockBufferForCleanup(buffer);
+               else
+                       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
                page = (Page) BufferGetPage(buffer);
 
                if (bkpb.hole_length == 0)
@@ -2961,7 +3437,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
                {
                        ereport(emode,
                                        (errcode_for_file_access(),
-                                        errmsg("could not read from log file %u, segment %u at offset %u: %m",
+                                        errmsg("could not read from log file %u, segment %u, offset %u: %m",
                                                        readId, readSeg, readOff)));
                        goto next_record_is_invalid;
                }
@@ -3211,8 +3687,11 @@ got_record:;
        return (XLogRecord *) buffer;
 
 next_record_is_invalid:;
-       close(readFile);
-       readFile = -1;
+       if (readFile >= 0)
+       {
+               close(readFile);
+               readFile = -1;
+       }
        nextRecord = NULL;
        return NULL;
 }
@@ -3374,7 +3853,7 @@ readTimeLineHistory(TimeLineID targetTLI)
        /*
         * Parse the file...
         */
-       while (fgets(fline, MAXPGPATH, fd) != NULL)
+       while (fgets(fline, sizeof(fline), fd) != NULL)
        {
                /* skip leading whitespace and check for # comment */
                char       *ptr;
@@ -3527,7 +4006,7 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
 
        unlink(tmppath);
 
-       /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
+       /* do not use get_sync_bit() here --- want to fsync only at end of fill */
        fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
                                           S_IRUSR | S_IWUSR);
        if (fd < 0)
@@ -3606,7 +4085,7 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
                         xlogfname,
                         recoveryStopAfter ? "after" : "before",
                         recoveryStopXid,
-                        str_time(recoveryStopTime));
+                        timestamptz_to_str(recoveryStopTime));
 
        nbytes = strlen(buffer);
        errno = 0;
@@ -3686,7 +4165,6 @@ WriteControlFile(void)
 {
        int                     fd;
        char            buffer[PG_CONTROL_SIZE];                /* need not be aligned */
-       char       *localeptr;
 
        /*
         * Initialize version and compatibility-check fields
@@ -3705,23 +4183,15 @@ WriteControlFile(void)
        ControlFile->nameDataLen = NAMEDATALEN;
        ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
 
+       ControlFile->toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE;
+
 #ifdef HAVE_INT64_TIMESTAMP
-       ControlFile->enableIntTimes = TRUE;
+       ControlFile->enableIntTimes = true;
 #else
-       ControlFile->enableIntTimes = FALSE;
+       ControlFile->enableIntTimes = false;
 #endif
-
-       ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
-       localeptr = setlocale(LC_COLLATE, NULL);
-       if (!localeptr)
-               ereport(PANIC,
-                               (errmsg("invalid LC_COLLATE setting")));
-       StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
-       localeptr = setlocale(LC_CTYPE, NULL);
-       if (!localeptr)
-               ereport(PANIC,
-                               (errmsg("invalid LC_CTYPE setting")));
-       StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
+       ControlFile->float4ByVal = FLOAT4PASSBYVAL;
+       ControlFile->float8ByVal = FLOAT8PASSBYVAL;
 
        /* Contents are protected with a CRC */
        INIT_CRC32(ControlFile->crc);
@@ -3805,6 +4275,16 @@ ReadControlFile(void)
         * of bytes.  Complaining about wrong version will probably be more
         * enlightening than complaining about wrong CRC.
         */
+
+       if (ControlFile->pg_control_version != PG_CONTROL_VERSION && ControlFile->pg_control_version % 65536 == 0 && ControlFile->pg_control_version / 65536 != 0)
+               ereport(FATAL,
+                               (errmsg("database files are incompatible with server"),
+                                errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d (0x%08x),"
+                                                  " but the server was compiled with PG_CONTROL_VERSION %d (0x%08x).",
+                                                  ControlFile->pg_control_version, ControlFile->pg_control_version,
+                                                  PG_CONTROL_VERSION, PG_CONTROL_VERSION),
+                                errhint("This could be a problem of mismatched byte ordering.  It looks like you need to initdb.")));
+
        if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
                ereport(FATAL,
                                (errmsg("database files are incompatible with server"),
@@ -3812,6 +4292,7 @@ ReadControlFile(void)
                                  " but the server was compiled with PG_CONTROL_VERSION %d.",
                                                ControlFile->pg_control_version, PG_CONTROL_VERSION),
                                 errhint("It looks like you need to initdb.")));
+
        /* Now check the CRC. */
        INIT_CRC32(crc);
        COMP_CRC32(crc,
@@ -3824,15 +4305,9 @@ ReadControlFile(void)
                                (errmsg("incorrect checksum in control file")));
 
        /*
-        * Do compatibility checking immediately.  We do this here for 2 reasons:
-        *
-        * (1) if the database isn't compatible with the backend executable, we
-        * want to abort before we can possibly do any damage;
-        *
-        * (2) this code is executed in the postmaster, so the setlocale() will
-        * propagate to forked backends, which aren't going to read this file for
-        * themselves.  (These locale settings are considered critical
-        * compatibility items because they can affect sort order of indexes.)
+        * Do compatibility checking immediately.  If the database isn't
+        * compatible with the backend executable, we want to abort before we
+        * can possibly do any damage.
         */
        if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
                ereport(FATAL,
@@ -3895,16 +4370,23 @@ ReadControlFile(void)
                                          " but the server was compiled with INDEX_MAX_KEYS %d.",
                                                   ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
                                 errhint("It looks like you need to recompile or initdb.")));
+       if (ControlFile->toast_max_chunk_size != TOAST_MAX_CHUNK_SIZE)
+               ereport(FATAL,
+                               (errmsg("database files are incompatible with server"),
+                                errdetail("The database cluster was initialized with TOAST_MAX_CHUNK_SIZE %d,"
+                               " but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.",
+                         ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE),
+                                errhint("It looks like you need to recompile or initdb.")));
 
 #ifdef HAVE_INT64_TIMESTAMP
-       if (ControlFile->enableIntTimes != TRUE)
+       if (ControlFile->enableIntTimes != true)
                ereport(FATAL,
                                (errmsg("database files are incompatible with server"),
                                 errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
                                  " but the server was compiled with HAVE_INT64_TIMESTAMP."),
                                 errhint("It looks like you need to recompile or initdb.")));
 #else
-       if (ControlFile->enableIntTimes != FALSE)
+       if (ControlFile->enableIntTimes != false)
                ereport(FATAL,
                                (errmsg("database files are incompatible with server"),
                                 errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
@@ -3912,33 +4394,37 @@ ReadControlFile(void)
                                 errhint("It looks like you need to recompile or initdb.")));
 #endif
 
-       if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
+#ifdef USE_FLOAT4_BYVAL
+       if (ControlFile->float4ByVal != true)
+               ereport(FATAL,
+                               (errmsg("database files are incompatible with server"),
+                                errdetail("The database cluster was initialized without USE_FLOAT4_BYVAL"
+                                                  " but the server was compiled with USE_FLOAT4_BYVAL."),
+                                errhint("It looks like you need to recompile or initdb.")));
+#else
+       if (ControlFile->float4ByVal != false)
                ereport(FATAL,
                                (errmsg("database files are incompatible with server"),
-                                errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
-                                 " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
-                                                  ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
+                                errdetail("The database cluster was initialized with USE_FLOAT4_BYVAL"
+                                                  " but the server was compiled without USE_FLOAT4_BYVAL."),
                                 errhint("It looks like you need to recompile or initdb.")));
-       if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
+#endif
+
+#ifdef USE_FLOAT8_BYVAL
+       if (ControlFile->float8ByVal != true)
                ereport(FATAL,
-                       (errmsg("database files are incompatible with operating system"),
-                        errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
-                                          " which is not recognized by setlocale().",
-                                          ControlFile->lc_collate),
-                        errhint("It looks like you need to initdb or install locale support.")));
-       if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
+                               (errmsg("database files are incompatible with server"),
+                                errdetail("The database cluster was initialized without USE_FLOAT8_BYVAL"
+                                                  " but the server was compiled with USE_FLOAT8_BYVAL."),
+                                errhint("It looks like you need to recompile or initdb.")));
+#else
+       if (ControlFile->float8ByVal != false)
                ereport(FATAL,
-                       (errmsg("database files are incompatible with operating system"),
-               errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
-                                 " which is not recognized by setlocale().",
-                                 ControlFile->lc_ctype),
-                        errhint("It looks like you need to initdb or install locale support.")));
-
-       /* Make the fixed locale settings visible as GUC variables, too */
-       SetConfigOption("lc_collate", ControlFile->lc_collate,
-                                       PGC_INTERNAL, PGC_S_OVERRIDE);
-       SetConfigOption("lc_ctype", ControlFile->lc_ctype,
-                                       PGC_INTERNAL, PGC_S_OVERRIDE);
+                               (errmsg("database files are incompatible with server"),
+                                errdetail("The database cluster was initialized with USE_FLOAT8_BYVAL"
+                                                  " but the server was compiled without USE_FLOAT8_BYVAL."),
+                                errhint("It looks like you need to recompile or initdb.")));
+#endif
 }
 
 void
@@ -4051,8 +4537,6 @@ XLOGShmemInit(void)
         * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
         * in additional info.)
         */
-       XLogCtl->XLogCacheByte = (Size) XLOG_BLCKSZ *XLOGbuffers;
-
        XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
        XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
        SpinLockInit(&XLogCtl->info_lck);
@@ -4110,14 +4594,13 @@ BootStrapXLOG(void)
        /* Set up information for the initial checkpoint record */
        checkPoint.redo.xlogid = 0;
        checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
-       checkPoint.undo = checkPoint.redo;
        checkPoint.ThisTimeLineID = ThisTimeLineID;
        checkPoint.nextXidEpoch = 0;
        checkPoint.nextXid = FirstNormalTransactionId;
        checkPoint.nextOid = FirstBootstrapObjectId;
        checkPoint.nextMulti = FirstMultiXactId;
        checkPoint.nextMultiOffset = 0;
-       checkPoint.time = time(NULL);
+       checkPoint.time = (pg_time_t) time(NULL);
 
        ShmemVariableCache->nextXid = checkPoint.nextXid;
        ShmemVariableCache->nextOid = checkPoint.nextOid;
@@ -4203,13 +4686,13 @@ BootStrapXLOG(void)
 }
 
 static char *
-str_time(time_t tnow)
+str_time(pg_time_t tnow)
 {
        static char buf[128];
 
-       strftime(buf, sizeof(buf),
-                        "%Y-%m-%d %H:%M:%S %Z",
-                        localtime(&tnow));
+       pg_strftime(buf, sizeof(buf),
+                               "%Y-%m-%d %H:%M:%S %Z",
+                               pg_localtime(&tnow, log_timezone));
 
        return buf;
 }
@@ -4248,7 +4731,7 @@ readRecoveryCommandFile(void)
        /*
         * Parse the file...
         */
-       while (fgets(cmdline, MAXPGPATH, fd) != NULL)
+       while (fgets(cmdline, sizeof(cmdline), fd) != NULL)
        {
                /* skip leading whitespace and check for # comment */
                char       *ptr;
@@ -4288,9 +4771,16 @@ readRecoveryCommandFile(void)
                {
                        recoveryRestoreCommand = pstrdup(tok2);
                        ereport(LOG,
-                                       (errmsg("restore_command = \"%s\"",
+                                       (errmsg("restore_command = '%s'",
                                                        recoveryRestoreCommand)));
                }
+               else if (strcmp(tok1, "recovery_end_command") == 0)
+               {
+                       recoveryEndCommand = pstrdup(tok2);
+                       ereport(LOG,
+                                       (errmsg("recovery_end_command = '%s'",
+                                                       recoveryEndCommand)));
+               }
                else if (strcmp(tok1, "recovery_target_timeline") == 0)
                {
                        rtliGiven = true;
@@ -4338,30 +4828,26 @@ readRecoveryCommandFile(void)
                        recoveryTargetExact = false;
 
                        /*
-                        * Convert the time string given by the user to the time_t format.
-                        * We use type abstime's input converter because we know abstime
-                        * has the same representation as time_t.
+                        * Convert the time string given by the user to TimestampTz form.
                         */
-                       recoveryTargetTime = (time_t)
-                               DatumGetAbsoluteTime(DirectFunctionCall1(abstimein,
-                                                                                                        CStringGetDatum(tok2)));
+                       recoveryTargetTime =
+                               DatumGetTimestampTz(DirectFunctionCall3(timestamptz_in,
+                                                                                                               CStringGetDatum(tok2),
+                                                                                               ObjectIdGetDatum(InvalidOid),
+                                                                                                               Int32GetDatum(-1)));
                        ereport(LOG,
-                                       (errmsg("recovery_target_time = %s",
-                                                       DatumGetCString(DirectFunctionCall1(abstimeout,
-                               AbsoluteTimeGetDatum((AbsoluteTime) recoveryTargetTime))))));
+                                       (errmsg("recovery_target_time = '%s'",
+                                                       timestamptz_to_str(recoveryTargetTime))));
                }
                else if (strcmp(tok1, "recovery_target_inclusive") == 0)
                {
                        /*
                         * does nothing if a recovery_target is not also set
                         */
-                       if (strcmp(tok2, "true") == 0)
-                               recoveryTargetInclusive = true;
-                       else
-                       {
-                               recoveryTargetInclusive = false;
-                               tok2 = "false";
-                       }
+                       if (!parse_bool(tok2, &recoveryTargetInclusive))
+                                 ereport(ERROR,
+                                                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                         errmsg("parameter \"recovery_target_inclusive\" requires a Boolean value")));
                        ereport(LOG,
                                        (errmsg("recovery_target_inclusive = %s", tok2)));
                }
@@ -4401,7 +4887,7 @@ readRecoveryCommandFile(void)
                        /* Timeline 1 does not have a history file, all else should */
                        if (rtli != 1 && !existsTimeLineHistory(rtli))
                                ereport(FATAL,
-                                               (errmsg("recovery_target_timeline %u does not exist",
+                                               (errmsg("recovery target timeline %u does not exist",
                                                                rtli)));
                        recoveryTargetTLI = rtli;
                }
@@ -4450,7 +4936,8 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
         *
         * Note that if we are establishing a new timeline, ThisTimeLineID is
         * already set to the new value, and so we will create a new file instead
-        * of overwriting any existing file.
+        * of overwriting any existing file.  (This is, in fact, always the case
+        * at present.)
         */
        snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
        XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
@@ -4480,10 +4967,22 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
                 * If we are establishing a new timeline, we have to copy data from
                 * the last WAL segment of the old timeline to create a starting WAL
                 * segment for the new timeline.
+                *
+                * Notify the archiver that the last WAL segment of the old timeline
+                * is ready to copy to archival storage. Otherwise, it is not archived
+                * for a while.
                 */
                if (endTLI != ThisTimeLineID)
+               {
                        XLogFileCopy(endLogId, endLogSeg,
                                                 endTLI, endLogId, endLogSeg);
+
+                       if (XLogArchivingActive())
+                       {
+                               XLogFileName(xlogpath, endTLI, endLogId, endLogSeg);
+                               XLogArchiveNotify(xlogpath);
+                       }
+               }
        }
 
        /*
@@ -4518,6 +5017,9 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
  *
  * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
  * *includeThis is set TRUE if we should apply this record before stopping.
+ *
+ * We also track the timestamp of the latest applied COMMIT/ABORT record
+ * in recoveryLastXTime, for logging purposes.
  * Also, some information is saved in recoveryStopXid et al for use in
  * annotating the new timeline's history file.
  */
@@ -4526,11 +5028,7 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
 {
        bool            stopsHere;
        uint8           record_info;
-       time_t          recordXtime;
-
-       /* Do we have a PITR target at all? */
-       if (!recoveryTarget)
-               return false;
+       TimestampTz recordXtime;
 
        /* We only consider stopping at COMMIT or ABORT records */
        if (record->xl_rmid != RM_XACT_ID)
@@ -4541,18 +5039,25 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
                xl_xact_commit *recordXactCommitData;
 
                recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
-               recordXtime = recordXactCommitData->xtime;
+               recordXtime = recordXactCommitData->xact_time;
        }
        else if (record_info == XLOG_XACT_ABORT)
        {
                xl_xact_abort *recordXactAbortData;
 
                recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
-               recordXtime = recordXactAbortData->xtime;
+               recordXtime = recordXactAbortData->xact_time;
        }
        else
                return false;
 
+       /* Do we have a PITR target at all? */
+       if (!recoveryTarget)
+       {
+               recoveryLastXTime = recordXtime;
+               return false;
+       }
+
        if (recoveryTargetExact)
        {
                /*
@@ -4594,24 +5099,33 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
                        if (recoveryStopAfter)
                                ereport(LOG,
                                                (errmsg("recovery stopping after commit of transaction %u, time %s",
-                                                         recoveryStopXid, str_time(recoveryStopTime))));
+                                                               recoveryStopXid,
+                                                               timestamptz_to_str(recoveryStopTime))));
                        else
                                ereport(LOG,
                                                (errmsg("recovery stopping before commit of transaction %u, time %s",
-                                                         recoveryStopXid, str_time(recoveryStopTime))));
+                                                               recoveryStopXid,
+                                                               timestamptz_to_str(recoveryStopTime))));
                }
                else
                {
                        if (recoveryStopAfter)
                                ereport(LOG,
                                                (errmsg("recovery stopping after abort of transaction %u, time %s",
-                                                         recoveryStopXid, str_time(recoveryStopTime))));
+                                                               recoveryStopXid,
+                                                               timestamptz_to_str(recoveryStopTime))));
                        else
                                ereport(LOG,
                                                (errmsg("recovery stopping before abort of transaction %u, time %s",
-                                                         recoveryStopXid, str_time(recoveryStopTime))));
+                                                               recoveryStopXid,
+                                                               timestamptz_to_str(recoveryStopTime))));
                }
+
+               if (recoveryStopAfter)
+                       recoveryLastXTime = recordXtime;
        }
+       else
+               recoveryLastXTime = recordXtime;
 
        return stopsHere;
 }
@@ -4625,12 +5139,12 @@ StartupXLOG(void)
        XLogCtlInsert *Insert;
        CheckPoint      checkPoint;
        bool            wasShutdown;
-       bool            needNewTimeLine = false;
+       bool            reachedStopPoint = false;
        bool            haveBackupLabel = false;
        XLogRecPtr      RecPtr,
                                LastRec,
                                checkPointLoc,
-                               minRecoveryLoc,
+                               backupStopLoc,
                                EndOfLog;
        uint32          endLogId;
        uint32          endLogSeg;
@@ -4638,6 +5152,8 @@ StartupXLOG(void)
        uint32          freespace;
        TransactionId oldestActiveXID;
 
+       XLogCtl->SharedRecoveryInProgress = true;
+
        /*
         * Read control file and check XLOG status looks valid.
         *
@@ -4670,12 +5186,12 @@ StartupXLOG(void)
                ereport(LOG,
                                (errmsg("database system was interrupted while in recovery at log time %s",
                                                str_time(ControlFile->checkPointCopy.time)),
-                                errhint("If this has occurred more than once some data may be corrupted"
-                               " and you may need to choose an earlier recovery target.")));
+                                errhint("If this has occurred more than once some data might be corrupted"
+                         " and you might need to choose an earlier recovery target.")));
        else if (ControlFile->state == DB_IN_PRODUCTION)
                ereport(LOG,
-                               (errmsg("database system was interrupted; last known up at %s",
-                                               str_time(ControlFile->time))));
+                         (errmsg("database system was interrupted; last known up at %s",
+                                         str_time(ControlFile->time))));
 
        /* This is just to allow attaching to startup process with a debugger */
 #ifdef XLOG_REPLAY_DELAY
@@ -4684,6 +5200,13 @@ StartupXLOG(void)
 #endif
 
        /*
+        * Verify that pg_xlog and pg_xlog/archive_status exist.  In cases where
+        * someone has performed a copy for PITR, these directories may have
+        * been excluded and need to be re-created.
+        */
+       ValidateXLOGDirectoryStructure();
+
+       /*
         * Initialize on the assumption we want to recover to the same timeline
         * that's active according to pg_control.
         */
@@ -4710,7 +5233,7 @@ StartupXLOG(void)
                                                recoveryTargetTLI,
                                                ControlFile->checkPointCopy.ThisTimeLineID)));
 
-       if (read_backup_label(&checkPointLoc, &minRecoveryLoc))
+       if (read_backup_label(&checkPointLoc, &backupStopLoc))
        {
                /*
                 * When a backup_label file is present, we want to roll forward from
@@ -4719,7 +5242,7 @@ StartupXLOG(void)
                record = ReadCheckpointRecord(checkPointLoc, 0);
                if (record != NULL)
                {
-                       ereport(LOG,
+                       ereport(DEBUG1,
                                        (errmsg("checkpoint record is at %X/%X",
                                                        checkPointLoc.xlogid, checkPointLoc.xrecoff)));
                        InRecovery = true;      /* force recovery even if SHUTDOWNED */
@@ -4743,7 +5266,7 @@ StartupXLOG(void)
                record = ReadCheckpointRecord(checkPointLoc, 1);
                if (record != NULL)
                {
-                       ereport(LOG,
+                       ereport(DEBUG1,
                                        (errmsg("checkpoint record is at %X/%X",
                                                        checkPointLoc.xlogid, checkPointLoc.xrecoff)));
                }
@@ -4768,16 +5291,15 @@ StartupXLOG(void)
        memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
        wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
 
-       ereport(LOG,
-        (errmsg("redo record is at %X/%X; undo record is at %X/%X; shutdown %s",
-                        checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
-                        checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
-                        wasShutdown ? "TRUE" : "FALSE")));
-       ereport(LOG,
+       ereport(DEBUG1,
+                       (errmsg("redo record is at %X/%X; shutdown %s",
+                                       checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
+                                       wasShutdown ? "TRUE" : "FALSE")));
+       ereport(DEBUG1,
                        (errmsg("next transaction ID: %u/%u; next OID: %u",
                                        checkPoint.nextXidEpoch, checkPoint.nextXid,
                                        checkPoint.nextOid)));
-       ereport(LOG,
+       ereport(DEBUG1,
                        (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
                                        checkPoint.nextMulti, checkPoint.nextMultiOffset)));
        if (!TransactionIdIsNormal(checkPoint.nextXid))
@@ -4801,20 +5323,17 @@ StartupXLOG(void)
        if (XLByteLT(RecPtr, checkPoint.redo))
                ereport(PANIC,
                                (errmsg("invalid redo in checkpoint record")));
-       if (checkPoint.undo.xrecoff == 0)
-               checkPoint.undo = RecPtr;
 
        /*
         * Check whether we need to force recovery from WAL.  If it appears to
         * have been a clean shutdown and we did not have a recovery.conf file,
         * then assume no recovery needed.
         */
-       if (XLByteLT(checkPoint.undo, RecPtr) ||
-               XLByteLT(checkPoint.redo, RecPtr))
+       if (XLByteLT(checkPoint.redo, RecPtr))
        {
                if (wasShutdown)
                        ereport(PANIC,
-                               (errmsg("invalid redo/undo record in shutdown checkpoint")));
+                                       (errmsg("invalid redo record in shutdown checkpoint")));
                InRecovery = true;
        }
        else if (ControlFile->state != DB_SHUTDOWNED)
@@ -4852,11 +5371,23 @@ StartupXLOG(void)
                ControlFile->prevCheckPoint = ControlFile->checkPoint;
                ControlFile->checkPoint = checkPointLoc;
                ControlFile->checkPointCopy = checkPoint;
-               if (minRecoveryLoc.xlogid != 0 || minRecoveryLoc.xrecoff != 0)
-                       ControlFile->minRecoveryPoint = minRecoveryLoc;
-               ControlFile->time = time(NULL);
+               if (backupStopLoc.xlogid != 0 || backupStopLoc.xrecoff != 0)
+               {
+                       if (XLByteLT(ControlFile->minRecoveryPoint, backupStopLoc))
+                               ControlFile->minRecoveryPoint = backupStopLoc;
+               }
+               ControlFile->time = (pg_time_t) time(NULL);
+               /* No need to hold ControlFileLock yet, we aren't up far enough */
                UpdateControlFile();
 
+               /* update our local copy of minRecoveryPoint */
+               minRecoveryPoint = ControlFile->minRecoveryPoint;
+
+               /*
+                * Reset pgstat data, because it may be invalid after recovery.
+                */
+               pgstat_reset_all();
+
                /*
                 * If there was a backup label file, it's done its job and the info
                 * has now been propagated into pg_control.  We must get rid of the
@@ -4875,9 +5406,7 @@ StartupXLOG(void)
                                                                BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
                }
 
-               /* Start up the recovery environment */
-               XLogInitRelationCache();
-
+               /* Initialize resource managers */
                for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
                {
                        if (RmgrTable[rmid].rm_startup != NULL)
@@ -4903,12 +5432,41 @@ StartupXLOG(void)
                {
                        bool            recoveryContinue = true;
                        bool            recoveryApply = true;
+                       bool            reachedMinRecoveryPoint = false;
                        ErrorContextCallback errcontext;
+                       /* use volatile pointer to prevent code rearrangement */
+                       volatile XLogCtlData *xlogctl = XLogCtl;
+
+                       /* Update shared replayEndRecPtr */
+                       SpinLockAcquire(&xlogctl->info_lck);
+                       xlogctl->replayEndRecPtr = ReadRecPtr;
+                       SpinLockRelease(&xlogctl->info_lck);
 
                        InRedo = true;
-                       ereport(LOG,
-                                       (errmsg("redo starts at %X/%X",
-                                                       ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
+
+                       if (minRecoveryPoint.xlogid == 0 && minRecoveryPoint.xrecoff == 0)
+                               ereport(LOG,
+                                               (errmsg("redo starts at %X/%X",
+                                                               ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
+                       else
+                               ereport(LOG,
+                                               (errmsg("redo starts at %X/%X, consistency will be reached at %X/%X",
+                                               ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
+                                               minRecoveryPoint.xlogid, minRecoveryPoint.xrecoff)));
+
+                       /*
+                        * Let postmaster know we've started redo now, so that it can
+                        * launch bgwriter to perform restartpoints.  We don't bother
+                        * during crash recovery as restartpoints can only be performed
+                        * during archive recovery.  And we'd like to keep crash recovery
+                        * simple, to avoid introducing bugs that could you from
+                        * recovering after crash.
+                        *
+                        * After this point, we can no longer assume that we're the only
+                        * process in addition to postmaster!
+                        */
+                       if (InArchiveRecovery && IsUnderPostmaster)
+                               SendPostmasterSignal(PMSIGNAL_RECOVERY_STARTED);
 
                        /*
                         * main redo apply loop
@@ -4935,11 +5493,44 @@ StartupXLOG(void)
 #endif
 
                                /*
+                                * Check if we were requested to re-read config file.
+                                */
+                               if (got_SIGHUP)
+                               {
+                                       got_SIGHUP = false;
+                                       ProcessConfigFile(PGC_SIGHUP);
+                               }
+
+                               /*
+                                * Check if we were requested to exit without finishing
+                                * recovery.
+                                */
+                               if (shutdown_requested)
+                                       proc_exit(1);
+
+                               /*
+                                * Have we reached our safe starting point? If so, we can
+                                * tell postmaster that the database is consistent now.
+                                */
+                               if (!reachedMinRecoveryPoint && 
+                                        XLByteLE(minRecoveryPoint, EndRecPtr))
+                               {
+                                       reachedMinRecoveryPoint = true;
+                                       if (InArchiveRecovery)
+                                       {
+                                               ereport(LOG,
+                                                               (errmsg("consistent recovery state reached")));
+                                               if (IsUnderPostmaster)
+                                                       SendPostmasterSignal(PMSIGNAL_RECOVERY_CONSISTENT);
+                                       }
+                               }
+
+                               /*
                                 * Have we reached our recovery target?
                                 */
                                if (recoveryStopsHere(record, &recoveryApply))
                                {
-                                       needNewTimeLine = true;         /* see below */
+                                       reachedStopPoint = true;        /* see below */
                                        recoveryContinue = false;
                                        if (!recoveryApply)
                                                break;
@@ -4959,8 +5550,14 @@ StartupXLOG(void)
                                        TransactionIdAdvance(ShmemVariableCache->nextXid);
                                }
 
-                               if (record->xl_info & XLR_BKP_BLOCK_MASK)
-                                       RestoreBkpBlocks(record, EndRecPtr);
+                               /*
+                                * Update shared replayEndRecPtr before replaying this
+                                * record, so that XLogFlush will update minRecoveryPoint
+                                * correctly.
+                                */
+                               SpinLockAcquire(&xlogctl->info_lck);
+                               xlogctl->replayEndRecPtr = EndRecPtr;
+                               SpinLockRelease(&xlogctl->info_lck);
 
                                RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
 
@@ -4979,6 +5576,10 @@ StartupXLOG(void)
                        ereport(LOG,
                                        (errmsg("redo done at %X/%X",
                                                        ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
+                       if (recoveryLastXTime)
+                               ereport(LOG,
+                                        (errmsg("last completed transaction was at log time %s",
+                                                        timestamptz_to_str(recoveryLastXTime))));
                        InRedo = false;
                }
                else
@@ -5001,26 +5602,31 @@ StartupXLOG(void)
         * Complain if we did not roll forward far enough to render the backup
         * dump consistent.
         */
-       if (XLByteLT(EndOfLog, ControlFile->minRecoveryPoint))
+       if (InRecovery && XLByteLT(EndOfLog, minRecoveryPoint))
        {
-               if (needNewTimeLine)    /* stopped because of stop request */
+               if (reachedStopPoint)   /* stopped because of stop request */
                        ereport(FATAL,
-                                       (errmsg("requested recovery stop point is before end time of backup dump")));
-               else
-                       /* ran off end of WAL */
+                                       (errmsg("requested recovery stop point is before consistent recovery point")));
+               else    /* ran off end of WAL */
                        ereport(FATAL,
-                                       (errmsg("WAL ends before end time of backup dump")));
+                                       (errmsg("WAL ends before consistent recovery point")));
        }
 
        /*
         * Consider whether we need to assign a new timeline ID.
         *
-        * If we stopped short of the end of WAL during recovery, then we are
-        * generating a new timeline and must assign it a unique new ID.
-        * Otherwise, we can just extend the timeline we were in when we ran out
-        * of WAL.
+        * If we are doing an archive recovery, we always assign a new ID.      This
+        * handles a couple of issues.  If we stopped short of the end of WAL
+        * during recovery, then we are clearly generating a new timeline and must
+        * assign it a unique new ID.  Even if we ran to the end, modifying the
+        * current last segment is problematic because it may result in trying to
+        * overwrite an already-archived copy of that segment, and we encourage
+        * DBAs to make their archive_commands reject that.  We can dodge the
+        * problem by making the new active segment have a new timeline ID.
+        *
+        * In a normal crash recovery, we can just extend the timeline we were in.
         */
-       if (needNewTimeLine)
+       if (InArchiveRecovery)
        {
                ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
                ereport(LOG,
@@ -5098,6 +5704,12 @@ StartupXLOG(void)
        /* Pre-scan prepared transactions to find out the range of XIDs present */
        oldestActiveXID = PrescanPreparedTransactions();
 
+       /*
+        * Allow writing WAL for us, so that we can create a checkpoint record.
+        * But not yet for other backends!
+        */
+       LocalRecoveryInProgress = false;
+
        if (InRecovery)
        {
                int                     rmid;
@@ -5118,11 +5730,6 @@ StartupXLOG(void)
                XLogCheckInvalidPages();
 
                /*
-                * Reset pgstat data, because it may be invalid after recovery.
-                */
-               pgstat_reset_all();
-
-               /*
                 * Perform a checkpoint to update all our recovery activity to disk.
                 *
                 * Note that we write a shutdown checkpoint rather than an on-line
@@ -5131,35 +5738,39 @@ StartupXLOG(void)
                 * the rule that TLI only changes in shutdown checkpoints, which
                 * allows some extra error checking in xlog_redo.
                 */
-               CreateCheckPoint(true, true);
+               CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
 
-               /*
-                * Close down recovery environment
-                */
-               XLogCloseRelationCache();
+               if (recoveryEndCommand)
+                       ExecuteRecoveryEndCommand();
        }
 
        /*
         * Preallocate additional log files, if wanted.
         */
-       (void) PreallocXlogFiles(EndOfLog);
+       PreallocXlogFiles(EndOfLog);
 
        /*
         * Okay, we're officially UP.
         */
        InRecovery = false;
 
+       LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
        ControlFile->state = DB_IN_PRODUCTION;
-       ControlFile->time = time(NULL);
+       ControlFile->time = (pg_time_t) time(NULL);
        UpdateControlFile();
+       LWLockRelease(ControlFileLock);
 
        /* start the archive_timeout timer running */
-       XLogCtl->Write.lastSegSwitchTime = ControlFile->time;
+       XLogCtl->Write.lastSegSwitchTime = (pg_time_t) time(NULL);
 
        /* initialize shared-memory copy of latest checkpoint XID/epoch */
        XLogCtl->ckptXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
        XLogCtl->ckptXid = ControlFile->checkPointCopy.nextXid;
 
+       /* also initialize latestCompletedXid, to nextXid - 1 */
+       ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
+       TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
+
        /* Start up the commit log and related stuff, too */
        StartupCLOG();
        StartupSUBTRANS(oldestActiveXID);
@@ -5168,9 +5779,6 @@ StartupXLOG(void)
        /* Reload shared-memory state for prepared transactions */
        RecoverPreparedTransactions();
 
-       ereport(LOG,
-                       (errmsg("database system is ready")));
-
        /* Shut down readFile facility, free space */
        if (readFile >= 0)
        {
@@ -5188,6 +5796,45 @@ StartupXLOG(void)
                readRecordBuf = NULL;
                readRecordBufSize = 0;
        }
+
+       /*
+        * All done. Allow others to write WAL.
+        */
+       XLogCtl->SharedRecoveryInProgress = false;
+}
+
+/*
+ * Is the system still in recovery?
+ *
+ * As a side-effect, we initialize the local TimeLineID and RedoRecPtr
+ * variables the first time we see that recovery is finished.
+ */
+bool
+RecoveryInProgress(void)
+{
+       /*
+        * We check shared state each time only until we leave recovery mode.
+        * We can't re-enter recovery, so we rely on the local state variable
+        * after that.
+        */
+       if (!LocalRecoveryInProgress)
+               return false;
+       else
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               LocalRecoveryInProgress = xlogctl->SharedRecoveryInProgress;
+
+               /*
+                * Initialize TimeLineID and RedoRecPtr the first time we see that
+                * recovery is finished.
+                */
+               if (!LocalRecoveryInProgress)
+                       InitXLOGAccess();
+
+               return LocalRecoveryInProgress;
+       }
 }
 
 /*
@@ -5319,6 +5966,8 @@ InitXLOGAccess(void)
 {
        /* ThisTimeLineID doesn't change so we need no lock to copy it */
        ThisTimeLineID = XLogCtl->ThisTimeLineID;
+       Assert(ThisTimeLineID != 0);
+
        /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
        (void) GetRedoRecPtr();
 }
@@ -5343,12 +5992,35 @@ GetRedoRecPtr(void)
 }
 
 /*
+ * GetInsertRecPtr -- Returns the current insert position.
+ *
+ * NOTE: The value *actually* returned is the position of the last full
+ * xlog page. It lags behind the real insert position by at most 1 page.
+ * For that, we don't need to acquire WALInsertLock which can be quite
+ * heavily contended, and an approximation is enough for the current
+ * usage of this function.
+ */
+XLogRecPtr
+GetInsertRecPtr(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogRecPtr      recptr;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       recptr = xlogctl->LogwrtRqst.Write;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       return recptr;
+}
+
+/*
  * Get the time of the last xlog segment switch
  */
-time_t
+pg_time_t
 GetLastSegSwitchTime(void)
 {
-       time_t          result;
+       pg_time_t       result;
 
        /* Need WALWriteLock, but shared lock is sufficient */
        LWLockAcquire(WALWriteLock, LW_SHARED);
@@ -5407,7 +6079,10 @@ ShutdownXLOG(int code, Datum arg)
        ereport(LOG,
                        (errmsg("shutting down")));
 
-       CreateCheckPoint(true, true);
+       if (RecoveryInProgress())
+               CreateRestartPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
+       else
+               CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
        ShutdownCLOG();
        ShutdownSUBTRANS();
        ShutdownMultiXact();
@@ -5417,14 +6092,98 @@ ShutdownXLOG(int code, Datum arg)
 }
 
 /*
+ * Log start of a checkpoint.
+ */
+static void
+LogCheckpointStart(int flags, bool restartpoint)
+{
+       char *msg;
+
+       /*
+        * XXX: This is hopelessly untranslatable. We could call gettext_noop
+        * for the main message, but what about all the flags?
+        */
+       if (restartpoint)
+               msg = "restartpoint starting:%s%s%s%s%s%s";
+       else
+               msg = "checkpoint starting:%s%s%s%s%s%s";
+
+       elog(LOG, msg,
+                (flags & CHECKPOINT_IS_SHUTDOWN) ? " shutdown" : "",
+                (flags & CHECKPOINT_IMMEDIATE) ? " immediate" : "",
+                (flags & CHECKPOINT_FORCE) ? " force" : "",
+                (flags & CHECKPOINT_WAIT) ? " wait" : "",
+                (flags & CHECKPOINT_CAUSE_XLOG) ? " xlog" : "",
+                (flags & CHECKPOINT_CAUSE_TIME) ? " time" : "");
+}
+
+/*
+ * Log end of a checkpoint.
+ */
+static void
+LogCheckpointEnd(bool restartpoint)
+{
+       long            write_secs,
+                               sync_secs,
+                               total_secs;
+       int                     write_usecs,
+                               sync_usecs,
+                               total_usecs;
+
+       CheckpointStats.ckpt_end_t = GetCurrentTimestamp();
+
+       TimestampDifference(CheckpointStats.ckpt_start_t,
+                                               CheckpointStats.ckpt_end_t,
+                                               &total_secs, &total_usecs);
+
+       TimestampDifference(CheckpointStats.ckpt_write_t,
+                                               CheckpointStats.ckpt_sync_t,
+                                               &write_secs, &write_usecs);
+
+       TimestampDifference(CheckpointStats.ckpt_sync_t,
+                                               CheckpointStats.ckpt_sync_end_t,
+                                               &sync_secs, &sync_usecs);
+
+       if (restartpoint)
+               elog(LOG, "restartpoint complete: wrote %d buffers (%.1f%%); "
+                        "write=%ld.%03d s, sync=%ld.%03d s, total=%ld.%03d s",
+                        CheckpointStats.ckpt_bufs_written,
+                        (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers,
+                        write_secs, write_usecs / 1000,
+                        sync_secs, sync_usecs / 1000,
+                        total_secs, total_usecs / 1000);
+       else
+               elog(LOG, "checkpoint complete: wrote %d buffers (%.1f%%); "
+                        "%d transaction log file(s) added, %d removed, %d recycled; "
+                        "write=%ld.%03d s, sync=%ld.%03d s, total=%ld.%03d s",
+                        CheckpointStats.ckpt_bufs_written,
+                        (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers,
+                        CheckpointStats.ckpt_segs_added,
+                        CheckpointStats.ckpt_segs_removed,
+                        CheckpointStats.ckpt_segs_recycled,
+                        write_secs, write_usecs / 1000,
+                        sync_secs, sync_usecs / 1000,
+                        total_secs, total_usecs / 1000);
+}
+
+/*
  * Perform a checkpoint --- either during shutdown, or on-the-fly
  *
- * If force is true, we force a checkpoint regardless of whether any XLOG
- * activity has occurred since the last one.
+ * flags is a bitwise OR of the following:
+ *     CHECKPOINT_IS_SHUTDOWN: checkpoint is for database shutdown.
+ *     CHECKPOINT_IMMEDIATE: finish the checkpoint ASAP,
+ *             ignoring checkpoint_completion_target parameter.
+ *     CHECKPOINT_FORCE: force a checkpoint even if no XLOG activity has occured
+ *             since the last one (implied by CHECKPOINT_IS_SHUTDOWN).
+ *
+ * Note: flags contains other bits, of interest here only for logging purposes.
+ * In particular note that this routine is synchronous and does not pay
+ * attention to CHECKPOINT_WAIT.
  */
 void
-CreateCheckPoint(bool shutdown, bool force)
+CreateCheckPoint(int flags)
 {
+       bool            shutdown = (flags & CHECKPOINT_IS_SHUTDOWN) != 0;
        CheckPoint      checkPoint;
        XLogRecPtr      recptr;
        XLogCtlInsert *Insert = &XLogCtl->Insert;
@@ -5432,17 +6191,46 @@ CreateCheckPoint(bool shutdown, bool force)
        uint32          freespace;
        uint32          _logId;
        uint32          _logSeg;
-       int                     nsegsadded = 0;
-       int                     nsegsremoved = 0;
-       int                     nsegsrecycled = 0;
+       TransactionId *inCommitXids;
+       int                     nInCommit;
+
+       /* shouldn't happen */
+       if (RecoveryInProgress())
+               elog(ERROR, "can't create a checkpoint during recovery");
 
        /*
         * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
-        * (This is just pro forma, since in the present system structure there is
-        * only one process that is allowed to issue checkpoints at any given
-        * time.)
+        * During normal operation, bgwriter is the only process that creates
+        * checkpoints, but at the end of archive recovery, the bgwriter can be
+        * busy creating a restartpoint while the startup process tries to perform
+        * the startup checkpoint.
         */
-       LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
+       if (!LWLockConditionalAcquire(CheckpointLock, LW_EXCLUSIVE))
+       {
+               Assert(InRecovery);
+
+               /*
+                * A restartpoint is in progress. Wait until it finishes. This can
+                * cause an extra restartpoint to be performed, but that's OK because
+                * we're just about to perform a checkpoint anyway. Flushing the
+                * buffers in this restartpoint can take some time, but that time is
+                * saved from the upcoming checkpoint so the net effect is zero.
+                */
+               ereport(DEBUG2, (errmsg("hurrying in-progress restartpoint")));
+               RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_WAIT);
+
+               LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
+       }
+
+       /*
+        * Prepare to accumulate statistics.
+        *
+        * Note: because it is possible for log_checkpoints to change while a
+        * checkpoint proceeds, we always accumulate stats, even if
+        * log_checkpoints is currently off.
+        */
+       MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
+       CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
 
        /*
         * Use a critical section to force system panic if we have trouble.
@@ -5451,24 +6239,29 @@ CreateCheckPoint(bool shutdown, bool force)
 
        if (shutdown)
        {
+               LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
                ControlFile->state = DB_SHUTDOWNING;
-               ControlFile->time = time(NULL);
+               ControlFile->time = (pg_time_t) time(NULL);
                UpdateControlFile();
+               LWLockRelease(ControlFileLock);
        }
 
+       /*
+        * Let smgr prepare for checkpoint; this has to happen before we determine
+        * the REDO pointer.  Note that smgr must not do anything that'd have to
+        * be undone if we decide no checkpoint is needed.
+        */
+       smgrpreckpt();
+
+       /* Begin filling in the checkpoint WAL record */
        MemSet(&checkPoint, 0, sizeof(checkPoint));
        checkPoint.ThisTimeLineID = ThisTimeLineID;
-       checkPoint.time = time(NULL);
+       checkPoint.time = (pg_time_t) time(NULL);
 
        /*
-        * We must hold CheckpointStartLock while determining the checkpoint REDO
-        * pointer.  This ensures that any concurrent transaction commits will be
-        * either not yet logged, or logged and recorded in pg_clog. See notes in
-        * RecordTransactionCommit().
+        * We must hold WALInsertLock while examining insert state to determine
+        * the checkpoint REDO pointer.
         */
-       LWLockAcquire(CheckpointStartLock, LW_EXCLUSIVE);
-
-       /* And we need WALInsertLock too */
        LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
 
        /*
@@ -5486,7 +6279,7 @@ CreateCheckPoint(bool shutdown, bool force)
         * the end of the last checkpoint record, and its redo pointer must point
         * to itself.
         */
-       if (!shutdown && !force)
+       if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_FORCE)) == 0)
        {
                XLogRecPtr      curInsert;
 
@@ -5500,7 +6293,6 @@ CreateCheckPoint(bool shutdown, bool force)
                        ControlFile->checkPointCopy.redo.xrecoff)
                {
                        LWLockRelease(WALInsertLock);
-                       LWLockRelease(CheckpointStartLock);
                        LWLockRelease(CheckpointLock);
                        END_CRIT_SECTION();
                        return;
@@ -5545,16 +6337,53 @@ CreateCheckPoint(bool shutdown, bool force)
        }
 
        /*
-        * Now we can release insert lock and checkpoint start lock, allowing
-        * other xacts to proceed even while we are flushing disk buffers.
+        * Now we can release WAL insert lock, allowing other xacts to proceed
+        * while we are flushing disk buffers.
         */
        LWLockRelease(WALInsertLock);
 
-       LWLockRelease(CheckpointStartLock);
+       /*
+        * If enabled, log checkpoint start.  We postpone this until now so as not
+        * to log anything if we decided to skip the checkpoint.
+        */
+       if (log_checkpoints)
+               LogCheckpointStart(flags, false);
+
+       TRACE_POSTGRESQL_CHECKPOINT_START(flags);
 
-       if (!shutdown)
-               ereport(DEBUG2,
-                               (errmsg("checkpoint starting")));
+       /*
+        * Before flushing data, we must wait for any transactions that are
+        * currently in their commit critical sections.  If an xact inserted its
+        * commit record into XLOG just before the REDO point, then a crash
+        * restart from the REDO point would not replay that record, which means
+        * that our flushing had better include the xact's update of pg_clog.  So
+        * we wait till he's out of his commit critical section before proceeding.
+        * See notes in RecordTransactionCommit().
+        *
+        * Because we've already released WALInsertLock, this test is a bit fuzzy:
+        * it is possible that we will wait for xacts we didn't really need to
+        * wait for.  But the delay should be short and it seems better to make
+        * checkpoint take a bit longer than to hold locks longer than necessary.
+        * (In fact, the whole reason we have this issue is that xact.c does
+        * commit record XLOG insertion and clog update as two separate steps
+        * protected by different locks, but again that seems best on grounds of
+        * minimizing lock contention.)
+        *
+        * A transaction that has not yet set inCommit when we look cannot be at
+        * risk, since he's not inserted his commit record yet; and one that's
+        * already cleared it is not at risk either, since he's done fixing clog
+        * and we will correctly flush the update below.  So we cannot miss any
+        * xacts we need to wait for.
+        */
+       nInCommit = GetTransactionsInCommit(&inCommitXids);
+       if (nInCommit > 0)
+       {
+               do
+               {
+                       pg_usleep(10000L);      /* wait for 10 msec */
+               } while (HaveTransactionsInCommit(inCommitXids, nInCommit));
+       }
+       pfree(inCommitXids);
 
        /*
         * Get the other info we need for the checkpoint record.
@@ -5588,7 +6417,7 @@ CreateCheckPoint(bool shutdown, bool force)
         */
        END_CRIT_SECTION();
 
-       CheckPointGuts(checkPoint.redo);
+       CheckPointGuts(checkPoint.redo, flags);
 
        START_CRIT_SECTION();
 
@@ -5630,7 +6459,7 @@ CreateCheckPoint(bool shutdown, bool force)
        ControlFile->prevCheckPoint = ControlFile->checkPoint;
        ControlFile->checkPoint = ProcLastRecPtr;
        ControlFile->checkPointCopy = checkPoint;
-       ControlFile->time = time(NULL);
+       ControlFile->time = (pg_time_t) time(NULL);
        UpdateControlFile();
        LWLockRelease(ControlFileLock);
 
@@ -5647,27 +6476,31 @@ CreateCheckPoint(bool shutdown, bool force)
 
        /*
         * We are now done with critical updates; no need for system panic if we
-        * have trouble while fooling with offline log segments.
+        * have trouble while fooling with old log segments.
         */
        END_CRIT_SECTION();
 
        /*
-        * Delete offline log files (those no longer needed even for previous
+        * Let smgr do post-checkpoint cleanup (eg, deleting old files).
+        */
+       smgrpostckpt();
+
+       /*
+        * Delete old log files (those no longer needed even for previous
         * checkpoint).
         */
        if (_logId || _logSeg)
        {
                PrevLogSeg(_logId, _logSeg);
-               MoveOfflineLogs(_logId, _logSeg, recptr,
-                                               &nsegsremoved, &nsegsrecycled);
+               RemoveOldXlogFiles(_logId, _logSeg, recptr);
        }
 
        /*
-        * Make more log segments if needed.  (Do this after deleting offline log
-        * segments, to avoid having peak disk space usage higher than necessary.)
+        * Make more log segments if needed.  (Do this after recycling old log
+        * segments, since that may supply some of the needed files.)
         */
        if (!shutdown)
-               nsegsadded = PreallocXlogFiles(recptr);
+               PreallocXlogFiles(recptr);
 
        /*
         * Truncate pg_subtrans if possible.  We can throw away all data before
@@ -5679,10 +6512,15 @@ CreateCheckPoint(bool shutdown, bool force)
        if (!InRecovery)
                TruncateSUBTRANS(GetOldestXmin(true, false));
 
-       if (!shutdown)
-               ereport(DEBUG2,
-                               (errmsg("checkpoint complete; %d transaction log file(s) added, %d removed, %d recycled",
-                                               nsegsadded, nsegsremoved, nsegsrecycled)));
+       /* All real work is done, but log before releasing lock. */
+       if (log_checkpoints)
+               LogCheckpointEnd(false);
+
+       TRACE_POSTGRESQL_CHECKPOINT_DONE(CheckpointStats.ckpt_bufs_written,
+                                                                        NBuffers,
+                                                                        CheckpointStats.ckpt_segs_added,
+                                                                        CheckpointStats.ckpt_segs_removed,
+                                                                        CheckpointStats.ckpt_segs_recycled);
 
        LWLockRelease(CheckpointLock);
 }
@@ -5694,43 +6532,28 @@ CreateCheckPoint(bool shutdown, bool force)
  * recovery restartpoints.
  */
 static void
-CheckPointGuts(XLogRecPtr checkPointRedo)
+CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
        CheckPointCLOG();
        CheckPointSUBTRANS();
        CheckPointMultiXact();
-       FlushBufferPool();                      /* performs all required fsyncs */
+       CheckPointBuffers(flags);       /* performs all required fsyncs */
        /* We deliberately delay 2PC checkpointing as long as possible */
        CheckPointTwoPhase(checkPointRedo);
 }
 
 /*
- * Set a recovery restart point if appropriate
- *
- * This is similar to CreateCheckpoint, but is used during WAL recovery
- * to establish a point from which recovery can roll forward without
- * replaying the entire recovery log.  This function is called each time
- * a checkpoint record is read from XLOG; it must determine whether a
- * restartpoint is needed or not.
+ * This is used during WAL recovery to establish a point from which recovery
+ * can roll forward without replaying the entire recovery log.  This function
+ * is called each time a checkpoint record is read from XLOG. It is stored
+ * in shared memory, so that it can be used as a restartpoint later on.
  */
 static void
 RecoveryRestartPoint(const CheckPoint *checkPoint)
 {
-       int                     elapsed_secs;
        int                     rmid;
-
-       /*
-        * Do nothing if the elapsed time since the last restartpoint is less than
-        * half of checkpoint_timeout.  (We use a value less than
-        * checkpoint_timeout so that variations in the timing of checkpoints on
-        * the master, or speed of transmission of WAL segments to a slave, won't
-        * make the slave skip a restartpoint once it's synced with the master.)
-        * Checking true elapsed time keeps us from doing restartpoints too often
-        * while rapidly scanning large amounts of WAL.
-        */
-       elapsed_secs = time(NULL) - ControlFile->time;
-       if (elapsed_secs < CheckPointTimeout / 2)
-               return;
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
 
        /*
         * Is it safe to checkpoint?  We must ask each of the resource managers
@@ -5742,28 +6565,138 @@ RecoveryRestartPoint(const CheckPoint *checkPoint)
        {
                if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
                        if (!(RmgrTable[rmid].rm_safe_restartpoint()))
+                       {
+                               elog(DEBUG2, "RM %d not safe to record restart point at %X/%X",
+                                        rmid,
+                                        checkPoint->redo.xlogid,
+                                        checkPoint->redo.xrecoff);
                                return;
+                       }
+       }
+
+       /*
+        * Copy the checkpoint record to shared memory, so that bgwriter can
+        * use it the next time it wants to perform a restartpoint.
+        */
+       SpinLockAcquire(&xlogctl->info_lck);
+       XLogCtl->lastCheckPointRecPtr = ReadRecPtr;
+       memcpy(&XLogCtl->lastCheckPoint, checkPoint, sizeof(CheckPoint));
+       SpinLockRelease(&xlogctl->info_lck);
+}
+
+/*
+ * This is similar to CreateCheckPoint, but is used during WAL recovery
+ * to establish a point from which recovery can roll forward without
+ * replaying the entire recovery log.
+ *
+ * Returns true if a new restartpoint was established. We can only establish
+ * a restartpoint if we have replayed a checkpoint record since last
+ * restartpoint.
+ */
+bool
+CreateRestartPoint(int flags)
+{
+       XLogRecPtr lastCheckPointRecPtr;
+       CheckPoint lastCheckPoint;
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       /*
+        * Acquire CheckpointLock to ensure only one restartpoint or checkpoint
+        * happens at a time.
+        */
+       LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
+
+       /* Get the a local copy of the last checkpoint record. */
+       SpinLockAcquire(&xlogctl->info_lck);
+       lastCheckPointRecPtr = xlogctl->lastCheckPointRecPtr;
+       memcpy(&lastCheckPoint, &XLogCtl->lastCheckPoint, sizeof(CheckPoint));
+       SpinLockRelease(&xlogctl->info_lck);
+
+       /* 
+        * Check that we're still in recovery mode. It's ok if we exit recovery
+        * mode after this check, the restart point is valid anyway.
+        */
+       if (!RecoveryInProgress())
+       {
+               ereport(DEBUG2,
+                               (errmsg("skipping restartpoint, recovery has already ended")));
+               LWLockRelease(CheckpointLock);
+               return false;
        }
 
        /*
-        * OK, force data out to disk
+        * If the last checkpoint record we've replayed is already our last
+        * restartpoint, we can't perform a new restart point. We still update
+        * minRecoveryPoint in that case, so that if this is a shutdown restart
+        * point, we won't start up earlier than before. That's not strictly
+        * necessary, but when we get hot standby capability, it would be rather
+        * weird if the database opened up for read-only connections at a
+        * point-in-time before the last shutdown. Such time travel is still
+        * possible in case of immediate shutdown, though.
+        *
+        * We don't explicitly advance minRecoveryPoint when we do create a
+        * restartpoint. It's assumed that flushing the buffers will do that
+        * as a side-effect.
         */
-       CheckPointGuts(checkPoint->redo);
+       if (XLogRecPtrIsInvalid(lastCheckPointRecPtr) ||
+               XLByteLE(lastCheckPoint.redo, ControlFile->checkPointCopy.redo))
+       {
+               XLogRecPtr InvalidXLogRecPtr = {0, 0};
+               ereport(DEBUG2,
+                               (errmsg("skipping restartpoint, already performed at %X/%X",
+                                               lastCheckPoint.redo.xlogid, lastCheckPoint.redo.xrecoff)));
+
+               UpdateMinRecoveryPoint(InvalidXLogRecPtr, true);
+               LWLockRelease(CheckpointLock);
+               return false;
+       }
+
+       if (log_checkpoints)
+       {
+               /*
+                * Prepare to accumulate statistics.
+                */
+               MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
+               CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
+
+               LogCheckpointStart(flags, true);
+       }
+
+       CheckPointGuts(lastCheckPoint.redo, flags);
 
        /*
-        * Update pg_control so that any subsequent crash will restart from this
-        * checkpoint.  Note: ReadRecPtr gives the XLOG address of the checkpoint
-        * record itself.
+        * Update pg_control, using current time
         */
+       LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
        ControlFile->prevCheckPoint = ControlFile->checkPoint;
-       ControlFile->checkPoint = ReadRecPtr;
-       ControlFile->checkPointCopy = *checkPoint;
-       ControlFile->time = time(NULL);
+       ControlFile->checkPoint = lastCheckPointRecPtr;
+       ControlFile->checkPointCopy = lastCheckPoint;
+       ControlFile->time = (pg_time_t) time(NULL);
        UpdateControlFile();
+       LWLockRelease(ControlFileLock);
+
+       /*
+        * Currently, there is no need to truncate pg_subtrans during recovery.
+        * If we did do that, we will need to have called StartupSUBTRANS()
+        * already and then TruncateSUBTRANS() would go here.
+        */
 
-       ereport(DEBUG2,
+       /* All real work is done, but log before releasing lock. */
+       if (log_checkpoints)
+               LogCheckpointEnd(true);
+
+       ereport((log_checkpoints ? LOG : DEBUG2),
                        (errmsg("recovery restart point at %X/%X",
-                                       checkPoint->redo.xlogid, checkPoint->redo.xrecoff)));
+                                       lastCheckPoint.redo.xlogid, lastCheckPoint.redo.xrecoff)));
+
+       if (recoveryLastXTime)
+               ereport((log_checkpoints ? LOG : DEBUG2),
+                       (errmsg("last completed transaction was at log time %s",
+                                       timestamptz_to_str(recoveryLastXTime))));
+
+       LWLockRelease(CheckpointLock);
+       return true;
 }
 
 /*
@@ -5789,14 +6722,14 @@ XLogPutNextOid(Oid nextOid)
         * does.
         *
         * Note, however, that the above statement only covers state "within" the
-        * database.  When we use a generated OID as a file or directory name,
-        * we are in a sense violating the basic WAL rule, because that filesystem
+        * database.  When we use a generated OID as a file or directory name, we
+        * are in a sense violating the basic WAL rule, because that filesystem
         * change may reach disk before the NEXTOID WAL record does.  The impact
-        * of this is that if a database crash occurs immediately afterward,
-        * we might after restart re-generate the same OID and find that it
-        * conflicts with the leftover file or directory.  But since for safety's
-        * sake we always loop until finding a nonconflicting filename, this poses
-        * no real problem in practice. See pgsql-hackers discussion 27-Sep-2006.
+        * of this is that if a database crash occurs immediately afterward, we
+        * might after restart re-generate the same OID and find that it conflicts
+        * with the leftover file or directory.  But since for safety's sake we
+        * always loop until finding a nonconflicting filename, this poses no real
+        * problem in practice. See pgsql-hackers discussion 27-Sep-2006.
         */
 }
 
@@ -5829,12 +6762,18 @@ RequestXLogSwitch(void)
 
 /*
  * XLOG resource manager's routines
+ *
+ * Definitions of info values are in include/catalog/pg_control.h, though
+ * not all records types are related to control file processing.
  */
 void
 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       /* Backup blocks are not used in xlog records */
+       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
        if (info == XLOG_NEXTOID)
        {
                Oid                     nextOid;
@@ -5872,9 +6811,9 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                                                                 (int) checkPoint.ThisTimeLineID))
                                ereport(PANIC,
                                                (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
-                                                               checkPoint.ThisTimeLineID, ThisTimeLineID)));
-                       /* Following WAL records should be run with new TLI */
-                       ThisTimeLineID = checkPoint.ThisTimeLineID;
+                               checkPoint.ThisTimeLineID, ThisTimeLineID)));
+           /* Following WAL records should be run with new TLI */
+           ThisTimeLineID = checkPoint.ThisTimeLineID;
                }
 
                RecoveryRestartPoint(&checkPoint);
@@ -5908,6 +6847,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 
                RecoveryRestartPoint(&checkPoint);
        }
+       else if (info == XLOG_NOOP)
+       {
+               /* nothing to do here */
+       }
        else if (info == XLOG_SWITCH)
        {
                /* nothing to do here */
@@ -5924,10 +6867,9 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
        {
                CheckPoint *checkpoint = (CheckPoint *) rec;
 
-               appendStringInfo(buf, "checkpoint: redo %X/%X; undo %X/%X; "
+               appendStringInfo(buf, "checkpoint: redo %X/%X; "
                                                 "tli %u; xid %u/%u; oid %u; multi %u; offset %u; %s",
                                                 checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
-                                                checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
                                                 checkpoint->ThisTimeLineID,
                                                 checkpoint->nextXidEpoch, checkpoint->nextXid,
                                                 checkpoint->nextOid,
@@ -5935,6 +6877,10 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
                                                 checkpoint->nextMultiOffset,
                                 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
        }
+       else if (info == XLOG_NOOP)
+       {
+               appendStringInfo(buf, "xlog no-op");
+       }
        else if (info == XLOG_NEXTOID)
        {
                Oid                     nextOid;
@@ -5973,54 +6919,53 @@ xlog_outrec(StringInfo buf, XLogRecord *record)
 
 
 /*
- * GUC support
+ * Return the (possible) sync flag used for opening a file, depending on the
+ * value of the GUC wal_sync_method.
  */
-const char *
-assign_xlog_sync_method(const char *method, bool doit, GucSource source)
+static int
+get_sync_bit(int method)
 {
-       int                     new_sync_method;
-       int                     new_sync_bit;
+       /* If fsync is disabled, never open in sync mode */
+       if (!enableFsync)
+               return 0;
 
-       if (pg_strcasecmp(method, "fsync") == 0)
-       {
-               new_sync_method = SYNC_METHOD_FSYNC;
-               new_sync_bit = 0;
-       }
-#ifdef HAVE_FSYNC_WRITETHROUGH
-       else if (pg_strcasecmp(method, "fsync_writethrough") == 0)
-       {
-               new_sync_method = SYNC_METHOD_FSYNC_WRITETHROUGH;
-               new_sync_bit = 0;
-       }
-#endif
-#ifdef HAVE_FDATASYNC
-       else if (pg_strcasecmp(method, "fdatasync") == 0)
+       switch (method)
        {
-               new_sync_method = SYNC_METHOD_FDATASYNC;
-               new_sync_bit = 0;
-       }
-#endif
+               /*
+                * enum values for all sync options are defined even if they are not
+                * supported on the current platform.  But if not, they are not
+                * included in the enum option array, and therefore will never be seen
+                * here.
+                */
+               case SYNC_METHOD_FSYNC:
+               case SYNC_METHOD_FSYNC_WRITETHROUGH:
+               case SYNC_METHOD_FDATASYNC:
+                       return 0;
 #ifdef OPEN_SYNC_FLAG
-       else if (pg_strcasecmp(method, "open_sync") == 0)
-       {
-               new_sync_method = SYNC_METHOD_OPEN;
-               new_sync_bit = OPEN_SYNC_FLAG;
-       }
+               case SYNC_METHOD_OPEN:
+                       return OPEN_SYNC_FLAG;
 #endif
 #ifdef OPEN_DATASYNC_FLAG
-       else if (pg_strcasecmp(method, "open_datasync") == 0)
-       {
-               new_sync_method = SYNC_METHOD_OPEN;
-               new_sync_bit = OPEN_DATASYNC_FLAG;
-       }
+               case SYNC_METHOD_OPEN_DSYNC:
+                       return OPEN_DATASYNC_FLAG;
 #endif
-       else
-               return NULL;
+               default:
+                       /* can't happen (unless we are out of sync with option array) */
+                       elog(ERROR, "unrecognized wal_sync_method: %d", method);
+                       return 0; /* silence warning */
+       }
+}
 
+/*
+ * GUC support
+ */
+bool
+assign_xlog_sync_method(int new_sync_method, bool doit, GucSource source)
+{
        if (!doit)
-               return method;
+               return true;
 
-       if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
+       if (sync_method != new_sync_method)
        {
                /*
                 * To ensure that no blocks escape unsynced, force an fsync on the
@@ -6035,14 +6980,12 @@ assign_xlog_sync_method(const char *method, bool doit, GucSource source)
                                                (errcode_for_file_access(),
                                                 errmsg("could not fsync log file %u, segment %u: %m",
                                                                openLogId, openLogSeg)));
-                       if (open_sync_bit != new_sync_bit)
+                       if (get_sync_bit(sync_method) != get_sync_bit(new_sync_method))
                                XLogFileClose();
                }
-               sync_method = new_sync_method;
-               open_sync_bit = new_sync_bit;
        }
 
-       return method;
+       return true;
 }
 
 
@@ -6080,6 +7023,7 @@ issue_xlog_fsync(void)
                        break;
 #endif
                case SYNC_METHOD_OPEN:
+               case SYNC_METHOD_OPEN_DSYNC:
                        /* write synced it already */
                        break;
                default:
@@ -6102,11 +7046,11 @@ Datum
 pg_start_backup(PG_FUNCTION_ARGS)
 {
        text       *backupid = PG_GETARG_TEXT_P(0);
-       text       *result;
+       bool            fast = PG_GETARG_BOOL(1);
        char       *backupidstr;
        XLogRecPtr      checkpointloc;
        XLogRecPtr      startpoint;
-       time_t          stamp_time;
+       pg_time_t       stamp_time;
        char            strfbuf[128];
        char            xlogfilename[MAXFNAMELEN];
        uint32          _logId;
@@ -6117,17 +7061,22 @@ pg_start_backup(PG_FUNCTION_ARGS)
        if (!superuser())
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                (errmsg("must be superuser to run a backup"))));
+                                errmsg("must be superuser to run a backup")));
 
        if (!XLogArchivingActive())
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                (errmsg("WAL archiving is not active"),
-                                 (errhint("archive_command must be defined before "
-                                                  "online backups can be made safely.")))));
+                                errmsg("WAL archiving is not active"),
+                                errhint("archive_mode must be enabled at server start.")));
+
+       if (!XLogArchiveCommandSet())
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("WAL archiving is not active"),
+                                errhint("archive_command must be defined before "
+                                                "online backups can be made safely.")));
 
-       backupidstr = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                PointerGetDatum(backupid)));
+       backupidstr = text_to_cstring(backupid);
 
        /*
         * Mark backup active in shared memory.  We must do full-page WAL writes
@@ -6158,16 +7107,33 @@ pg_start_backup(PG_FUNCTION_ARGS)
        XLogCtl->Insert.forcePageWrites = true;
        LWLockRelease(WALInsertLock);
 
-       /* Use a TRY block to ensure we release forcePageWrites if fail below */
-       PG_TRY();
+       /*
+        * Force an XLOG file switch before the checkpoint, to ensure that the WAL
+        * segment the checkpoint is written to doesn't contain pages with old
+        * timeline IDs. That would otherwise happen if you called
+        * pg_start_backup() right after restoring from a PITR archive: the first
+        * WAL segment containing the startup checkpoint has pages in the
+        * beginning with the old timeline ID. That can cause trouble at recovery:
+        * we won't have a history file covering the old timeline if pg_xlog
+        * directory was not included in the base backup and the WAL archive was
+        * cleared too before starting the backup.
+        */
+       RequestXLogSwitch();
+
+       /* Ensure we release forcePageWrites if fail below */
+       PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) 0);
        {
                /*
                 * Force a CHECKPOINT.  Aside from being necessary to prevent torn
                 * page problems, this guarantees that two successive backup runs will
                 * have different checkpoint positions and hence different history
                 * file names, even if nothing happened in between.
+                *
+                * We use CHECKPOINT_IMMEDIATE only if requested by user (via
+                * passing fast = true).  Otherwise this can take awhile.
                 */
-               RequestCheckpoint(true, false);
+               RequestCheckpoint(CHECKPOINT_FORCE | CHECKPOINT_WAIT |
+                                                 (fast ? CHECKPOINT_IMMEDIATE : 0));
 
                /*
                 * Now we need to fetch the checkpoint record location, and also its
@@ -6182,16 +7148,11 @@ pg_start_backup(PG_FUNCTION_ARGS)
                XLByteToSeg(startpoint, _logId, _logSeg);
                XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
 
-               /*
-                * We deliberately use strftime/localtime not the src/timezone
-                * functions, so that backup labels will consistently be recorded in
-                * the same timezone regardless of TimeZone setting.  This matches
-                * elog.c's practice.
-                */
-               stamp_time = time(NULL);
-               strftime(strfbuf, sizeof(strfbuf),
-                                "%Y-%m-%d %H:%M:%S %Z",
-                                localtime(&stamp_time));
+               /* Use the log timezone here, not the session timezone */
+               stamp_time = (pg_time_t) time(NULL);
+               pg_strftime(strfbuf, sizeof(strfbuf),
+                                       "%Y-%m-%d %H:%M:%S %Z",
+                                       pg_localtime(&stamp_time, log_timezone));
 
                /*
                 * Check for existing backup label --- implies a backup is already
@@ -6234,25 +7195,24 @@ pg_start_backup(PG_FUNCTION_ARGS)
                                         errmsg("could not write file \"%s\": %m",
                                                        BACKUP_LABEL_FILE)));
        }
-       PG_CATCH();
-       {
-               /* Turn off forcePageWrites on failure */
-               LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-               XLogCtl->Insert.forcePageWrites = false;
-               LWLockRelease(WALInsertLock);
-
-               PG_RE_THROW();
-       }
-       PG_END_TRY();
+       PG_END_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) 0);
 
        /*
         * We're done.  As a convenience, return the starting WAL location.
         */
        snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
                         startpoint.xlogid, startpoint.xrecoff);
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                        CStringGetDatum(xlogfilename)));
-       PG_RETURN_TEXT_P(result);
+       PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
+}
+
+/* Error cleanup callback for pg_start_backup */
+static void
+pg_start_backup_callback(int code, Datum arg)
+{
+       /* Turn off forcePageWrites on failure */
+       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       XLogCtl->Insert.forcePageWrites = false;
+       LWLockRelease(WALInsertLock);
 }
 
 /*
@@ -6262,30 +7222,40 @@ pg_start_backup(PG_FUNCTION_ARGS)
  * create a backup history file in pg_xlog (whence it will immediately be
  * archived).  The backup history file contains the same info found in
  * the label file, plus the backup-end time and WAL location.
+ * Note: different from CancelBackup which just cancels online backup mode.
  */
 Datum
 pg_stop_backup(PG_FUNCTION_ARGS)
 {
-       text       *result;
        XLogRecPtr      startpoint;
        XLogRecPtr      stoppoint;
-       time_t          stamp_time;
+       pg_time_t       stamp_time;
        char            strfbuf[128];
        char            histfilepath[MAXPGPATH];
        char            startxlogfilename[MAXFNAMELEN];
        char            stopxlogfilename[MAXFNAMELEN];
+       char            lastxlogfilename[MAXFNAMELEN];
+       char            histfilename[MAXFNAMELEN];
        uint32          _logId;
        uint32          _logSeg;
        FILE       *lfp;
        FILE       *fp;
        char            ch;
        int                     ich;
+       int                     seconds_before_warning;
+       int                     waits = 0;
 
        if (!superuser())
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                 (errmsg("must be superuser to run a backup"))));
 
+       if (!XLogArchivingActive())
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("WAL archiving is not active"),
+                                errhint("archive_mode must be enabled at server start.")));
+
        /*
         * OK to clear forcePageWrites
         */
@@ -6303,16 +7273,11 @@ pg_stop_backup(PG_FUNCTION_ARGS)
        XLByteToSeg(stoppoint, _logId, _logSeg);
        XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
 
-       /*
-        * We deliberately use strftime/localtime not the src/timezone functions,
-        * so that backup labels will consistently be recorded in the same
-        * timezone regardless of TimeZone setting.  This matches elog.c's
-        * practice.
-        */
-       stamp_time = time(NULL);
-       strftime(strfbuf, sizeof(strfbuf),
-                        "%Y-%m-%d %H:%M:%S %Z",
-                        localtime(&stamp_time));
+       /* Use the log timezone here, not the session timezone */
+       stamp_time = (pg_time_t) time(NULL);
+       pg_strftime(strfbuf, sizeof(strfbuf),
+                               "%Y-%m-%d %H:%M:%S %Z",
+                               pg_localtime(&stamp_time, log_timezone));
 
        /*
         * Open the existing label file
@@ -6389,13 +7354,47 @@ pg_stop_backup(PG_FUNCTION_ARGS)
        CleanupBackupHistory();
 
        /*
+        * Wait until both the last WAL file filled during backup and the history
+        * file have been archived.  We assume that the alphabetic sorting
+        * property of the WAL files ensures any earlier WAL files are safely
+        * archived as well.
+        *
+        * We wait forever, since archive_command is supposed to work and
+        * we assume the admin wanted his backup to work completely. If you
+        * don't wish to wait, you can set statement_timeout.
+        */
+       XLByteToPrevSeg(stoppoint, _logId, _logSeg);
+       XLogFileName(lastxlogfilename, ThisTimeLineID, _logId, _logSeg);
+
+       XLByteToSeg(startpoint, _logId, _logSeg);
+       BackupHistoryFileName(histfilename, ThisTimeLineID, _logId, _logSeg,
+                                                 startpoint.xrecoff % XLogSegSize);
+
+       seconds_before_warning = 60;
+       waits = 0;
+
+       while (XLogArchiveIsBusy(lastxlogfilename) ||
+                  XLogArchiveIsBusy(histfilename))
+       {
+               CHECK_FOR_INTERRUPTS();
+
+               pg_usleep(1000000L);
+
+               if (++waits >= seconds_before_warning)
+               {
+                       seconds_before_warning *= 2;     /* This wraps in >10 years... */
+                       ereport(WARNING,
+                                       (errmsg("pg_stop_backup still waiting for archive to complete (%d seconds elapsed)",
+                                                       waits)));
+               }
+       }
+
+       /*
         * We're done.  As a convenience, return the ending WAL location.
         */
        snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
                         stoppoint.xlogid, stoppoint.xrecoff);
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                CStringGetDatum(stopxlogfilename)));
-       PG_RETURN_TEXT_P(result);
+       PG_RETURN_TEXT_P(cstring_to_text(stopxlogfilename));
 }
 
 /*
@@ -6404,14 +7403,13 @@ pg_stop_backup(PG_FUNCTION_ARGS)
 Datum
 pg_switch_xlog(PG_FUNCTION_ARGS)
 {
-       text       *result;
        XLogRecPtr      switchpoint;
        char            location[MAXFNAMELEN];
 
        if (!superuser())
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                (errmsg("must be superuser to switch transaction log files"))));
+                        (errmsg("must be superuser to switch transaction log files"))));
 
        switchpoint = RequestXLogSwitch();
 
@@ -6420,9 +7418,7 @@ pg_switch_xlog(PG_FUNCTION_ARGS)
         */
        snprintf(location, sizeof(location), "%X/%X",
                         switchpoint.xlogid, switchpoint.xrecoff);
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                          CStringGetDatum(location)));
-       PG_RETURN_TEXT_P(result);
+       PG_RETURN_TEXT_P(cstring_to_text(location));
 }
 
 /*
@@ -6435,7 +7431,6 @@ pg_switch_xlog(PG_FUNCTION_ARGS)
 Datum
 pg_current_xlog_location(PG_FUNCTION_ARGS)
 {
-       text       *result;
        char            location[MAXFNAMELEN];
 
        /* Make sure we have an up-to-date local LogwrtResult */
@@ -6450,10 +7445,7 @@ pg_current_xlog_location(PG_FUNCTION_ARGS)
 
        snprintf(location, sizeof(location), "%X/%X",
                         LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff);
-
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                          CStringGetDatum(location)));
-       PG_RETURN_TEXT_P(result);
+       PG_RETURN_TEXT_P(cstring_to_text(location));
 }
 
 /*
@@ -6464,7 +7456,6 @@ pg_current_xlog_location(PG_FUNCTION_ARGS)
 Datum
 pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
 {
-       text       *result;
        XLogCtlInsert *Insert = &XLogCtl->Insert;
        XLogRecPtr      current_recptr;
        char            location[MAXFNAMELEN];
@@ -6478,10 +7469,7 @@ pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
 
        snprintf(location, sizeof(location), "%X/%X",
                         current_recptr.xlogid, current_recptr.xrecoff);
-
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                          CStringGetDatum(location)));
-       PG_RETURN_TEXT_P(result);
+       PG_RETURN_TEXT_P(cstring_to_text(location));
 }
 
 /*
@@ -6513,8 +7501,7 @@ pg_xlogfile_name_offset(PG_FUNCTION_ARGS)
        /*
         * Read input and parse
         */
-       locationstr = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                PointerGetDatum(location)));
+       locationstr = text_to_cstring(location);
 
        if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
                ereport(ERROR,
@@ -6543,8 +7530,7 @@ pg_xlogfile_name_offset(PG_FUNCTION_ARGS)
        XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
        XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
 
-       values[0] = DirectFunctionCall1(textin,
-                                                                       CStringGetDatum(xlogfilename));
+       values[0] = CStringGetTextDatum(xlogfilename);
        isnull[0] = false;
 
        /*
@@ -6573,7 +7559,6 @@ Datum
 pg_xlogfile_name(PG_FUNCTION_ARGS)
 {
        text       *location = PG_GETARG_TEXT_P(0);
-       text       *result;
        char       *locationstr;
        unsigned int uxlogid;
        unsigned int uxrecoff;
@@ -6582,8 +7567,7 @@ pg_xlogfile_name(PG_FUNCTION_ARGS)
        XLogRecPtr      locationpoint;
        char            xlogfilename[MAXFNAMELEN];
 
-       locationstr = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                PointerGetDatum(location)));
+       locationstr = text_to_cstring(location);
 
        if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
                ereport(ERROR,
@@ -6597,9 +7581,7 @@ pg_xlogfile_name(PG_FUNCTION_ARGS)
        XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
        XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
 
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                        CStringGetDatum(xlogfilename)));
-       PG_RETURN_TEXT_P(result);
+       PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
 }
 
 /*
@@ -6738,3 +7720,149 @@ rm_redo_error_callback(void *arg)
 
        pfree(buf.data);
 }
+
+/*
+ * BackupInProgress: check if online backup mode is active
+ *
+ * This is done by checking for existence of the "backup_label" file.
+ */
+bool
+BackupInProgress(void)
+{
+       struct stat stat_buf;
+
+       return (stat(BACKUP_LABEL_FILE, &stat_buf) == 0);
+}
+
+/*
+ * CancelBackup: rename the "backup_label" file to cancel backup mode
+ *
+ * If the "backup_label" file exists, it will be renamed to "backup_label.old".
+ * Note that this will render an online backup in progress useless.
+ * To correctly finish an online backup, pg_stop_backup must be called.
+ */
+void
+CancelBackup(void)
+{
+       struct stat stat_buf;
+
+       /* if the file is not there, return */
+       if (stat(BACKUP_LABEL_FILE, &stat_buf) < 0)
+               return;
+
+       /* remove leftover file from previously cancelled backup if it exists */
+       unlink(BACKUP_LABEL_OLD);
+
+       if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) == 0)
+       {
+               ereport(LOG,
+                               (errmsg("online backup mode cancelled"),
+                                errdetail("\"%s\" was renamed to \"%s\".",
+                                               BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
+       }
+       else
+       {
+               ereport(WARNING,
+                               (errcode_for_file_access(),
+                                errmsg("online backup mode was not cancelled"),
+                                errdetail("Could not rename \"%s\" to \"%s\": %m.",
+                                               BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
+       }
+}
+
+/* ------------------------------------------------------
+ *  Startup Process main entry point and signal handlers
+ * ------------------------------------------------------
+ */
+
+/*
+ * startupproc_quickdie() occurs when signalled SIGQUIT by the postmaster.
+ *
+ * Some backend has bought the farm,
+ * so we need to stop what we're doing and exit.
+ */
+static void
+startupproc_quickdie(SIGNAL_ARGS)
+{
+       PG_SETMASK(&BlockSig);
+
+       /*
+        * DO NOT proc_exit() -- we're here because shared memory may be
+        * corrupted, so we don't want to try to clean up our transaction. Just
+        * nail the windows shut and get out of town.
+        *
+        * Note we do exit(2) not exit(0).      This is to force the postmaster into a
+        * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
+        * backend.  This is necessary precisely because we don't clean up our
+        * shared memory state.
+        */
+       exit(2);
+}
+
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+StartupProcSigHupHandler(SIGNAL_ARGS)
+{
+       got_SIGHUP = true;
+}
+
+/* SIGTERM: set flag to abort redo and exit */
+static void
+StartupProcShutdownHandler(SIGNAL_ARGS)
+{
+       if (in_restore_command)
+               proc_exit(1);
+       else
+               shutdown_requested = true;
+}
+
+/* Main entry point for startup process */
+void
+StartupProcessMain(void)
+{
+       /*
+        * If possible, make this process a group leader, so that the postmaster
+        * can signal any child processes too.
+        */
+#ifdef HAVE_SETSID
+       if (setsid() < 0)
+               elog(FATAL, "setsid() failed: %m");
+#endif
+
+       /*
+        * Properly accept or ignore signals the postmaster might send us
+        */
+       pqsignal(SIGHUP, StartupProcSigHupHandler);      /* reload config file */
+       pqsignal(SIGINT, SIG_IGN);                                      /* ignore query cancel */
+       pqsignal(SIGTERM, StartupProcShutdownHandler); /* request shutdown */
+       pqsignal(SIGQUIT, startupproc_quickdie);                /* hard crash time */
+       pqsignal(SIGALRM, SIG_IGN);
+       pqsignal(SIGPIPE, SIG_IGN);
+       pqsignal(SIGUSR1, SIG_IGN);
+       pqsignal(SIGUSR2, SIG_IGN);
+
+       /*
+        * Reset some signals that are accepted by postmaster but not here
+        */
+       pqsignal(SIGCHLD, SIG_DFL);
+       pqsignal(SIGTTIN, SIG_DFL);
+       pqsignal(SIGTTOU, SIG_DFL);
+       pqsignal(SIGCONT, SIG_DFL);
+       pqsignal(SIGWINCH, SIG_DFL);
+
+       /*
+        * Unblock signals (they were blocked when the postmaster forked us)
+        */
+       PG_SETMASK(&UnBlockSig);
+
+       StartupXLOG();  
+
+       BuildFlatFiles(false);
+
+       /*
+        * Exit normally. Exit code 0 tells postmaster that we completed
+        * recovery successfully.
+        */
+       proc_exit(0);
+}