OSDN Git Service

Merge branch 'pgrex90-base' into pgrex90
authorMasaoFujii <masao.fujii@gmail.com>
Mon, 13 Dec 2010 09:12:21 +0000 (18:12 +0900)
committerMasaoFujii <masao.fujii@gmail.com>
Mon, 13 Dec 2010 09:12:21 +0000 (18:12 +0900)
Conflicts:
doc/src/sgml/protocol.sgml
src/backend/replication/walsender.c
src/interfaces/libpq/fe-exec.c
src/interfaces/libpq/fe-protocol2.c
src/interfaces/libpq/fe-protocol3.c
src/interfaces/libpq/libpq-int.h

15 files changed:
doc/src/sgml/libpq.sgml
doc/src/sgml/protocol.sgml
src/backend/access/heap/heapam.c
src/backend/access/heap/pruneheap.c
src/backend/access/nbtree/nbtxlog.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/walsender.c
src/backend/utils/sort/tuplestore.c
src/bin/pg_dump/pg_backup_archiver.c
src/bin/pg_dump/pg_backup_archiver.h
src/interfaces/libpq/fe-exec.c
src/interfaces/libpq/fe-protocol2.c
src/interfaces/libpq/fe-protocol3.c
src/interfaces/libpq/libpq-fe.h
src/interfaces/libpq/libpq-int.h

index 7854998..203af28 100644 (file)
@@ -2084,6 +2084,16 @@ ExecStatusType PQresultStatus(const PGresult *res);
           </listitem>
          </varlistentry>
 
+         <varlistentry id="libpq-pgres-copy-both">
+          <term><literal>PGRES_COPY_BOTH</literal></term>
+          <listitem>
+           <para>
+            Copy In/Out (to and from server) data transfer started.  This is
+            currently used only for streaming replication.
+           </para>
+          </listitem>
+         </varlistentry>
+
          <varlistentry id="libpq-pgres-bad-response">
           <term><literal>PGRES_BAD_RESPONSE</literal></term>
           <listitem>
index fe32116..048761f 100644 (file)
    </para>
 
    <para>
-    The CopyInResponse and CopyOutResponse messages include fields that
-    inform the frontend of the number of columns per row and the format
-    codes being used for each column.  (As of the present implementation,
-    all columns in a given <command>COPY</> operation will use the same
-    format, but the message design does not assume this.)
+    There is another Copy-related mode called Copy-both, which allows
+    high-speed bulk data transfer to <emphasis>and</> from the server.
+    Copy-both mode is initiated when a backend in walsender mode
+    executes a <command>START_REPLICATION</command> statement.  The
+    backend sends a CopyBothResponse message to the frontend.  Both
+    the backend and the frontend may then send CopyData messages
+    until the connection is terminated.  See see <xref
+    linkend="protocol-replication">.
    </para>
+
+   <para>
+    The CopyInResponse, CopyOutResponse and CopyBothResponse messages
+    include fields that inform the frontend of the number of columns
+    per row and the format codes being used for each column.  (As of
+    the present implementation, all columns in a given <command>COPY</>
+    operation will use the same format, but the message design does not
+    assume this.)
+   </para>
+
   </sect2>
 
   <sect2 id="protocol-async">
@@ -1344,7 +1357,7 @@ The commands accepted in walsender mode are:
       WAL position <replaceable>XXX</>/<replaceable>XXX</>.
       The server can reply with an error, e.g. if the requested section of WAL
       has already been recycled. On success, server responds with a
-      CopyXLogResponse message, and then starts to stream WAL to the frontend.
+      CopyBothResponse message, and then starts to stream WAL to the frontend.
       WAL will continue to be streamed until the connection is broken;
       no further commands will be accepted.
      </para>
@@ -2731,7 +2744,7 @@ CopyOutResponse (B)
 
 <varlistentry>
 <term>
-CopyXLogResponse (B)
+CopyBothResponse (B)
 </term>
 <listitem>
 <para>
@@ -2743,7 +2756,7 @@ CopyXLogResponse (B)
 </term>
 <listitem>
 <para>
-                Identifies the message as a Start Copy XLog response.
+                Identifies the message as a Start Copy Both response.
                 This message is used only for Streaming Replication.
 </para>
 </listitem>
