OSDN Git Service

Introduce compact WAL record for the common case of commit (non-DDL).
authorSimon Riggs <simon@2ndQuadrant.com>
Tue, 28 Jun 2011 21:58:17 +0000 (22:58 +0100)
committerSimon Riggs <simon@2ndQuadrant.com>
Tue, 28 Jun 2011 21:58:17 +0000 (22:58 +0100)
XLOG_XACT_COMMIT_COMPACT leaves out invalidation messages and relfilenodes,
saving considerable space for the vast majority of transaction commits.
XLOG_XACT_COMMIT keeps same definition as XLOG_PAGE_MAGIC 0xD067 and earlier.

Leonardo Francalanci and Simon Riggs

src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/include/access/xact.h
src/include/access/xlog_internal.h

index 2ca1c14..7f01a83 100644 (file)
@@ -962,26 +962,10 @@ RecordTransactionCommit(void)
                /*
                 * Begin commit critical section and insert the commit XLOG record.
                 */
-               XLogRecData rdata[4];
-               int                     lastrdata = 0;
-               xl_xact_commit xlrec;
-
                /* Tell bufmgr and smgr to prepare for commit */
                BufmgrCommit();
 
                /*
-                * Set flags required for recovery processing of commits.
-                */
-               xlrec.xinfo = 0;
-               if (RelcacheInitFileInval)
-                       xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
-               if (forceSyncCommit)
-                       xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
-
-               xlrec.dbId = MyDatabaseId;
-               xlrec.tsId = MyDatabaseTableSpace;
-
-               /*
                 * Mark ourselves as within our "commit critical section".      This
                 * forces any concurrent checkpoint to wait until we've updated
                 * pg_clog.  Without this, it is possible for the checkpoint to set
@@ -1002,43 +986,88 @@ RecordTransactionCommit(void)
                MyProc->inCommit = true;
 
                SetCurrentTransactionStopTimestamp();
-               xlrec.xact_time = xactStopTimestamp;
-               xlrec.nrels = nrels;
-               xlrec.nsubxacts = nchildren;
-               xlrec.nmsgs = nmsgs;
-               rdata[0].data = (char *) (&xlrec);
-               rdata[0].len = MinSizeOfXactCommit;
-               rdata[0].buffer = InvalidBuffer;
-               /* dump rels to delete */
-               if (nrels > 0)
-               {
-                       rdata[0].next = &(rdata[1]);
-                       rdata[1].data = (char *) rels;
-                       rdata[1].len = nrels * sizeof(RelFileNode);
-                       rdata[1].buffer = InvalidBuffer;
-                       lastrdata = 1;
-               }
-               /* dump committed child Xids */
-               if (nchildren > 0)
+
+               /*
+                * Do we need the long commit record? If not, use the compact format.
+                */
+               if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit)
                {
-                       rdata[lastrdata].next = &(rdata[2]);
-                       rdata[2].data = (char *) children;
-                       rdata[2].len = nchildren * sizeof(TransactionId);
-                       rdata[2].buffer = InvalidBuffer;
-                       lastrdata = 2;
+                       XLogRecData rdata[4];
+                       int                     lastrdata = 0;
+                       xl_xact_commit xlrec;
+                       /*
+                        * Set flags required for recovery processing of commits.
+                        */
+                       xlrec.xinfo = 0;
+                       if (RelcacheInitFileInval)
+                               xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+                       if (forceSyncCommit)
+                               xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
+
+                       xlrec.dbId = MyDatabaseId;
+                       xlrec.tsId = MyDatabaseTableSpace;
+
+                       xlrec.xact_time = xactStopTimestamp;
+                       xlrec.nrels = nrels;
+                       xlrec.nsubxacts = nchildren;
+                       xlrec.nmsgs = nmsgs;
+                       rdata[0].data = (char *) (&xlrec);
+                       rdata[0].len = MinSizeOfXactCommit;
+                       rdata[0].buffer = InvalidBuffer;
+                       /* dump rels to delete */
+                       if (nrels > 0)
+                       {
+                               rdata[0].next = &(rdata[1]);
+                               rdata[1].data = (char *) rels;
+                               rdata[1].len = nrels * sizeof(RelFileNode);
+                               rdata[1].buffer = InvalidBuffer;
+                               lastrdata = 1;
+                       }
+                       /* dump committed child Xids */
+                       if (nchildren > 0)
+                       {
+                               rdata[lastrdata].next = &(rdata[2]);
+                               rdata[2].data = (char *) children;
+                               rdata[2].len = nchildren * sizeof(TransactionId);
+                               rdata[2].buffer = InvalidBuffer;
+                               lastrdata = 2;
+                       }
+                       /* dump shared cache invalidation messages */
+                       if (nmsgs > 0)
+                       {
+                               rdata[lastrdata].next = &(rdata[3]);
+                               rdata[3].data = (char *) invalMessages;
+                               rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
+                               rdata[3].buffer = InvalidBuffer;
+                               lastrdata = 3;
+                       }
+                       rdata[lastrdata].next = NULL;
+
+                       (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
                }
-               /* dump shared cache invalidation messages */
-               if (nmsgs > 0)
+               else
                {
-                       rdata[lastrdata].next = &(rdata[3]);
-                       rdata[3].data = (char *) invalMessages;
-                       rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
-                       rdata[3].buffer = InvalidBuffer;
-                       lastrdata = 3;
-               }
-               rdata[lastrdata].next = NULL;
+                       XLogRecData rdata[2];
+                       int                     lastrdata = 0;
+                       xl_xact_commit_compact  xlrec;
+                       xlrec.xact_time = xactStopTimestamp;
+                       xlrec.nsubxacts = nchildren;
+                       rdata[0].data = (char *) (&xlrec);
+                       rdata[0].len = MinSizeOfXactCommitCompact;
+                       rdata[0].buffer = InvalidBuffer;
+                       /* dump committed child Xids */
+                       if (nchildren > 0)
+                       {
+                               rdata[0].next = &(rdata[1]);
+                               rdata[1].data = (char *) children;
+                               rdata[1].len = nchildren * sizeof(TransactionId);
+                               rdata[1].buffer = InvalidBuffer;
+                               lastrdata = 1;
+                       }
+                       rdata[lastrdata].next = NULL;
 
-               (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
+                       (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT, rdata);
+               }
        }
 
        /*
@@ -4441,19 +4470,17 @@ xactGetCommittedChildren(TransactionId **ptr)
  * actions for which the order of execution is critical.
  */
 static void
-xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
+xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
+                                       TransactionId *sub_xids, int nsubxacts,
+                                       SharedInvalidationMessage *inval_msgs, int nmsgs,
+                                       RelFileNode *xnodes, int nrels,
+                                       Oid dbId, Oid tsId,
+                                       uint32 xinfo)
 {
-       TransactionId *sub_xids;
-       SharedInvalidationMessage *inval_msgs;
        TransactionId max_xid;
        int                     i;
 
-       /* subxid array follows relfilenodes */
-       sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-       /* invalidation messages array follows subxids */
-       inval_msgs = (SharedInvalidationMessage *) &(sub_xids[xlrec->nsubxacts]);
-
-       max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
+       max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
 
        /*
         * Make sure nextXid is beyond any XID mentioned in the record.
@@ -4476,7 +4503,7 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
                /*
                 * Mark the transaction committed in pg_clog.
                 */
-               TransactionIdCommitTree(xid, xlrec->nsubxacts, sub_xids);
+               TransactionIdCommitTree(xid, nsubxacts, sub_xids);
        }
        else
        {
@@ -4500,41 +4527,41 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
                 * bits set on changes made by transactions that haven't yet
                 * recovered. It's unlikely but it's good to be safe.
                 */
-               TransactionIdAsyncCommitTree(xid, xlrec->nsubxacts, sub_xids, lsn);
+               TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
 
                /*
                 * We must mark clog before we update the ProcArray.
                 */
-               ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
+               ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
 
                /*
                 * Send any cache invalidations attached to the commit. We must
                 * maintain the same order of invalidation then release locks as
                 * occurs in CommitTransaction().
                 */
-               ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs,
-                                                                 XactCompletionRelcacheInitFileInval(xlrec),
-                                                                                        xlrec->dbId, xlrec->tsId);
+               ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
+                                                                 XactCompletionRelcacheInitFileInval(xinfo),
+                                                                                        dbId, tsId);
 
                /*
                 * Release locks, if any. We do this for both two phase and normal one
                 * phase transactions. In effect we are ignoring the prepare phase and
                 * just going straight to lock release.
                 */
