OSDN Git Service

Modify parallel pg_restore to track pending and ready items by means of
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 7 Aug 2009 22:48:34 +0000 (22:48 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 7 Aug 2009 22:48:34 +0000 (22:48 +0000)
two new lists, rather than repeatedly rescanning the main TOC list.
This avoids a potential O(N^2) slowdown, although you'd need a *lot*
of tables to make that really significant; and it might simplify future
improvements in the scheduling algorithm by making the set of ready
items more easily inspectable.  The original thought that it would
in itself result in a more efficient job dispatch order doesn't seem
to have been borne out in testing, but it seems worth doing anyway.

src/bin/pg_dump/pg_backup_archiver.c
src/bin/pg_dump/pg_backup_archiver.h

index 2ff282f..20bd3eb 100644 (file)
@@ -15,7 +15,7 @@
  *
  *
  * IDENTIFICATION
- *             $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.174 2009/08/04 21:56:08 tgl Exp $
+ *             $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.c,v 1.175 2009/08/07 22:48:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #define thandle HANDLE
 #endif
 
+/* Arguments needed for a worker child */
 typedef struct _restore_args
 {
        ArchiveHandle *AH;
        TocEntry   *te;
 } RestoreArgs;
 
+/* State for each parallel activity slot */
 typedef struct _parallel_slot
 {
        thandle         child_id;
@@ -117,11 +119,15 @@ static thandle spawn_restore(RestoreArgs *args);
 static thandle reap_child(ParallelSlot *slots, int n_slots, int *work_status);
 static bool work_in_progress(ParallelSlot *slots, int n_slots);
 static int     get_next_slot(ParallelSlot *slots, int n_slots);
+static void par_list_header_init(TocEntry *l);
+static void par_list_append(TocEntry *l, TocEntry *te);
+static void par_list_remove(TocEntry *te);
 static TocEntry *get_next_work_item(ArchiveHandle *AH,
-                                  TocEntry **first_unprocessed,
+                                  TocEntry *ready_list,
                                   ParallelSlot *slots, int n_slots);
 static parallel_restore_result parallel_restore(RestoreArgs *args);
-static void mark_work_done(ArchiveHandle *AH, thandle worker, int status,
+static void mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
+                          thandle worker, int status,
                           ParallelSlot *slots, int n_slots);
 static void fix_dependencies(ArchiveHandle *AH);
 static bool has_lock_conflicts(TocEntry *te1, TocEntry *te2);
@@ -129,7 +135,8 @@ static void repoint_table_dependencies(ArchiveHandle *AH,
                                                   DumpId tableId, DumpId tableDataId);
 static void identify_locking_dependencies(TocEntry *te,
                                                          TocEntry **tocsByDumpId);
-static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te);
+static void reduce_dependencies(ArchiveHandle *AH, TocEntry *te,
+                                                               TocEntry *ready_list);
 static void mark_create_done(ArchiveHandle *AH, TocEntry *te);
 static void inhibit_data_for_failed_table(ArchiveHandle *AH, TocEntry *te);
 static ArchiveHandle *CloneArchive(ArchiveHandle *AH);
@@ -3069,7 +3076,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
        ParallelSlot *slots;
        int                     work_status;
        int                     next_slot;
-       TocEntry   *first_unprocessed = AH->toc->next;
+       TocEntry        pending_list;
+       TocEntry        ready_list;
        TocEntry   *next_work_item;
        thandle         ret_child;
        TocEntry   *te;
@@ -3091,8 +3099,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
         * faster in a single connection because we avoid all the connection and
         * setup overhead.
         */
-       while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
-                                                                                               NULL, 0)) != NULL)
+       for (next_work_item = AH->toc->next; next_work_item != AH->toc; next_work_item = next_work_item->next)
        {
                if (next_work_item->section == SECTION_DATA ||
                        next_work_item->section == SECTION_POST_DATA)
@@ -3104,8 +3111,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 
                (void) restore_toc_entry(AH, next_work_item, ropt, false);
 
-               next_work_item->restored = true;
-               reduce_dependencies(AH, next_work_item);
+               /* there should be no touch of ready_list here, so pass NULL */
+               reduce_dependencies(AH, next_work_item, NULL);
        }
 
        /*
@@ -3129,6 +3136,25 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
        AH->currWithOids = -1;
 
        /*
+        * Initialize the lists of pending and ready items.  After this setup,
+        * the pending list is everything that needs to be done but is blocked
+        * by one or more dependencies, while the ready list contains items that
+        * have no remaining dependencies.  Note: we don't yet filter out entries
+        * that aren't going to be restored.  They might participate in
+        * dependency chains connecting entries that should be restored, so we
+        * treat them as live until we actually process them.
+        */
+       par_list_header_init(&pending_list);
+       par_list_header_init(&ready_list);
+       for (; next_work_item != AH->toc; next_work_item = next_work_item->next)
+       {
+               if (next_work_item->depCount > 0)
+                       par_list_append(&pending_list, next_work_item);
+               else
+                       par_list_append(&ready_list, next_work_item);
+       }
+
+       /*
         * main parent loop
         *
         * Keep going until there is no worker still running AND there is no work
@@ -3137,7 +3163,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 
        ahlog(AH, 1, "entering main parallel loop\n");
 
-       while ((next_work_item = get_next_work_item(AH, &first_unprocessed,
+       while ((next_work_item = get_next_work_item(AH, &ready_list,
                                                                                                slots, n_slots)) != NULL ||
                   work_in_progress(slots, n_slots))
        {
@@ -3153,8 +3179,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
                                          next_work_item->dumpId,
                                          next_work_item->desc, next_work_item->tag);
 
-                               next_work_item->restored = true;
-                               reduce_dependencies(AH, next_work_item);
+                               par_list_remove(next_work_item);
+                               reduce_dependencies(AH, next_work_item, &ready_list);
 
                                continue;
                        }
@@ -3169,7 +3195,7 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
                                          next_work_item->dumpId,
                                          next_work_item->desc, next_work_item->tag);
 
-                               next_work_item->restored = true;
+                               par_list_remove(next_work_item);
 
                                /* this memory is dealloced in mark_work_done() */
                                args = malloc(sizeof(RestoreArgs));
