OSDN Git Service

Add recovery_end_command option to recovery.conf. recovery_end_command
[pg-rex/syncrep.git] / src / backend / access / transam / xlog.c
index de8e682..09b5075 100644 (file)
@@ -4,10 +4,10 @@
  *             PostgreSQL transaction log manager
  *
  *
- * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.246 2006/08/06 03:53:44 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.338 2009/05/14 20:31:09 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
 #include <ctype.h>
-#include <fcntl.h>
 #include <signal.h>
 #include <time.h>
-#include <unistd.h>
+#include <fcntl.h>
 #include <sys/stat.h>
 #include <sys/time.h>
+#include <sys/wait.h>
+#include <unistd.h>
 
 #include "access/clog.h"
 #include "access/multixact.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
+#include "access/tuptoaster.h"
 #include "access/twophase.h"
 #include "access/xact.h"
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
 #include "catalog/catversion.h"
 #include "catalog/pg_control.h"
+#include "catalog/pg_type.h"
+#include "funcapi.h"
+#include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
-#include "storage/bufpage.h"
+#include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/ipc.h"
 #include "storage/pmsignal.h"
 #include "storage/procarray.h"
+#include "storage/smgr.h"
 #include "storage/spin.h"
 #include "utils/builtins.h"
-#include "utils/nabstime.h"
-#include "utils/pg_locale.h"
-
-
-/*
- *     Because O_DIRECT bypasses the kernel buffers, and because we never
- *     read those buffers except during crash recovery, it is a win to use
- *     it in all cases where we sync on each write().  We could allow O_DIRECT
- *     with fsync(), but because skipping the kernel buffer forces writes out
- *     quickly, it seems best just to use it for O_SYNC.  It is hard to imagine
- *     how fsync() could be a win for O_DIRECT compared to O_SYNC and O_DIRECT.
- *     Also, O_DIRECT is never enough to force data to the drives, it merely
- *     tries to bypass the kernel cache, so we still need O_SYNC or fsync().
- */
-#ifdef O_DIRECT
-#define PG_O_DIRECT                            O_DIRECT
-#else
-#define PG_O_DIRECT                            0
-#endif
-
-/*
- * This chunk of hackery attempts to determine which file sync methods
- * are available on the current platform, and to choose an appropriate
- * default method.     We assume that fsync() is always available, and that
- * configure determined whether fdatasync() is.
- */
-#if defined(O_SYNC)
-#define BARE_OPEN_SYNC_FLAG            O_SYNC
-#elif defined(O_FSYNC)
-#define BARE_OPEN_SYNC_FLAG            O_FSYNC
-#endif
-#ifdef BARE_OPEN_SYNC_FLAG
-#define OPEN_SYNC_FLAG                 (BARE_OPEN_SYNC_FLAG | PG_O_DIRECT)
-#endif
-
-#if defined(O_DSYNC)
-#if defined(OPEN_SYNC_FLAG)
-/* O_DSYNC is distinct? */
-#if O_DSYNC != BARE_OPEN_SYNC_FLAG
-#define OPEN_DATASYNC_FLAG             (O_DSYNC | PG_O_DIRECT)
-#endif
-#else                                                  /* !defined(OPEN_SYNC_FLAG) */
-/* Win32 only has O_DSYNC */
-#define OPEN_DATASYNC_FLAG             (O_DSYNC | PG_O_DIRECT)
-#endif
-#endif
-
-#if defined(OPEN_DATASYNC_FLAG)
-#define DEFAULT_SYNC_METHOD_STR "open_datasync"
-#define DEFAULT_SYNC_METHOD            SYNC_METHOD_OPEN
-#define DEFAULT_SYNC_FLAGBIT   OPEN_DATASYNC_FLAG
-#elif defined(HAVE_FDATASYNC)
-#define DEFAULT_SYNC_METHOD_STR "fdatasync"
-#define DEFAULT_SYNC_METHOD            SYNC_METHOD_FDATASYNC
-#define DEFAULT_SYNC_FLAGBIT   0
-#elif defined(HAVE_FSYNC_WRITETHROUGH_ONLY)
-#define DEFAULT_SYNC_METHOD_STR "fsync_writethrough"
-#define DEFAULT_SYNC_METHOD            SYNC_METHOD_FSYNC_WRITETHROUGH
-#define DEFAULT_SYNC_FLAGBIT   0
-#else
-#define DEFAULT_SYNC_METHOD_STR "fsync"
-#define DEFAULT_SYNC_METHOD            SYNC_METHOD_FSYNC
-#define DEFAULT_SYNC_FLAGBIT   0
-#endif
-
-
-/*
- * Limitation of buffer-alignment for direct IO depends on OS and filesystem,
- * but XLOG_BLCKSZ is assumed to be enough for it.
- */
-#ifdef O_DIRECT
-#define ALIGNOF_XLOG_BUFFER            XLOG_BLCKSZ
-#else
-#define ALIGNOF_XLOG_BUFFER            ALIGNOF_BUFFER
-#endif
+#include "utils/flatfiles.h"
+#include "utils/guc.h"
+#include "utils/ps_status.h"
+#include "pg_trace.h"
 
 
 /* File path names (all relative to $PGDATA) */
 #define BACKUP_LABEL_FILE              "backup_label"
+#define BACKUP_LABEL_OLD               "backup_label.old"
 #define RECOVERY_COMMAND_FILE  "recovery.conf"
 #define RECOVERY_COMMAND_DONE  "recovery.done"
 
 /* User-settable parameters */
 int                    CheckPointSegments = 3;
 int                    XLOGbuffers = 8;
+int                    XLogArchiveTimeout = 0;
+bool           XLogArchiveMode = false;
 char      *XLogArchiveCommand = NULL;
-char      *XLOG_sync_method = NULL;
-const char     XLOG_sync_method_default[] = DEFAULT_SYNC_METHOD_STR;
 bool           fullPageWrites = true;
+bool           log_checkpoints = false;
+int            sync_method = DEFAULT_SYNC_METHOD;
 
 #ifdef WAL_DEBUG
 bool           XLOG_DEBUG = false;
 #endif
 
 /*
- * XLOGfileslop is used in the code as the allowed "fuzz" in the number of
- * preallocated XLOG segments --- we try to have at least XLOGfiles advance
- * segments but no more than XLOGfileslop segments.  This could
- * be made a separate GUC variable, but at present I think it's sufficient
- * to hardwire it as 2*CheckPointSegments+1.  Under normal conditions, a
- * checkpoint will free no more than 2*CheckPointSegments log segments, and
- * we want to recycle all of them; the +1 allows boundary cases to happen
- * without wasting a delete/create-segment cycle.
+ * XLOGfileslop is the maximum number of preallocated future XLOG segments.
+ * When we are done with an old XLOG segment file, we will recycle it as a
+ * future XLOG segment as long as there aren't already XLOGfileslop future
+ * segments; else we'll delete it.  This could be made a separate GUC
+ * variable, but at present I think it's sufficient to hardwire it as
+ * 2*CheckPointSegments+1.     Under normal conditions, a checkpoint will free
+ * no more than 2*CheckPointSegments log segments, and we want to recycle all
+ * of them; the +1 allows boundary cases to happen without wasting a
+ * delete/create-segment cycle.
  */
-
 #define XLOGfileslop   (2*CheckPointSegments + 1)
 
+/*
+ * GUC support
+ */
+const struct config_enum_entry sync_method_options[] = {
+       {"fsync", SYNC_METHOD_FSYNC, false},
+#ifdef HAVE_FSYNC_WRITETHROUGH
+       {"fsync_writethrough", SYNC_METHOD_FSYNC_WRITETHROUGH, false},
+#endif
+#ifdef HAVE_FDATASYNC
+       {"fdatasync", SYNC_METHOD_FDATASYNC, false},
+#endif
+#ifdef OPEN_SYNC_FLAG
+       {"open_sync", SYNC_METHOD_OPEN, false},
+#endif
+#ifdef OPEN_DATASYNC_FLAG
+       {"open_datasync", SYNC_METHOD_OPEN_DSYNC, false},
+#endif
+       {NULL, 0, false}
+};
 
-/* these are derived from XLOG_sync_method by assign_xlog_sync_method */
-int                    sync_method = DEFAULT_SYNC_METHOD;
-static int     open_sync_bit = DEFAULT_SYNC_FLAGBIT;
-
-#define XLOG_SYNC_BIT  (enableFsync ? open_sync_bit : 0)
-
+/*
+ * Statistics for current checkpoint are collected in this global struct.
+ * Because only the background writer or a stand-alone backend can perform
+ * checkpoints, this will be unused in normal backends.
+ */
+CheckpointStatsData CheckpointStats;
 
 /*
  * ThisTimeLineID will be same in all backends --- it identifies current
@@ -163,31 +121,45 @@ static int        open_sync_bit = DEFAULT_SYNC_FLAGBIT;
  */
 TimeLineID     ThisTimeLineID = 0;
 
-/* Are we doing recovery from XLOG? */
+/*
+ * Are we doing recovery from XLOG? 
+ *
+ * This is only ever true in the startup process, even if the system is still
+ * in recovery. Prior to 8.4, all activity during recovery were carried out
+ * by Startup process. This local variable continues to be used in functions
+ * that need to act differently when called from a redo function (e.g skip
+ * WAL logging). To check whether the system is in recovery regardless of what
+ * process you're running in, use RecoveryInProgress().
+ */
 bool           InRecovery = false;
 
 /* Are we recovering using offline XLOG archives? */
 static bool InArchiveRecovery = false;
 
+/*
+ * Local copy of SharedRecoveryInProgress variable. True actually means "not
+ * known, need to check the shared state"
+ */
+static bool LocalRecoveryInProgress = true;
+
 /* Was the last xlog file restored from archive, or local? */
 static bool restoredFromArchive = false;
 
 /* options taken from recovery.conf */
 static char *recoveryRestoreCommand = NULL;
+static char *recoveryEndCommand = NULL;
 static bool recoveryTarget = false;
 static bool recoveryTargetExact = false;
 static bool recoveryTargetInclusive = true;
 static TransactionId recoveryTargetXid;
-static time_t recoveryTargetTime;
+static TimestampTz recoveryTargetTime;
+static TimestampTz recoveryLastXTime = 0;
 
 /* if recoveryStopsHere returns true, it saves actual stop xid/time here */
 static TransactionId recoveryStopXid;
-static time_t recoveryStopTime;
+static TimestampTz recoveryStopTime;
 static bool recoveryStopAfter;
 
-/* constraint set by read_backup_label */
-static XLogRecPtr recoveryMinXlogOffset = {0, 0};
-
 /*
  * During normal operation, the only timeline we care about is ThisTimeLineID.
  * During recovery, however, things are more complicated.  To simplify life
@@ -215,37 +187,15 @@ static List *expectedTLIs;
 static TimeLineID curFileTLI;
 
 /*
- * MyLastRecPtr points to the start of the last XLOG record inserted by the
- * current transaction.  If MyLastRecPtr.xrecoff == 0, then the current
- * xact hasn't yet inserted any transaction-controlled XLOG records.
- *
- * Note that XLOG records inserted outside transaction control are not
- * reflected into MyLastRecPtr.  They do, however, cause MyXactMadeXLogEntry
- * to be set true.     The latter can be used to test whether the current xact
- * made any loggable changes (including out-of-xact changes, such as
- * sequence updates).
- *
- * When we insert/update/delete a tuple in a temporary relation, we do not
- * make any XLOG record, since we don't care about recovering the state of
- * the temp rel after a crash. However, we will still need to remember
- * whether our transaction committed or aborted in that case.  So, we must
- * set MyXactMadeTempRelUpdate true to indicate that the XID will be of
- * interest later.
- */
-XLogRecPtr     MyLastRecPtr = {0, 0};
-
-bool           MyXactMadeXLogEntry = false;
-
-bool           MyXactMadeTempRelUpdate = false;
-
-/*
  * ProcLastRecPtr points to the start of the last XLOG record inserted by the
- * current backend.  It is updated for all inserts, transaction-controlled
- * or not.     ProcLastRecEnd is similar but points to end+1 of last record.
+ * current backend.  It is updated for all inserts.  XactLastRecEnd points to
+ * end+1 of the last record, and is reset when we end a top-level transaction,
+ * or start a new one; so it can be used to tell if the current transaction has
+ * created any XLOG records.
  */
 static XLogRecPtr ProcLastRecPtr = {0, 0};
 
-XLogRecPtr     ProcLastRecEnd = {0, 0};
+XLogRecPtr     XactLastRecEnd = {0, 0};
 
 /*
  * RedoRecPtr is this backend's local copy of the REDO record pointer
@@ -309,11 +259,8 @@ static XLogRecPtr RedoRecPtr;
  * ControlFileLock: must be held to read/update control file or create
  * new log file.
  *
- * CheckpointLock: must be held to do a checkpoint (ensures only one
- * checkpointer at a time; even though the postmaster won't launch
- * parallel checkpoint processes, we need this because manual checkpoints
- * could be launched simultaneously).  XXX now that all checkpoints are
- * done by the bgwriter, isn't this lock redundant?
+ * CheckpointLock: must be held to do a checkpoint or restartpoint (ensures
+ * only one checkpointer at a time)
  *
  *----------
  */
@@ -351,6 +298,7 @@ typedef struct XLogCtlWrite
 {
        XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
        int                     curridx;                /* cache index of next block to write */
+       pg_time_t       lastSegSwitchTime;              /* time of last xlog segment switch */
 } XLogCtlWrite;
 
 /*
@@ -360,9 +308,14 @@ typedef struct XLogCtlData
 {
        /* Protected by WALInsertLock: */
        XLogCtlInsert Insert;
+
        /* Protected by info_lck: */
        XLogwrtRqst LogwrtRqst;
        XLogwrtResult LogwrtResult;
+       uint32          ckptXidEpoch;   /* nextXID & epoch of latest checkpoint */
+       TransactionId ckptXid;
+       XLogRecPtr      asyncCommitLSN; /* LSN of newest async commit */
+
        /* Protected by WALWriteLock: */
        XLogCtlWrite Write;
 
@@ -373,11 +326,29 @@ typedef struct XLogCtlData
         */
        char       *pages;                      /* buffers for unwritten XLOG pages */
        XLogRecPtr *xlblocks;           /* 1st byte ptr-s + XLOG_BLCKSZ */
-       Size            XLogCacheByte;  /* # bytes in xlog buffers */
        int                     XLogCacheBlck;  /* highest allocated xlog buffer index */
        TimeLineID      ThisTimeLineID;
 
-       slock_t         info_lck;               /* locks shared LogwrtRqst/LogwrtResult */
+       /*
+        * SharedRecoveryInProgress indicates if we're still in crash or archive
+        * recovery.  It's checked by RecoveryInProgress().
+        */
+       bool            SharedRecoveryInProgress;
+
+       /*
+        * During recovery, we keep a copy of the latest checkpoint record
+        * here.  Used by the background writer when it wants to create
+        * a restartpoint.
+        *
+        * Protected by info_lck.
+        */
+       XLogRecPtr      lastCheckPointRecPtr;
+       CheckPoint      lastCheckPoint;
+
+       /* end+1 of the last record replayed (or being replayed) */
+       XLogRecPtr      replayEndRecPtr;
+
+       slock_t         info_lck;               /* locks shared variables shown above */
 } XLogCtlData;
 
 static XLogCtlData *XLogCtl = NULL;
@@ -451,18 +422,33 @@ static XLogRecPtr ReadRecPtr;     /* start of last record read */
 static XLogRecPtr EndRecPtr;   /* end+1 of last record read */
 static XLogRecord *nextRecord = NULL;
 static TimeLineID lastPageTLI = 0;
+static XLogRecPtr minRecoveryPoint; /* local copy of ControlFile->minRecoveryPoint */
+static bool    updateMinRecoveryPoint = true;
 
 static bool InRedo = false;
 
+/*
+ * Flag set by interrupt handlers for later service in the redo loop.
+ */
+static volatile sig_atomic_t got_SIGHUP = false;
+static volatile sig_atomic_t shutdown_requested = false;
+/*
+ * Flag set when executing a restore command, to tell SIGTERM signal handler
+ * that it's safe to just proc_exit.
+ */
+static volatile sig_atomic_t in_restore_command = false;
+
 
 static void XLogArchiveNotify(const char *xlog);
 static void XLogArchiveNotifySeg(uint32 log, uint32 seg);
 static bool XLogArchiveCheckDone(const char *xlog);
+static bool XLogArchiveIsBusy(const char *xlog);
 static void XLogArchiveCleanup(const char *xlog);
 static void readRecoveryCommandFile(void);
 static void exitArchiveRecovery(TimeLineID endTLI,
                                        uint32 endLogId, uint32 endLogSeg);
 static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
+static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 
 static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
                                XLogRecPtr *lsn, BkpBlock *bkpb);
@@ -475,13 +461,15 @@ static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
                                           bool use_lock);
 static int     XLogFileOpen(uint32 log, uint32 seg);
 static int     XLogFileRead(uint32 log, uint32 seg, int emode);
-static void    XLogFileClose(void);
+static void XLogFileClose(void);
 static bool RestoreArchivedFile(char *path, const char *xlogfname,
                                        const char *recovername, off_t expectedSize);
-static int     PreallocXlogFiles(XLogRecPtr endptr);
-static void MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
-                               int *nsegsremoved, int *nsegsrecycled);
+static void ExecuteRecoveryEndCommand(void);
+static void PreallocXlogFiles(XLogRecPtr endptr);
+static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr);
+static void ValidateXLOGDirectoryStructure(void);
 static void CleanupBackupHistory(void);
+static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force);
 static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode);
 static bool ValidXLOGHeader(XLogPageHeader hdr, int emode);
 static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt);
@@ -493,15 +481,16 @@ static void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
                                         uint32 endLogId, uint32 endLogSeg);
 static void WriteControlFile(void);
 static void ReadControlFile(void);
-static char *str_time(time_t tnow);
-static void issue_xlog_fsync(void);
-
+static char *str_time(pg_time_t tnow);
 #ifdef WAL_DEBUG
 static void xlog_outrec(StringInfo buf, XLogRecord *record);
 #endif
-static bool read_backup_label(XLogRecPtr *checkPointLoc);
-static void remove_backup_label(void);
+static void issue_xlog_fsync(void);
+static void pg_start_backup_callback(int code, Datum arg);
+static bool read_backup_label(XLogRecPtr *checkPointLoc,
+                                 XLogRecPtr *minRecoveryLoc);
 static void rm_redo_error_callback(void *arg);
+static int get_sync_bit(int method);
 
 
 /*
@@ -541,19 +530,19 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
        uint32          len,
                                write_len;
        unsigned        i;
-       XLogwrtRqst LogwrtRqst;
        bool            updrqst;
        bool            doPageWrites;
        bool            isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
-       bool            no_tran = (rmid == RM_XLOG_ID);
 
+       /* cross-check on whether we should be here or not */
+       if (RecoveryInProgress())
+               elog(FATAL, "cannot make new WAL entries during recovery");
+
+       /* info's high bits are reserved for use by me */
        if (info & XLR_INFO_MASK)
-       {
-               if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
-                       elog(PANIC, "invalid xlog info mask %02X", (info & XLR_INFO_MASK));
-               no_tran = true;
-               info &= ~XLR_INFO_MASK;
-       }
+               elog(PANIC, "invalid xlog info mask %02X", info);
+
+       TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
 
        /*
         * In bootstrap mode, we don't actually log anything but XLOG resources;
@@ -691,52 +680,15 @@ begin:;
        /*
         * NOTE: We disallow len == 0 because it provides a useful bit of extra
         * error checking in ReadRecord.  This means that all callers of
-        * XLogInsert must supply at least some not-in-a-buffer data.  However,
-        * we make an exception for XLOG SWITCH records because we don't want
-        * them to ever cross a segment boundary.
+        * XLogInsert must supply at least some not-in-a-buffer data.  However, we
+        * make an exception for XLOG SWITCH records because we don't want them to
+        * ever cross a segment boundary.
         */
        if (len == 0 && !isLogSwitch)
                elog(PANIC, "invalid xlog record length %u", len);
 
        START_CRIT_SECTION();
 
-       /* update LogwrtResult before doing cache fill check */
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile XLogCtlData *xlogctl = XLogCtl;
-
-               SpinLockAcquire(&xlogctl->info_lck);
-               LogwrtRqst = xlogctl->LogwrtRqst;
-               LogwrtResult = xlogctl->LogwrtResult;
-               SpinLockRelease(&xlogctl->info_lck);
-       }
-
-       /*
-        * If cache is half filled then try to acquire write lock and do
-        * XLogWrite. Ignore any fractional blocks in performing this check.
-        */
-       LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % XLOG_BLCKSZ;
-       if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
-               (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
-                XLogCtl->XLogCacheByte / 2))
-       {
-               if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
-               {
-                       /*
-                        * Since the amount of data we write here is completely optional
-                        * anyway, tell XLogWrite it can be "flexible" and stop at a
-                        * convenient boundary.  This allows writes triggered by this
-                        * mechanism to synchronize with the cache boundaries, so that in
-                        * a long transaction we'll basically dump alternating halves of
-                        * the buffer array.
-                        */
-                       LogwrtResult = XLogCtl->Write.LogwrtResult;
-                       if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
-                               XLogWrite(LogwrtRqst, true, false);
-                       LWLockRelease(WALWriteLock);
-               }
-       }
-
        /* Now wait to get insert lock */
        LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
 
@@ -746,8 +698,8 @@ begin:;
         * checkpoint, so it's better to be slow in this case and fast otherwise.
         *
         * If we aren't doing full-page writes then RedoRecPtr doesn't actually