-               StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
+               StandbyReleaseLockTree(xid, nsubxacts, sub_xids);
        }
 
        /* Make sure files supposed to be dropped are dropped */
-       for (i = 0; i < xlrec->nrels; i++)
+       for (i = 0; i < nrels; i++)
        {
-               SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
+               SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
                ForkNumber      fork;
 
                for (fork = 0; fork <= MAX_FORKNUM; fork++)
                {
                        if (smgrexists(srel, fork))
                        {
-                               XLogDropRelation(xlrec->xnodes[i], fork);
+                               XLogDropRelation(xnodes[i], fork);
                                smgrdounlink(srel, fork, true);
                        }
                }
@@ -4553,8 +4580,46 @@ xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
         * to reduce that problem window, for any user that requested
         * ForceSyncCommit().
         */
-       if (XactCompletionForceSyncCommit(xlrec))
+       if (XactCompletionForceSyncCommit(xinfo))
                XLogFlush(lsn);
+
+}
+/*
+ * Utility function to call xact_redo_commit_internal after breaking down xlrec
+ */
+static void
+xact_redo_commit(xl_xact_commit *xlrec,
+                                                       TransactionId xid, XLogRecPtr lsn)
+{
+       TransactionId *subxacts;
+       SharedInvalidationMessage *inval_msgs;
+
+       /* subxid array follows relfilenodes */
+       subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+       /* invalidation messages array follows subxids */
+       inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
+
+       xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
+                                                               inval_msgs, xlrec->nmsgs,
+                                                               xlrec->xnodes, xlrec->nrels,
+                                                               xlrec->dbId,
+                                                               xlrec->tsId,
+                                                               xlrec->xinfo);
+}
+
+/*
+ * Utility function to call xact_redo_commit_internal  for compact form of message.
+ */
+static void
+xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
+                                                       TransactionId xid, XLogRecPtr lsn)
+{
+       xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
+                                                               NULL, 0,                /* inval msgs */
+                                                               NULL, 0,                /* relfilenodes */
+                                                               InvalidOid,             /* dbId */
+                                                               InvalidOid,             /* tsId */
+                                                               0);                             /* xinfo */
 }
 
 /*
@@ -4655,7 +4720,13 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
        /* Backup blocks are not used in xact records */
        Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
 