@@ -2758,6 +2771,43 @@ CopyXLogResponse (B)
 </para>
 </listitem>
 </varlistentry>
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                0 indicates the overall <command>COPY</command> format
+                is textual (rows separated by newlines, columns
+                separated by separator characters, etc). 1 indicates
+                the overall copy format is binary (similar to DataRow
+                format). See <xref linkend="sql-copy"> for more information.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int16
+</term>
+<listitem>
+<para>
+                The number of columns in the data to be copied
+                (denoted <replaceable>N</> below).
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int16[<replaceable>N</>]
+</term>
+<listitem>
+<para>
+                The format codes to be used for each column.
+                Each must presently be zero (text) or one (binary).
+                All must be zero if the overall copy format is textual.
+</para>
+</listitem>
+</varlistentry>
 </variablelist>
 
 </para>
index 48a387e..1c93400 100644 (file)
@@ -3776,8 +3776,11 @@ heap_restrpos(HeapScanDesc scan)
 }
 
 /*
- * If 'tuple' contains any XID greater than latestRemovedXid, update
- * latestRemovedXid to the greatest one found.
+ * If 'tuple' contains any visible XID greater than latestRemovedXid,
+ * ratchet forwards latestRemovedXid to the greatest one found.
+ * This is used as the basis for generating Hot Standby conflicts, so
+ * if a tuple was never visible then removing it should not conflict
+ * with queries.
  */
 void
 HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
@@ -3793,13 +3796,25 @@ HeapTupleHeaderAdvanceLatestRemovedXid(HeapTupleHeader tuple,
                        *latestRemovedXid = xvac;
        }
 
-       if (TransactionIdPrecedes(*latestRemovedXid, xmax))
-               *latestRemovedXid = xmax;
-
-       if (TransactionIdPrecedes(*latestRemovedXid, xmin))
-               *latestRemovedXid = xmin;
+       /*
+        * Ignore tuples inserted by an aborted transaction or
+        * if the tuple was updated/deleted by the inserting transaction.
+        *
+        * Look for a committed hint bit, or if no xmin bit is set, check clog.
+        * This needs to work on both master and standby, where it is used
+        * to assess btree delete records.
+        */
+       if ((tuple->t_infomask & HEAP_XMIN_COMMITTED) ||
+               (!(tuple->t_infomask & HEAP_XMIN_COMMITTED) &&
+                !(tuple->t_infomask & HEAP_XMIN_INVALID) &&
+                TransactionIdDidCommit(xmin)))
+       {
+               if (xmax != xmin &&
+                       TransactionIdFollows(xmax, *latestRemovedXid))
+                               *latestRemovedXid = xmax;
+       }
 
-       Assert(TransactionIdIsValid(*latestRemovedXid));
+       /* *latestRemovedXid may still be invalid at end */
 }
 
 /*
index 3332e08..6d72bb2 100644 (file)
@@ -237,7 +237,6 @@ heap_page_prune(Relation relation, Buffer buffer, TransactionId OldestXmin,
                {
                        XLogRecPtr      recptr;
 
-                       Assert(TransactionIdIsValid(prstate.latestRemovedXid));
                        recptr = log_heap_clean(relation, buffer,
                                                                        prstate.redirected, prstate.nredirected,
                                                                        prstate.nowdead, prstate.ndead,
index 3261483..740986d 100644 (file)
@@ -580,7 +580,6 @@ btree_xlog_delete_get_latestRemovedXid(XLogRecord *record)
        BlockNumber hblkno;
        OffsetNumber hoffnum;
        TransactionId latestRemovedXid = InvalidTransactionId;
-       TransactionId htupxid = InvalidTransactionId;
        int                     i;
 
        /*
@@ -646,24 +645,16 @@ btree_xlog_delete_get_latestRemovedXid(XLogRecord *record)
                }
 
                /*
-                * If the heap item has storage, then read the header. Some LP_DEAD
-                * items may not be accessible, so we ignore them.
+                * If the heap item has storage, then read the header and use that to
+                * set latestRemovedXid.
+                *
+                * Some LP_DEAD items may not be accessible, so we ignore them.
                 */
                if (ItemIdHasStorage(hitemid))
                {
                        htuphdr = (HeapTupleHeader) PageGetItem(hpage, hitemid);
 
-                       /*
-                        * Get the heap tuple's xmin/xmax and ratchet up the
-                        * latestRemovedXid. No need to consider xvac values here.
-                        */
-                       htupxid = HeapTupleHeaderGetXmin(htuphdr);
-                       if (TransactionIdFollows(htupxid, latestRemovedXid))
-                               latestRemovedXid = htupxid;
-
-                       htupxid = HeapTupleHeaderGetXmax(htuphdr);
-                       if (TransactionIdFollows(htupxid, latestRemovedXid))
-                               latestRemovedXid = htupxid;
+                       HeapTupleHeaderAdvanceLatestRemovedXid(htuphdr, &latestRemovedXid);
                }
                else if (ItemIdIsDead(hitemid))
                {
index e4cee68..0d8c278 100644 (file)
@@ -160,7 +160,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
                         startpoint.xlogid, startpoint.xrecoff,
                         GetConfigOption("replication_mode", false));
        res = libpqrcv_PQexec(cmd);