-        * affect the contents of the XLOG record, so we'll update our local
-        * copy but not force a recomputation.
+        * affect the contents of the XLOG record, so we'll update our local copy
+        * but not force a recomputation.
         */
        if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
        {
@@ -776,10 +728,10 @@ begin:;
        }
 
        /*
-        * Also check to see if forcePageWrites was just turned on; if we
-        * weren't already doing full-page writes then go back and recompute.
-        * (If it was just turned off, we could recompute the record without
-        * full pages, but we choose not to bother.)
+        * Also check to see if forcePageWrites was just turned on; if we weren't
+        * already doing full-page writes then go back and recompute. (If it was
+        * just turned off, we could recompute the record without full pages, but
+        * we choose not to bother.)
         */
        if (Insert->forcePageWrites && !doPageWrites)
        {
@@ -848,6 +800,19 @@ begin:;
        }
 
        /*
+        * If we backed up any full blocks and online backup is not in progress,
+        * mark the backup blocks as removable.  This allows the WAL archiver to
+        * know whether it is safe to compress archived WAL data by transforming
+        * full-block records into the non-full-block format.
+        *
+        * Note: we could just set the flag whenever !forcePageWrites, but
+        * defining it like this leaves the info bit free for some potential other
+        * use in records without any backup blocks.
+        */
+       if ((info & XLR_BKP_BLOCK_MASK) && !Insert->forcePageWrites)
+               info |= XLR_BKP_REMOVABLE;
+
+       /*
         * If there isn't enough space on the current XLOG page for a record
         * header, advance to the next page (leaving the unused space as zeroes).
         */
@@ -864,11 +829,11 @@ begin:;
        INSERT_RECPTR(RecPtr, Insert, curridx);
 
        /*
-        * If the record is an XLOG_SWITCH, and we are exactly at the start
-        * of a segment, we need not insert it (and don't want to because
-        * we'd like consecutive switch requests to be no-ops).  Instead,
-        * make sure everything is written and flushed through the end of
-        * the prior segment, and return the prior segment's end address.
+        * If the record is an XLOG_SWITCH, and we are exactly at the start of a
+        * segment, we need not insert it (and don't want to because we'd like
+        * consecutive switch requests to be no-ops).  Instead, make sure
+        * everything is written and flushed through the end of the prior segment,
+        * and return the prior segment's end address.
         */
        if (isLogSwitch &&
                (RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
@@ -920,7 +885,7 @@ begin:;
 #ifdef WAL_DEBUG
        if (XLOG_DEBUG)
        {
-               StringInfoData  buf;
+               StringInfoData buf;
 
                initStringInfo(&buf);
                appendStringInfo(&buf, "INSERT @ %X/%X: ",
@@ -937,11 +902,8 @@ begin:;
 #endif
 
        /* Record begin of record in appropriate places */
-       if (!no_tran)
-               MyLastRecPtr = RecPtr;
        ProcLastRecPtr = RecPtr;
        Insert->PrevRecord = RecPtr;
-       MyXactMadeXLogEntry = true;
 
        Insert->currpos += SizeOfXLogRecord;
        freespace -= SizeOfXLogRecord;
@@ -1010,11 +972,13 @@ begin:;
                XLogwrtRqst FlushRqst;
                XLogRecPtr      OldSegEnd;
 
+               TRACE_POSTGRESQL_XLOG_SWITCH();
+
                LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
 
                /*
-                * Flush through the end of the page containing XLOG_SWITCH,
-                * and perform end-of-segment actions (eg, notifying archiver).
+                * Flush through the end of the page containing XLOG_SWITCH, and
+                * perform end-of-segment actions (eg, notifying archiver).
                 */
                WriteRqst = XLogCtl->xlblocks[curridx];
                FlushRqst.Write = WriteRqst;
@@ -1099,7 +1063,7 @@ begin:;
                SpinLockRelease(&xlogctl->info_lck);
        }
 
-       ProcLastRecEnd = RecPtr;
+       XactLastRecEnd = RecPtr;
 
        END_CRIT_SECTION();
 
@@ -1115,31 +1079,30 @@ static bool
 XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
                                XLogRecPtr *lsn, BkpBlock *bkpb)
 {
-       PageHeader      page;
+       Page            page;
 
-       page = (PageHeader) BufferGetBlock(rdata->buffer);
+       page = BufferGetPage(rdata->buffer);
 
        /*
         * XXX We assume page LSN is first data on *every* page that can be passed
         * to XLogInsert, whether it otherwise has the standard page layout or
         * not.
         */
-       *lsn = page->pd_lsn;
+       *lsn = PageGetLSN(page);
 
        if (doPageWrites &&
-               XLByteLE(page->pd_lsn, RedoRecPtr))
+               XLByteLE(PageGetLSN(page), RedoRecPtr))
        {
                /*
                 * The page needs to be backed up, so set up *bkpb
                 */
-               bkpb->node = BufferGetFileNode(rdata->buffer);
-               bkpb->block = BufferGetBlockNumber(rdata->buffer);
+               BufferGetTag(rdata->buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
 
                if (rdata->buffer_std)
                {
                        /* Assume we can omit data between pd_lower and pd_upper */
-                       uint16          lower = page->pd_lower;
-                       uint16          upper = page->pd_upper;
+                       uint16          lower = ((PageHeader) page)->pd_lower;
+                       uint16          upper = ((PageHeader) page)->pd_upper;
 
                        if (lower >= SizeOfPageHeaderData &&
                                upper > lower &&
@@ -1266,6 +1229,50 @@ XLogArchiveCheckDone(const char *xlog)
 }
 
 /*
+ * XLogArchiveIsBusy
+ *
+ * Check to see if an XLOG segment file is still unarchived.
+ * This is almost but not quite the inverse of XLogArchiveCheckDone: in
+ * the first place we aren't chartered to recreate the .ready file, and
+ * in the second place we should consider that if the file is already gone
+ * then it's not busy.  (This check is needed to handle the race condition
+ * that a checkpoint already deleted the no-longer-needed file.)
+ */
+static bool
+XLogArchiveIsBusy(const char *xlog)
+{
+       char            archiveStatusPath[MAXPGPATH];
+       struct stat stat_buf;
+
+       /* First check for .done --- this means archiver is done with it */
+       StatusFilePath(archiveStatusPath, xlog, ".done");
+       if (stat(archiveStatusPath, &stat_buf) == 0)
+               return false;
+
+       /* check for .ready --- this means archiver is still busy with it */
+       StatusFilePath(archiveStatusPath, xlog, ".ready");
+       if (stat(archiveStatusPath, &stat_buf) == 0)
+               return true;
+
+       /* Race condition --- maybe archiver just finished, so recheck */
+       StatusFilePath(archiveStatusPath, xlog, ".done");
+       if (stat(archiveStatusPath, &stat_buf) == 0)
+               return false;
+
+       /*
+        * Check to see if the WAL file has been removed by checkpoint,
+        * which implies it has already been archived, and explains why we
+        * can't see a status file for it.
+        */
+       snprintf(archiveStatusPath, MAXPGPATH, XLOGDIR "/%s", xlog);
+       if (stat(archiveStatusPath, &stat_buf) != 0 &&
+               errno == ENOENT)
+               return false;
+
+       return true;
+}
+
+/*
  * XLogArchiveCleanup
  *
  * Cleanup archive notification file(s) for a particular xlog segment
@@ -1366,12 +1373,14 @@ AdvanceXLInsertBuffer(bool new_segment)
                                 * Have to write buffers while holding insert lock. This is
                                 * not good, so only write as much as we absolutely must.
                                 */
+                               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START();
                                WriteRqst.Write = OldPageRqstPtr;
                                WriteRqst.Flush.xlogid = 0;
                                WriteRqst.Flush.xrecoff = 0;
                                XLogWrite(WriteRqst, false, false);
                                LWLockRelease(WALWriteLock);
                                Insert->LogwrtResult = LogwrtResult;
+                               TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE();
                        }
                }
        }
@@ -1440,6 +1449,40 @@ AdvanceXLInsertBuffer(bool new_segment)
 }
 
 /*
+ * Check whether we've consumed enough xlog space that a checkpoint is needed.
+ *
+ * Caller must have just finished filling the open log file (so that
+ * openLogId/openLogSeg are valid).  We measure the distance from RedoRecPtr
+ * to the open log file and see if that exceeds CheckPointSegments.
+ *
+ * Note: it is caller's responsibility that RedoRecPtr is up-to-date.
+ */
+static bool
+XLogCheckpointNeeded(void)
+{
+       /*
+        * A straight computation of segment number could overflow 32 bits. Rather
+        * than assuming we have working 64-bit arithmetic, we compare the
+        * highest-order bits separately, and force a checkpoint immediately when
+        * they change.
+        */
+       uint32          old_segno,
+                               new_segno;
+       uint32          old_highbits,
+                               new_highbits;
+
+       old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
+               (RedoRecPtr.xrecoff / XLogSegSize);
+       old_highbits = RedoRecPtr.xlogid / XLogSegSize;
+       new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile + openLogSeg;
+       new_highbits = openLogId / XLogSegSize;
+       if (new_highbits != old_highbits ||
+               new_segno >= old_segno + (uint32) (CheckPointSegments - 1))
+               return true;
+       return false;
+}
+
+/*
  * Write and/or fsync the log at least as far as WriteRqst indicates.
  *
  * If flexible == TRUE, we don't have to write as far as WriteRqst, but
@@ -1531,54 +1574,6 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                        openLogFile = XLogFileInit(openLogId, openLogSeg,
                                                                           &use_existent, true);
                        openLogOff = 0;
-
-                       /* update pg_control, unless someone else already did */
-                       LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
-                       if (ControlFile->logId < openLogId ||
-                               (ControlFile->logId == openLogId &&
-                                ControlFile->logSeg < openLogSeg + 1))
-                       {
-                               ControlFile->logId = openLogId;
-                               ControlFile->logSeg = openLogSeg + 1;
-                               ControlFile->time = time(NULL);
-                               UpdateControlFile();
-
-                               /*
-                                * Signal bgwriter to start a checkpoint if it's been too long
-                                * since the last one.  (We look at local copy of RedoRecPtr
-                                * which might be a little out of date, but should be close
-                                * enough for this purpose.)
-                                *
-                                * A straight computation of segment number could overflow 32
-                                * bits.  Rather than assuming we have working 64-bit
-                                * arithmetic, we compare the highest-order bits separately,
-                                * and force a checkpoint immediately when they change.
-                                */
-                               if (IsUnderPostmaster)
-                               {
-                                       uint32          old_segno,
-                                                               new_segno;
-                                       uint32          old_highbits,
-                                                               new_highbits;
-
-                                       old_segno = (RedoRecPtr.xlogid % XLogSegSize) * XLogSegsPerFile +
-                                               (RedoRecPtr.xrecoff / XLogSegSize);
-                                       old_highbits = RedoRecPtr.xlogid / XLogSegSize;
-                                       new_segno = (openLogId % XLogSegSize) * XLogSegsPerFile +
-                                               openLogSeg;
-                                       new_highbits = openLogId / XLogSegSize;
-                                       if (new_highbits != old_highbits ||
-                                               new_segno >= old_segno + (uint32) CheckPointSegments)
-                                       {
-#ifdef WAL_DEBUG
-                                               if (XLOG_DEBUG)
-                                                       elog(LOG, "time for a checkpoint, signaling bgwriter");
-#endif
-                                               RequestCheckpoint(false, true);
-                                       }
-                               }
-                       }
-                       LWLockRelease(ControlFileLock);
                }
 
                /* Make sure we have the current logfile open */
@@ -1661,7 +1656,10 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                         * switch.
                         *
                         * This is also the right place to notify the Archiver that the
-                        * segment is ready to copy to archival storage.
+                        * segment is ready to copy to archival storage, and to update the
+                        * timer for archive_timeout, and to signal for a checkpoint if
+                        * too many logfile segments have been used since the last
+                        * checkpoint.
                         */
                        if (finishing_seg || (xlog_switch && last_iteration))
                        {
@@ -1670,6 +1668,23 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 
                                if (XLogArchivingActive())
                                        XLogArchiveNotifySeg(openLogId, openLogSeg);
+
+                               Write->lastSegSwitchTime = (pg_time_t) time(NULL);
+
+                               /*
+                                * Signal bgwriter to start a checkpoint if we've consumed too
+                                * much xlog since the last one.  For speed, we first check
+                                * using the local copy of RedoRecPtr, which might be out of
+                                * date; if it looks like a checkpoint is needed, forcibly
+                                * update RedoRecPtr and recheck.
+                                */
+                               if (IsUnderPostmaster &&
+                                       XLogCheckpointNeeded())
+                               {
+                                       (void) GetRedoRecPtr();
+                                       if (XLogCheckpointNeeded())
+                                               RequestCheckpoint(CHECKPOINT_CAUSE_XLOG);
+                               }
                        }
                }
 
@@ -1700,7 +1715,8 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
                 * have no open file or the wrong one.  However, we do not need to
                 * fsync more than one file.
                 */
-               if (sync_method != SYNC_METHOD_OPEN)
+               if (sync_method != SYNC_METHOD_OPEN &&
+                       sync_method != SYNC_METHOD_OPEN_DSYNC)
                {
                        if (openLogFile >= 0 &&
                                !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
@@ -1740,6 +1756,79 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
 }
 
 /*
+ * Record the LSN for an asynchronous transaction commit.
+ * (This should not be called for aborts, nor for synchronous commits.)
+ */
+void
+XLogSetAsyncCommitLSN(XLogRecPtr asyncCommitLSN)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       if (XLByteLT(xlogctl->asyncCommitLSN, asyncCommitLSN))
+               xlogctl->asyncCommitLSN = asyncCommitLSN;
+       SpinLockRelease(&xlogctl->info_lck);
+}
+
+/*
+ * Advance minRecoveryPoint in control file.
+ *
+ * If we crash during recovery, we must reach this point again before the
+ * database is consistent. 
+ * 
+ * If 'force' is true, 'lsn' argument is ignored. Otherwise, minRecoveryPoint
+ * is is only updated if it's not already greater than or equal to 'lsn'.
+ */
+static void
+UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force)
+{
+       /* Quick check using our local copy of the variable */
+       if (!updateMinRecoveryPoint || (!force && XLByteLE(lsn, minRecoveryPoint)))
+               return;
+
+       LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+
+       /* update local copy */
+       minRecoveryPoint = ControlFile->minRecoveryPoint;
+
+       /*
+        * An invalid minRecoveryPoint means that we need to recover all the WAL,
+        * ie. crash recovery. Don't update the control file in that case.
+        */
+       if (minRecoveryPoint.xlogid == 0 && minRecoveryPoint.xrecoff == 0)
+               updateMinRecoveryPoint = false;
+       else if (force || XLByteLT(minRecoveryPoint, lsn))
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+               XLogRecPtr newMinRecoveryPoint;
+
+               /*
+                * To avoid having to update the control file too often, we update it
+                * all the way to the last record being replayed, even though 'lsn'
+                * would suffice for correctness.
+                */
+               SpinLockAcquire(&xlogctl->info_lck);
+               newMinRecoveryPoint = xlogctl->replayEndRecPtr;
+               SpinLockRelease(&xlogctl->info_lck);
+
+               /* update control file */
+               if (XLByteLT(ControlFile->minRecoveryPoint, newMinRecoveryPoint))
+               {
+                       ControlFile->minRecoveryPoint = newMinRecoveryPoint;
+                       UpdateControlFile();
+                       minRecoveryPoint = newMinRecoveryPoint;
+
+                       ereport(DEBUG2,
+                                       (errmsg("updated min recovery point to %X/%X",
+                                               minRecoveryPoint.xlogid, minRecoveryPoint.xrecoff)));
+               }
+       }
+       LWLockRelease(ControlFileLock);
+}
+
+/*
  * Ensure that all XLOG data through the given position is flushed to disk.
  *
  * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
@@ -1751,9 +1840,15 @@ XLogFlush(XLogRecPtr record)
        XLogRecPtr      WriteRqstPtr;
        XLogwrtRqst WriteRqst;
 
-       /* Disabled during REDO */
-       if (InRedo)
+       /*
+        * During REDO, we don't try to flush the WAL, but update minRecoveryPoint
+        * instead.
+        */
+       if (RecoveryInProgress())
+       {
+               UpdateMinRecoveryPoint(record, false);
                return;
+       }
 
        /* Quick exit if already known flushed */
        if (XLByteLE(record, LogwrtResult.Flush))
@@ -1840,9 +1935,9 @@ XLogFlush(XLogRecPtr record)
         * the bad page is encountered again during recovery then we would be
         * unable to restart the database at all!  (This scenario has actually
         * happened in the field several times with 7.1 releases. Note that we
-        * cannot get here while InRedo is true, but if the bad page is brought in
-        * and marked dirty during recovery then CreateCheckPoint will try to
-        * flush it at the end of recovery.)
+        * cannot get here while RecoveryInProgress(), but if the bad page is
+        * brought in and marked dirty during recovery then if a checkpoint were
+        * performed at the end of recovery it will try to flush it.
         *
         * The current approach is to ERROR under normal conditions, but only
         * WARNING during recovery, so that the system can be brought up even if
@@ -1859,58 +1954,199 @@ XLogFlush(XLogRecPtr record)
 }
 
 /*
- * Create a new XLOG file segment, or open a pre-existing one.
- *
- * log, seg: identify segment to be created/opened.
- *
- * *use_existent: if TRUE, OK to use a pre-existing file (else, any
- * pre-existing file will be deleted). On return, TRUE if a pre-existing
- * file was used.
+ * Flush xlog, but without specifying exactly where to flush to.
  *
- * use_lock: if TRUE, acquire ControlFileLock while moving file into
- * place.  This should be TRUE except during bootstrap log creation.  The
- * caller must *not* hold the lock at call.
- *
- * Returns FD of opened file.
+ * We normally flush only completed blocks; but if there is nothing to do on
+ * that basis, we check for unflushed async commits in the current incomplete
+ * block, and flush through the latest one of those.  Thus, if async commits
+ * are not being used, we will flush complete blocks only.     We can guarantee
+ * that async commits reach disk after at most three cycles; normally only
+ * one or two. (We allow XLogWrite to write "flexibly", meaning it can stop
+ * at the end of the buffer ring; this makes a difference only with very high
+ * load or long wal_writer_delay, but imposes one extra cycle for the worst
+ * case for async commits.)
  *
- * Note: errors here are ERROR not PANIC because we might or might not be
- * inside a critical section (eg, during checkpoint there is no reason to
- * take down the system on failure).  They will promote to PANIC if we are
- * in a critical section.
+ * This routine is invoked periodically by the background walwriter process.
  */
-static int
-XLogFileInit(uint32 log, uint32 seg,
-                        bool *use_existent, bool use_lock)
+void
+XLogBackgroundFlush(void)
 {
-       char            path[MAXPGPATH];
-       char            tmppath[MAXPGPATH];
-       char            zbuffer[XLOG_BLCKSZ];
-       uint32          installed_log;
-       uint32          installed_seg;
-       int                     max_advance;
-       int                     fd;
-       int                     nbytes;
+       XLogRecPtr      WriteRqstPtr;
+       bool            flexible = true;
 
-       XLogFilePath(path, ThisTimeLineID, log, seg);
+       /* XLOG doesn't need flushing during recovery */
+       if (RecoveryInProgress())
+               return;
 
-       /*
-        * Try to use existent file (checkpoint maker may have created it already)
-        */
-       if (*use_existent)
+       /* read LogwrtResult and update local state */
        {
-               fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
-                                                  S_IRUSR | S_IWUSR);
-               if (fd < 0)
-               {
-                       if (errno != ENOENT)
-                               ereport(ERROR,
-                                               (errcode_for_file_access(),
-                                                errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
-                                                               path, log, seg)));
-               }
-               else
-                       return fd;
-       }
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               LogwrtResult = xlogctl->LogwrtResult;
+               WriteRqstPtr = xlogctl->LogwrtRqst.Write;
+               SpinLockRelease(&xlogctl->info_lck);
+       }
+
+       /* back off to last completed page boundary */
+       WriteRqstPtr.xrecoff -= WriteRqstPtr.xrecoff % XLOG_BLCKSZ;
+
+       /* if we have already flushed that far, consider async commit records */
+       if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               WriteRqstPtr = xlogctl->asyncCommitLSN;
+               SpinLockRelease(&xlogctl->info_lck);
+               flexible = false;               /* ensure it all gets written */
+       }
+
+       /* Done if already known flushed */
+       if (XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+               return;
+
+#ifdef WAL_DEBUG
+       if (XLOG_DEBUG)
+               elog(LOG, "xlog bg flush request %X/%X; write %X/%X; flush %X/%X",
+                        WriteRqstPtr.xlogid, WriteRqstPtr.xrecoff,
+                        LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
+                        LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
+#endif
+
+       START_CRIT_SECTION();
+
+       /* now wait for the write lock */
+       LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+       LogwrtResult = XLogCtl->Write.LogwrtResult;
+       if (!XLByteLE(WriteRqstPtr, LogwrtResult.Flush))
+       {
+               XLogwrtRqst WriteRqst;
+
+               WriteRqst.Write = WriteRqstPtr;
+               WriteRqst.Flush = WriteRqstPtr;
+               XLogWrite(WriteRqst, flexible, false);
+       }
+       LWLockRelease(WALWriteLock);
+
+       END_CRIT_SECTION();
+}
+
+/*
+ * Flush any previous asynchronously-committed transactions' commit records.
+ *
+ * NOTE: it is unwise to assume that this provides any strong guarantees.
+ * In particular, because of the inexact LSN bookkeeping used by clog.c,
+ * we cannot assume that hint bits will be settable for these transactions.
+ */
+void
+XLogAsyncCommitFlush(void)
+{
+       XLogRecPtr      WriteRqstPtr;
+
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       /* There's no asynchronously committed transactions during recovery */
+       if (RecoveryInProgress())
+               return;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       WriteRqstPtr = xlogctl->asyncCommitLSN;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       XLogFlush(WriteRqstPtr);
+}
+
+/*
+ * Test whether XLOG data has been flushed up to (at least) the given position.
+ *
+ * Returns true if a flush is still needed.  (It may be that someone else
+ * is already in process of flushing that far, however.)
+ */
+bool
+XLogNeedsFlush(XLogRecPtr record)
+{
+       /* XLOG doesn't need flushing during recovery */
+       if (RecoveryInProgress())
+               return false;
+
+       /* Quick exit if already known flushed */
+       if (XLByteLE(record, LogwrtResult.Flush))
+               return false;
+
+       /* read LogwrtResult and update local state */
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               LogwrtResult = xlogctl->LogwrtResult;
+               SpinLockRelease(&xlogctl->info_lck);
+       }
+
+       /* check again */
+       if (XLByteLE(record, LogwrtResult.Flush))
+               return false;
+
+       return true;
+}
+
+/*
+ * Create a new XLOG file segment, or open a pre-existing one.
+ *
+ * log, seg: identify segment to be created/opened.
+ *
+ * *use_existent: if TRUE, OK to use a pre-existing file (else, any
+ * pre-existing file will be deleted). On return, TRUE if a pre-existing
+ * file was used.
+ *
+ * use_lock: if TRUE, acquire ControlFileLock while moving file into
+ * place.  This should be TRUE except during bootstrap log creation.  The
+ * caller must *not* hold the lock at call.
+ *
+ * Returns FD of opened file.
+ *
+ * Note: errors here are ERROR not PANIC because we might or might not be
+ * inside a critical section (eg, during checkpoint there is no reason to
+ * take down the system on failure).  They will promote to PANIC if we are
+ * in a critical section.
+ */
+static int
+XLogFileInit(uint32 log, uint32 seg,
+                        bool *use_existent, bool use_lock)
+{
+       char            path[MAXPGPATH];
+       char            tmppath[MAXPGPATH];
+       char       *zbuffer;
+       uint32          installed_log;
+       uint32          installed_seg;
+       int                     max_advance;
+       int                     fd;
+       int                     nbytes;
+
+       XLogFilePath(path, ThisTimeLineID, log, seg);
+
+       /*
+        * Try to use existent file (checkpoint maker may have created it already)
+        */
+       if (*use_existent)
+       {
+               fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
+                                                  S_IRUSR | S_IWUSR);
+               if (fd < 0)
+               {
+                       if (errno != ENOENT)
+                               ereport(ERROR,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
+                                                               path, log, seg)));
+               }
+               else
+                       return fd;
+       }
 
        /*
         * Initialize an empty (all zeroes) segment.  NOTE: it is possible that
@@ -1918,11 +2154,13 @@ XLogFileInit(uint32 log, uint32 seg,
         * pre-creating an extra log segment.  That seems OK, and better than
         * holding the lock throughout this lengthy process.
         */