@@ -3196,7 +3222,8 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
 
                if (WIFEXITED(work_status))
                {
-                       mark_work_done(AH, ret_child, WEXITSTATUS(work_status),
+                       mark_work_done(AH, &ready_list,
+                                                  ret_child, WEXITSTATUS(work_status),
                                                   slots, n_slots);
                }
                else
@@ -3222,14 +3249,11 @@ restore_toc_entries_parallel(ArchiveHandle *AH)
         * dependencies, or some other pathological condition. If so, do it in the
         * single parent connection.
         */
-       for (te = AH->toc->next; te != AH->toc; te = te->next)
+       for (te = pending_list.par_next; te != &pending_list; te = te->par_next)
        {
-               if (!te->restored)
-               {
-                       ahlog(AH, 1, "processing missed item %d %s %s\n",
-                                 te->dumpId, te->desc, te->tag);
-                       (void) restore_toc_entry(AH, te, ropt, false);
-               }
+               ahlog(AH, 1, "processing missed item %d %s %s\n",
+                         te->dumpId, te->desc, te->tag);
+               (void) restore_toc_entry(AH, te, ropt, false);
        }
 
        /* The ACLs will be handled back in RestoreArchive. */
@@ -3372,25 +3396,57 @@ has_lock_conflicts(TocEntry *te1, TocEntry *te2)
 }
 
 
+/*
+ * Initialize the header of a parallel-processing list.
+ *
+ * These are circular lists with a dummy TocEntry as header, just like the
+ * main TOC list; but we use separate list links so that an entry can be in
+ * the main TOC list as well as in a parallel-processing list.
+ */
+static void
+par_list_header_init(TocEntry *l)
+{
+       l->par_prev = l->par_next = l;
+}
+
+/* Append te to the end of the parallel-processing list headed by l */
+static void
+par_list_append(TocEntry *l, TocEntry *te)
+{
+       te->par_prev = l->par_prev;
+       l->par_prev->par_next = te;
+       l->par_prev = te;
+       te->par_next = l;
+}
+
+/* Remove te from whatever parallel-processing list it's in */
+static void
+par_list_remove(TocEntry *te)
+{
+       te->par_prev->par_next = te->par_next;
+       te->par_next->par_prev = te->par_prev;
+       te->par_prev = NULL;
+       te->par_next = NULL;
+}
+
 
 /*
  * Find the next work item (if any) that is capable of being run now.
  *
  * To qualify, the item must have no remaining dependencies
- * and no requirement for locks that is incompatible with
- * items currently running.
+ * and no requirements for locks that are incompatible with
+ * items currently running.  Items in the ready_list are known to have
+ * no remaining dependencies, but we have to check for lock conflicts.
  *
- * first_unprocessed is state data that tracks the location of the first
- * TocEntry that's not marked 'restored'.  This avoids O(N^2) search time
- * with long TOC lists.  (Even though the constant is pretty small, it'd
- * get us eventually.)
+ * Note that the returned item has *not* been removed from ready_list.
+ * The caller must do that after successfully dispatching the item.
  *
  * pref_non_data is for an alternative selection algorithm that gives
  * preference to non-data items if there is already a data load running.
  * It is currently disabled.
  */
 static TocEntry *