-       if (PQresultStatus(res) != PGRES_COPY_OUT)
+       if (PQresultStatus(res) != PGRES_COPY_BOTH)
        {
                PQclear(res);
                ereport(ERROR,
@@ -306,6 +306,7 @@ libpqrcv_PQexec(const char *query)
 
                if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
                        PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+                       PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
                        PQstatus(streamConn) == CONNECTION_BAD)
                        break;
        }
index fa96810..a41365f 100644 (file)
@@ -352,8 +352,10 @@ WalSndHandshake(void)
                                                        SpinLockRelease(&walsnd->mutex);
                                                }
 
-                                               /* Send a CopyXLogResponse message, and start streaming */
+                                               /* Send a CopyBothResponse message, and start streaming */
                                                pq_beginmessage(&buf, 'W');
+                                               pq_sendbyte(&buf, 0);
+                                               pq_sendint(&buf, 0, 2);
                                                pq_endmessage(&buf);
                                                pq_flush();
 
index b752d67..b67d78c 100644 (file)
@@ -145,8 +145,15 @@ struct Tuplestorestate
        /*
         * This array holds pointers to tuples in memory if we are in state INMEM.
         * In states WRITEFILE and READFILE it's not used.
+        *
+        * When memtupdeleted > 0, the first memtupdeleted pointers are already
+        * released due to a tuplestore_trim() operation, but we haven't expended
+        * the effort to slide the remaining pointers down.  These unused pointers
+        * are set to NULL to catch any invalid accesses.  Note that memtupcount
+        * includes the deleted pointers.
         */
        void      **memtuples;          /* array of pointers to palloc'd tuples */
+       int                     memtupdeleted;  /* the first N slots are currently unused */
        int                     memtupcount;    /* number of tuples currently present */
        int                     memtupsize;             /* allocated length of memtuples array */
 
@@ -252,6 +259,7 @@ tuplestore_begin_common(int eflags, bool interXact, int maxKBytes)
        state->context = CurrentMemoryContext;
        state->resowner = CurrentResourceOwner;
 
+       state->memtupdeleted = 0;
        state->memtupcount = 0;
        state->memtupsize = 1024;       /* initial guess */
        state->memtuples = (void **) palloc(state->memtupsize * sizeof(void *));