+       elog(DEBUG2, "creating and filling new WAL file");
+
        snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
 
        unlink(tmppath);
 
-       /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
+       /* do not use get_sync_bit() here --- want to fsync only at end of fill */
        fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
                                           S_IRUSR | S_IWUSR);
        if (fd < 0)
@@ -1938,12 +2176,16 @@ XLogFileInit(uint32 log, uint32 seg,
         * fsync below) that all the indirect blocks are down on disk.  Therefore,
         * fdatasync(2) or O_DSYNC will be sufficient to sync future writes to the
         * log file.
+        *
+        * Note: palloc zbuffer, instead of just using a local char array, to
+        * ensure it is reasonably well-aligned; this may save a few cycles
+        * transferring data to the kernel.
         */
-       MemSet(zbuffer, 0, sizeof(zbuffer));
-       for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
+       zbuffer = (char *) palloc0(XLOG_BLCKSZ);
+       for (nbytes = 0; nbytes < XLogSegSize; nbytes += XLOG_BLCKSZ)
        {
                errno = 0;
-               if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
+               if ((int) write(fd, zbuffer, XLOG_BLCKSZ) != (int) XLOG_BLCKSZ)
                {
                        int                     save_errno = errno;
 
@@ -1959,6 +2201,7 @@ XLogFileInit(uint32 log, uint32 seg,
                                         errmsg("could not write to file \"%s\": %m", tmppath)));
                }
        }
+       pfree(zbuffer);
 
        if (pg_fsync(fd) != 0)
                ereport(ERROR,
@@ -1989,11 +2232,13 @@ XLogFileInit(uint32 log, uint32 seg,
                unlink(tmppath);
        }
 
+       elog(DEBUG2, "done creating and filling new WAL file");
+
        /* Set flag to tell caller there was no existent file */
        *use_existent = false;
 
        /* Now open original target segment (might not be file I just made) */
-       fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
+       fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
                                           S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(ERROR,
@@ -2044,7 +2289,7 @@ XLogFileCopy(uint32 log, uint32 seg,
 
        unlink(tmppath);
 
-       /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
+       /* do not use get_sync_bit() here --- want to fsync only at end of fill */
        fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
                                           S_IRUSR | S_IWUSR);
        if (fd < 0)
@@ -2131,7 +2376,9 @@ XLogFileCopy(uint32 log, uint32 seg,
  * caller must *not* hold the lock at call.
  *
  * Returns TRUE if file installed, FALSE if not installed because of
- * exceeding max_advance limit.  (Any other kind of failure causes ereport().)
+ * exceeding max_advance limit.  On Windows, we also return FALSE if we
+ * can't rename the file into place because someone's got it open.
+ * (Any other kind of failure causes ereport().)
  */
 static bool
 InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
@@ -2186,10 +2433,25 @@ InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
        unlink(tmppath);
 #else
        if (rename(tmppath, path) < 0)
+       {
+#ifdef WIN32
+#if !defined(__CYGWIN__)
+               if (GetLastError() == ERROR_ACCESS_DENIED)
+#else
+               if (errno == EACCES)
+#endif
+               {
+                       if (use_lock)
+                               LWLockRelease(ControlFileLock);
+                       return false;
+               }
+#endif   /* WIN32 */
+
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("could not rename file \"%s\" to \"%s\" (initialization of log file %u, segment %u): %m",
                                                tmppath, path, *log, *seg)));
+       }
 #endif
 
        if (use_lock)
@@ -2209,7 +2471,7 @@ XLogFileOpen(uint32 log, uint32 seg)
 
        XLogFilePath(path, ThisTimeLineID, log, seg);
 
-       fd = BasicOpenFile(path, O_RDWR | PG_BINARY | XLOG_SYNC_BIT,
+       fd = BasicOpenFile(path, O_RDWR | PG_BINARY | get_sync_bit(sync_method),
                                           S_IRUSR | S_IWUSR);
        if (fd < 0)
                ereport(PANIC,
@@ -2228,6 +2490,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
 {
        char            path[MAXPGPATH];
        char            xlogfname[MAXFNAMELEN];
+       char            activitymsg[MAXFNAMELEN + 16];
        ListCell   *cell;
        int                     fd;
 
@@ -2248,9 +2511,15 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
                if (tli < curFileTLI)
                        break;                          /* don't bother looking at too-old TLIs */
 
+               XLogFileName(xlogfname, tli, log, seg);
+
                if (InArchiveRecovery)
                {
-                       XLogFileName(xlogfname, tli, log, seg);
+                       /* Report recovery progress in PS display */
+                       snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
+                                        xlogfname);
+                       set_ps_display(activitymsg, false);
+
                        restoredFromArchive = RestoreArchivedFile(path, xlogfname,
                                                                                                          "RECOVERYXLOG",
                                                                                                          XLogSegSize);
@@ -2263,6 +2532,12 @@ XLogFileRead(uint32 log, uint32 seg, int emode)
                {
                        /* Success! */
                        curFileTLI = tli;
+
+                       /* Report recovery progress in PS display */
+                       snprintf(activitymsg, sizeof(activitymsg), "recovering %s",
+                                        xlogfname);
+                       set_ps_display(activitymsg, false);
+
                        return fd;
                }
                if (errno != ENOENT)    /* unexpected failure? */
@@ -2291,36 +2566,23 @@ XLogFileClose(void)
        Assert(openLogFile >= 0);
 
        /*
-        * posix_fadvise is problematic on many platforms: on older x86 Linux
-        * it just dumps core, and there are reports of problems on PPC platforms
-        * as well.  The following is therefore disabled for the time being.
-        * We could consider some kind of configure test to see if it's safe to
-        * use, but since we lack hard evidence that there's any useful performance
-        * gain to be had, spending time on that seems unprofitable for now.
-        */
-#ifdef NOT_USED
-
-       /*
         * WAL segment files will not be re-read in normal operation, so we advise
-        * OS to release any cached pages.  But do not do so if WAL archiving is
-        * active, because archiver process could use the cache to read the WAL
-        * segment.
-        *
-        * While O_DIRECT works for O_SYNC, posix_fadvise() works for fsync()
-        * and O_SYNC, and some platforms only have posix_fadvise().
+        * the OS to release any cached pages.  But do not do so if WAL archiving
+        * is active, because archiver process could use the cache to read the WAL
+        * segment.  Also, don't bother with it if we are using O_DIRECT, since
+        * the kernel is presumably not caching in that case.
         */
-#if defined(HAVE_DECL_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
-       if (!XLogArchivingActive())
-               posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
+#if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED)
+       if (!XLogArchivingActive() &&
+               (get_sync_bit(sync_method) & PG_O_DIRECT) == 0)
+               (void) posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED);
 #endif
 
-#endif /* NOT_USED */
-
        if (close(openLogFile))
                ereport(PANIC,
-                       (errcode_for_file_access(),
-                       errmsg("could not close log file %u, segment %u: %m",
-                                  openLogId, openLogSeg)));
+                               (errcode_for_file_access(),
+                                errmsg("could not close log file %u, segment %u: %m",
+                                               openLogId, openLogSeg)));
        openLogFile = -1;
 }
 
@@ -2344,11 +2606,15 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 {
        char            xlogpath[MAXPGPATH];
        char            xlogRestoreCmd[MAXPGPATH];
+       char            lastRestartPointFname[MAXPGPATH];
        char       *dp;
        char       *endp;
        const char *sp;
        int                     rc;
+       bool            signaled;
        struct stat stat_buf;
+       uint32          restartLog;
+       uint32          restartSeg;
 
        /*
         * When doing archive recovery, we always prefer an archived log file even
@@ -2396,6 +2662,35 @@ RestoreArchivedFile(char *path, const char *xlogfname,
        }
 
        /*
+        * Calculate the archive file cutoff point for use during log shipping
+        * replication. All files earlier than this point can be deleted
+        * from the archive, though there is no requirement to do so.
+        *
+        * We initialise this with the filename of an InvalidXLogRecPtr, which
+        * will prevent the deletion of any WAL files from the archive
+        * because of the alphabetic sorting property of WAL filenames.
+        *
+        * Once we have successfully located the redo pointer of the checkpoint
+        * from which we start recovery we never request a file prior to the redo
+        * pointer of the last restartpoint. When redo begins we know that we
+        * have successfully located it, so there is no need for additional
+        * status flags to signify the point when we can begin deleting WAL files
+        * from the archive.
+        */
+       if (InRedo)
+       {
+               XLByteToSeg(ControlFile->checkPointCopy.redo,
+                                       restartLog, restartSeg);
+               XLogFileName(lastRestartPointFname,
+                                        ControlFile->checkPointCopy.ThisTimeLineID,
+                                        restartLog, restartSeg);
+               /* we shouldn't need anything earlier than last restart point */
+               Assert(strcmp(lastRestartPointFname, xlogfname) <= 0);
+       }
+       else
+               XLogFileName(lastRestartPointFname, 0, 0, 0);
+
+       /*
         * construct the command to be executed
         */
        dp = xlogRestoreCmd;
@@ -2409,7 +2704,7 @@ RestoreArchivedFile(char *path, const char *xlogfname,
                        switch (sp[1])
                        {
                                case 'p':
-                                       /* %p: full path of target file */
+                                       /* %p: relative path of target file */
                                        sp++;
                                        StrNCpy(dp, xlogpath, endp - dp);
                                        make_native_path(dp);
@@ -2421,6 +2716,12 @@ RestoreArchivedFile(char *path, const char *xlogfname,
                                        StrNCpy(dp, xlogfname, endp - dp);
                                        dp += strlen(dp);
                                        break;
+                               case 'r':
+                                       /* %r: filename of last restartpoint */
+                                       sp++;
+                                       StrNCpy(dp, lastRestartPointFname, endp - dp);
+                                       dp += strlen(dp);
+                                       break;
                                case '%':
                                        /* convert %% to a single % */
                                        sp++;
@@ -2447,9 +2748,22 @@ RestoreArchivedFile(char *path, const char *xlogfname,
                                                         xlogRestoreCmd)));
 
        /*
+        * Set in_restore_command to tell the signal handler that we should exit
+        * right away on SIGTERM. We know that we're in a safe point to do that.
+        * Check if we had already received the signal, so that we don't miss a
+        * shutdown request received just before this.
+        */
+       in_restore_command = true;
+       if (shutdown_requested)
+               proc_exit(1);
+
+       /*
         * Copy xlog from archival storage to XLOGDIR
         */
        rc = system(xlogRestoreCmd);
+
+       in_restore_command = false;
+
        if (rc == 0)
        {
                /*
@@ -2491,13 +2805,38 @@ RestoreArchivedFile(char *path, const char *xlogfname,
        }
 
        /*
-        * remember, we rollforward UNTIL the restore fails so failure here is
+        * Remember, we rollforward UNTIL the restore fails so failure here is
         * just part of the process... that makes it difficult to determine
         * whether the restore failed because there isn't an archive to restore,
         * or because the administrator has specified the restore program
         * incorrectly.  We have to assume the former.
+        *
+        * However, if the failure was due to any sort of signal, it's best to
+        * punt and abort recovery.  (If we "return false" here, upper levels will
+        * assume that recovery is complete and start up the database!) It's
+        * essential to abort on child SIGINT and SIGQUIT, because per spec
+        * system() ignores SIGINT and SIGQUIT while waiting; if we see one of
+        * those it's a good bet we should have gotten it too.
+        *
+        * On SIGTERM, assume we have received a fast shutdown request, and exit
+        * cleanly. It's pure chance whether we receive the SIGTERM first, or the
+        * child process. If we receive it first, the signal handler will call
+        * proc_exit, otherwise we do it here. If we or the child process
+        * received SIGTERM for any other reason than a fast shutdown request,
+        * postmaster will perform an immediate shutdown when it sees us exiting
+        * unexpectedly.
+        *
+        * Per the Single Unix Spec, shells report exit status > 128 when a called
+        * command died on a signal.  Also, 126 and 127 are used to report
+        * problems such as an unfindable command; treat those as fatal errors
+        * too.
         */
-       ereport(DEBUG2,
+       if (WTERMSIG(rc) == SIGTERM)
+               proc_exit(1);
+
+       signaled = WIFSIGNALED(rc) || WEXITSTATUS(rc) > 125;
+
+       ereport(signaled ? FATAL : DEBUG2,
                (errmsg("could not restore file \"%s\" from archive: return code %d",
                                xlogfname, rc)));
 
@@ -2513,13 +2852,126 @@ RestoreArchivedFile(char *path, const char *xlogfname,
 }
 
 /*
- * Preallocate log files beyond the specified log endpoint, according to
- * the XLOGfile user parameter.
+ * Attempt to execute the recovery_end_command.
  */
-static int
+static void
+ExecuteRecoveryEndCommand(void)
+{
+       char            xlogRecoveryEndCmd[MAXPGPATH];
+       char            lastRestartPointFname[MAXPGPATH];
+       char       *dp;
+       char       *endp;
+       const char *sp;
+       int                     rc;
+       bool            signaled;
+       uint32          restartLog;
+       uint32          restartSeg;
+
+       Assert(recoveryEndCommand);
+
+       /*
+        * Calculate the archive file cutoff point for use during log shipping
+        * replication. All files earlier than this point can be deleted
+        * from the archive, though there is no requirement to do so.
+        *
+        * We initialise this with the filename of an InvalidXLogRecPtr, which
+        * will prevent the deletion of any WAL files from the archive
+        * because of the alphabetic sorting property of WAL filenames. 
+        *
+        * Once we have successfully located the redo pointer of the checkpoint
+        * from which we start recovery we never request a file prior to the redo
+        * pointer of the last restartpoint. When redo begins we know that we
+        * have successfully located it, so there is no need for additional
+        * status flags to signify the point when we can begin deleting WAL files
+        * from the archive. 
+        */
+       if (InRedo)
+       {
+               XLByteToSeg(ControlFile->checkPointCopy.redo,
+                                       restartLog, restartSeg);
+               XLogFileName(lastRestartPointFname,
+                                        ControlFile->checkPointCopy.ThisTimeLineID,
+                                        restartLog, restartSeg);
+       }
+       else
+               XLogFileName(lastRestartPointFname, 0, 0, 0);
+
+       /*
+        * construct the command to be executed
+        */
+       dp = xlogRecoveryEndCmd;
+       endp = xlogRecoveryEndCmd + MAXPGPATH - 1;
+       *endp = '\0';
+
+       for (sp = recoveryEndCommand; *sp; sp++)
+       {
+               if (*sp == '%')
+               {
+                       switch (sp[1])
+                       {
+                               case 'r':
+                                       /* %r: filename of last restartpoint */
+                                       sp++;
+                                       StrNCpy(dp, lastRestartPointFname, endp - dp);
+                                       dp += strlen(dp);
+                                       break;
+                               case '%':
+                                       /* convert %% to a single % */
+                                       sp++;
+                                       if (dp < endp)
+                                               *dp++ = *sp;
+                                       break;
+                               default:
+                                       /* otherwise treat the % as not special */
+                                       if (dp < endp)
+                                               *dp++ = *sp;
+                                       break;
+                       }
+               }
+               else
+               {
+                       if (dp < endp)
+                               *dp++ = *sp;
+               }
+       }
+       *dp = '\0';
+
+       ereport(DEBUG3,
+                       (errmsg_internal("executing recovery end command \"%s\"",
+                                                        xlogRecoveryEndCmd)));
+
+       /*
+        * Copy xlog from archival storage to XLOGDIR
+        */
+       rc = system(xlogRecoveryEndCmd);
+       if (rc != 0)
+       {
+               /*
+                * If the failure was due to any sort of signal, it's best to punt and
+                * abort recovery. See also detailed comments on signals in 
+                * RestoreArchivedFile().
+                */
+               signaled = WIFSIGNALED(rc) || WEXITSTATUS(rc) > 125;
+
+               ereport(signaled ? FATAL : WARNING,
+                               (errmsg("recovery_end_command \"%s\": return code %d",
+                                                               xlogRecoveryEndCmd, rc)));
+       }
+}
+
+/*
+ * Preallocate log files beyond the specified log endpoint.
+ *
+ * XXX this is currently extremely conservative, since it forces only one
+ * future log segment to exist, and even that only if we are 75% done with
+ * the current one.  This is only appropriate for very low-WAL-volume systems.
+ * High-volume systems will be OK once they've built up a sufficient set of
+ * recycled log segments, but the startup transient is likely to include
+ * a lot of segment creations by foreground processes, which is not so good.
+ */
+static void
 PreallocXlogFiles(XLogRecPtr endptr)
 {
-       int                     nsegsadded = 0;
        uint32          _logId;
        uint32          _logSeg;
        int                     lf;
@@ -2534,20 +2986,18 @@ PreallocXlogFiles(XLogRecPtr endptr)
                lf = XLogFileInit(_logId, _logSeg, &use_existent, true);
                close(lf);
                if (!use_existent)
-                       nsegsadded++;
+                       CheckpointStats.ckpt_segs_added++;
        }
-       return nsegsadded;
 }
 
 /*
- * Remove or move offline all log files older or equal to passed log/seg#
+ * Recycle or remove all log files older or equal to passed log/seg#
  *
  * endptr is current (or recent) end of xlog; this is used to determine
  * whether we want to recycle rather than delete no-longer-wanted log files.
  */
 static void
-MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
-                               int *nsegsremoved, int *nsegsrecycled)
+RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr)
 {
        uint32          endlogId;
        uint32          endlogSeg;
@@ -2557,9 +3007,6 @@ MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
        char            lastoff[MAXFNAMELEN];
        char            path[MAXPGPATH];
 
-       *nsegsremoved = 0;
-       *nsegsrecycled = 0;
-
        /*
         * Initialize info about where to try to recycle to.  We allow recycling
         * segments up to XLOGfileslop segments beyond the current XLOG location.
@@ -2608,7 +3055,7 @@ MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
                                        ereport(DEBUG2,
                                                        (errmsg("recycled transaction log file \"%s\"",
                                                                        xlde->d_name)));
-                                       (*nsegsrecycled)++;
+                                       CheckpointStats.ckpt_segs_recycled++;
                                        /* Needn't recheck that slot on future iterations */
                                        if (max_advance > 0)
                                        {
@@ -2623,7 +3070,7 @@ MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
                                                        (errmsg("removing transaction log file \"%s\"",
                                                                        xlde->d_name)));
                                        unlink(path);
-                                       (*nsegsremoved)++;
+                                       CheckpointStats.ckpt_segs_removed++;
                                }
 
                                XLogArchiveCleanup(xlde->d_name);
@@ -2635,25 +3082,72 @@ MoveOfflineLogs(uint32 log, uint32 seg, XLogRecPtr endptr,
 }
 
 /*
- * Remove previous backup history files.  This also retries creation of
- * .ready files for any backup history files for which XLogArchiveNotify
- * failed earlier.
+ * Verify whether pg_xlog and pg_xlog/archive_status exist.
+ * If the latter does not exist, recreate it.
+ *
+ * It is not the goal of this function to verify the contents of these
+ * directories, but to help in cases where someone has performed a cluster
+ * copy for PITR purposes but omitted pg_xlog from the copy.
+ *
+ * We could also recreate pg_xlog if it doesn't exist, but a deliberate
+ * policy decision was made not to.  It is fairly common for pg_xlog to be
+ * a symlink, and if that was the DBA's intent then automatically making a
+ * plain directory would result in degraded performance with no notice.
  */
 static void
-CleanupBackupHistory(void)
+ValidateXLOGDirectoryStructure(void)
 {
-       DIR                *xldir;
-       struct dirent *xlde;
        char            path[MAXPGPATH];
+       struct stat     stat_buf;
 
-       xldir = AllocateDir(XLOGDIR);
-       if (xldir == NULL)
-               ereport(ERROR,
-                               (errcode_for_file_access(),
-                                errmsg("could not open transaction log directory \"%s\": %m",
+       /* Check for pg_xlog; if it doesn't exist, error out */
+       if (stat(XLOGDIR, &stat_buf) != 0 ||
+               !S_ISDIR(stat_buf.st_mode))
+               ereport(FATAL, 
+                               (errmsg("required WAL directory \"%s\" does not exist",
                                                XLOGDIR)));
 
-       while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
+       /* Check for archive_status */
+       snprintf(path, MAXPGPATH, XLOGDIR "/archive_status");
+       if (stat(path, &stat_buf) == 0)
+       {
+               /* Check for weird cases where it exists but isn't a directory */
+               if (!S_ISDIR(stat_buf.st_mode))
+                       ereport(FATAL, 
+                                       (errmsg("required WAL directory \"%s\" does not exist",
+                                                       path)));
+       }
+       else
+       {
+               ereport(LOG,
+                               (errmsg("creating missing WAL directory \"%s\"", path)));
+               if (mkdir(path, 0700) < 0)
+                       ereport(FATAL, 
+                                       (errmsg("could not create missing directory \"%s\": %m",
+                                                       path)));
+       }
+}
+
+/*
+ * Remove previous backup history files.  This also retries creation of
+ * .ready files for any backup history files for which XLogArchiveNotify
+ * failed earlier.
+ */
+static void
+CleanupBackupHistory(void)
+{
+       DIR                *xldir;
+       struct dirent *xlde;
+       char            path[MAXPGPATH];
+
+       xldir = AllocateDir(XLOGDIR);
+       if (xldir == NULL)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not open transaction log directory \"%s\": %m",
+                                               XLOGDIR)));
+
+       while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL)
        {
                if (strlen(xlde->d_name) > 24 &&
                        strspn(xlde->d_name, "0123456789ABCDEF") == 24 &&
@@ -2688,17 +3182,25 @@ CleanupBackupHistory(void)
  * page might not be.  This will force us to replay all subsequent
  * modifications of the page that appear in XLOG, rather than possibly
  * ignoring them as already applied, but that's not a huge drawback.
+ *
+ * If 'cleanup' is true, a cleanup lock is used when restoring blocks.
+ * Otherwise, a normal exclusive lock is used.  At the moment, that's just
+ * pro forma, because there can't be any regular backends in the system
+ * during recovery.  The 'cleanup' argument applies to all backup blocks
+ * in the WAL record, that suffices for now.
  */
-static void
-RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
+void
+RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup)
 {
-       Relation        reln;
        Buffer          buffer;
        Page            page;
        BkpBlock        bkpb;
        char       *blk;
        int                     i;
 
+       if (!(record->xl_info & XLR_BKP_BLOCK_MASK))
+               return;
+
        blk = (char *) XLogRecGetData(record) + record->xl_len;
        for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
        {
@@ -2708,9 +3210,14 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
                memcpy(&bkpb, blk, sizeof(BkpBlock));
                blk += sizeof(BkpBlock);
 
-               reln = XLogOpenRelation(bkpb.node);
-               buffer = XLogReadBuffer(reln, bkpb.block, true);
+               buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
+                                                                               RBM_ZERO);
                Assert(BufferIsValid(buffer));
+               if (cleanup)
+                       LockBufferForCleanup(buffer);
+               else
+                       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
                page = (Page) BufferGetPage(buffer);
 
                if (bkpb.hole_length == 0)
@@ -2930,7 +3437,7 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
                {
                        ereport(emode,
                                        (errcode_for_file_access(),
-                                        errmsg("could not read from log file %u, segment %u at offset %u: %m",
+                                        errmsg("could not read from log file %u, segment %u, offset %u: %m",
                                                        readId, readSeg, readOff)));
                        goto next_record_is_invalid;
                }
@@ -2969,8 +3476,8 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
 got_record:;
 
        /*
-        * xl_len == 0 is bad data for everything except XLOG SWITCH, where
-        * it is required.
+        * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
+        * required.
         */
        if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
        {
@@ -3159,6 +3666,7 @@ got_record:;
        EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
        ReadRecPtr = *RecPtr;
        memcpy(buffer, record, total_len);
+
        /*
         * Special processing if it's an XLOG SWITCH record
         */
@@ -3168,18 +3676,22 @@ got_record:;
                EndRecPtr.xrecoff += XLogSegSize - 1;
                EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
                nextRecord = NULL;              /* definitely not on same page */
+
                /*
-                * Pretend that readBuf contains the last page of the segment.
-                * This is just to avoid Assert failure in StartupXLOG if XLOG
-                * ends with this segment.
+                * Pretend that readBuf contains the last page of the segment. This is
+                * just to avoid Assert failure in StartupXLOG if XLOG ends with this
+                * segment.
                 */
                readOff = XLogSegSize - XLOG_BLCKSZ;
        }
        return (XLogRecord *) buffer;
 
 next_record_is_invalid:;
-       close(readFile);
-       readFile = -1;
+       if (readFile >= 0)
+       {
+               close(readFile);
+               readFile = -1;
+       }
        nextRecord = NULL;
        return NULL;
 }
@@ -3341,7 +3853,7 @@ readTimeLineHistory(TimeLineID targetTLI)
        /*
         * Parse the file...
         */
-       while (fgets(fline, MAXPGPATH, fd) != NULL)
+       while (fgets(fline, sizeof(fline), fd) != NULL)
        {
                /* skip leading whitespace and check for # comment */
                char       *ptr;
@@ -3494,7 +4006,7 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
 
        unlink(tmppath);
 
-       /* do not use XLOG_SYNC_BIT here --- want to fsync only at end of fill */
+       /* do not use get_sync_bit() here --- want to fsync only at end of fill */
        fd = BasicOpenFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
                                           S_IRUSR | S_IWUSR);
        if (fd < 0)
@@ -3573,7 +4085,7 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
                         xlogfname,
                         recoveryStopAfter ? "after" : "before",
                         recoveryStopXid,
-                        str_time(recoveryStopTime));
+                        timestamptz_to_str(recoveryStopTime));
 
        nbytes = strlen(buffer);
        errno = 0;
@@ -3652,8 +4164,7 @@ static void
 WriteControlFile(void)
 {
        int                     fd;
-       char            buffer[PG_CONTROL_SIZE]; /* need not be aligned */
-       char       *localeptr;
+       char            buffer[PG_CONTROL_SIZE];                /* need not be aligned */
 
        /*
         * Initialize version and compatibility-check fields
@@ -3672,23 +4183,15 @@ WriteControlFile(void)
        ControlFile->nameDataLen = NAMEDATALEN;
        ControlFile->indexMaxKeys = INDEX_MAX_KEYS;
 
+       ControlFile->toast_max_chunk_size = TOAST_MAX_CHUNK_SIZE;
+
 #ifdef HAVE_INT64_TIMESTAMP
-       ControlFile->enableIntTimes = TRUE;
+       ControlFile->enableIntTimes = true;
 #else
-       ControlFile->enableIntTimes = FALSE;
+       ControlFile->enableIntTimes = false;
 #endif
-
-       ControlFile->localeBuflen = LOCALE_NAME_BUFLEN;
-       localeptr = setlocale(LC_COLLATE, NULL);
-       if (!localeptr)
-               ereport(PANIC,
-                               (errmsg("invalid LC_COLLATE setting")));
-       StrNCpy(ControlFile->lc_collate, localeptr, LOCALE_NAME_BUFLEN);
-       localeptr = setlocale(LC_CTYPE, NULL);
-       if (!localeptr)
-               ereport(PANIC,
-                               (errmsg("invalid LC_CTYPE setting")));
-       StrNCpy(ControlFile->lc_ctype, localeptr, LOCALE_NAME_BUFLEN);
+       ControlFile->float4ByVal = FLOAT4PASSBYVAL;
+       ControlFile->float8ByVal = FLOAT8PASSBYVAL;
 
        /* Contents are protected with a CRC */
        INIT_CRC32(ControlFile->crc);
@@ -3772,6 +4275,16 @@ ReadControlFile(void)
         * of bytes.  Complaining about wrong version will probably be more
         * enlightening than complaining about wrong CRC.
         */
+
+       if (ControlFile->pg_control_version != PG_CONTROL_VERSION && ControlFile->pg_control_version % 65536 == 0 && ControlFile->pg_control_version / 65536 != 0)
+               ereport(FATAL,
+                               (errmsg("database files are incompatible with server"),
+                                errdetail("The database cluster was initialized with PG_CONTROL_VERSION %d (0x%08x),"
+                                                  " but the server was compiled with PG_CONTROL_VERSION %d (0x%08x).",
+                                                  ControlFile->pg_control_version, ControlFile->pg_control_version,
+                                                  PG_CONTROL_VERSION, PG_CONTROL_VERSION),
+                                errhint("This could be a problem of mismatched byte ordering.  It looks like you need to initdb.")));
+
        if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
                ereport(FATAL,
                                (errmsg("database files are incompatible with server"),
@@ -3779,6 +4292,7 @@ ReadControlFile(void)
                                  " but the server was compiled with PG_CONTROL_VERSION %d.",
                                                ControlFile->pg_control_version, PG_CONTROL_VERSION),
                                 errhint("It looks like you need to initdb.")));
+
        /* Now check the CRC. */
        INIT_CRC32(crc);
        COMP_CRC32(crc,
@@ -3791,15 +4305,9 @@ ReadControlFile(void)
                                (errmsg("incorrect checksum in control file")));
 
        /*
-        * Do compatibility checking immediately.  We do this here for 2 reasons:
-        *
-        * (1) if the database isn't compatible with the backend executable, we
-        * want to abort before we can possibly do any damage;
-        *
-        * (2) this code is executed in the postmaster, so the setlocale() will
-        * propagate to forked backends, which aren't going to read this file for
-        * themselves.  (These locale settings are considered critical
-        * compatibility items because they can affect sort order of indexes.)
+        * Do compatibility checking immediately.  If the database isn't
+        * compatible with the backend executable, we want to abort before we
+        * can possibly do any damage.
         */
        if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
                ereport(FATAL,
@@ -3837,9 +4345,9 @@ ReadControlFile(void)
        if (ControlFile->xlog_blcksz != XLOG_BLCKSZ)
                ereport(FATAL,
                                (errmsg("database files are incompatible with server"),
-                        errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
-                                          " but the server was compiled with XLOG_BLCKSZ %d.",
-                                          ControlFile->xlog_blcksz, XLOG_BLCKSZ),
+               errdetail("The database cluster was initialized with XLOG_BLCKSZ %d,"
+                                 " but the server was compiled with XLOG_BLCKSZ %d.",
+                                 ControlFile->xlog_blcksz, XLOG_BLCKSZ),
                                 errhint("It looks like you need to recompile or initdb.")));
        if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
                ereport(FATAL,
@@ -3862,16 +4370,23 @@ ReadControlFile(void)
                                          " but the server was compiled with INDEX_MAX_KEYS %d.",
                                                   ControlFile->indexMaxKeys, INDEX_MAX_KEYS),
                                 errhint("It looks like you need to recompile or initdb.")));