-       if (info == XLOG_XACT_COMMIT)
+       if (info == XLOG_XACT_COMMIT_COMPACT)
+       {
+               xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
+
+               xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
+       }
+       else if (info == XLOG_XACT_COMMIT)
        {
                xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
 
@@ -4703,9 +4774,9 @@ static void
 xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 {
        int                     i;
-       TransactionId *xacts;
+       TransactionId *subxacts;
 
-       xacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+       subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
 
        appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
 
@@ -4724,15 +4795,15 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
        {
                appendStringInfo(buf, "; subxacts:");
                for (i = 0; i < xlrec->nsubxacts; i++)
-                       appendStringInfo(buf, " %u", xacts[i]);
+                       appendStringInfo(buf, " %u", subxacts[i]);
        }
        if (xlrec->nmsgs > 0)
        {
                SharedInvalidationMessage *msgs;
 
-               msgs = (SharedInvalidationMessage *) &xacts[xlrec->nsubxacts];
+               msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
 
-               if (XactCompletionRelcacheInitFileInval(xlrec))
+               if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
                        appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
                                                         xlrec->dbId, xlrec->tsId);
 
@@ -4759,6 +4830,21 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 }
 
 static void
+xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
+{
+       int                     i;
+
+       appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
+
+       if (xlrec->nsubxacts > 0)
+       {
+               appendStringInfo(buf, "; subxacts:");
+               for (i = 0; i < xlrec->nsubxacts; i++)
+                       appendStringInfo(buf, " %u", xlrec->subxacts[i]);
+       }
+}
+
+static void
 xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
 {
        int                     i;
@@ -4802,7 +4888,14 @@ xact_desc(StringInfo buf, uint8 xl_info, char *rec)
 {
        uint8           info = xl_info & ~XLR_INFO_MASK;
 
-       if (info == XLOG_XACT_COMMIT)
+       if (info == XLOG_XACT_COMMIT_COMPACT)
+       {
+               xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
+
+               appendStringInfo(buf, "commit: ");
+               xact_desc_commit_compact(buf, xlrec);
+       }
+       else if (info == XLOG_XACT_COMMIT)
        {
                xl_xact_commit *xlrec = (xl_xact_commit *) rec;
 
index 4952d22..178a458 100644 (file)
@@ -5593,7 +5593,14 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
        if (record->xl_rmid != RM_XACT_ID && record->xl_rmid != RM_XLOG_ID)
                return false;
        record_info = record->xl_info & ~XLR_INFO_MASK;
-       if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
+       if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
+       {
+               xl_xact_commit_compact *recordXactCommitData;
+
+               recordXactCommitData = (xl_xact_commit_compact *) XLogRecGetData(record);
+               recordXtime = recordXactCommitData->xact_time;
+       }
+       else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
        {
                xl_xact_commit *recordXactCommitData;
 
@@ -5680,7 +5687,7 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
                recoveryStopTime = recordXtime;
                recoveryStopAfter = *includeThis;
 
-               if (record_info == XLOG_XACT_COMMIT)
+               if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
                {
                        if (recoveryStopAfter)
                                ereport(LOG,
index cb440d4..c585752 100644 (file)
@@ -106,6 +106,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED      0x30
 #define XLOG_XACT_ABORT_PREPARED       0x40
 #define XLOG_XACT_ASSIGNMENT           0x50
+#define XLOG_XACT_COMMIT_COMPACT       0x60
 
 typedef struct xl_xact_assignment
 {
@@ -116,6 +117,16 @@ typedef struct xl_xact_assignment
 
 #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
 
+typedef struct xl_xact_commit_compact
+{
+       TimestampTz xact_time;          /* time of commit */
+       int                     nsubxacts;              /* number of subtransaction XIDs */
+       /* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
+       TransactionId subxacts[1];      /* VARIABLE LENGTH ARRAY */
+} xl_xact_commit_compact;
+
+#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts)
+
 typedef struct xl_xact_commit
 {
        TimestampTz xact_time;          /* time of commit */
@@ -145,8 +156,8 @@ typedef struct xl_xact_commit
 #define XACT_COMPLETION_FORCE_SYNC_COMMIT              0x02
 
 /* Access macros for above flags */
-#define XactCompletionRelcacheInitFileInval(xlrec)     ((xlrec)->xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
-#define XactCompletionForceSyncCommit(xlrec)           ((xlrec)->xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
+#define XactCompletionRelcacheInitFileInval(xinfo)     (xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
+#define XactCompletionForceSyncCommit(xinfo)           (xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
 
 typedef struct xl_xact_abort
 {
index 34316ff..4eaa243 100644 (file)
@@ -71,7 +71,7 @@ typedef struct XLogContRecord
 /*
  * Each page of XLOG file has a header like this:
  */
-#define XLOG_PAGE_MAGIC 0xD067 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD068 /* can be used as WAL version indicator */
 
 typedef struct XLogPageHeaderData
 {