@@ -401,7 +409,7 @@ tuplestore_clear(Tuplestorestate *state)
        state->myfile = NULL;
        if (state->memtuples)
        {
-               for (i = 0; i < state->memtupcount; i++)
+               for (i = state->memtupdeleted; i < state->memtupcount; i++)
                {
                        FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
                        pfree(state->memtuples[i]);
@@ -409,6 +417,7 @@ tuplestore_clear(Tuplestorestate *state)
        }
        state->status = TSS_INMEM;
        state->truncated = false;
+       state->memtupdeleted = 0;
        state->memtupcount = 0;
        readptr = state->readptrs;
        for (i = 0; i < state->readptrcount; readptr++, i++)
@@ -432,7 +441,7 @@ tuplestore_end(Tuplestorestate *state)
                BufFileClose(state->myfile);
        if (state->memtuples)
        {
-               for (i = 0; i < state->memtupcount; i++)
+               for (i = state->memtupdeleted; i < state->memtupcount; i++)
                        pfree(state->memtuples[i]);
                pfree(state->memtuples);
        }
@@ -774,14 +783,14 @@ tuplestore_gettuple(Tuplestorestate *state, bool forward,
                                }
                                else
                                {
-                                       if (readptr->current <= 0)
+                                       if (readptr->current <= state->memtupdeleted)
                                        {
                                                Assert(!state->truncated);
                                                return NULL;
                                        }
                                        readptr->current--; /* last returned tuple */
                                }
-                               if (readptr->current <= 0)
+                               if (readptr->current <= state->memtupdeleted)
                                {
                                        Assert(!state->truncated);
                                        return NULL;
@@ -969,7 +978,7 @@ dumptuples(Tuplestorestate *state)
 {
        int                     i;
 
-       for (i = 0;; i++)
+       for (i = state->memtupdeleted;; i++)
        {
                TSReadPointer *readptr = state->readptrs;
                int                     j;
@@ -984,6 +993,7 @@ dumptuples(Tuplestorestate *state)
                        break;
                WRITETUP(state, state->memtuples[i]);
        }
+       state->memtupdeleted = 0;
        state->memtupcount = 0;
 }
 
@@ -1153,24 +1163,36 @@ tuplestore_trim(Tuplestorestate *state)
        nremove = oldest - 1;
        if (nremove <= 0)
                return;                                 /* nothing to do */
+
+       Assert(nremove >= state->memtupdeleted);
        Assert(nremove <= state->memtupcount);
 
        /* Release no-longer-needed tuples */
-       for (i = 0; i < nremove; i++)
+       for (i = state->memtupdeleted; i < nremove; i++)
        {
                FREEMEM(state, GetMemoryChunkSpace(state->memtuples[i]));
                pfree(state->memtuples[i]);
+               state->memtuples[i] = NULL;
        }
+       state->memtupdeleted = nremove;
+
+       /* mark tuplestore as truncated (used for Assert crosschecks only) */
+       state->truncated = true;
+
+       /*
+        * If nremove is less than 1/8th memtupcount, just stop here, leaving the
+        * "deleted" slots as NULL.  This prevents us from expending O(N^2) time
+        * repeatedly memmove-ing a large pointer array.  The worst case space
+        * wastage is pretty small, since it's just pointers and not whole tuples.
+        */
+       if (nremove < state->memtupcount / 8)
+               return;
 
        /*
-        * Slide the array down and readjust pointers.  This may look pretty
-        * stupid, but we expect that there will usually not be very many
-        * tuple-pointers to move, so this isn't that expensive; and it keeps a
-        * lot of other logic simple.
+        * Slide the array down and readjust pointers.
         *
-        * In fact, in the current usage for merge joins, it's demonstrable that
-        * there will always be exactly one non-removed tuple; so optimize that
-        * case.
+        * In mergejoin's current usage, it's demonstrable that there will always
+        * be exactly one non-removed tuple; so optimize that case.
         */
        if (nremove + 1 == state->memtupcount)
                state->memtuples[0] = state->memtuples[nremove];
@@ -1178,15 +1200,13 @@ tuplestore_trim(Tuplestorestate *state)
                memmove(state->memtuples, state->memtuples + nremove,
                                (state->memtupcount - nremove) * sizeof(void *));
 
+       state->memtupdeleted = 0;
        state->memtupcount -= nremove;
        for (i = 0; i < state->readptrcount; i++)
        {
                if (!state->readptrs[i].eof_reached)
                        state->readptrs[i].current -= nremove;
        }
-
-       /* mark tuplestore as truncated (used for Assert crosschecks only) */
-       state->truncated = true;
 }
 
 /*
index be5339e..6528f4d 100644 (file)
@@ -79,6 +79,10 @@ const char *progname;
 
 static const char *modulename = gettext_noop("archiver");
 
+/* index array created by fix_dependencies -- only used in parallel restore */
+static TocEntry          **tocsByDumpId;                       /* index by dumpId - 1 */
+static DumpId          maxDumpId;                              /* length of above array */
+
 
 static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
                 const int compression, ArchiveMode mode);
@@ -134,9 +138,7 @@ static void fix_dependencies(ArchiveHandle *AH);
 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
 static void repoint_table_dependencies(ArchiveHandle *AH,
                                                   DumpId tableId, DumpId tableDataId);