+       if (ControlFile->toast_max_chunk_size != TOAST_MAX_CHUNK_SIZE)
+               ereport(FATAL,
+                               (errmsg("database files are incompatible with server"),
+                                errdetail("The database cluster was initialized with TOAST_MAX_CHUNK_SIZE %d,"
+                               " but the server was compiled with TOAST_MAX_CHUNK_SIZE %d.",
+                         ControlFile->toast_max_chunk_size, (int) TOAST_MAX_CHUNK_SIZE),
+                                errhint("It looks like you need to recompile or initdb.")));
 
 #ifdef HAVE_INT64_TIMESTAMP
-       if (ControlFile->enableIntTimes != TRUE)
+       if (ControlFile->enableIntTimes != true)
                ereport(FATAL,
                                (errmsg("database files are incompatible with server"),
                                 errdetail("The database cluster was initialized without HAVE_INT64_TIMESTAMP"
                                  " but the server was compiled with HAVE_INT64_TIMESTAMP."),
                                 errhint("It looks like you need to recompile or initdb.")));
 #else
-       if (ControlFile->enableIntTimes != FALSE)
+       if (ControlFile->enableIntTimes != false)
                ereport(FATAL,
                                (errmsg("database files are incompatible with server"),
                                 errdetail("The database cluster was initialized with HAVE_INT64_TIMESTAMP"
@@ -3879,33 +4394,37 @@ ReadControlFile(void)
                                 errhint("It looks like you need to recompile or initdb.")));
 #endif
 
-       if (ControlFile->localeBuflen != LOCALE_NAME_BUFLEN)
+#ifdef USE_FLOAT4_BYVAL
+       if (ControlFile->float4ByVal != true)
                ereport(FATAL,
                                (errmsg("database files are incompatible with server"),
-                                errdetail("The database cluster was initialized with LOCALE_NAME_BUFLEN %d,"
-                                 " but the server was compiled with LOCALE_NAME_BUFLEN %d.",
-                                                  ControlFile->localeBuflen, LOCALE_NAME_BUFLEN),
+                                errdetail("The database cluster was initialized without USE_FLOAT4_BYVAL"
+                                                  " but the server was compiled with USE_FLOAT4_BYVAL."),
                                 errhint("It looks like you need to recompile or initdb.")));
-       if (pg_perm_setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
+#else
+       if (ControlFile->float4ByVal != false)
+               ereport(FATAL,
+                               (errmsg("database files are incompatible with server"),
+                                errdetail("The database cluster was initialized with USE_FLOAT4_BYVAL"
+                                                  " but the server was compiled without USE_FLOAT4_BYVAL."),
+                                errhint("It looks like you need to recompile or initdb.")));
+#endif
+
+#ifdef USE_FLOAT8_BYVAL
+       if (ControlFile->float8ByVal != true)
                ereport(FATAL,
-                       (errmsg("database files are incompatible with operating system"),
-                        errdetail("The database cluster was initialized with LC_COLLATE \"%s\","
-                                          " which is not recognized by setlocale().",
-                                          ControlFile->lc_collate),
-                        errhint("It looks like you need to initdb or install locale support.")));
-       if (pg_perm_setlocale(LC_CTYPE, ControlFile->lc_ctype) == NULL)
+                               (errmsg("database files are incompatible with server"),
+                                errdetail("The database cluster was initialized without USE_FLOAT8_BYVAL"
+                                                  " but the server was compiled with USE_FLOAT8_BYVAL."),
+                                errhint("It looks like you need to recompile or initdb.")));
+#else
+       if (ControlFile->float8ByVal != false)
                ereport(FATAL,
-                       (errmsg("database files are incompatible with operating system"),
-               errdetail("The database cluster was initialized with LC_CTYPE \"%s\","
-                                 " which is not recognized by setlocale().",
-                                 ControlFile->lc_ctype),
-                        errhint("It looks like you need to initdb or install locale support.")));
-
-       /* Make the fixed locale settings visible as GUC variables, too */
-       SetConfigOption("lc_collate", ControlFile->lc_collate,
-                                       PGC_INTERNAL, PGC_S_OVERRIDE);
-       SetConfigOption("lc_ctype", ControlFile->lc_ctype,
-                                       PGC_INTERNAL, PGC_S_OVERRIDE);
+                               (errmsg("database files are incompatible with server"),
+                                errdetail("The database cluster was initialized with USE_FLOAT8_BYVAL"
+                                                  " but the server was compiled without USE_FLOAT8_BYVAL."),
+                                errhint("It looks like you need to recompile or initdb.")));
+#endif
 }
 
 void
@@ -4018,8 +4537,6 @@ XLOGShmemInit(void)
         * Do basic initialization of XLogCtl shared data. (StartupXLOG will fill
         * in additional info.)
         */
-       XLogCtl->XLogCacheByte = (Size) XLOG_BLCKSZ * XLOGbuffers;
-
        XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
        XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
        SpinLockInit(&XLogCtl->info_lck);
@@ -4077,13 +4594,13 @@ BootStrapXLOG(void)
        /* Set up information for the initial checkpoint record */
        checkPoint.redo.xlogid = 0;
        checkPoint.redo.xrecoff = SizeOfXLogLongPHD;
-       checkPoint.undo = checkPoint.redo;
        checkPoint.ThisTimeLineID = ThisTimeLineID;
+       checkPoint.nextXidEpoch = 0;
        checkPoint.nextXid = FirstNormalTransactionId;
        checkPoint.nextOid = FirstBootstrapObjectId;
        checkPoint.nextMulti = FirstMultiXactId;
        checkPoint.nextMultiOffset = 0;
-       checkPoint.time = time(NULL);
+       checkPoint.time = (pg_time_t) time(NULL);
 
        ShmemVariableCache->nextXid = checkPoint.nextXid;
        ShmemVariableCache->nextOid = checkPoint.nextOid;
@@ -4154,8 +4671,6 @@ BootStrapXLOG(void)
        ControlFile->system_identifier = sysidentifier;
        ControlFile->state = DB_SHUTDOWNED;
        ControlFile->time = checkPoint.time;
-       ControlFile->logId = 0;
-       ControlFile->logSeg = 1;
        ControlFile->checkPoint = checkPoint.redo;
        ControlFile->checkPointCopy = checkPoint;
        /* some additional ControlFile fields are set in WriteControlFile() */
@@ -4171,13 +4686,13 @@ BootStrapXLOG(void)
 }
 
 static char *
-str_time(time_t tnow)
+str_time(pg_time_t tnow)
 {
        static char buf[128];
 
-       strftime(buf, sizeof(buf),
-                        "%Y-%m-%d %H:%M:%S %Z",
-                        localtime(&tnow));
+       pg_strftime(buf, sizeof(buf),
+                               "%Y-%m-%d %H:%M:%S %Z",
+                               pg_localtime(&tnow, log_timezone));
 
        return buf;
 }
@@ -4216,7 +4731,7 @@ readRecoveryCommandFile(void)
        /*
         * Parse the file...
         */