-get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
+get_next_work_item(ArchiveHandle *AH, TocEntry *ready_list,
                                   ParallelSlot *slots, int n_slots)
 {
        bool            pref_non_data = false;  /* or get from AH->ropt */
@@ -3415,26 +3471,12 @@ get_next_work_item(ArchiveHandle *AH, TocEntry **first_unprocessed,
        }
 
        /*
-        * Advance first_unprocessed if possible.
-        */
-       for (te = *first_unprocessed; te != AH->toc; te = te->next)
-       {
-               if (!te->restored)
-                       break;
-       }
-       *first_unprocessed = te;
-
-       /*
-        * Search from first_unprocessed until we find an available item.
+        * Search the ready_list until we find a suitable item.
         */
-       for (; te != AH->toc; te = te->next)
+       for (te = ready_list->par_next; te != ready_list; te = te->par_next)
        {
                bool            conflicts = false;
 
-               /* Ignore if already done or still waiting on dependencies */
-               if (te->restored || te->depCount > 0)
-                       continue;
-
                /*
                 * Check to see if the item would need exclusive lock on something
                 * that a currently running item also needs lock on, or vice versa. If
@@ -3546,7 +3588,8 @@ parallel_restore(RestoreArgs *args)
  * update status, and reduce the dependency count of any dependent items.
  */
 static void
-mark_work_done(ArchiveHandle *AH, thandle worker, int status,
+mark_work_done(ArchiveHandle *AH, TocEntry *ready_list,
+                          thandle worker, int status,
                           ParallelSlot *slots, int n_slots)
 {
        TocEntry   *te = NULL;
@@ -3585,7 +3628,7 @@ mark_work_done(ArchiveHandle *AH, thandle worker, int status,
                die_horribly(AH, modulename, "worker process failed: exit code %d\n",
                                         status);
 
-       reduce_dependencies(AH, te);
+       reduce_dependencies(AH, te, ready_list);
 }
 
 
@@ -3610,13 +3653,16 @@ fix_dependencies(ArchiveHandle *AH)
         * indexes the TOC entries by dump ID, rather than searching the TOC list
         * repeatedly.  Entries for dump IDs not present in the TOC will be NULL.
         *
-        * Also, initialize the depCount fields.
+        * Also, initialize the depCount fields, and make sure all the TOC items
+        * are marked as not being in any parallel-processing list.
         */
        tocsByDumpId = (TocEntry **) calloc(AH->maxDumpId, sizeof(TocEntry *));
        for (te = AH->toc->next; te != AH->toc; te = te->next)
        {
                tocsByDumpId[te->dumpId - 1] = te;
                te->depCount = te->nDeps;
+               te->par_prev = NULL;
+               te->par_next = NULL;
        }
 
        /*
@@ -3785,10 +3831,11 @@ identify_locking_dependencies(TocEntry *te, TocEntry **tocsByDumpId)
 
 /*
  * Remove the specified TOC entry from the depCounts of items that depend on
- * it, thereby possibly making them ready-to-run.
+ * it, thereby possibly making them ready-to-run.  Any pending item that
+ * becomes ready should be moved to the ready list.
  */
 static void
-reduce_dependencies(ArchiveHandle *AH, TocEntry *te)
+reduce_dependencies(ArchiveHandle *AH, TocEntry *te, TocEntry *ready_list)
 {
        DumpId          target = te->dumpId;
        int                     i;
@@ -3805,7 +3852,16 @@ reduce_dependencies(ArchiveHandle *AH, TocEntry *te)
                for (i = 0; i < te->nDeps; i++)
                {
                        if (te->dependencies[i] == target)
+                       {
                                te->depCount--;
+                               if (te->depCount == 0 && te->par_prev != NULL)
+                               {
+                                       /* It must be in the pending list, so remove it ... */
+                                       par_list_remove(te);
+                                       /* ... and add to ready_list */
+                                       par_list_append(ready_list, te);
+                               }
+                       }
                }
        }
 }
index 710dec0..fa6db40 100644 (file)
@@ -17,7 +17,7 @@
  *
  *
  * IDENTIFICATION
- *             $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.81 2009/08/04 21:56:09 tgl Exp $
+ *             $PostgreSQL: pgsql/src/bin/pg_dump/pg_backup_archiver.h,v 1.82 2009/08/07 22:48:34 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -314,7 +314,8 @@ typedef struct _tocEntry
        void       *formatData;         /* TOC Entry data specific to file format */
 
        /* working state (needed only for parallel restore) */
-       bool            restored;               /* item is in progress or done */
+       struct _tocEntry *par_prev;     /* list links for pending/ready items; */
+       struct _tocEntry *par_next;     /* these are NULL if not in either list */
        bool            created;                /* set for DATA member if TABLE was created */
        int                     depCount;               /* number of dependencies not yet restored */
        DumpId     *lockDeps;           /* dumpIds of objects this one needs lock on */