-static void identify_locking_dependencies(TocEntry *te,
-                                                         TocEntry **tocsByDumpId,
-                                                         DumpId maxDumpId);
+static void identify_locking_dependencies(TocEntry *te);
 static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
                                        TocEntry *ready_list);
 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
@@ -3737,7 +3739,12 @@ mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
 /*
  * Process the dependency information into a form useful for parallel restore.
  *
- * We set up depCount fields that are the number of as-yet-unprocessed
+ * This function takes care of fixing up some missing or badly designed
+ * dependencies, and then prepares subsidiary data structures that will be
+ * used in the main parallel-restore logic, including:
+ * 1. We build the tocsByDumpId[] index array.
+ * 2. We build the revDeps[] arrays of incoming dependency dumpIds.
+ * 3. We set up depCount fields that are the number of as-yet-unprocessed
  * dependencies for each TOC entry.
  *
  * We also identify locking dependencies so that we can avoid trying to
@@ -3746,22 +3753,20 @@ mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
 static void
 fix_dependencies(ArchiveHandle *AH)
 {
-       TocEntry  **tocsByDumpId;
        TocEntry   *te;
-       DumpId          maxDumpId;
        int                     i;
 
        /*
-        * For some of the steps here, it is convenient to have an array that
-        * indexes the TOC entries by dump ID, rather than searching the TOC list
-        * repeatedly.  Entries for dump IDs not present in the TOC will be NULL.
+        * It is convenient to have an array that indexes the TOC entries by dump
+        * ID, rather than searching the TOC list repeatedly.  Entries for dump
+        * IDs not present in the TOC will be NULL.
         *
         * NOTE: because maxDumpId is just the highest dump ID defined in the
         * archive, there might be dependencies for IDs > maxDumpId.  All uses of
         * this array must guard against out-of-range dependency numbers.
         *
-        * Also, initialize the depCount fields, and make sure all the TOC items
-        * are marked as not being in any parallel-processing list.
+        * Also, initialize the depCount/revDeps/nRevDeps fields, and make sure
+        * the TOC items are marked as not being in any parallel-processing list.
         */
        maxDumpId = AH->maxDumpId;
        tocsByDumpId = (TocEntry **) calloc(maxDumpId, sizeof(TocEntry *));
@@ -3769,6 +3774,8 @@ fix_dependencies(ArchiveHandle *AH)
        {
                tocsByDumpId[te->dumpId - 1] = te;
                te->depCount = te->nDeps;
+               te->revDeps = NULL;
+               te->nRevDeps = 0;
                te->par_prev = NULL;
                te->par_next = NULL;
        }
@@ -3786,6 +3793,9 @@ fix_dependencies(ArchiveHandle *AH)
         * TABLE, if possible.  However, if the dependency isn't in the archive
         * then just assume it was a TABLE; this is to cover cases where the table
         * was suppressed but we have the data and some dependent post-data items.