-       while (fgets(cmdline, MAXPGPATH, fd) != NULL)
+       while (fgets(cmdline, sizeof(cmdline), fd) != NULL)
        {
                /* skip leading whitespace and check for # comment */
                char       *ptr;
@@ -4256,9 +4771,16 @@ readRecoveryCommandFile(void)
                {
                        recoveryRestoreCommand = pstrdup(tok2);
                        ereport(LOG,
-                                       (errmsg("restore_command = \"%s\"",
+                                       (errmsg("restore_command = '%s'",
                                                        recoveryRestoreCommand)));
                }
+               else if (strcmp(tok1, "recovery_end_command") == 0)
+               {
+                       recoveryEndCommand = pstrdup(tok2);
+                       ereport(LOG,
+                                       (errmsg("recovery_end_command = '%s'",
+                                                       recoveryEndCommand)));
+               }
                else if (strcmp(tok1, "recovery_target_timeline") == 0)
                {
                        rtliGiven = true;
@@ -4306,30 +4828,26 @@ readRecoveryCommandFile(void)
                        recoveryTargetExact = false;
 
                        /*
-                        * Convert the time string given by the user to the time_t format.
-                        * We use type abstime's input converter because we know abstime
-                        * has the same representation as time_t.
+                        * Convert the time string given by the user to TimestampTz form.
                         */
-                       recoveryTargetTime = (time_t)
-                               DatumGetAbsoluteTime(DirectFunctionCall1(abstimein,
-                                                                                                        CStringGetDatum(tok2)));
+                       recoveryTargetTime =
+                               DatumGetTimestampTz(DirectFunctionCall3(timestamptz_in,
+                                                                                                               CStringGetDatum(tok2),
+                                                                                               ObjectIdGetDatum(InvalidOid),
+                                                                                                               Int32GetDatum(-1)));
                        ereport(LOG,
-                                       (errmsg("recovery_target_time = %s",
-                                                       DatumGetCString(DirectFunctionCall1(abstimeout,
-                               AbsoluteTimeGetDatum((AbsoluteTime) recoveryTargetTime))))));
+                                       (errmsg("recovery_target_time = '%s'",
+                                                       timestamptz_to_str(recoveryTargetTime))));
                }
                else if (strcmp(tok1, "recovery_target_inclusive") == 0)
                {
                        /*
                         * does nothing if a recovery_target is not also set
                         */
-                       if (strcmp(tok2, "true") == 0)
-                               recoveryTargetInclusive = true;
-                       else
-                       {
-                               recoveryTargetInclusive = false;
-                               tok2 = "false";
-                       }
+                       if (!parse_bool(tok2, &recoveryTargetInclusive))
+                                 ereport(ERROR,
+                                                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                         errmsg("parameter \"recovery_target_inclusive\" requires a Boolean value")));
                        ereport(LOG,
                                        (errmsg("recovery_target_inclusive = %s", tok2)));
                }
@@ -4369,7 +4887,7 @@ readRecoveryCommandFile(void)
                        /* Timeline 1 does not have a history file, all else should */
                        if (rtli != 1 && !existsTimeLineHistory(rtli))
                                ereport(FATAL,
-                                               (errmsg("recovery_target_timeline %u does not exist",
+                                               (errmsg("recovery target timeline %u does not exist",
                                                                rtli)));
                        recoveryTargetTLI = rtli;
                }
@@ -4418,7 +4936,8 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
         *
         * Note that if we are establishing a new timeline, ThisTimeLineID is
         * already set to the new value, and so we will create a new file instead
-        * of overwriting any existing file.
+        * of overwriting any existing file.  (This is, in fact, always the case
+        * at present.)
         */
        snprintf(recoveryPath, MAXPGPATH, XLOGDIR "/RECOVERYXLOG");
        XLogFilePath(xlogpath, ThisTimeLineID, endLogId, endLogSeg);
@@ -4448,10 +4967,22 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
                 * If we are establishing a new timeline, we have to copy data from
                 * the last WAL segment of the old timeline to create a starting WAL
                 * segment for the new timeline.
+                *
+                * Notify the archiver that the last WAL segment of the old timeline
+                * is ready to copy to archival storage. Otherwise, it is not archived
+                * for a while.
                 */
                if (endTLI != ThisTimeLineID)
+               {
                        XLogFileCopy(endLogId, endLogSeg,
                                                 endTLI, endLogId, endLogSeg);
+
+                       if (XLogArchivingActive())
+                       {
+                               XLogFileName(xlogpath, endTLI, endLogId, endLogSeg);
+                               XLogArchiveNotify(xlogpath);
+                       }
+               }
        }
 
        /*
@@ -4486,6 +5017,9 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
  *
  * Returns TRUE if we are stopping, FALSE otherwise.  On TRUE return,
  * *includeThis is set TRUE if we should apply this record before stopping.
+ *
+ * We also track the timestamp of the latest applied COMMIT/ABORT record
+ * in recoveryLastXTime, for logging purposes.
  * Also, some information is saved in recoveryStopXid et al for use in
  * annotating the new timeline's history file.
  */
@@ -4494,11 +5028,7 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
 {
        bool            stopsHere;
        uint8           record_info;
-       time_t          recordXtime;
-
-       /* Do we have a PITR target at all? */
-       if (!recoveryTarget)
-               return false;
+       TimestampTz recordXtime;
 
        /* We only consider stopping at COMMIT or ABORT records */
        if (record->xl_rmid != RM_XACT_ID)
@@ -4509,18 +5039,25 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
                xl_xact_commit *recordXactCommitData;
 
                recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
-               recordXtime = recordXactCommitData->xtime;
+               recordXtime = recordXactCommitData->xact_time;
        }
        else if (record_info == XLOG_XACT_ABORT)
        {
                xl_xact_abort *recordXactAbortData;
 
                recordXactAbortData = (xl_xact_abort *) XLogRecGetData(record);
-               recordXtime = recordXactAbortData->xtime;
+               recordXtime = recordXactAbortData->xact_time;
        }
        else
                return false;
 
+       /* Do we have a PITR target at all? */
+       if (!recoveryTarget)
+       {
+               recoveryLastXTime = recordXtime;
+               return false;
+       }
+
        if (recoveryTargetExact)
        {
                /*
@@ -4562,24 +5099,33 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
                        if (recoveryStopAfter)
                                ereport(LOG,
                                                (errmsg("recovery stopping after commit of transaction %u, time %s",
-                                                         recoveryStopXid, str_time(recoveryStopTime))));
+                                                               recoveryStopXid,
+                                                               timestamptz_to_str(recoveryStopTime))));
                        else
                                ereport(LOG,
                                                (errmsg("recovery stopping before commit of transaction %u, time %s",
-                                                         recoveryStopXid, str_time(recoveryStopTime))));
+                                                               recoveryStopXid,
+                                                               timestamptz_to_str(recoveryStopTime))));
                }
                else
                {
                        if (recoveryStopAfter)
                                ereport(LOG,
                                                (errmsg("recovery stopping after abort of transaction %u, time %s",
-                                                         recoveryStopXid, str_time(recoveryStopTime))));
+                                                               recoveryStopXid,
+                                                               timestamptz_to_str(recoveryStopTime))));
                        else
                                ereport(LOG,
                                                (errmsg("recovery stopping before abort of transaction %u, time %s",
-                                                         recoveryStopXid, str_time(recoveryStopTime))));
+                                                               recoveryStopXid,
+                                                               timestamptz_to_str(recoveryStopTime))));
                }
+
+               if (recoveryStopAfter)
+                       recoveryLastXTime = recordXtime;
        }
+       else
+               recoveryLastXTime = recordXtime;
 
        return stopsHere;
 }
@@ -4593,10 +5139,12 @@ StartupXLOG(void)
        XLogCtlInsert *Insert;
        CheckPoint      checkPoint;
        bool            wasShutdown;
-       bool            needNewTimeLine = false;
+       bool            reachedStopPoint = false;
+       bool            haveBackupLabel = false;
        XLogRecPtr      RecPtr,
                                LastRec,
                                checkPointLoc,
+                               backupStopLoc,
                                EndOfLog;
        uint32          endLogId;
        uint32          endLogSeg;
@@ -4604,7 +5152,7 @@ StartupXLOG(void)
        uint32          freespace;
        TransactionId oldestActiveXID;
 
-       CritSectionCount++;
+       XLogCtl->SharedRecoveryInProgress = true;
 
        /*
         * Read control file and check XLOG status looks valid.
@@ -4614,8 +5162,7 @@ StartupXLOG(void)
         */
        ReadControlFile();
 
-       if (ControlFile->logSeg == 0 ||
-               ControlFile->state < DB_SHUTDOWNED ||
+       if (ControlFile->state < DB_SHUTDOWNED ||
                ControlFile->state > DB_IN_PRODUCTION ||
                !XRecOffIsValid(ControlFile->checkPoint.xrecoff))
                ereport(FATAL,
@@ -4627,18 +5174,24 @@ StartupXLOG(void)
                                                str_time(ControlFile->time))));
        else if (ControlFile->state == DB_SHUTDOWNING)
                ereport(LOG,
-                               (errmsg("database system shutdown was interrupted at %s",
+                               (errmsg("database system shutdown was interrupted; last known up at %s",
                                                str_time(ControlFile->time))));
-       else if (ControlFile->state == DB_IN_RECOVERY)
+       else if (ControlFile->state == DB_IN_CRASH_RECOVERY)
                ereport(LOG,
                   (errmsg("database system was interrupted while in recovery at %s",
                                   str_time(ControlFile->time)),
                        errhint("This probably means that some data is corrupted and"
                                        " you will have to use the last backup for recovery.")));
+       else if (ControlFile->state == DB_IN_ARCHIVE_RECOVERY)
+               ereport(LOG,
+                               (errmsg("database system was interrupted while in recovery at log time %s",
+                                               str_time(ControlFile->checkPointCopy.time)),
+                                errhint("If this has occurred more than once some data might be corrupted"
+                         " and you might need to choose an earlier recovery target.")));
        else if (ControlFile->state == DB_IN_PRODUCTION)
                ereport(LOG,
-                               (errmsg("database system was interrupted at %s",
-                                               str_time(ControlFile->time))));
+                         (errmsg("database system was interrupted; last known up at %s",
+                                         str_time(ControlFile->time))));
 
        /* This is just to allow attaching to startup process with a debugger */
 #ifdef XLOG_REPLAY_DELAY
@@ -4647,6 +5200,13 @@ StartupXLOG(void)
 #endif
 
        /*
+        * Verify that pg_xlog and pg_xlog/archive_status exist.  In cases where
+        * someone has performed a copy for PITR, these directories may have
+        * been excluded and need to be re-created.
+        */
+       ValidateXLOGDirectoryStructure();
+
+       /*
         * Initialize on the assumption we want to recover to the same timeline
         * that's active according to pg_control.
         */
@@ -4673,7 +5233,7 @@ StartupXLOG(void)
                                                recoveryTargetTLI,
                                                ControlFile->checkPointCopy.ThisTimeLineID)));
 
-       if (read_backup_label(&checkPointLoc))
+       if (read_backup_label(&checkPointLoc, &backupStopLoc))
        {
                /*
                 * When a backup_label file is present, we want to roll forward from
@@ -4682,7 +5242,7 @@ StartupXLOG(void)
                record = ReadCheckpointRecord(checkPointLoc, 0);
                if (record != NULL)
                {
-                       ereport(LOG,
+                       ereport(DEBUG1,
                                        (errmsg("checkpoint record is at %X/%X",
                                                        checkPointLoc.xlogid, checkPointLoc.xrecoff)));
                        InRecovery = true;      /* force recovery even if SHUTDOWNED */
@@ -4693,6 +5253,8 @@ StartupXLOG(void)
                                        (errmsg("could not locate required checkpoint record"),
                                         errhint("If you are not restoring from a backup, try removing the file \"%s/backup_label\".", DataDir)));
                }
+               /* set flag to delete it later */
+               haveBackupLabel = true;
        }
        else
        {
@@ -4704,7 +5266,7 @@ StartupXLOG(void)
                record = ReadCheckpointRecord(checkPointLoc, 1);
                if (record != NULL)
                {
-                       ereport(LOG,
+                       ereport(DEBUG1,
                                        (errmsg("checkpoint record is at %X/%X",
                                                        checkPointLoc.xlogid, checkPointLoc.xrecoff)));
                }
@@ -4729,15 +5291,15 @@ StartupXLOG(void)
        memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
        wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
 
-       ereport(LOG,
-        (errmsg("redo record is at %X/%X; undo record is at %X/%X; shutdown %s",
-                        checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
-                        checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
-                        wasShutdown ? "TRUE" : "FALSE")));
-       ereport(LOG,
-                       (errmsg("next transaction ID: %u; next OID: %u",
-                                       checkPoint.nextXid, checkPoint.nextOid)));
-       ereport(LOG,
+       ereport(DEBUG1,
+                       (errmsg("redo record is at %X/%X; shutdown %s",
+                                       checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
+                                       wasShutdown ? "TRUE" : "FALSE")));
+       ereport(DEBUG1,
+                       (errmsg("next transaction ID: %u/%u; next OID: %u",
+                                       checkPoint.nextXidEpoch, checkPoint.nextXid,
+                                       checkPoint.nextOid)));
+       ereport(DEBUG1,
                        (errmsg("next MultiXactId: %u; next MultiXactOffset: %u",
                                        checkPoint.nextMulti, checkPoint.nextMultiOffset)));
        if (!TransactionIdIsNormal(checkPoint.nextXid))
@@ -4761,20 +5323,17 @@ StartupXLOG(void)
        if (XLByteLT(RecPtr, checkPoint.redo))
                ereport(PANIC,
                                (errmsg("invalid redo in checkpoint record")));
-       if (checkPoint.undo.xrecoff == 0)
-               checkPoint.undo = RecPtr;
 
        /*
         * Check whether we need to force recovery from WAL.  If it appears to
         * have been a clean shutdown and we did not have a recovery.conf file,
         * then assume no recovery needed.
         */
-       if (XLByteLT(checkPoint.undo, RecPtr) ||
-               XLByteLT(checkPoint.redo, RecPtr))
+       if (XLByteLT(checkPoint.redo, RecPtr))
        {
                if (wasShutdown)
                        ereport(PANIC,
-                               (errmsg("invalid redo/undo record in shutdown checkpoint")));
+                                       (errmsg("invalid redo record in shutdown checkpoint")));
                InRecovery = true;
        }
        else if (ControlFile->state != DB_SHUTDOWNED)
@@ -4790,20 +5349,64 @@ StartupXLOG(void)
        {
                int                     rmid;
 
+               /*
+                * Update pg_control to show that we are recovering and to show the
+                * selected checkpoint as the place we are starting from. We also mark
+                * pg_control with any minimum recovery stop point obtained from a
+                * backup history file.
+                */
                if (InArchiveRecovery)
+               {
                        ereport(LOG,
                                        (errmsg("automatic recovery in progress")));
+                       ControlFile->state = DB_IN_ARCHIVE_RECOVERY;
+               }
                else
+               {
                        ereport(LOG,
                                        (errmsg("database system was not properly shut down; "
                                                        "automatic recovery in progress")));
-               ControlFile->state = DB_IN_RECOVERY;
-               ControlFile->time = time(NULL);
+                       ControlFile->state = DB_IN_CRASH_RECOVERY;
+               }
+               ControlFile->prevCheckPoint = ControlFile->checkPoint;
+               ControlFile->checkPoint = checkPointLoc;
+               ControlFile->checkPointCopy = checkPoint;
+               if (backupStopLoc.xlogid != 0 || backupStopLoc.xrecoff != 0)
+               {
+                       if (XLByteLT(ControlFile->minRecoveryPoint, backupStopLoc))
+                               ControlFile->minRecoveryPoint = backupStopLoc;
+               }
+               ControlFile->time = (pg_time_t) time(NULL);
+               /* No need to hold ControlFileLock yet, we aren't up far enough */
                UpdateControlFile();
 
-               /* Start up the recovery environment */
-               XLogInitRelationCache();
+               /* update our local copy of minRecoveryPoint */
+               minRecoveryPoint = ControlFile->minRecoveryPoint;
+
+               /*
+                * Reset pgstat data, because it may be invalid after recovery.
+                */
+               pgstat_reset_all();
+
+               /*
+                * If there was a backup label file, it's done its job and the info
+                * has now been propagated into pg_control.  We must get rid of the
+                * label file so that if we crash during recovery, we'll pick up at
+                * the latest recovery restartpoint instead of going all the way back
+                * to the backup start point.  It seems prudent though to just rename
+                * the file out of the way rather than delete it completely.
+                */
+               if (haveBackupLabel)
+               {
+                       unlink(BACKUP_LABEL_OLD);
+                       if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) != 0)
+                               ereport(FATAL,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not rename file \"%s\" to \"%s\": %m",
+                                                               BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
+               }
 
+               /* Initialize resource managers */
                for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
                {
                        if (RmgrTable[rmid].rm_startup != NULL)
@@ -4829,12 +5432,41 @@ StartupXLOG(void)
                {
                        bool            recoveryContinue = true;
                        bool            recoveryApply = true;
-                       ErrorContextCallback    errcontext;
+                       bool            reachedMinRecoveryPoint = false;
+                       ErrorContextCallback errcontext;
+                       /* use volatile pointer to prevent code rearrangement */
+                       volatile XLogCtlData *xlogctl = XLogCtl;
+
+                       /* Update shared replayEndRecPtr */
+                       SpinLockAcquire(&xlogctl->info_lck);
+                       xlogctl->replayEndRecPtr = ReadRecPtr;
+                       SpinLockRelease(&xlogctl->info_lck);
 
                        InRedo = true;
-                       ereport(LOG,
-                                       (errmsg("redo starts at %X/%X",
-                                                       ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
+
+                       if (minRecoveryPoint.xlogid == 0 && minRecoveryPoint.xrecoff == 0)
+                               ereport(LOG,
+                                               (errmsg("redo starts at %X/%X",
+                                                               ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
+                       else
+                               ereport(LOG,
+                                               (errmsg("redo starts at %X/%X, consistency will be reached at %X/%X",
+                                               ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
+                                               minRecoveryPoint.xlogid, minRecoveryPoint.xrecoff)));
+
+                       /*
+                        * Let postmaster know we've started redo now, so that it can
+                        * launch bgwriter to perform restartpoints.  We don't bother
+                        * during crash recovery as restartpoints can only be performed
+                        * during archive recovery.  And we'd like to keep crash recovery
+                        * simple, to avoid introducing bugs that could you from
+                        * recovering after crash.
+                        *
+                        * After this point, we can no longer assume that we're the only
+                        * process in addition to postmaster!
+                        */
+                       if (InArchiveRecovery && IsUnderPostmaster)
+                               SendPostmasterSignal(PMSIGNAL_RECOVERY_STARTED);
 
                        /*
                         * main redo apply loop
@@ -4844,28 +5476,61 @@ StartupXLOG(void)
 #ifdef WAL_DEBUG
                                if (XLOG_DEBUG)
                                {
-                                       StringInfoData  buf;
+                                       StringInfoData buf;
 
                                        initStringInfo(&buf);
                                        appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
-                                                       ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
-                                                       EndRecPtr.xlogid, EndRecPtr.xrecoff);
+                                                                        ReadRecPtr.xlogid, ReadRecPtr.xrecoff,
+                                                                        EndRecPtr.xlogid, EndRecPtr.xrecoff);
                                        xlog_outrec(&buf, record);
                                        appendStringInfo(&buf, " - ");
                                        RmgrTable[record->xl_rmid].rm_desc(&buf,
                                                                                                           record->xl_info,
-                                                                                                          XLogRecGetData(record));
+                                                                                                        XLogRecGetData(record));
                                        elog(LOG, "%s", buf.data);
                                        pfree(buf.data);
                                }
 #endif
 
                                /*
+                                * Check if we were requested to re-read config file.
+                                */
+                               if (got_SIGHUP)
+                               {
+                                       got_SIGHUP = false;
+                                       ProcessConfigFile(PGC_SIGHUP);
+                               }
+
+                               /*
+                                * Check if we were requested to exit without finishing
+                                * recovery.
+                                */
+                               if (shutdown_requested)
+                                       proc_exit(1);
+
+                               /*
+                                * Have we reached our safe starting point? If so, we can
+                                * tell postmaster that the database is consistent now.
+                                */
+                               if (!reachedMinRecoveryPoint && 
+                                        XLByteLE(minRecoveryPoint, EndRecPtr))
+                               {
+                                       reachedMinRecoveryPoint = true;
+                                       if (InArchiveRecovery)
+                                       {
+                                               ereport(LOG,
+                                                               (errmsg("consistent recovery state reached")));
+                                               if (IsUnderPostmaster)
+                                                       SendPostmasterSignal(PMSIGNAL_RECOVERY_CONSISTENT);
+                                       }
+                               }
+
+                               /*
                                 * Have we reached our recovery target?
                                 */
                                if (recoveryStopsHere(record, &recoveryApply))
                                {
-                                       needNewTimeLine = true;         /* see below */
+                                       reachedStopPoint = true;        /* see below */
                                        recoveryContinue = false;
                                        if (!recoveryApply)
                                                break;
@@ -4885,8 +5550,14 @@ StartupXLOG(void)
                                        TransactionIdAdvance(ShmemVariableCache->nextXid);
                                }
 
-                               if (record->xl_info & XLR_BKP_BLOCK_MASK)
-                                       RestoreBkpBlocks(record, EndRecPtr);
+                               /*
+                                * Update shared replayEndRecPtr before replaying this
+                                * record, so that XLogFlush will update minRecoveryPoint
+                                * correctly.
+                                */
+                               SpinLockAcquire(&xlogctl->info_lck);
+                               xlogctl->replayEndRecPtr = EndRecPtr;
+                               SpinLockRelease(&xlogctl->info_lck);
 
                                RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
 
@@ -4905,6 +5576,10 @@ StartupXLOG(void)
                        ereport(LOG,
                                        (errmsg("redo done at %X/%X",
                                                        ReadRecPtr.xlogid, ReadRecPtr.xrecoff)));
+                       if (recoveryLastXTime)
+                               ereport(LOG,
+                                        (errmsg("last completed transaction was at log time %s",
+                                                        timestamptz_to_str(recoveryLastXTime))));
                        InRedo = false;
                }
                else
@@ -4927,26 +5602,31 @@ StartupXLOG(void)
         * Complain if we did not roll forward far enough to render the backup
         * dump consistent.
         */
-       if (XLByteLT(EndOfLog, recoveryMinXlogOffset))
+       if (InRecovery && XLByteLT(EndOfLog, minRecoveryPoint))
        {
-               if (needNewTimeLine)    /* stopped because of stop request */
+               if (reachedStopPoint)   /* stopped because of stop request */
                        ereport(FATAL,
-                                       (errmsg("requested recovery stop point is before end time of backup dump")));
-               else
-                       /* ran off end of WAL */
+                                       (errmsg("requested recovery stop point is before consistent recovery point")));
+               else    /* ran off end of WAL */
                        ereport(FATAL,
-                                       (errmsg("WAL ends before end time of backup dump")));
+                                       (errmsg("WAL ends before consistent recovery point")));
        }
 
        /*
         * Consider whether we need to assign a new timeline ID.
         *
-        * If we stopped short of the end of WAL during recovery, then we are
-        * generating a new timeline and must assign it a unique new ID.
-        * Otherwise, we can just extend the timeline we were in when we ran out
-        * of WAL.
+        * If we are doing an archive recovery, we always assign a new ID.      This
+        * handles a couple of issues.  If we stopped short of the end of WAL
+        * during recovery, then we are clearly generating a new timeline and must
+        * assign it a unique new ID.  Even if we ran to the end, modifying the
+        * current last segment is problematic because it may result in trying to
+        * overwrite an already-archived copy of that segment, and we encourage
+        * DBAs to make their archive_commands reject that.  We can dodge the
+        * problem by making the new active segment have a new timeline ID.
+        *
+        * In a normal crash recovery, we can just extend the timeline we were in.
         */
-       if (needNewTimeLine)
+       if (InArchiveRecovery)
        {
                ThisTimeLineID = findNewestTimeLine(recoveryTargetTLI) + 1;
                ereport(LOG,
@@ -4976,8 +5656,6 @@ StartupXLOG(void)
        openLogSeg = endLogSeg;
        openLogFile = XLogFileOpen(openLogId, openLogSeg);
        openLogOff = 0;
-       ControlFile->logId = openLogId;
-       ControlFile->logSeg = openLogSeg + 1;
        Insert = &XLogCtl->Insert;
        Insert->PrevRecord = LastRec;
        XLogCtl->xlblocks[0].xlogid = openLogId;
@@ -5026,6 +5704,12 @@ StartupXLOG(void)
        /* Pre-scan prepared transactions to find out the range of XIDs present */
        oldestActiveXID = PrescanPreparedTransactions();
 
+       /*
+        * Allow writing WAL for us, so that we can create a checkpoint record.
+        * But not yet for other backends!
+        */
+       LocalRecoveryInProgress = false;
+
        if (InRecovery)
        {
                int                     rmid;
@@ -5046,54 +5730,46 @@ StartupXLOG(void)
                XLogCheckInvalidPages();
 
                /*
-                * Reset pgstat data, because it may be invalid after recovery.
-                */
-               pgstat_reset_all();
-
-               /*
-                * Perform a new checkpoint to update our recovery activity to disk.
+                * Perform a checkpoint to update all our recovery activity to disk.
                 *
                 * Note that we write a shutdown checkpoint rather than an on-line
                 * one. This is not particularly critical, but since we may be
                 * assigning a new TLI, using a shutdown checkpoint allows us to have
                 * the rule that TLI only changes in shutdown checkpoints, which
                 * allows some extra error checking in xlog_redo.
-                *
-                * In case we had to use the secondary checkpoint, make sure that it
-                * will still be shown as the secondary checkpoint after this
-                * CreateCheckPoint operation; we don't want the broken primary
-                * checkpoint to become prevCheckPoint...
-                */
-               if (XLByteEQ(checkPointLoc, ControlFile->prevCheckPoint))
-                       ControlFile->checkPoint = checkPointLoc;
-
-               CreateCheckPoint(true, true);
-
-               /*
-                * Close down recovery environment
                 */
-               XLogCloseRelationCache();
+               CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
 
-               /*
-                * Now that we've checkpointed the recovery, it's safe to flush old
-                * backup_label, if present.
-                */
-               remove_backup_label();
+               if (recoveryEndCommand)
+                       ExecuteRecoveryEndCommand();
        }
 
        /*
         * Preallocate additional log files, if wanted.
         */
-       (void) PreallocXlogFiles(EndOfLog);
+       PreallocXlogFiles(EndOfLog);
 
        /*
         * Okay, we're officially UP.
         */
        InRecovery = false;
 
+       LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
        ControlFile->state = DB_IN_PRODUCTION;
-       ControlFile->time = time(NULL);
+       ControlFile->time = (pg_time_t) time(NULL);
        UpdateControlFile();
+       LWLockRelease(ControlFileLock);
+
+       /* start the archive_timeout timer running */
+       XLogCtl->Write.lastSegSwitchTime = (pg_time_t) time(NULL);
+
+       /* initialize shared-memory copy of latest checkpoint XID/epoch */
+       XLogCtl->ckptXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
+       XLogCtl->ckptXid = ControlFile->checkPointCopy.nextXid;
+
+       /* also initialize latestCompletedXid, to nextXid - 1 */
+       ShmemVariableCache->latestCompletedXid = ShmemVariableCache->nextXid;
+       TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
 
        /* Start up the commit log and related stuff, too */
        StartupCLOG();
@@ -5103,10 +5779,6 @@ StartupXLOG(void)
        /* Reload shared-memory state for prepared transactions */
        RecoverPreparedTransactions();
 
-       ereport(LOG,
-                       (errmsg("database system is ready")));
-       CritSectionCount--;
-
        /* Shut down readFile facility, free space */
        if (readFile >= 0)
        {
@@ -5124,6 +5796,45 @@ StartupXLOG(void)
                readRecordBuf = NULL;
                readRecordBufSize = 0;
        }
+
+       /*
+        * All done. Allow others to write WAL.
+        */
+       XLogCtl->SharedRecoveryInProgress = false;
+}
+
+/*
+ * Is the system still in recovery?
+ *
+ * As a side-effect, we initialize the local TimeLineID and RedoRecPtr
+ * variables the first time we see that recovery is finished.
+ */
+bool
+RecoveryInProgress(void)
+{
+       /*
+        * We check shared state each time only until we leave recovery mode.
+        * We can't re-enter recovery, so we rely on the local state variable
+        * after that.
+        */
+       if (!LocalRecoveryInProgress)
+               return false;
+       else
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               LocalRecoveryInProgress = xlogctl->SharedRecoveryInProgress;
+
+               /*
+                * Initialize TimeLineID and RedoRecPtr the first time we see that
+                * recovery is finished.
+                */
+               if (!LocalRecoveryInProgress)
+                       InitXLOGAccess();
+
+               return LocalRecoveryInProgress;
+       }
 }
 
 /*
@@ -5255,6 +5966,8 @@ InitXLOGAccess(void)
 {
        /* ThisTimeLineID doesn't change so we need no lock to copy it */
        ThisTimeLineID = XLogCtl->ThisTimeLineID;
+       Assert(ThisTimeLineID != 0);
+
        /* Use GetRedoRecPtr to copy the RedoRecPtr safely */
        (void) GetRedoRecPtr();
 }
@@ -5279,33 +5992,82 @@ GetRedoRecPtr(void)
 }
 
 /*
- * GetRecentNextXid - get the nextXid value saved by the most recent checkpoint
+ * GetInsertRecPtr -- Returns the current insert position.
  *
- * This is currently used only by the autovacuum daemon.  To check for
- * impending XID wraparound, autovac needs an approximate idea of the current
- * XID counter, and it needs it before choosing which DB to attach to, hence
- * before it sets up a PGPROC, hence before it can take any LWLocks.  But it
- * has attached to shared memory, and so we can let it reach into the shared
- * ControlFile structure and pull out the last checkpoint nextXID.
- *
- * Since we don't take any sort of lock, we have to assume that reading a
- * TransactionId is atomic ... but that assumption is made elsewhere, too,
- * and in any case the worst possible consequence of a bogus result is that
- * autovac issues an unnecessary database-wide VACUUM.
- *
- * Note: we could also choose to read ShmemVariableCache->nextXid in an
- * unlocked fashion, thus getting a more up-to-date result; but since that
- * changes far more frequently than the controlfile checkpoint copy, it would
- * pose a far higher risk of bogus result if we did have a nonatomic-read
- * problem.
+ * NOTE: The value *actually* returned is the position of the last full
+ * xlog page. It lags behind the real insert position by at most 1 page.
+ * For that, we don't need to acquire WALInsertLock which can be quite
+ * heavily contended, and an approximation is enough for the current
+ * usage of this function.
+ */
+XLogRecPtr
+GetInsertRecPtr(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+       XLogRecPtr      recptr;
+
+       SpinLockAcquire(&xlogctl->info_lck);
+       recptr = xlogctl->LogwrtRqst.Write;
+       SpinLockRelease(&xlogctl->info_lck);
+
+       return recptr;
+}
+
+/*
+ * Get the time of the last xlog segment switch
+ */
+pg_time_t
+GetLastSegSwitchTime(void)
+{
+       pg_time_t       result;
+
+       /* Need WALWriteLock, but shared lock is sufficient */
+       LWLockAcquire(WALWriteLock, LW_SHARED);
+       result = XLogCtl->Write.lastSegSwitchTime;
+       LWLockRelease(WALWriteLock);
+
+       return result;
+}
+
+/*
+ * GetNextXidAndEpoch - get the current nextXid value and associated epoch
  *
- * A (theoretically) completely safe answer is to read the actual pg_control
- * file into local process memory, but that certainly seems like overkill.
+ * This is exported for use by code that would like to have 64-bit XIDs.
+ * We don't really support such things, but all XIDs within the system
+ * can be presumed "close to" the result, and thus the epoch associated
+ * with them can be determined.
  */
-TransactionId
-GetRecentNextXid(void)
+void
+GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
 {
-       return ControlFile->checkPointCopy.nextXid;
+       uint32          ckptXidEpoch;
+       TransactionId ckptXid;
+       TransactionId nextXid;
+
+       /* Must read checkpoint info first, else have race condition */
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               ckptXidEpoch = xlogctl->ckptXidEpoch;
+               ckptXid = xlogctl->ckptXid;
+               SpinLockRelease(&xlogctl->info_lck);
+       }
+
+       /* Now fetch current nextXid */
+       nextXid = ReadNewTransactionId();
+
+       /*
+        * nextXid is certainly logically later than ckptXid.  So if it's
+        * numerically less, it must have wrapped into the next epoch.
+        */
+       if (nextXid < ckptXid)
+               ckptXidEpoch++;
+
+       *xid = nextXid;
+       *epoch = ckptXidEpoch;
 }
 
 /*
@@ -5317,26 +6079,111 @@ ShutdownXLOG(int code, Datum arg)
        ereport(LOG,
                        (errmsg("shutting down")));
 
-       CritSectionCount++;
-       CreateCheckPoint(true, true);
+       if (RecoveryInProgress())
+               CreateRestartPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
+       else
+               CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
        ShutdownCLOG();
        ShutdownSUBTRANS();
        ShutdownMultiXact();
-       CritSectionCount--;
 
        ereport(LOG,
                        (errmsg("database system is shut down")));
 }
 
 /*
+ * Log start of a checkpoint.
+ */
+static void
+LogCheckpointStart(int flags, bool restartpoint)
+{
+       char *msg;
+
+       /*
+        * XXX: This is hopelessly untranslatable. We could call gettext_noop
+        * for the main message, but what about all the flags?
+        */
+       if (restartpoint)
+               msg = "restartpoint starting:%s%s%s%s%s%s";
+       else
+               msg = "checkpoint starting:%s%s%s%s%s%s";
+
+       elog(LOG, msg,
+                (flags & CHECKPOINT_IS_SHUTDOWN) ? " shutdown" : "",
+                (flags & CHECKPOINT_IMMEDIATE) ? " immediate" : "",
+                (flags & CHECKPOINT_FORCE) ? " force" : "",
+                (flags & CHECKPOINT_WAIT) ? " wait" : "",
+                (flags & CHECKPOINT_CAUSE_XLOG) ? " xlog" : "",
+                (flags & CHECKPOINT_CAUSE_TIME) ? " time" : "");
+}
+
+/*
+ * Log end of a checkpoint.
+ */
+static void
+LogCheckpointEnd(bool restartpoint)
+{
+       long            write_secs,
+                               sync_secs,
+                               total_secs;
+       int                     write_usecs,
+                               sync_usecs,
+                               total_usecs;
+
+       CheckpointStats.ckpt_end_t = GetCurrentTimestamp();
+
+       TimestampDifference(CheckpointStats.ckpt_start_t,
+                                               CheckpointStats.ckpt_end_t,
+                                               &total_secs, &total_usecs);
+
+       TimestampDifference(CheckpointStats.ckpt_write_t,
+                                               CheckpointStats.ckpt_sync_t,
+                                               &write_secs, &write_usecs);
+
+       TimestampDifference(CheckpointStats.ckpt_sync_t,
+                                               CheckpointStats.ckpt_sync_end_t,
+                                               &sync_secs, &sync_usecs);
+
+       if (restartpoint)
+               elog(LOG, "restartpoint complete: wrote %d buffers (%.1f%%); "
+                        "write=%ld.%03d s, sync=%ld.%03d s, total=%ld.%03d s",
+                        CheckpointStats.ckpt_bufs_written,
+                        (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers,
+                        write_secs, write_usecs / 1000,
+                        sync_secs, sync_usecs / 1000,
+                        total_secs, total_usecs / 1000);
+       else
+               elog(LOG, "checkpoint complete: wrote %d buffers (%.1f%%); "
+                        "%d transaction log file(s) added, %d removed, %d recycled; "
+                        "write=%ld.%03d s, sync=%ld.%03d s, total=%ld.%03d s",
+                        CheckpointStats.ckpt_bufs_written,
+                        (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers,
+                        CheckpointStats.ckpt_segs_added,
+                        CheckpointStats.ckpt_segs_removed,
+                        CheckpointStats.ckpt_segs_recycled,
+                        write_secs, write_usecs / 1000,
+                        sync_secs, sync_usecs / 1000,
+                        total_secs, total_usecs / 1000);
+}
+
+/*
  * Perform a checkpoint --- either during shutdown, or on-the-fly
  *
- * If force is true, we force a checkpoint regardless of whether any XLOG
- * activity has occurred since the last one.
+ * flags is a bitwise OR of the following:
+ *     CHECKPOINT_IS_SHUTDOWN: checkpoint is for database shutdown.
+ *     CHECKPOINT_IMMEDIATE: finish the checkpoint ASAP,
+ *             ignoring checkpoint_completion_target parameter.
+ *     CHECKPOINT_FORCE: force a checkpoint even if no XLOG activity has occured
+ *             since the last one (implied by CHECKPOINT_IS_SHUTDOWN).
+ *
+ * Note: flags contains other bits, of interest here only for logging purposes.
+ * In particular note that this routine is synchronous and does not pay
+ * attention to CHECKPOINT_WAIT.
  */
 void
-CreateCheckPoint(bool shutdown, bool force)
+CreateCheckPoint(int flags)
 {
+       bool            shutdown = (flags & CHECKPOINT_IS_SHUTDOWN) != 0;
        CheckPoint      checkPoint;
        XLogRecPtr      recptr;
        XLogCtlInsert *Insert = &XLogCtl->Insert;
@@ -5344,17 +6191,46 @@ CreateCheckPoint(bool shutdown, bool force)
        uint32          freespace;
        uint32          _logId;
        uint32          _logSeg;
-       int                     nsegsadded = 0;
-       int                     nsegsremoved = 0;
-       int                     nsegsrecycled = 0;
+       TransactionId *inCommitXids;
+       int                     nInCommit;
+
+       /* shouldn't happen */
+       if (RecoveryInProgress())
+               elog(ERROR, "can't create a checkpoint during recovery");
 
        /*
         * Acquire CheckpointLock to ensure only one checkpoint happens at a time.
-        * (This is just pro forma, since in the present system structure there is
-        * only one process that is allowed to issue checkpoints at any given
-        * time.)
+        * During normal operation, bgwriter is the only process that creates
+        * checkpoints, but at the end of archive recovery, the bgwriter can be
+        * busy creating a restartpoint while the startup process tries to perform
+        * the startup checkpoint.
         */
-       LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
+       if (!LWLockConditionalAcquire(CheckpointLock, LW_EXCLUSIVE))
+       {
+               Assert(InRecovery);
+
+               /*
+                * A restartpoint is in progress. Wait until it finishes. This can
+                * cause an extra restartpoint to be performed, but that's OK because
+                * we're just about to perform a checkpoint anyway. Flushing the
+                * buffers in this restartpoint can take some time, but that time is
+                * saved from the upcoming checkpoint so the net effect is zero.
+                */
+               ereport(DEBUG2, (errmsg("hurrying in-progress restartpoint")));
+               RequestCheckpoint(CHECKPOINT_IMMEDIATE | CHECKPOINT_WAIT);
+
+               LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
+       }
+
+       /*
+        * Prepare to accumulate statistics.
+        *
+        * Note: because it is possible for log_checkpoints to change while a
+        * checkpoint proceeds, we always accumulate stats, even if
+        * log_checkpoints is currently off.
+        */
+       MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
+       CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
 
        /*
         * Use a critical section to force system panic if we have trouble.
@@ -5363,24 +6239,29 @@ CreateCheckPoint(bool shutdown, bool force)
 
        if (shutdown)
        {
+               LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
                ControlFile->state = DB_SHUTDOWNING;
-               ControlFile->time = time(NULL);
+               ControlFile->time = (pg_time_t) time(NULL);
                UpdateControlFile();
+               LWLockRelease(ControlFileLock);
        }
 
+       /*
+        * Let smgr prepare for checkpoint; this has to happen before we determine
+        * the REDO pointer.  Note that smgr must not do anything that'd have to
+        * be undone if we decide no checkpoint is needed.
+        */
+       smgrpreckpt();
+
+       /* Begin filling in the checkpoint WAL record */
        MemSet(&checkPoint, 0, sizeof(checkPoint));
        checkPoint.ThisTimeLineID = ThisTimeLineID;
-       checkPoint.time = time(NULL);
+       checkPoint.time = (pg_time_t) time(NULL);
 
        /*
-        * We must hold CheckpointStartLock while determining the checkpoint REDO
-        * pointer.  This ensures that any concurrent transaction commits will be
-        * either not yet logged, or logged and recorded in pg_clog. See notes in
-        * RecordTransactionCommit().
+        * We must hold WALInsertLock while examining insert state to determine
+        * the checkpoint REDO pointer.
         */
-       LWLockAcquire(CheckpointStartLock, LW_EXCLUSIVE);
-
-       /* And we need WALInsertLock too */
        LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
 
        /*
@@ -5398,7 +6279,7 @@ CreateCheckPoint(bool shutdown, bool force)
         * the end of the last checkpoint record, and its redo pointer must point
         * to itself.
         */
-       if (!shutdown && !force)
+       if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_FORCE)) == 0)
        {
                XLogRecPtr      curInsert;
 
@@ -5412,7 +6293,6 @@ CreateCheckPoint(bool shutdown, bool force)
                        ControlFile->checkPointCopy.redo.xrecoff)
                {
                        LWLockRelease(WALInsertLock);
-                       LWLockRelease(CheckpointStartLock);
                        LWLockRelease(CheckpointLock);
                        END_CRIT_SECTION();
                        return;
@@ -5457,12 +6337,53 @@ CreateCheckPoint(bool shutdown, bool force)
        }
 
        /*
-        * Now we can release insert lock and checkpoint start lock, allowing
-        * other xacts to proceed even while we are flushing disk buffers.
+        * Now we can release WAL insert lock, allowing other xacts to proceed
+        * while we are flushing disk buffers.
         */
        LWLockRelease(WALInsertLock);
 
-       LWLockRelease(CheckpointStartLock);
+       /*
+        * If enabled, log checkpoint start.  We postpone this until now so as not
+        * to log anything if we decided to skip the checkpoint.
+        */
+       if (log_checkpoints)
+               LogCheckpointStart(flags, false);
+
+       TRACE_POSTGRESQL_CHECKPOINT_START(flags);
+
+       /*
+        * Before flushing data, we must wait for any transactions that are
+        * currently in their commit critical sections.  If an xact inserted its
+        * commit record into XLOG just before the REDO point, then a crash
+        * restart from the REDO point would not replay that record, which means
+        * that our flushing had better include the xact's update of pg_clog.  So
+        * we wait till he's out of his commit critical section before proceeding.
+        * See notes in RecordTransactionCommit().
+        *
+        * Because we've already released WALInsertLock, this test is a bit fuzzy:
+        * it is possible that we will wait for xacts we didn't really need to
+        * wait for.  But the delay should be short and it seems better to make
+        * checkpoint take a bit longer than to hold locks longer than necessary.
+        * (In fact, the whole reason we have this issue is that xact.c does
+        * commit record XLOG insertion and clog update as two separate steps
+        * protected by different locks, but again that seems best on grounds of
+        * minimizing lock contention.)
+        *
+        * A transaction that has not yet set inCommit when we look cannot be at
+        * risk, since he's not inserted his commit record yet; and one that's
+        * already cleared it is not at risk either, since he's done fixing clog
+        * and we will correctly flush the update below.  So we cannot miss any
+        * xacts we need to wait for.
+        */
+       nInCommit = GetTransactionsInCommit(&inCommitXids);
+       if (nInCommit > 0)
+       {
+               do
+               {
+                       pg_usleep(10000L);      /* wait for 10 msec */
+               } while (HaveTransactionsInCommit(inCommitXids, nInCommit));
+       }
+       pfree(inCommitXids);
 
        /*
         * Get the other info we need for the checkpoint record.
@@ -5471,6 +6392,11 @@ CreateCheckPoint(bool shutdown, bool force)
        checkPoint.nextXid = ShmemVariableCache->nextXid;
        LWLockRelease(XidGenLock);
 
+       /* Increase XID epoch if we've wrapped around since last checkpoint */
+       checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
+       if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
+               checkPoint.nextXidEpoch++;
+
        LWLockAcquire(OidGenLock, LW_SHARED);
        checkPoint.nextOid = ShmemVariableCache->nextOid;
        if (!shutdown)
@@ -5487,23 +6413,11 @@ CreateCheckPoint(bool shutdown, bool force)
         *
         * This I/O could fail for various reasons.  If so, we will fail to
         * complete the checkpoint, but there is no reason to force a system
-        * panic. Accordingly, exit critical section while doing it.  (If we are
-        * doing a shutdown checkpoint, we probably *should* panic --- but that
-        * will happen anyway because we'll still be inside the critical section
-        * established by ShutdownXLOG.)
+        * panic. Accordingly, exit critical section while doing it.
         */
        END_CRIT_SECTION();
 
-       if (!shutdown)
-               ereport(DEBUG2,
-                               (errmsg("checkpoint starting")));
-
-       CheckPointCLOG();
-       CheckPointSUBTRANS();
-       CheckPointMultiXact();
-       FlushBufferPool();
-       /* We deliberately delay 2PC checkpointing as long as possible */
-       CheckPointTwoPhase(checkPoint.redo);
+       CheckPointGuts(checkPoint.redo, flags);
 
        START_CRIT_SECTION();
 
@@ -5545,50 +6459,244 @@ CreateCheckPoint(bool shutdown, bool force)
        ControlFile->prevCheckPoint = ControlFile->checkPoint;
        ControlFile->checkPoint = ProcLastRecPtr;
        ControlFile->checkPointCopy = checkPoint;
-       ControlFile->time = time(NULL);
+       ControlFile->time = (pg_time_t) time(NULL);
        UpdateControlFile();
        LWLockRelease(ControlFileLock);
 
+       /* Update shared-memory copy of checkpoint XID/epoch */
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               xlogctl->ckptXidEpoch = checkPoint.nextXidEpoch;
+               xlogctl->ckptXid = checkPoint.nextXid;
+               SpinLockRelease(&xlogctl->info_lck);
+       }
+
+       /*
+        * We are now done with critical updates; no need for system panic if we
+        * have trouble while fooling with old log segments.
+        */
+       END_CRIT_SECTION();
+
+       /*
+        * Let smgr do post-checkpoint cleanup (eg, deleting old files).
+        */
+       smgrpostckpt();
+
+       /*
+        * Delete old log files (those no longer needed even for previous
+        * checkpoint).
+        */
+       if (_logId || _logSeg)
+       {
+               PrevLogSeg(_logId, _logSeg);
+               RemoveOldXlogFiles(_logId, _logSeg, recptr);
+       }
+
+       /*
+        * Make more log segments if needed.  (Do this after recycling old log
+        * segments, since that may supply some of the needed files.)
+        */
+       if (!shutdown)
+               PreallocXlogFiles(recptr);
+
+       /*
+        * Truncate pg_subtrans if possible.  We can throw away all data before
+        * the oldest XMIN of any running transaction.  No future transaction will
+        * attempt to reference any pg_subtrans entry older than that (see Asserts
+        * in subtrans.c).      During recovery, though, we mustn't do this because
+        * StartupSUBTRANS hasn't been called yet.
+        */
+       if (!InRecovery)
+               TruncateSUBTRANS(GetOldestXmin(true, false));
+
+       /* All real work is done, but log before releasing lock. */
+       if (log_checkpoints)
+               LogCheckpointEnd(false);
+
+       TRACE_POSTGRESQL_CHECKPOINT_DONE(CheckpointStats.ckpt_bufs_written,
+                                                                        NBuffers,
+                                                                        CheckpointStats.ckpt_segs_added,
+                                                                        CheckpointStats.ckpt_segs_removed,
+                                                                        CheckpointStats.ckpt_segs_recycled);
+
+       LWLockRelease(CheckpointLock);
+}
+
+/*
+ * Flush all data in shared memory to disk, and fsync
+ *
+ * This is the common code shared between regular checkpoints and
+ * recovery restartpoints.
+ */
+static void
+CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
+{
+       CheckPointCLOG();
+       CheckPointSUBTRANS();
+       CheckPointMultiXact();
+       CheckPointBuffers(flags);       /* performs all required fsyncs */
+       /* We deliberately delay 2PC checkpointing as long as possible */
+       CheckPointTwoPhase(checkPointRedo);
+}
+
+/*
+ * This is used during WAL recovery to establish a point from which recovery
+ * can roll forward without replaying the entire recovery log.  This function
+ * is called each time a checkpoint record is read from XLOG. It is stored
+ * in shared memory, so that it can be used as a restartpoint later on.
+ */
+static void
+RecoveryRestartPoint(const CheckPoint *checkPoint)
+{
+       int                     rmid;
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
+       /*
+        * Is it safe to checkpoint?  We must ask each of the resource managers
+        * whether they have any partial state information that might prevent a
+        * correct restart from this point.  If so, we skip this opportunity, but
+        * return at the next checkpoint record for another try.
+        */
+       for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
+       {
+               if (RmgrTable[rmid].rm_safe_restartpoint != NULL)
+                       if (!(RmgrTable[rmid].rm_safe_restartpoint()))
+                       {
+                               elog(DEBUG2, "RM %d not safe to record restart point at %X/%X",
+                                        rmid,
+                                        checkPoint->redo.xlogid,
+                                        checkPoint->redo.xrecoff);
+                               return;
+                       }
+       }
+
+       /*
+        * Copy the checkpoint record to shared memory, so that bgwriter can
+        * use it the next time it wants to perform a restartpoint.
+        */
+       SpinLockAcquire(&xlogctl->info_lck);
+       XLogCtl->lastCheckPointRecPtr = ReadRecPtr;
+       memcpy(&XLogCtl->lastCheckPoint, checkPoint, sizeof(CheckPoint));
+       SpinLockRelease(&xlogctl->info_lck);
+}
+
+/*
+ * This is similar to CreateCheckPoint, but is used during WAL recovery
+ * to establish a point from which recovery can roll forward without
+ * replaying the entire recovery log.
+ *
+ * Returns true if a new restartpoint was established. We can only establish
+ * a restartpoint if we have replayed a checkpoint record since last
+ * restartpoint.
+ */
+bool
+CreateRestartPoint(int flags)
+{
+       XLogRecPtr lastCheckPointRecPtr;
+       CheckPoint lastCheckPoint;
+       /* use volatile pointer to prevent code rearrangement */
+       volatile XLogCtlData *xlogctl = XLogCtl;
+
        /*
-        * We are now done with critical updates; no need for system panic if we
-        * have trouble while fooling with offline log segments.
+        * Acquire CheckpointLock to ensure only one restartpoint or checkpoint
+        * happens at a time.
         */
-       END_CRIT_SECTION();
+       LWLockAcquire(CheckpointLock, LW_EXCLUSIVE);
+
+       /* Get the a local copy of the last checkpoint record. */
+       SpinLockAcquire(&xlogctl->info_lck);
+       lastCheckPointRecPtr = xlogctl->lastCheckPointRecPtr;
+       memcpy(&lastCheckPoint, &XLogCtl->lastCheckPoint, sizeof(CheckPoint));
+       SpinLockRelease(&xlogctl->info_lck);
+
+       /* 
+        * Check that we're still in recovery mode. It's ok if we exit recovery
+        * mode after this check, the restart point is valid anyway.
+        */
+       if (!RecoveryInProgress())
+       {
+               ereport(DEBUG2,
+                               (errmsg("skipping restartpoint, recovery has already ended")));
+               LWLockRelease(CheckpointLock);
+               return false;
+       }
 
        /*
-        * Delete offline log files (those no longer needed even for previous
-        * checkpoint).
+        * If the last checkpoint record we've replayed is already our last
+        * restartpoint, we can't perform a new restart point. We still update
+        * minRecoveryPoint in that case, so that if this is a shutdown restart
+        * point, we won't start up earlier than before. That's not strictly
+        * necessary, but when we get hot standby capability, it would be rather
+        * weird if the database opened up for read-only connections at a
+        * point-in-time before the last shutdown. Such time travel is still
+        * possible in case of immediate shutdown, though.
+        *
+        * We don't explicitly advance minRecoveryPoint when we do create a
+        * restartpoint. It's assumed that flushing the buffers will do that
+        * as a side-effect.
         */
-       if (_logId || _logSeg)
+       if (XLogRecPtrIsInvalid(lastCheckPointRecPtr) ||
+               XLByteLE(lastCheckPoint.redo, ControlFile->checkPointCopy.redo))
        {
-               PrevLogSeg(_logId, _logSeg);
-               MoveOfflineLogs(_logId, _logSeg, recptr,
-                                               &nsegsremoved, &nsegsrecycled);
+               XLogRecPtr InvalidXLogRecPtr = {0, 0};
+               ereport(DEBUG2,
+                               (errmsg("skipping restartpoint, already performed at %X/%X",
+                                               lastCheckPoint.redo.xlogid, lastCheckPoint.redo.xrecoff)));
+
+               UpdateMinRecoveryPoint(InvalidXLogRecPtr, true);
+               LWLockRelease(CheckpointLock);
+               return false;
+       }
+
+       if (log_checkpoints)
+       {
+               /*
+                * Prepare to accumulate statistics.
+                */
+               MemSet(&CheckpointStats, 0, sizeof(CheckpointStats));
+               CheckpointStats.ckpt_start_t = GetCurrentTimestamp();
+
+               LogCheckpointStart(flags, true);
        }
 
+       CheckPointGuts(lastCheckPoint.redo, flags);
+
        /*
-        * Make more log segments if needed.  (Do this after deleting offline log
-        * segments, to avoid having peak disk space usage higher than necessary.)
+        * Update pg_control, using current time
         */
-       if (!shutdown)
-               nsegsadded = PreallocXlogFiles(recptr);
+       LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+       ControlFile->prevCheckPoint = ControlFile->checkPoint;
+       ControlFile->checkPoint = lastCheckPointRecPtr;
+       ControlFile->checkPointCopy = lastCheckPoint;
+       ControlFile->time = (pg_time_t) time(NULL);
+       UpdateControlFile();
+       LWLockRelease(ControlFileLock);
 
        /*
-        * Truncate pg_subtrans if possible.  We can throw away all data before
-        * the oldest XMIN of any running transaction.  No future transaction will
-        * attempt to reference any pg_subtrans entry older than that (see Asserts
-        * in subtrans.c).      During recovery, though, we mustn't do this because
-        * StartupSUBTRANS hasn't been called yet.
+        * Currently, there is no need to truncate pg_subtrans during recovery.
+        * If we did do that, we will need to have called StartupSUBTRANS()
+        * already and then TruncateSUBTRANS() would go here.
         */
-       if (!InRecovery)
-               TruncateSUBTRANS(GetOldestXmin(true, false));
 
-       if (!shutdown)
-               ereport(DEBUG2,
-                               (errmsg("checkpoint complete; %d transaction log file(s) added, %d removed, %d recycled",
-                                               nsegsadded, nsegsremoved, nsegsrecycled)));
+       /* All real work is done, but log before releasing lock. */
+       if (log_checkpoints)
+               LogCheckpointEnd(true);
+
+       ereport((log_checkpoints ? LOG : DEBUG2),
+                       (errmsg("recovery restart point at %X/%X",
+                                       lastCheckPoint.redo.xlogid, lastCheckPoint.redo.xrecoff)));
+
+       if (recoveryLastXTime)
+               ereport((log_checkpoints ? LOG : DEBUG2),
+                       (errmsg("last completed transaction was at log time %s",
+                                       timestamptz_to_str(recoveryLastXTime))));
 
        LWLockRelease(CheckpointLock);
+       return true;
 }
 
 /*
@@ -5612,6 +6720,16 @@ XLogPutNextOid(Oid nextOid)
         * record.      Therefore, the standard buffer LSN interlock applied to those
         * records will ensure no such OID reaches disk before the NEXTOID record
         * does.
+        *
+        * Note, however, that the above statement only covers state "within" the
+        * database.  When we use a generated OID as a file or directory name, we
+        * are in a sense violating the basic WAL rule, because that filesystem
+        * change may reach disk before the NEXTOID WAL record does.  The impact
+        * of this is that if a database crash occurs immediately afterward, we
+        * might after restart re-generate the same OID and find that it conflicts
+        * with the leftover file or directory.  But since for safety's sake we
+        * always loop until finding a nonconflicting filename, this poses no real
+        * problem in practice. See pgsql-hackers discussion 27-Sep-2006.
         */
 }
 
@@ -5625,7 +6743,7 @@ XLogPutNextOid(Oid nextOid)
  * or the end+1 address of the prior segment if we did not need to
  * write a switch record because we are already at segment start.
  */
-static XLogRecPtr
+XLogRecPtr
 RequestXLogSwitch(void)
 {
        XLogRecPtr      RecPtr;
@@ -5644,12 +6762,18 @@ RequestXLogSwitch(void)
 
 /*
  * XLOG resource manager's routines
+ *
+ * Definitions of info values are in include/catalog/pg_control.h, though
+ * not all records types are related to control file processing.
  */
 void
 xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 {
        uint8           info = record->xl_info & ~XLR_INFO_MASK;
 
+       /* Backup blocks are not used in xlog records */
+       Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
        if (info == XLOG_NEXTOID)
        {
                Oid                     nextOid;
@@ -5673,6 +6797,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                MultiXactSetNextMXact(checkPoint.nextMulti,
                                                          checkPoint.nextMultiOffset);
 
+               /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
+               ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
+               ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
+
                /*
                 * TLI may change in a shutdown checkpoint, but it shouldn't decrease
                 */
@@ -5683,10 +6811,12 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                                                                 (int) checkPoint.ThisTimeLineID))
                                ereport(PANIC,
                                                (errmsg("unexpected timeline ID %u (after %u) in checkpoint record",
-                                                               checkPoint.ThisTimeLineID, ThisTimeLineID)));
-                       /* Following WAL records should be run with new TLI */
-                       ThisTimeLineID = checkPoint.ThisTimeLineID;
+                               checkPoint.ThisTimeLineID, ThisTimeLineID)));
+           /* Following WAL records should be run with new TLI */
+           ThisTimeLineID = checkPoint.ThisTimeLineID;
                }