+        *
+        * XXX this is O(N^2) if there are a lot of tables.  We ought to fix
+        * pg_dump to produce correctly-linked dependencies in the first place.
         */
        for (te = AH->toc->next; te != AH->toc; te = te->next)
        {
@@ -3832,8 +3842,14 @@ fix_dependencies(ArchiveHandle *AH)
        }
 
        /*
-        * It is possible that the dependencies list items that are not in the
-        * archive at all.      Subtract such items from the depCounts.
+        * At this point we start to build the revDeps reverse-dependency arrays,
+        * so all changes of dependencies must be complete.
+        */
+
+       /*
+        * Count the incoming dependencies for each item.  Also, it is possible
+        * that the dependencies list items that are not in the archive at
+        * all.  Subtract such items from the depCounts.
         */
        for (te = AH->toc->next; te != AH->toc; te = te->next)
        {
@@ -3841,22 +3857,52 @@ fix_dependencies(ArchiveHandle *AH)
                {
                        DumpId          depid = te->dependencies[i];
 
-                       if (depid > maxDumpId || tocsByDumpId[depid - 1] == NULL)
+                       if (depid <= maxDumpId && tocsByDumpId[depid - 1] != NULL)
+                               tocsByDumpId[depid - 1]->nRevDeps++;
+                       else
                                te->depCount--;
                }
        }
 
        /*
+        * Allocate space for revDeps[] arrays, and reset nRevDeps so we can
+        * use it as a counter below.
+        */
+       for (te = AH->toc->next; te != AH->toc; te = te->next)
+       {
+               if (te->nRevDeps > 0)
+                       te->revDeps = (DumpId *) malloc(te->nRevDeps * sizeof(DumpId));
+               te->nRevDeps = 0;
+       }
+
+       /*
+        * Build the revDeps[] arrays of incoming-dependency dumpIds.  This
+        * had better agree with the loops above.
+        */
+       for (te = AH->toc->next; te != AH->toc; te = te->next)
+       {
+               for (i = 0; i < te->nDeps; i++)
+               {
+                       DumpId          depid = te->dependencies[i];
+
+                       if (depid <= maxDumpId && tocsByDumpId[depid - 1] != NULL)
+                       {
+                               TocEntry   *otherte = tocsByDumpId[depid - 1];
+
+                               otherte->revDeps[otherte->nRevDeps++] = te->dumpId;
+                       }
+               }
+       }
+
+       /*
         * Lastly, work out the locking dependencies.
         */
        for (te = AH->toc->next; te != AH->toc; te = te->next)
        {
                te->lockDeps = NULL;
                te->nLockDeps = 0;
-               identify_locking_dependencies(te, tocsByDumpId, maxDumpId);
+               identify_locking_dependencies(te);
        }
-
-       free(tocsByDumpId);
 }
 
 /*
@@ -3890,13 +3936,9 @@ repoint_table_dependencies(ArchiveHandle *AH,
  * Identify which objects we'll need exclusive lock on in order to restore
  * the given TOC entry (*other* than the one identified by the TOC entry
  * itself).  Record their dump IDs in the entry's lockDeps[] array.
- * tocsByDumpId[] is a convenience array (of size maxDumpId) to avoid
- * searching the TOC for each dependency.
  */
 static void
-identify_locking_dependencies(TocEntry *te,
-                                                         TocEntry **tocsByDumpId,
-                                                         DumpId maxDumpId)
+identify_locking_dependencies(TocEntry *te)
 {
        DumpId     *lockids;
        int                     nlockids;
@@ -3950,31 +3992,21 @@ identify_locking_dependencies(TocEntry *te,
 static void
 reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
 {
-       DumpId          target = te->dumpId;
        int                     i;
 
-       ahlog(AH, 2, "reducing dependencies for %d\n", target);
+       ahlog(AH, 2, "reducing dependencies for %d\n", te->dumpId);
 
-       /*
-        * We must examine all entries, not only the ones after the target item,
-        * because if the user used a -L switch then the original dependency-
-        * respecting order has been destroyed by SortTocFromFile.
-        */
-       for (te = AH->toc->next; te != AH->toc; te = te->next)
+       for (i = 0; i < te->nRevDeps; i++)
        {
-               for (i = 0; i < te->nDeps; i++)
+               TocEntry   *otherte = tocsByDumpId[te->revDeps[i] - 1];
+
+               otherte->depCount--;
+               if (otherte->depCount == 0 && otherte->par_prev != NULL)
                {
-                       if (te->dependencies[i] == target)
-                       {
-                               te->depCount--;
-                               if (te->depCount == 0 && te->par_prev != NULL)
-                               {
-                                       /* It must be in the pending list, so remove it ... */
-                                       par_list_remove(te);
-                                       /* ... and add to ready_list */
-                                       par_list_append(ready_list, te);
-                               }
-                       }
+                       /* It must be in the pending list, so remove it ... */
+                       par_list_remove(otherte);
+                       /* ... and add to ready_list */
+                       par_list_append(ready_list, otherte);
                }
        }
 }
index 0a135ee..9f826b6 100644 (file)
@@ -321,6 +321,8 @@ typedef struct _tocEntry
        struct _tocEntry *par_next; /* these are NULL if not in either list */
        bool            created;                /* set for DATA member if TABLE was created */
        int                     depCount;               /* number of dependencies not yet restored */
+       DumpId     *revDeps;            /* dumpIds of objects depending on this one */
+       int                     nRevDeps;               /* number of such dependencies */
        DumpId     *lockDeps;           /* dumpIds of objects this one needs lock on */
        int                     nLockDeps;              /* number of such dependencies */
 } TocEntry;
index fe34525..344bd40 100644 (file)
@@ -35,6 +35,7 @@ char     *const pgresStatus[] = {
        "PGRES_TUPLES_OK",
        "PGRES_COPY_OUT",
        "PGRES_COPY_IN",
+       "PGRES_COPY_BOTH",
        "PGRES_BAD_RESPONSE",
        "PGRES_NONFATAL_ERROR",
        "PGRES_FATAL_ERROR"
@@ -174,6 +175,7 @@ PQmakeEmptyPGresult(PGconn *conn, ExecStatusType status)
                        case PGRES_TUPLES_OK:
                        case PGRES_COPY_OUT:
                        case PGRES_COPY_IN:
+                       case PGRES_COPY_BOTH:
                                /* non-error cases */
                                break;
                        default:
@@ -1592,6 +1594,12 @@ PQgetResult(PGconn *conn)
                        else
                                res = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT);
                        break;
+               case PGASYNC_COPY_BOTH:
+                       if (conn->result && conn->result->resultStatus == PGRES_COPY_BOTH)
+                               res = pqPrepareAsyncResult(conn);
+                       else
+                               res = PQmakeEmptyPGresult(conn, PGRES_COPY_BOTH);
+                       break;
                default:
                        printfPQExpBuffer(&conn->errorMessage,
                                                          libpq_gettext("unexpected asyncStatus: %d\n"),
@@ -1776,6 +1784,13 @@ PQexecStart(PGconn *conn)
                                return false;
                        }
                }