+
+               RecoveryRestartPoint(&checkPoint);
        }
        else if (info == XLOG_CHECKPOINT_ONLINE)
        {
@@ -5704,11 +6834,22 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
                }
                MultiXactAdvanceNextMXact(checkPoint.nextMulti,
                                                                  checkPoint.nextMultiOffset);
+
+               /* ControlFile->checkPointCopy always tracks the latest ckpt XID */
+               ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
+               ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
+
                /* TLI should not change in an on-line checkpoint */
                if (checkPoint.ThisTimeLineID != ThisTimeLineID)
                        ereport(PANIC,
                                        (errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
                                                        checkPoint.ThisTimeLineID, ThisTimeLineID)));
+
+               RecoveryRestartPoint(&checkPoint);
+       }
+       else if (info == XLOG_NOOP)
+       {
+               /* nothing to do here */
        }
        else if (info == XLOG_SWITCH)
        {
@@ -5719,22 +6860,26 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 void
 xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
 {
-       uint8                   info = xl_info & ~XLR_INFO_MASK;
+       uint8           info = xl_info & ~XLR_INFO_MASK;
 
        if (info == XLOG_CHECKPOINT_SHUTDOWN ||
                info == XLOG_CHECKPOINT_ONLINE)
        {
                CheckPoint *checkpoint = (CheckPoint *) rec;
 
-               appendStringInfo(buf, "checkpoint: redo %X/%X; undo %X/%X; "
-                               "tli %u; xid %u; oid %u; multi %u; offset %u; %s",
-                               checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
-                               checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
-                               checkpoint->ThisTimeLineID, checkpoint->nextXid,
-                               checkpoint->nextOid,
-                               checkpoint->nextMulti,
-                               checkpoint->nextMultiOffset,
-                               (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
+               appendStringInfo(buf, "checkpoint: redo %X/%X; "
+                                                "tli %u; xid %u/%u; oid %u; multi %u; offset %u; %s",
+                                                checkpoint->redo.xlogid, checkpoint->redo.xrecoff,
+                                                checkpoint->ThisTimeLineID,
+                                                checkpoint->nextXidEpoch, checkpoint->nextXid,
+                                                checkpoint->nextOid,
+                                                checkpoint->nextMulti,
+                                                checkpoint->nextMultiOffset,
+                                (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
+       }
+       else if (info == XLOG_NOOP)
+       {
+               appendStringInfo(buf, "xlog no-op");
        }
        else if (info == XLOG_NEXTOID)
        {
@@ -5765,7 +6910,7 @@ xlog_outrec(StringInfo buf, XLogRecord *record)
        for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
        {
                if (record->xl_info & XLR_SET_BKP_BLOCK(i))
-                       appendStringInfo(buf, "; bkpb%d", i+1);
+                       appendStringInfo(buf, "; bkpb%d", i + 1);
        }
 
        appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name);
@@ -5774,54 +6919,53 @@ xlog_outrec(StringInfo buf, XLogRecord *record)
 
 
 /*
- * GUC support
+ * Return the (possible) sync flag used for opening a file, depending on the
+ * value of the GUC wal_sync_method.
  */
-const char *
-assign_xlog_sync_method(const char *method, bool doit, GucSource source)
+static int
+get_sync_bit(int method)
 {
-       int                     new_sync_method;
-       int                     new_sync_bit;
+       /* If fsync is disabled, never open in sync mode */
+       if (!enableFsync)
+               return 0;
 
-       if (pg_strcasecmp(method, "fsync") == 0)
-       {
-               new_sync_method = SYNC_METHOD_FSYNC;
-               new_sync_bit = 0;
-       }
-#ifdef HAVE_FSYNC_WRITETHROUGH
-       else if (pg_strcasecmp(method, "fsync_writethrough") == 0)
+       switch (method)
        {
-               new_sync_method = SYNC_METHOD_FSYNC_WRITETHROUGH;
-               new_sync_bit = 0;
-       }
-#endif
-#ifdef HAVE_FDATASYNC
-       else if (pg_strcasecmp(method, "fdatasync") == 0)
-       {
-               new_sync_method = SYNC_METHOD_FDATASYNC;
-               new_sync_bit = 0;
-       }
-#endif
+               /*
+                * enum values for all sync options are defined even if they are not
+                * supported on the current platform.  But if not, they are not
+                * included in the enum option array, and therefore will never be seen
+                * here.
+                */
+               case SYNC_METHOD_FSYNC:
+               case SYNC_METHOD_FSYNC_WRITETHROUGH:
+               case SYNC_METHOD_FDATASYNC:
+                       return 0;
 #ifdef OPEN_SYNC_FLAG
-       else if (pg_strcasecmp(method, "open_sync") == 0)
-       {
-               new_sync_method = SYNC_METHOD_OPEN;
-               new_sync_bit = OPEN_SYNC_FLAG;
-       }
+               case SYNC_METHOD_OPEN:
+                       return OPEN_SYNC_FLAG;
 #endif
 #ifdef OPEN_DATASYNC_FLAG
-       else if (pg_strcasecmp(method, "open_datasync") == 0)
-       {
-               new_sync_method = SYNC_METHOD_OPEN;
-               new_sync_bit = OPEN_DATASYNC_FLAG;
-       }
+               case SYNC_METHOD_OPEN_DSYNC:
+                       return OPEN_DATASYNC_FLAG;
 #endif
-       else
-               return NULL;
+               default:
+                       /* can't happen (unless we are out of sync with option array) */
+                       elog(ERROR, "unrecognized wal_sync_method: %d", method);
+                       return 0; /* silence warning */
+       }
+}
 
+/*
+ * GUC support
+ */
+bool
+assign_xlog_sync_method(int new_sync_method, bool doit, GucSource source)
+{
        if (!doit)
-               return method;
+               return true;
 
-       if (sync_method != new_sync_method || open_sync_bit != new_sync_bit)
+       if (sync_method != new_sync_method)
        {
                /*
                 * To ensure that no blocks escape unsynced, force an fsync on the
@@ -5836,14 +6980,12 @@ assign_xlog_sync_method(const char *method, bool doit, GucSource source)
                                                (errcode_for_file_access(),
                                                 errmsg("could not fsync log file %u, segment %u: %m",
                                                                openLogId, openLogSeg)));
-                       if (open_sync_bit != new_sync_bit)
+                       if (get_sync_bit(sync_method) != get_sync_bit(new_sync_method))
                                XLogFileClose();
                }
-               sync_method = new_sync_method;
-               open_sync_bit = new_sync_bit;
        }
 
-       return method;
+       return true;
 }
 
 
@@ -5881,6 +7023,7 @@ issue_xlog_fsync(void)
                        break;
 #endif
                case SYNC_METHOD_OPEN:
+               case SYNC_METHOD_OPEN_DSYNC:
                        /* write synced it already */
                        break;
                default:
@@ -5903,11 +7046,11 @@ Datum
 pg_start_backup(PG_FUNCTION_ARGS)
 {
        text       *backupid = PG_GETARG_TEXT_P(0);
-       text       *result;
+       bool            fast = PG_GETARG_BOOL(1);
        char       *backupidstr;
        XLogRecPtr      checkpointloc;
        XLogRecPtr      startpoint;
-       time_t          stamp_time;
+       pg_time_t       stamp_time;
        char            strfbuf[128];
        char            xlogfilename[MAXFNAMELEN];
        uint32          _logId;
@@ -5918,34 +7061,39 @@ pg_start_backup(PG_FUNCTION_ARGS)
        if (!superuser())
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                (errmsg("must be superuser to run a backup"))));
+                                errmsg("must be superuser to run a backup")));
 
        if (!XLogArchivingActive())
                ereport(ERROR,
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                (errmsg("WAL archiving is not active"),
-                                 (errhint("archive_command must be defined before "
-                                                  "online backups can be made safely.")))));
+                                errmsg("WAL archiving is not active"),
+                                errhint("archive_mode must be enabled at server start.")));
 
-       backupidstr = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                PointerGetDatum(backupid)));
+       if (!XLogArchiveCommandSet())
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("WAL archiving is not active"),
+                                errhint("archive_command must be defined before "
+                                                "online backups can be made safely.")));
+
+       backupidstr = text_to_cstring(backupid);
 
        /*
         * Mark backup active in shared memory.  We must do full-page WAL writes
         * during an on-line backup even if not doing so at other times, because
         * it's quite possible for the backup dump to obtain a "torn" (partially
-        * written) copy of a database page if it reads the page concurrently
-        * with our write to the same page.  This can be fixed as long as the
-        * first write to the page in the WAL sequence is a full-page write.
-        * Hence, we turn on forcePageWrites and then force a CHECKPOINT, to
-        * ensure there are no dirty pages in shared memory that might get
-        * dumped while the backup is in progress without having a corresponding
-        * WAL record.  (Once the backup is complete, we need not force full-page
-        * writes anymore, since we expect that any pages not modified during
-        * the backup interval must have been correctly captured by the backup.)
+        * written) copy of a database page if it reads the page concurrently with
+        * our write to the same page.  This can be fixed as long as the first
+        * write to the page in the WAL sequence is a full-page write. Hence, we
+        * turn on forcePageWrites and then force a CHECKPOINT, to ensure there
+        * are no dirty pages in shared memory that might get dumped while the
+        * backup is in progress without having a corresponding WAL record.  (Once
+        * the backup is complete, we need not force full-page writes anymore,
+        * since we expect that any pages not modified during the backup interval
+        * must have been correctly captured by the backup.)
         *
-        * We must hold WALInsertLock to change the value of forcePageWrites,
-        * to ensure adequate interlocking against XLogInsert().
+        * We must hold WALInsertLock to change the value of forcePageWrites, to
+        * ensure adequate interlocking against XLogInsert().
         */
        LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
        if (XLogCtl->Insert.forcePageWrites)
@@ -5959,16 +7107,33 @@ pg_start_backup(PG_FUNCTION_ARGS)
        XLogCtl->Insert.forcePageWrites = true;
        LWLockRelease(WALInsertLock);
 