+               else if (resultStatus == PGRES_COPY_BOTH)
+               {
+                       /* We don't allow PQexec during COPY BOTH */
+                       printfPQExpBuffer(&conn->errorMessage,
+                        libpq_gettext("PQexec not allowed during COPY BOTH\n"));
+                       return false;                   
+               }
                /* check for loss of connection, too */
                if (conn->status == CONNECTION_BAD)
                        return false;
@@ -1799,7 +1814,7 @@ PQexecFinish(PGconn *conn)
         * than one --- but merge error messages if we get more than one error
         * result.
         *
-        * We have to stop if we see copy in/out, however. We will resume parsing
+        * We have to stop if we see copy in/out/both, however. We will resume parsing
         * after application performs the data transfer.
         *
         * Also stop if the connection is lost (else we'll loop infinitely).
@@ -1828,6 +1843,7 @@ PQexecFinish(PGconn *conn)
                lastResult = result;
                if (result->resultStatus == PGRES_COPY_IN ||
                        result->resultStatus == PGRES_COPY_OUT ||
+                       result->resultStatus == PGRES_COPY_BOTH ||
                        conn->status == CONNECTION_BAD)
                        break;
        }
@@ -2001,7 +2017,7 @@ PQnotifies(PGconn *conn)
 }
 
 /*
- * PQputCopyData - send some data to the backend during COPY IN or COPY XLOG
+ * PQputCopyData - send some data to the backend during COPY IN or COPY BOTH
  *
  * Returns 1 if successful, 0 if data could not be sent (only possible
  * in nonblock mode), or -1 if an error occurs.
@@ -2012,7 +2028,7 @@ PQputCopyData(PGconn *conn, const char *buffer, int nbytes)
        if (!conn)
                return -1;
        if (conn->asyncStatus != PGASYNC_COPY_IN &&
-               conn->asyncStatus != PGASYNC_COPY_XLOG)
+               conn->asyncStatus != PGASYNC_COPY_BOTH)
        {
                printfPQExpBuffer(&conn->errorMessage,
                                                  libpq_gettext("no COPY in progress\n"));
@@ -2150,7 +2166,7 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 
 /*
  * PQgetCopyData - read a row of data from the backend during COPY OUT
- * or COPY XLOG
+ * or COPY BOTH
  *
  * If successful, sets *buffer to point to a malloc'd row of data, and
  * returns row length (always > 0) as result.
@@ -2165,7 +2181,7 @@ PQgetCopyData(PGconn *conn, char **buffer, int async)
        if (!conn)
                return -2;
        if (conn->asyncStatus != PGASYNC_COPY_OUT &&
-               conn->asyncStatus != PGASYNC_COPY_XLOG)
+               conn->asyncStatus != PGASYNC_COPY_BOTH)
        {
                printfPQExpBuffer(&conn->errorMessage,
                                                  libpq_gettext("no COPY in progress\n"));
index c83ef12..6587a2e 100644 (file)
@@ -542,7 +542,7 @@ pqParseInput2(PGconn *conn)
                                        conn->asyncStatus = PGASYNC_COPY_OUT;
                                        break;
                                        /*
-                                        * Don't need to process CopyXLogResponse here because
+                                        * Don't need to process CopyBothResponse here because
                                         * it never arrives from the server during protocol 2.0.
                                         */
                                default:
index 207377d..c599edc 100644 (file)
@@ -358,16 +358,10 @@ pqParseInput3(PGconn *conn)
                                        conn->asyncStatus = PGASYNC_COPY_OUT;
                                        conn->copy_already_done = 0;
                                        break;
-                               case 'W':               /* Start Copy XLog */
-                                       /*
-                                        * We don't need to use getCopyStart here since CopyXLogResponse
-                                        * specifies neither the copy format nor the number of columns in
-                                        * the Copy data. They should be always zero.
-                                        */
-                                       conn->result = PQmakeEmptyPGresult(conn, PGRES_COPY_OUT);
-                                       if (!conn->result)
+                               case 'W':               /* Start Copy Both */
+                                       if (getCopyStart(conn, PGRES_COPY_BOTH))
                                                return;
-                                       conn->asyncStatus = PGASYNC_COPY_XLOG;
+                                       conn->asyncStatus = PGASYNC_COPY_BOTH;
                                        conn->copy_already_done = 0;
                                        break;
                                case 'd':               /* Copy Data */
@@ -1204,7 +1198,8 @@ getNotify(PGconn *conn)
 }
 
 /*
- * getCopyStart - process CopyInResponse or CopyOutResponse message
+ * getCopyStart - process CopyInResponse, CopyOutResponse or
+ * CopyBothResponse message
  *
  * parseInput already read the message type and length.
  */
@@ -1375,6 +1370,7 @@ getCopyDataMessage(PGconn *conn)
 
 /*
  * PQgetCopyData - read a row of data from the backend during COPY OUT
+ * or COPY BOTH
  *
  * If successful, sets *buffer to point to a malloc'd row of data, and
  * returns row length (always > 0) as result.
@@ -1398,10 +1394,10 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
                if (msgLength < 0)
                {
                        /*
-                        * On end-of-copy, exit COPY_OUT mode and let caller read status
-                        * with PQgetResult().  The normal case is that it's Copy Done,
-                        * but we let parseInput read that.  If error, we expect the state
-                        * was already changed.
+                        * On end-of-copy, exit COPY_OUT or COPY_BOTH mode and let caller
+                        * read status with PQgetResult().      The normal case is that it's
+                        * Copy Done, but we let parseInput read that.  If error, we expect
+                        * the state was already changed.
                         */
                        if (msgLength == -1)
                                conn->asyncStatus = PGASYNC_BUSY;
index f32b2d3..b70abae 100644 (file)
@@ -85,6 +85,7 @@ typedef enum
                                                                 * contains the result tuples */
        PGRES_COPY_OUT,                         /* Copy Out data transfer in progress */
        PGRES_COPY_IN,                          /* Copy In data transfer in progress */
+       PGRES_COPY_BOTH,                        /* Copy In/Out data transfer in progress */
        PGRES_BAD_RESPONSE,                     /* an unexpected response was recv'd from the
                                                                 * backend */
        PGRES_NONFATAL_ERROR,           /* notice or warning message */
index 26ac03b..b854ae9 100644 (file)
@@ -219,7 +219,7 @@ typedef enum
        PGASYNC_READY,                          /* result ready for PQgetResult */
        PGASYNC_COPY_IN,                        /* Copy In data transfer in progress */
        PGASYNC_COPY_OUT,                       /* Copy Out data transfer in progress */
-       PGASYNC_COPY_XLOG                       /* Copy XLog data transfer in progress */
+       PGASYNC_COPY_BOTH                       /* Copy In/Out data transfer in progress */
 } PGAsyncStatusType;
 
 /* PGQueryClass tracks which query protocol we are now executing */