-       /* Use a TRY block to ensure we release forcePageWrites if fail below */
-       PG_TRY();
+       /*
+        * Force an XLOG file switch before the checkpoint, to ensure that the WAL
+        * segment the checkpoint is written to doesn't contain pages with old
+        * timeline IDs. That would otherwise happen if you called
+        * pg_start_backup() right after restoring from a PITR archive: the first
+        * WAL segment containing the startup checkpoint has pages in the
+        * beginning with the old timeline ID. That can cause trouble at recovery:
+        * we won't have a history file covering the old timeline if pg_xlog
+        * directory was not included in the base backup and the WAL archive was
+        * cleared too before starting the backup.
+        */
+       RequestXLogSwitch();
+
+       /* Ensure we release forcePageWrites if fail below */
+       PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) 0);
        {
                /*
-                * Force a CHECKPOINT.  Aside from being necessary to prevent torn
+                * Force a CHECKPOINT.  Aside from being necessary to prevent torn
                 * page problems, this guarantees that two successive backup runs will
                 * have different checkpoint positions and hence different history
                 * file names, even if nothing happened in between.
+                *
+                * We use CHECKPOINT_IMMEDIATE only if requested by user (via
+                * passing fast = true).  Otherwise this can take awhile.
                 */
-               RequestCheckpoint(true, false);
+               RequestCheckpoint(CHECKPOINT_FORCE | CHECKPOINT_WAIT |
+                                                 (fast ? CHECKPOINT_IMMEDIATE : 0));
 
                /*
                 * Now we need to fetch the checkpoint record location, and also its
@@ -5983,16 +7148,11 @@ pg_start_backup(PG_FUNCTION_ARGS)
                XLByteToSeg(startpoint, _logId, _logSeg);
                XLogFileName(xlogfilename, ThisTimeLineID, _logId, _logSeg);
 
-               /*
-                * We deliberately use strftime/localtime not the src/timezone
-                * functions, so that backup labels will consistently be recorded in
-                * the same timezone regardless of TimeZone setting.  This matches
-                * elog.c's practice.
-                */
-               stamp_time = time(NULL);
-               strftime(strfbuf, sizeof(strfbuf),
-                                "%Y-%m-%d %H:%M:%S %Z",
-                                localtime(&stamp_time));
+               /* Use the log timezone here, not the session timezone */
+               stamp_time = (pg_time_t) time(NULL);
+               pg_strftime(strfbuf, sizeof(strfbuf),
+                                       "%Y-%m-%d %H:%M:%S %Z",
+                                       pg_localtime(&stamp_time, log_timezone));
 
                /*
                 * Check for existing backup label --- implies a backup is already
@@ -6035,25 +7195,24 @@ pg_start_backup(PG_FUNCTION_ARGS)
                                         errmsg("could not write file \"%s\": %m",
                                                        BACKUP_LABEL_FILE)));
        }
-       PG_CATCH();
-       {
-               /* Turn off forcePageWrites on failure */
-               LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
-               XLogCtl->Insert.forcePageWrites = false;
-               LWLockRelease(WALInsertLock);
-
-               PG_RE_THROW();
-       }
-       PG_END_TRY();
+       PG_END_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) 0);
 
        /*
         * We're done.  As a convenience, return the starting WAL location.
         */
        snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
                         startpoint.xlogid, startpoint.xrecoff);
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                        CStringGetDatum(xlogfilename)));
-       PG_RETURN_TEXT_P(result);
+       PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
+}
+
+/* Error cleanup callback for pg_start_backup */
+static void
+pg_start_backup_callback(int code, Datum arg)
+{
+       /* Turn off forcePageWrites on failure */
+       LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+       XLogCtl->Insert.forcePageWrites = false;
+       LWLockRelease(WALInsertLock);
 }
 
 /*
@@ -6063,30 +7222,40 @@ pg_start_backup(PG_FUNCTION_ARGS)
  * create a backup history file in pg_xlog (whence it will immediately be
  * archived).  The backup history file contains the same info found in
  * the label file, plus the backup-end time and WAL location.
+ * Note: different from CancelBackup which just cancels online backup mode.
  */
 Datum
 pg_stop_backup(PG_FUNCTION_ARGS)
 {
-       text       *result;
        XLogRecPtr      startpoint;
        XLogRecPtr      stoppoint;
-       time_t          stamp_time;
+       pg_time_t       stamp_time;
        char            strfbuf[128];
        char            histfilepath[MAXPGPATH];
        char            startxlogfilename[MAXFNAMELEN];
        char            stopxlogfilename[MAXFNAMELEN];
+       char            lastxlogfilename[MAXFNAMELEN];
+       char            histfilename[MAXFNAMELEN];
        uint32          _logId;
        uint32          _logSeg;
        FILE       *lfp;
        FILE       *fp;
        char            ch;
        int                     ich;
+       int                     seconds_before_warning;
+       int                     waits = 0;
 
        if (!superuser())
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                 (errmsg("must be superuser to run a backup"))));
 
+       if (!XLogArchivingActive())
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("WAL archiving is not active"),
+                                errhint("archive_mode must be enabled at server start.")));
+
        /*
         * OK to clear forcePageWrites
         */
@@ -6095,26 +7264,20 @@ pg_stop_backup(PG_FUNCTION_ARGS)
        LWLockRelease(WALInsertLock);
 
        /*
-        * Force a switch to a new xlog segment file, so that the backup
-        * is valid as soon as archiver moves out the current segment file.
-        * We'll report the end address of the XLOG SWITCH record as the backup
-        * stopping point.
+        * Force a switch to a new xlog segment file, so that the backup is valid
+        * as soon as archiver moves out the current segment file. We'll report
+        * the end address of the XLOG SWITCH record as the backup stopping point.
         */
        stoppoint = RequestXLogSwitch();
 
        XLByteToSeg(stoppoint, _logId, _logSeg);
        XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
 
-       /*
-        * We deliberately use strftime/localtime not the src/timezone functions,
-        * so that backup labels will consistently be recorded in the same
-        * timezone regardless of TimeZone setting.  This matches elog.c's
-        * practice.
-        */
-       stamp_time = time(NULL);
-       strftime(strfbuf, sizeof(strfbuf),
-                        "%Y-%m-%d %H:%M:%S %Z",
-                        localtime(&stamp_time));
+       /* Use the log timezone here, not the session timezone */
+       stamp_time = (pg_time_t) time(NULL);
+       pg_strftime(strfbuf, sizeof(strfbuf),
+                               "%Y-%m-%d %H:%M:%S %Z",
+                               pg_localtime(&stamp_time, log_timezone));
 
        /*
         * Open the existing label file
@@ -6184,20 +7347,54 @@ pg_stop_backup(PG_FUNCTION_ARGS)
                                                BACKUP_LABEL_FILE)));
 
        /*
-        * Clean out any no-longer-needed history files.  As a side effect,
-        * this will post a .ready file for the newly created history file,
-        * notifying the archiver that history file may be archived immediately.
+        * Clean out any no-longer-needed history files.  As a side effect, this
+        * will post a .ready file for the newly created history file, notifying
+        * the archiver that history file may be archived immediately.
         */
        CleanupBackupHistory();
 
        /*
+        * Wait until both the last WAL file filled during backup and the history
+        * file have been archived.  We assume that the alphabetic sorting
+        * property of the WAL files ensures any earlier WAL files are safely
+        * archived as well.
+        *
+        * We wait forever, since archive_command is supposed to work and
+        * we assume the admin wanted his backup to work completely. If you
+        * don't wish to wait, you can set statement_timeout.
+        */
+       XLByteToPrevSeg(stoppoint, _logId, _logSeg);
+       XLogFileName(lastxlogfilename, ThisTimeLineID, _logId, _logSeg);
+
+       XLByteToSeg(startpoint, _logId, _logSeg);
+       BackupHistoryFileName(histfilename, ThisTimeLineID, _logId, _logSeg,
+                                                 startpoint.xrecoff % XLogSegSize);
+
+       seconds_before_warning = 60;
+       waits = 0;
+
+       while (XLogArchiveIsBusy(lastxlogfilename) ||
+                  XLogArchiveIsBusy(histfilename))
+       {
+               CHECK_FOR_INTERRUPTS();
+
+               pg_usleep(1000000L);
+
+               if (++waits >= seconds_before_warning)
+               {
+                       seconds_before_warning *= 2;     /* This wraps in >10 years... */
+                       ereport(WARNING,
+                                       (errmsg("pg_stop_backup still waiting for archive to complete (%d seconds elapsed)",
+                                                       waits)));
+               }
+       }
+
+       /*
         * We're done.  As a convenience, return the ending WAL location.
         */
        snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
                         stoppoint.xlogid, stoppoint.xrecoff);
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                CStringGetDatum(stopxlogfilename)));
-       PG_RETURN_TEXT_P(result);
+       PG_RETURN_TEXT_P(cstring_to_text(stopxlogfilename));
 }
 
 /*
@@ -6206,14 +7403,13 @@ pg_stop_backup(PG_FUNCTION_ARGS)
 Datum
 pg_switch_xlog(PG_FUNCTION_ARGS)
 {
-       text       *result;
-       XLogRecPtr switchpoint;
+       XLogRecPtr      switchpoint;
        char            location[MAXFNAMELEN];
 
        if (!superuser())
                ereport(ERROR,
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
-                                (errmsg("must be superuser to switch xlog files"))));
+                        (errmsg("must be superuser to switch transaction log files"))));
 
        switchpoint = RequestXLogSwitch();
 
@@ -6222,18 +7418,44 @@ pg_switch_xlog(PG_FUNCTION_ARGS)
         */
        snprintf(location, sizeof(location), "%X/%X",
                         switchpoint.xlogid, switchpoint.xrecoff);
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                          CStringGetDatum(location)));
-       PG_RETURN_TEXT_P(result);
+       PG_RETURN_TEXT_P(cstring_to_text(location));
 }
 
 /*
- * Report the current WAL location (same format as pg_start_backup etc)
+ * Report the current WAL write location (same format as pg_start_backup etc)
+ *
+ * This is useful for determining how much of WAL is visible to an external
+ * archiving process.  Note that the data before this point is written out
+ * to the kernel, but is not necessarily synced to disk.
  */
 Datum
 pg_current_xlog_location(PG_FUNCTION_ARGS)
 {
-       text       *result;
+       char            location[MAXFNAMELEN];
+
+       /* Make sure we have an up-to-date local LogwrtResult */
+       {
+               /* use volatile pointer to prevent code rearrangement */
+               volatile XLogCtlData *xlogctl = XLogCtl;
+
+               SpinLockAcquire(&xlogctl->info_lck);
+               LogwrtResult = xlogctl->LogwrtResult;
+               SpinLockRelease(&xlogctl->info_lck);
+       }
+
+       snprintf(location, sizeof(location), "%X/%X",
+                        LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff);
+       PG_RETURN_TEXT_P(cstring_to_text(location));
+}
+
+/*
+ * Report the current WAL insert location (same format as pg_start_backup etc)
+ *
+ * This function is mostly for debugging purposes.
+ */
+Datum
+pg_current_xlog_insert_location(PG_FUNCTION_ARGS)
+{
        XLogCtlInsert *Insert = &XLogCtl->Insert;
        XLogRecPtr      current_recptr;
        char            location[MAXFNAMELEN];
@@ -6247,10 +7469,7 @@ pg_current_xlog_location(PG_FUNCTION_ARGS)
 
        snprintf(location, sizeof(location), "%X/%X",
                         current_recptr.xlogid, current_recptr.xrecoff);
-
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                          CStringGetDatum(location)));
-       PG_RETURN_TEXT_P(result);
+       PG_RETURN_TEXT_P(cstring_to_text(location));
 }
 
 /*
@@ -6265,7 +7484,6 @@ Datum
 pg_xlogfile_name_offset(PG_FUNCTION_ARGS)
 {
        text       *location = PG_GETARG_TEXT_P(0);
-       text       *result;
        char       *locationstr;
        unsigned int uxlogid;
        unsigned int uxrecoff;
@@ -6274,31 +7492,63 @@ pg_xlogfile_name_offset(PG_FUNCTION_ARGS)
        uint32          xrecoff;
        XLogRecPtr      locationpoint;
        char            xlogfilename[MAXFNAMELEN];
+       Datum           values[2];
+       bool            isnull[2];
+       TupleDesc       resultTupleDesc;
+       HeapTuple       resultHeapTuple;
+       Datum           result;
 
-       locationstr = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                               PointerGetDatum(location)));
+       /*
+        * Read input and parse
+        */
+       locationstr = text_to_cstring(location);
 
        if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
                ereport(ERROR,
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                                errmsg("could not parse xlog location \"%s\"",
+                                errmsg("could not parse transaction log location \"%s\"",
                                                locationstr)));
 
        locationpoint.xlogid = uxlogid;
        locationpoint.xrecoff = uxrecoff;
 
+       /*
+        * Construct a tuple descriptor for the result row.  This must match this
+        * function's pg_proc entry!
+        */
+       resultTupleDesc = CreateTemplateTupleDesc(2, false);
+       TupleDescInitEntry(resultTupleDesc, (AttrNumber) 1, "file_name",
+                                          TEXTOID, -1, 0);
+       TupleDescInitEntry(resultTupleDesc, (AttrNumber) 2, "file_offset",
+                                          INT4OID, -1, 0);
+
+       resultTupleDesc = BlessTupleDesc(resultTupleDesc);
+
+       /*
+        * xlogfilename
+        */
        XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
        XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
 
+       values[0] = CStringGetTextDatum(xlogfilename);
+       isnull[0] = false;
+
+       /*
+        * offset
+        */
        xrecoff = locationpoint.xrecoff - xlogseg * XLogSegSize;
-       snprintf(xlogfilename + strlen(xlogfilename),
-                        sizeof(xlogfilename) - strlen(xlogfilename),
-                        " %u",
-                        (unsigned int) xrecoff);
-
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                          CStringGetDatum(xlogfilename)));
-       PG_RETURN_TEXT_P(result);
+
+       values[1] = UInt32GetDatum(xrecoff);
+       isnull[1] = false;
+
+       /*
+        * Tuple jam: Having first prepared your Datums, then squash together
+        */
+       resultHeapTuple = heap_form_tuple(resultTupleDesc, values, isnull);
+
+       result = HeapTupleGetDatum(resultHeapTuple);
+
+       PG_RETURN_DATUM(result);
 }
 
 /*
@@ -6309,7 +7559,6 @@ Datum
 pg_xlogfile_name(PG_FUNCTION_ARGS)
 {
        text       *location = PG_GETARG_TEXT_P(0);
-       text       *result;
        char       *locationstr;
        unsigned int uxlogid;
        unsigned int uxrecoff;
@@ -6318,13 +7567,12 @@ pg_xlogfile_name(PG_FUNCTION_ARGS)
        XLogRecPtr      locationpoint;
        char            xlogfilename[MAXFNAMELEN];
 
-       locationstr = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                               PointerGetDatum(location)));
+       locationstr = text_to_cstring(location);
 
        if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
                ereport(ERROR,
                                (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-                                errmsg("could not parse xlog location \"%s\"",
+                                errmsg("could not parse transaction log location \"%s\"",
                                                locationstr)));
 
        locationpoint.xlogid = uxlogid;
@@ -6333,9 +7581,7 @@ pg_xlogfile_name(PG_FUNCTION_ARGS)
        XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
        XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
 
-       result = DatumGetTextP(DirectFunctionCall1(textin,
-                                                                                          CStringGetDatum(xlogfilename)));
-       PG_RETURN_TEXT_P(result);
+       PG_RETURN_TEXT_P(cstring_to_text(xlogfilename));
 }
 
 /*
@@ -6349,14 +7595,14 @@ pg_xlogfile_name(PG_FUNCTION_ARGS)
  * point, we will fail to restore a consistent database state.
  *
  * We also attempt to retrieve the corresponding backup history file.
- * If successful, set recoveryMinXlogOffset to constrain valid PITR stopping
+ * If successful, set *minRecoveryLoc to constrain valid PITR stopping
  * points.
  *
  * Returns TRUE if a backup_label was found (and fills the checkpoint
  * location into *checkPointLoc); returns FALSE if not.
  */
 static bool
-read_backup_label(XLogRecPtr *checkPointLoc)
+read_backup_label(XLogRecPtr *checkPointLoc, XLogRecPtr *minRecoveryLoc)
 {
        XLogRecPtr      startpoint;
        XLogRecPtr      stoppoint;
@@ -6371,6 +7617,10 @@ read_backup_label(XLogRecPtr *checkPointLoc)
        FILE       *fp;
        char            ch;
 
+       /* Default is to not constrain recovery stop point */
+       minRecoveryLoc->xlogid = 0;
+       minRecoveryLoc->xrecoff = 0;
+
        /*
         * See if label file is present
         */
@@ -6439,7 +7689,7 @@ read_backup_label(XLogRecPtr *checkPointLoc)
                        ereport(FATAL,
                                        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                         errmsg("invalid data in file \"%s\"", histfilename)));
-               recoveryMinXlogOffset = stoppoint;
+               *minRecoveryLoc = stoppoint;
                if (ferror(fp) || FreeFile(fp))
                        ereport(FATAL,
                                        (errcode_for_file_access(),
@@ -6451,31 +7701,13 @@ read_backup_label(XLogRecPtr *checkPointLoc)
 }
 
 /*
- * remove_backup_label: remove any extant backup_label after successful
- * recovery.  Once we have completed the end-of-recovery checkpoint there
- * is no reason to have to replay from the start point indicated by the
- * label (and indeed we'll probably have removed/recycled the needed WAL
- * segments), so remove the label to prevent trouble in later crash recoveries.
- */
-static void
-remove_backup_label(void)
-{
-       if (unlink(BACKUP_LABEL_FILE) != 0)
-               if (errno != ENOENT)
-                       ereport(FATAL,
-                                       (errcode_for_file_access(),
-                                        errmsg("could not remove file \"%s\": %m",
-                                                       BACKUP_LABEL_FILE)));
-}
-
-/*
  * Error context callback for errors occurring during rm_redo().
  */
 static void
 rm_redo_error_callback(void *arg)
 {
-       XLogRecord              *record = (XLogRecord *) arg;
-       StringInfoData   buf;
+       XLogRecord *record = (XLogRecord *) arg;
+       StringInfoData buf;
 
        initStringInfo(&buf);
        RmgrTable[record->xl_rmid].rm_desc(&buf,
@@ -6488,3 +7720,149 @@ rm_redo_error_callback(void *arg)
 
        pfree(buf.data);
 }
+
+/*
+ * BackupInProgress: check if online backup mode is active
+ *
+ * This is done by checking for existence of the "backup_label" file.
+ */
+bool
+BackupInProgress(void)
+{
+       struct stat stat_buf;
+
+       return (stat(BACKUP_LABEL_FILE, &stat_buf) == 0);
+}
+
+/*
+ * CancelBackup: rename the "backup_label" file to cancel backup mode
+ *
+ * If the "backup_label" file exists, it will be renamed to "backup_label.old".
+ * Note that this will render an online backup in progress useless.
+ * To correctly finish an online backup, pg_stop_backup must be called.
+ */
+void
+CancelBackup(void)
+{
+       struct stat stat_buf;
+
+       /* if the file is not there, return */
+       if (stat(BACKUP_LABEL_FILE, &stat_buf) < 0)
+               return;
+
+       /* remove leftover file from previously cancelled backup if it exists */
+       unlink(BACKUP_LABEL_OLD);
+
+       if (rename(BACKUP_LABEL_FILE, BACKUP_LABEL_OLD) == 0)
+       {
+               ereport(LOG,
+                               (errmsg("online backup mode cancelled"),
+                                errdetail("\"%s\" was renamed to \"%s\".",
+                                               BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
+       }
+       else
+       {
+               ereport(WARNING,
+                               (errcode_for_file_access(),
+                                errmsg("online backup mode was not cancelled"),
+                                errdetail("Could not rename \"%s\" to \"%s\": %m.",
+                                               BACKUP_LABEL_FILE, BACKUP_LABEL_OLD)));
+       }
+}
+
+/* ------------------------------------------------------
+ *  Startup Process main entry point and signal handlers
+ * ------------------------------------------------------
+ */
+
+/*
+ * startupproc_quickdie() occurs when signalled SIGQUIT by the postmaster.
+ *
+ * Some backend has bought the farm,
+ * so we need to stop what we're doing and exit.
+ */
+static void
+startupproc_quickdie(SIGNAL_ARGS)
+{
+       PG_SETMASK(&BlockSig);
+
+       /*
+        * DO NOT proc_exit() -- we're here because shared memory may be
+        * corrupted, so we don't want to try to clean up our transaction. Just
+        * nail the windows shut and get out of town.
+        *
+        * Note we do exit(2) not exit(0).      This is to force the postmaster into a
+        * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
+        * backend.  This is necessary precisely because we don't clean up our
+        * shared memory state.
+        */
+       exit(2);
+}
+
+
+/* SIGHUP: set flag to re-read config file at next convenient time */
+static void
+StartupProcSigHupHandler(SIGNAL_ARGS)
+{
+       got_SIGHUP = true;
+}
+
+/* SIGTERM: set flag to abort redo and exit */
+static void
+StartupProcShutdownHandler(SIGNAL_ARGS)
+{
+       if (in_restore_command)
+               proc_exit(1);
+       else
+               shutdown_requested = true;
+}
+
+/* Main entry point for startup process */
+void
+StartupProcessMain(void)
+{
+       /*
+        * If possible, make this process a group leader, so that the postmaster
+        * can signal any child processes too.
+        */
+#ifdef HAVE_SETSID
+       if (setsid() < 0)
+               elog(FATAL, "setsid() failed: %m");
+#endif
+
+       /*
+        * Properly accept or ignore signals the postmaster might send us
+        */
+       pqsignal(SIGHUP, StartupProcSigHupHandler);      /* reload config file */
+       pqsignal(SIGINT, SIG_IGN);                                      /* ignore query cancel */
+       pqsignal(SIGTERM, StartupProcShutdownHandler); /* request shutdown */
+       pqsignal(SIGQUIT, startupproc_quickdie);                /* hard crash time */
+       pqsignal(SIGALRM, SIG_IGN);
+       pqsignal(SIGPIPE, SIG_IGN);
+       pqsignal(SIGUSR1, SIG_IGN);
+       pqsignal(SIGUSR2, SIG_IGN);
+
+       /*
+        * Reset some signals that are accepted by postmaster but not here
+        */
+       pqsignal(SIGCHLD, SIG_DFL);
+       pqsignal(SIGTTIN, SIG_DFL);
+       pqsignal(SIGTTOU, SIG_DFL);
+       pqsignal(SIGCONT, SIG_DFL);
+       pqsignal(SIGWINCH, SIG_DFL);
+
+       /*
+        * Unblock signals (they were blocked when the postmaster forked us)
+        */
+       PG_SETMASK(&UnBlockSig);
+
+       StartupXLOG();  
+
+       BuildFlatFiles(false);
+
+       /*
+        * Exit normally. Exit code 0 tells postmaster that we completed
+        * recovery successfully.
+        */
+       proc_exit(0);
+}