OSDN Git Service

Initial implementation of concurrent VACUUM. Ifdef'd out for the moment,
authorTom Lane <tgl@sss.pgh.pa.us>
Fri, 13 Jul 2001 22:55:59 +0000 (22:55 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Fri, 13 Jul 2001 22:55:59 +0000 (22:55 +0000)
because index locking issues are not handled correctly yet.  Need to go
work on the index AMs next.

src/backend/commands/Makefile
src/backend/commands/vacuum.c
src/backend/commands/vacuumlazy.c [new file with mode: 0644]
src/include/commands/vacuum.h

index 6ae11f1..b27e6d7 100644 (file)
@@ -4,7 +4,7 @@
 #    Makefile for commands
 #
 # IDENTIFICATION
-#    $Header: /cvsroot/pgsql/src/backend/commands/Makefile,v 1.26 2000/08/31 16:09:53 petere Exp $
+#    $Header: /cvsroot/pgsql/src/backend/commands/Makefile,v 1.27 2001/07/13 22:55:59 tgl Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -13,7 +13,7 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = async.o creatinh.o command.o comment.o copy.o indexcmds.o define.o \
-       remove.o rename.o vacuum.o analyze.o view.o cluster.o \
+       remove.o rename.o vacuum.o vacuumlazy.o analyze.o view.o cluster.o \
        explain.o sequence.o trigger.o user.o proclang.o \
        dbcommands.o variable.o
 
index 34bc1e9..f41bb66 100644 (file)
@@ -1,41 +1,39 @@
 /*-------------------------------------------------------------------------
  *
  * vacuum.c
- *       the postgres vacuum cleaner
+ *       The postgres vacuum cleaner.
+ *
+ * This file includes the "full" version of VACUUM, as well as control code
+ * used by all three of full VACUUM, lazy VACUUM, and ANALYZE.  See
+ * vacuumlazy.c and analyze.c for the rest of the code for the latter two.
+ *
  *
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.203 2001/07/12 04:11:13 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/vacuum.c,v 1.204 2001/07/13 22:55:59 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
-#include <fcntl.h>
 #include <unistd.h>
-#include <sys/types.h>
-#include <sys/file.h>
-#include <sys/stat.h>
 
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
 #include "catalog/catname.h"
-#include "catalog/index.h"
 #include "catalog/pg_index.h"
 #include "commands/vacuum.h"
 #include "executor/executor.h"
 #include "miscadmin.h"
-#include "nodes/execnodes.h"
 #include "storage/freespace.h"
 #include "storage/sinval.h"
 #include "storage/smgr.h"
 #include "tcop/pquery.h"
-#include "tcop/tcopprot.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
@@ -123,7 +121,7 @@ static void scan_heap(VRelStats *vacrelstats, Relation onerel,
                                          VacPageList vacuum_pages, VacPageList fraged_pages);
 static void repair_frag(VRelStats *vacrelstats, Relation onerel,
                                                VacPageList vacuum_pages, VacPageList fraged_pages,
-                                               int nindices, Relation *Irel);
+                                               int nindexes, Relation *Irel);
 static void vacuum_heap(VRelStats *vacrelstats, Relation onerel,
                                                VacPageList vacpagelist);
 static void vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage);
@@ -135,8 +133,6 @@ static void vac_update_fsm(Relation onerel, VacPageList fraged_pages,
                                                   BlockNumber rel_pages);
 static VacPage copy_vac_page(VacPage vacpage);
 static void vpage_insert(VacPageList vacpagelist, VacPage vpnew);
-static void get_indices(Relation relation, int *nindices, Relation **Irel);
-static void close_indices(int nindices, Relation *Irel);
 static bool is_partial_index(Relation indrel);
 static void *vac_bsearch(const void *key, const void *base,
                                                 size_t nelem, size_t size,
@@ -455,14 +451,6 @@ vac_update_relstats(Oid relid, BlockNumber num_pages, double num_tuples,
  */
 
 
-/* XXX Temporary placeholder */
-static void
-lazy_vacuum_rel(Relation onerel)
-{
-       full_vacuum_rel(onerel);
-}
-
-
 /*
  *     vacuum_rel() -- vacuum one heap relation
  *
@@ -554,11 +542,17 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt)
 
        /*
         * Do the actual work --- either FULL or "lazy" vacuum
+        *
+        * XXX for the moment, lazy vac not supported unless CONCURRENT_VACUUM
         */
+#ifdef CONCURRENT_VACUUM
        if (vacstmt->full)
                full_vacuum_rel(onerel);
        else
-               lazy_vacuum_rel(onerel);
+               lazy_vacuum_rel(onerel, vacstmt);
+#else
+       full_vacuum_rel(onerel);
+#endif
 
        /* all done with this class, but hold lock until commit */
        heap_close(onerel, NoLock);
@@ -596,7 +590,7 @@ vacuum_rel(Oid relid, VacuumStmt *vacstmt)
 /*
  *     full_vacuum_rel() -- perform FULL VACUUM for one heap relation
  *
- *             This routine vacuums a single heap, cleans out its indices, and
+ *             This routine vacuums a single heap, cleans out its indexes, and
  *             updates its num_pages and num_tuples statistics.
  *
  *             At entry, we have already established a transaction and opened
@@ -606,11 +600,11 @@ static void
 full_vacuum_rel(Relation onerel)
 {
        VacPageListData vacuum_pages;           /* List of pages to vacuum and/or
-                                                                                * clean indices */
+                                                                                * clean indexes */
        VacPageListData fraged_pages;           /* List of pages with space enough
                                                                                 * for re-using */
        Relation   *Irel;
-       int32           nindices,
+       int                     nindexes,
                                i;
        VRelStats  *vacrelstats;
        bool            reindex = false;
@@ -633,15 +627,13 @@ full_vacuum_rel(Relation onerel)
        vacuum_pages.num_pages = fraged_pages.num_pages = 0;
        scan_heap(vacrelstats, onerel, &vacuum_pages, &fraged_pages);
 
-       /* Now open all indices of the relation */
-       nindices = 0;
-       Irel = (Relation *) NULL;
-       get_indices(onerel, &nindices, &Irel);
+       /* Now open all indexes of the relation */
+       vac_open_indexes(onerel, &nindexes, &Irel);
        if (!Irel)
                reindex = false;
        else if (!RelationGetForm(onerel)->relhasindex)
                reindex = true;
-       if (nindices > 0)
+       if (nindexes > 0)
                vacrelstats->hasindex = true;
 
 #ifdef NOT_USED
@@ -651,7 +643,7 @@ full_vacuum_rel(Relation onerel)
         */
        if (reindex)
        {
-               close_indices(nindices, Irel);
+               vac_close_indexes(nindexes, Irel);
                Irel = (Relation *) NULL;
                activate_indexes_of_a_table(RelationGetRelid(onerel), false);
        }
@@ -662,14 +654,14 @@ full_vacuum_rel(Relation onerel)
        {
                if (vacuum_pages.num_pages > 0)
                {
-                       for (i = 0; i < nindices; i++)
+                       for (i = 0; i < nindexes; i++)
                                vacuum_index(&vacuum_pages, Irel[i],
                                                         vacrelstats->rel_tuples, 0);
                }
                else
                {
-                       /* just scan indices to update statistic */
-                       for (i = 0; i < nindices; i++)
+                       /* just scan indexes to update statistic */
+                       for (i = 0; i < nindexes; i++)
                                scan_index(Irel[i], vacrelstats->rel_tuples);
                }
        }
@@ -678,12 +670,12 @@ full_vacuum_rel(Relation onerel)
        {
                /* Try to shrink heap */
                repair_frag(vacrelstats, onerel, &vacuum_pages, &fraged_pages,
-                                       nindices, Irel);
-               close_indices(nindices, Irel);
+                                       nindexes, Irel);
+               vac_close_indexes(nindexes, Irel);
        }
        else
        {
-               close_indices(nindices, Irel);
+               vac_close_indexes(nindexes, Irel);
                if (vacuum_pages.num_pages > 0)
                {
                        /* Clean pages from vacuum_pages list */
@@ -835,7 +827,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                        itemid = PageGetItemId(page, offnum);
 
                        /*
-                        * Collect un-used items too - it's possible to have indices
+                        * Collect un-used items too - it's possible to have indexes
                         * pointing here after crash.
                         */
                        if (!ItemIdIsUsed(itemid))
@@ -944,7 +936,7 @@ scan_heap(VRelStats *vacrelstats, Relation onerel,
                                }
 
                                /* mark it unused on the temp page */
-                               lpp = &(((PageHeader) tempPage)->pd_linp[offnum - 1]);
+                               lpp = PageGetItemId(tempPage, offnum);
                                lpp->lp_flags &= ~LP_USED;
 
                                vacpage->offsets[vacpage->offsets_free++] = offnum;
@@ -1073,8 +1065,8 @@ Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u. %s",
  *     repair_frag() -- try to repair relation's fragmentation
  *
  *             This routine marks dead tuples as unused and tries re-use dead space
- *             by moving tuples (and inserting indices if needed). It constructs
- *             Nvacpagelist list of free-ed pages (moved tuples) and clean indices
+ *             by moving tuples (and inserting indexes if needed). It constructs
+ *             Nvacpagelist list of free-ed pages (moved tuples) and clean indexes
  *             for them after committing (in hack-manner - without losing locks
  *             and freeing memory!) current transaction. It truncates relation
  *             if some end-blocks are gone away.
@@ -1082,7 +1074,7 @@ Re-using: Free/Avail. Space %.0f/%.0f; EndEmpty/Avail. Pages %u/%u. %s",
 static void
 repair_frag(VRelStats *vacrelstats, Relation onerel,
                        VacPageList vacuum_pages, VacPageList fraged_pages,
-                       int nindices, Relation *Irel)
+                       int nindexes, Relation *Irel)
 {
        TransactionId myXID;
        CommandId       myCID;
@@ -1884,7 +1876,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                 * relation.  Ideally we should do Commit/StartTransactionCommand
                 * here, relying on the session-level table lock to protect our
                 * exclusive access to the relation.  However, that would require
-                * a lot of extra code to close and re-open the relation, indices,
+                * a lot of extra code to close and re-open the relation, indexes,
                 * etc.  For now, a quick hack: record status of current
                 * transaction as committed, and continue.
                 */
@@ -1985,7 +1977,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
 
        if (Nvacpagelist.num_pages > 0)
        {
-               /* vacuum indices again if needed */
+               /* vacuum indexes again if needed */
                if (Irel != (Relation *) NULL)
                {
                        VacPage    *vpleft,
@@ -2002,7 +1994,7 @@ repair_frag(VRelStats *vacrelstats, Relation onerel,
                                *vpright = vpsave;
                        }
                        Assert(keep_tuples >= 0);
-                       for (i = 0; i < nindices; i++)
+                       for (i = 0; i < nindexes; i++)
                                vacuum_index(&Nvacpagelist, Irel[i],
                                                         vacrelstats->rel_tuples, keep_tuples);
                }
@@ -2175,7 +2167,7 @@ vacuum_page(Relation onerel, Buffer buffer, VacPage vacpage)
        START_CRIT_SECTION();
        for (i = 0; i < vacpage->offsets_free; i++)
        {
-               itemid = &(((PageHeader) page)->pd_linp[vacpage->offsets[i] - 1]);
+               itemid = PageGetItemId(page, vacpage->offsets[i]);
                itemid->lp_flags &= ~LP_USED;
        }
        uncnt = PageRepairFragmentation(page, unused);
@@ -2244,9 +2236,9 @@ scan_index(Relation indrel, double num_tuples)
  *
  *             Vpl is the VacPageList of the heap we're currently vacuuming.
  *             It's locked. Indrel is an index relation on the vacuumed heap.
- *             We don't set locks on the index relation here, since the indexed
- *             access methods support locking at different granularities.
- *             We let them handle it.
+ *
+ *             We don't bother to set locks on the index relation here, since
+ *             the parent table is exclusive-locked already.
  *
  *             Finally, we arrange to update the index relation's statistics in
  *             pg_class.
@@ -2555,8 +2547,8 @@ vac_cmp_vtlinks(const void *left, const void *right)
 }
 
 
-static void
-get_indices(Relation relation, int *nindices, Relation **Irel)
+void
+vac_open_indexes(Relation relation, int *nindexes, Relation **Irel)
 {
        List       *indexoidlist,
                           *indexoidscan;
@@ -2564,10 +2556,10 @@ get_indices(Relation relation, int *nindices, Relation **Irel)
 
        indexoidlist = RelationGetIndexList(relation);
 
-       *nindices = length(indexoidlist);
+       *nindexes = length(indexoidlist);
 
-       if (*nindices > 0)
-               *Irel = (Relation *) palloc(*nindices * sizeof(Relation));
+       if (*nindexes > 0)
+               *Irel = (Relation *) palloc(*nindexes * sizeof(Relation));
        else
                *Irel = NULL;
 
@@ -2584,14 +2576,14 @@ get_indices(Relation relation, int *nindices, Relation **Irel)
 }
 
 
-static void
-close_indices(int nindices, Relation *Irel)
+void
+vac_close_indexes(int nindexes, Relation *Irel)
 {
        if (Irel == (Relation *) NULL)
                return;
 
-       while (nindices--)
-               index_close(Irel[nindices]);
+       while (nindexes--)
+               index_close(Irel[nindexes]);
        pfree(Irel);
 }
 
@@ -2621,22 +2613,20 @@ is_partial_index(Relation indrel)
 static bool
 enough_space(VacPage vacpage, Size len)
 {
-
        len = MAXALIGN(len);
 
        if (len > vacpage->free)
                return false;
 
-       if (vacpage->offsets_used < vacpage->offsets_free)      /* there are free
-                                                                                                                * itemid(s) */
-               return true;                    /* and len <= free_space */
+       /* if there are free itemid(s) and len <= free_space... */
+       if (vacpage->offsets_used < vacpage->offsets_free)
+               return true;
 
-       /* ok. noff_usd >= noff_free and so we'll have to allocate new itemid */
-       if (len + MAXALIGN(sizeof(ItemIdData)) <= vacpage->free)
+       /* noff_used >= noff_free and so we'll have to allocate new itemid */
+       if (len + sizeof(ItemIdData) <= vacpage->free)
                return true;
 
        return false;
-
 }
 
 
diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c
new file mode 100644 (file)
index 0000000..07529fe
--- /dev/null
@@ -0,0 +1,1026 @@
+/*-------------------------------------------------------------------------
+ *
+ * vacuumlazy.c
+ *       Concurrent ("lazy") vacuuming.
+ *
+ *
+ * The major space usage for LAZY VACUUM is storage for the array of dead
+ * tuple TIDs, with the next biggest need being storage for per-disk-page
+ * free space info.  We want to ensure we can vacuum even the very largest
+ * relations with finite memory space usage.  To do that, we set upper bounds
+ * on the number of tuples and pages we will keep track of at once.
+ *
+ * We are willing to use at most SortMem memory space to keep track of
+ * dead tuples.  We initially allocate an array of TIDs of that size.
+ * If the array threatens to overflow, we suspend the heap scan phase
+ * and perform a pass of index cleanup and page compaction, then resume
+ * the heap scan with an empty TID array.
+ *
+ * We can limit the storage for page free space to MaxFSMPages entries,
+ * since that's the most the free space map will be willing to remember
+ * anyway.  If the relation has fewer than that many pages with free space,
+ * life is easy: just build an array of per-page info.  If it has more,
+ * we store the free space info as a heap ordered by amount of free space,
+ * so that we can discard the pages with least free space to ensure we never
+ * have more than MaxFSMPages entries in all.  The surviving page entries
+ * are passed to the free space map at conclusion of the scan.
+ *
+ *
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       $Header: /cvsroot/pgsql/src/backend/commands/vacuumlazy.c,v 1.1 2001/07/13 22:55:59 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/heapam.h"
+#include "access/xlog.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "storage/freespace.h"
+#include "storage/sinval.h"
+#include "storage/smgr.h"
+
+
+/*
+ * Space/time tradeoff parameters: do these need to be user-tunable?
+ *
+ * A page with less than PAGE_SPACE_THRESHOLD free space will be forgotten
+ * immediately, and not even passed to the free space map.  Removing the
+ * uselessly small entries early saves cycles, and in particular reduces
+ * the amount of time we spend holding the FSM spinlock when we finally call
+ * MultiRecordFreeSpace.  Since the FSM will ignore pages below its own
+ * runtime threshold anyway, there's no point in making this really small.
+ * XXX Is it worth trying to measure average tuple size, and using that to
+ * set the threshold?  Problem is we don't know average tuple size very
+ * accurately for the first few pages...
+ *
+ * To consider truncating the relation, we want there to be at least
+ * relsize / REL_TRUNCATE_FRACTION potentially-freeable pages.
+ */
+#define PAGE_SPACE_THRESHOLD   ((Size) (BLCKSZ / 32))
+
+#define REL_TRUNCATE_FRACTION  16
+
+/* MAX_TUPLES_PER_PAGE can be a conservative upper limit */
+#define MAX_TUPLES_PER_PAGE            ((int) (BLCKSZ / sizeof(HeapTupleHeaderData)))
+
+
+typedef struct LVRelStats
+{
+       /* Overall statistics about rel */
+       BlockNumber     rel_pages;
+       double          rel_tuples;
+       BlockNumber     nonempty_pages;         /* actually, last nonempty page + 1 */
+       /* List of TIDs of tuples we intend to delete */
+       /* NB: this list is ordered by TID address */
+       int                     num_dead_tuples;        /* current # of entries */
+       int                     max_dead_tuples;        /* # slots allocated in array */
+       ItemPointer     dead_tuples;            /* array of ItemPointerData */
+       /* Array or heap of per-page info about free space */
+       /* We use a simple array until it fills up, then convert to heap */
+       bool            fs_is_heap;                     /* are we using heap organization? */
+       int                     num_free_pages;         /* current # of entries */
+       int                     max_free_pages;         /* # slots allocated in arrays */
+       BlockNumber *free_pages;                /* array or heap of block numbers */
+       Size       *free_spaceavail;    /* array or heap of available space */
+} LVRelStats;
+
+
+static int     MESSAGE_LEVEL;          /* message level */
+
+static TransactionId XmaxRecent;
+
+
+/* non-export function prototypes */
+static void lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
+                                                  Relation *Irel, int nindexes);
+static void lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats);
+static void lazy_vacuum_index(Relation indrel, LVRelStats *vacrelstats);
+static int     lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
+                                                        int tupindex, LVRelStats *vacrelstats);
+static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats);
+static BlockNumber count_nondeletable_pages(Relation onerel,
+                                                                                       LVRelStats *vacrelstats);
+static void lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks);
+static void lazy_record_dead_tuple(LVRelStats *vacrelstats,
+                                                                  ItemPointer itemptr);
+static void lazy_record_free_space(LVRelStats *vacrelstats,
+                                                                  BlockNumber page, Size avail);
+static bool lazy_tid_reaped(ItemPointer itemptr, LVRelStats *vacrelstats);
+static void lazy_update_fsm(Relation onerel, LVRelStats *vacrelstats);
+static int     vac_cmp_itemptr(const void *left, const void *right);
+
+
+/*
+ *     lazy_vacuum_rel() -- perform LAZY VACUUM for one heap relation
+ *
+ *             This routine vacuums a single heap, cleans out its indexes, and
+ *             updates its num_pages and num_tuples statistics.
+ *
+ *             At entry, we have already established a transaction and opened
+ *             and locked the relation.
+ */
+void
+lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt)
+{
+       LVRelStats *vacrelstats;
+       Relation   *Irel;
+       int                     nindexes;
+       bool            hasindex;
+       BlockNumber     possibly_freeable;
+
+       /* initialize */
+       if (vacstmt->verbose)
+               MESSAGE_LEVEL = NOTICE;
+       else
+               MESSAGE_LEVEL = DEBUG;
+
+       GetXmaxRecent(&XmaxRecent);
+
+       vacrelstats = (LVRelStats *) palloc(sizeof(LVRelStats));
+       MemSet(vacrelstats, 0, sizeof(LVRelStats));
+
+       /* Open all indexes of the relation */
+       vac_open_indexes(onerel, &nindexes, &Irel);
+       hasindex = (nindexes > 0);
+
+       /* Do the vacuuming */
+       lazy_scan_heap(onerel, vacrelstats, Irel, nindexes);
+
+       /* Done with indexes */
+       vac_close_indexes(nindexes, Irel);
+
+       /*
+        * Optionally truncate the relation.
+        *
+        * Don't even think about it unless we have a shot at releasing a
+        * goodly number of pages.  Otherwise, the time taken isn't worth it.
+        */
+       possibly_freeable = vacrelstats->rel_pages - vacrelstats->nonempty_pages;
+       if (possibly_freeable > vacrelstats->rel_pages / REL_TRUNCATE_FRACTION)
+               lazy_truncate_heap(onerel, vacrelstats);
+
+       /* Update shared free space map with final free space info */
+       lazy_update_fsm(onerel, vacrelstats);
+
+       /* Update statistics in pg_class */
+       vac_update_relstats(RelationGetRelid(onerel), vacrelstats->rel_pages,
+                                               vacrelstats->rel_tuples, hasindex);
+}
+
+
+/*
+ *     lazy_scan_heap() -- scan an open heap relation
+ *
+ *             This routine sets commit status bits, builds lists of dead tuples
+ *             and pages with free space, and calculates statistics on the number
+ *             of live tuples in the heap.  When done, or when we run low on space
+ *             for dead-tuple TIDs, invoke vacuuming of indexes and heap.
+ */
+static void
+lazy_scan_heap(Relation onerel, LVRelStats *vacrelstats,
+                          Relation *Irel, int nindexes)
+{
+       BlockNumber nblocks,
+                               blkno;
+       HeapTupleData tuple;
+       char       *relname;
+       BlockNumber     empty_pages,
+                               changed_pages;
+       double          num_tuples,
+                               tups_vacuumed,
+                               nkeep,
+                               nunused;
+       int                     i;
+       VacRUsage       ru0;
+
+       vac_init_rusage(&ru0);
+
+       relname = RelationGetRelationName(onerel);
+       elog(MESSAGE_LEVEL, "--Relation %s--", relname);
+
+       empty_pages = changed_pages = 0;
+       num_tuples = tups_vacuumed = nkeep = nunused = 0;
+
+       nblocks = RelationGetNumberOfBlocks(onerel);
+       vacrelstats->rel_pages = nblocks;
+       vacrelstats->nonempty_pages = 0;
+
+       lazy_space_alloc(vacrelstats, nblocks);
+
+       for (blkno = 0; blkno < nblocks; blkno++)
+       {
+               Buffer          buf;
+               Page            page;
+               OffsetNumber offnum,
+                                       maxoff;
+               bool            pgchanged,
+                                       tupgone,
+                                       hastup;
+               int                     prev_dead_count;
+
+               /*
+                * If we are close to overrunning the available space for dead-tuple
+                * TIDs, pause and do a cycle of vacuuming before we tackle this page.
+                */
+               if ((vacrelstats->max_dead_tuples - vacrelstats->num_dead_tuples) < MAX_TUPLES_PER_PAGE &&
+                       vacrelstats->num_dead_tuples > 0)
+               {
+                       /* Remove index entries */
+                       for (i = 0; i < nindexes; i++)
+                               lazy_vacuum_index(Irel[i], vacrelstats);
+                       /* Remove tuples from heap */
+                       lazy_vacuum_heap(onerel, vacrelstats);
+                       /* Forget the now-vacuumed tuples, and press on */
+                       vacrelstats->num_dead_tuples = 0;
+               }
+
+               buf = ReadBuffer(onerel, blkno);
+
+               /* In this phase we only need shared access to the buffer */
+               LockBuffer(buf, BUFFER_LOCK_SHARE);
+
+               page = BufferGetPage(buf);
+
+               if (PageIsNew(page))
+               {
+                       /* Not sure we still need to handle this case, but... */
+                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                       LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
+                       if (PageIsNew(page))
+                       {
+                               elog(NOTICE, "Rel %s: Uninitialized page %u - fixing",
+                                        relname, blkno);
+                               PageInit(page, BufferGetPageSize(buf), 0);
+                               lazy_record_free_space(vacrelstats, blkno,
+                                                                          PageGetFreeSpace(page));
+                       }
+                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                       WriteBuffer(buf);
+                       continue;
+               }
+
+               if (PageIsEmpty(page))
+               {
+                       empty_pages++;
+                       lazy_record_free_space(vacrelstats, blkno,
+                                                                  PageGetFreeSpace(page));
+                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buf);
+                       continue;
+               }
+
+               pgchanged = false;
+               hastup = false;
+               prev_dead_count = vacrelstats->num_dead_tuples;
+               maxoff = PageGetMaxOffsetNumber(page);
+               for (offnum = FirstOffsetNumber;
+                        offnum <= maxoff;
+                        offnum = OffsetNumberNext(offnum))
+               {
+                       ItemId          itemid;
+                       uint16          sv_infomask;
+
+                       itemid = PageGetItemId(page, offnum);
+
+                       if (!ItemIdIsUsed(itemid))
+                       {
+                               nunused += 1;
+                               continue;
+                       }
+
+                       tuple.t_datamcxt = NULL;
+                       tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+                       tuple.t_len = ItemIdGetLength(itemid);
+                       ItemPointerSet(&(tuple.t_self), blkno, offnum);
+
+                       tupgone = false;
+                       sv_infomask = tuple.t_data->t_infomask;
+
+                       switch (HeapTupleSatisfiesVacuum(tuple.t_data, XmaxRecent))
+                       {
+                               case HEAPTUPLE_DEAD:
+                                       tupgone = true; /* we can delete the tuple */
+                                       break;
+                               case HEAPTUPLE_LIVE:
+                                       break;
+                               case HEAPTUPLE_RECENTLY_DEAD:
+                                       /*
+                                        * If tuple is recently deleted then we must not remove
+                                        * it from relation.
+                                        */
+                                       nkeep += 1;
+                                       break;
+                               case HEAPTUPLE_INSERT_IN_PROGRESS:
+                                       /* This is an expected case during concurrent vacuum */
+                                       break;
+                               case HEAPTUPLE_DELETE_IN_PROGRESS:
+                                       /* This is an expected case during concurrent vacuum */
+                                       break;
+                               default:
+                                       elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
+                                       break;
+                       }
+
+                       /* check for hint-bit update by HeapTupleSatisfiesVacuum */
+                       if (sv_infomask != tuple.t_data->t_infomask)
+                               pgchanged = true;
+
+                       /*
+                        * Other checks...
+                        */
+                       if (!OidIsValid(tuple.t_data->t_oid))
+                               elog(NOTICE, "Rel %s: TID %u/%u: OID IS INVALID. TUPGONE %d.",
+                                        relname, blkno, offnum, (int) tupgone);
+
+                       if (tupgone)
+                       {
+                               lazy_record_dead_tuple(vacrelstats, &(tuple.t_self));
+                               tups_vacuumed += 1;
+                       }
+                       else
+                       {
+                               num_tuples += 1;
+                               hastup = true;
+                       }
+               } /* scan along page */
+
+               /*
+                * If we remembered any tuples for deletion, then the page will
+                * be visited again by lazy_vacuum_heap, which will compute and
+                * record its post-compaction free space.  If not, then we're done
+                * with this page, so remember its free space as-is.
+                */
+               if (vacrelstats->num_dead_tuples == prev_dead_count)
+               {
+                       lazy_record_free_space(vacrelstats, blkno,
+                                                                  PageGetFreeSpace(page));
+               }
+
+               /* Remember the location of the last page with nonremovable tuples */
+               if (hastup)
+                       vacrelstats->nonempty_pages = blkno + 1;
+
+               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+               if (pgchanged)
+               {
+                       WriteBuffer(buf);
+                       changed_pages++;
+               }
+               else
+                       ReleaseBuffer(buf);
+       }
+
+       /* If any tuples need to be deleted, perform final vacuum cycle */
+       /* XXX put a threshold on min nuber of tuples here? */
+       if (vacrelstats->num_dead_tuples > 0)
+       {
+               /* Remove index entries */
+               for (i = 0; i < nindexes; i++)
+                       lazy_vacuum_index(Irel[i], vacrelstats);
+               /* Remove tuples from heap */
+               lazy_vacuum_heap(onerel, vacrelstats);
+       }
+
+       /* save stats for use later */
+       vacrelstats->rel_tuples = num_tuples;
+
+       elog(MESSAGE_LEVEL, "Pages %u: Changed %u, Empty %u; \
+Tup %.0f: Vac %.0f, Keep %.0f, UnUsed %.0f.\n\tTotal %s",
+                nblocks, changed_pages, empty_pages,
+                num_tuples, tups_vacuumed, nkeep, nunused,
+                vac_show_rusage(&ru0));
+}
+
+
+/*
+ *     lazy_vacuum_heap() -- second pass over the heap
+ *
+ *             This routine marks dead tuples as unused and compacts out free
+ *             space on their pages.  Pages not having dead tuples recorded from
+ *             lazy_scan_heap are not visited at all.
+ *
+ * Note: the reason for doing this as a second pass is we cannot remove
+ * the tuples until we've removed their index entries, and we want to
+ * process index entry removal in batches as large as possible.
+ */
+static void
+lazy_vacuum_heap(Relation onerel, LVRelStats *vacrelstats)
+{
+       int                     tupindex;
+       int                     npages;
+       VacRUsage       ru0;
+
+       vac_init_rusage(&ru0);
+       npages = 0;
+
+       tupindex = 0;
+       while (tupindex < vacrelstats->num_dead_tuples)
+       {
+               BlockNumber             tblk;
+               Buffer          buf;
+               Page            page;
+
+               tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+               buf = ReadBuffer(onerel, tblk);
+               LockBufferForCleanup(buf);
+               tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats);
+               /* Now that we've compacted the page, record its available space */
+               page = BufferGetPage(buf);
+               lazy_record_free_space(vacrelstats, tblk,
+                                                          PageGetFreeSpace(page));
+               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+               WriteBuffer(buf);
+               npages++;
+       }
+
+       elog(MESSAGE_LEVEL, "Removed %d tuples in %d pages.\n\t%s",
+                tupindex, npages,
+                vac_show_rusage(&ru0));
+}
+
+/*
+ *     lazy_vacuum_page() -- free dead tuples on a page
+ *                                      and repair its fragmentation.
+ *
+ * Caller is expected to handle reading, locking, and writing the buffer.
+ *
+ * tupindex is the index in vacrelstats->dead_tuples of the first dead
+ * tuple for this page.  We assume the rest follow sequentially.
+ * The return value is the first tupindex after the tuples of this page.
+ */
+static int
+lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer,
+                                int tupindex, LVRelStats *vacrelstats)
+{
+       OffsetNumber unbuf[BLCKSZ/sizeof(OffsetNumber)];
+       OffsetNumber *unused = unbuf;
+       int                     uncnt;
+       Page            page = BufferGetPage(buffer);
+       ItemId          itemid;
+
+       START_CRIT_SECTION();
+       for (; tupindex < vacrelstats->num_dead_tuples; tupindex++)
+       {
+               BlockNumber             tblk;
+               OffsetNumber    toff;
+
+               tblk = ItemPointerGetBlockNumber(&vacrelstats->dead_tuples[tupindex]);
+               if (tblk != blkno)
+                       break;                          /* past end of tuples for this block */
+               toff = ItemPointerGetOffsetNumber(&vacrelstats->dead_tuples[tupindex]);
+               itemid = PageGetItemId(page, toff);
+               itemid->lp_flags &= ~LP_USED;
+       }
+
+       uncnt = PageRepairFragmentation(page, unused);
+
+       {
+               XLogRecPtr      recptr;
+
+               recptr = log_heap_clean(onerel, buffer, (char *) unused,
+                                                 (char *) (&(unused[uncnt])) - (char *) unused);
+               PageSetLSN(page, recptr);
+               PageSetSUI(page, ThisStartUpID);
+       }
+       END_CRIT_SECTION();
+
+       return tupindex;
+}
+
+/*
+ *     lazy_vacuum_index() -- vacuum one index relation.
+ *
+ *             Delete all the index entries pointing to tuples listed in
+ *             vacrelstats->dead_tuples.
+ *
+ *             Finally, we arrange to update the index relation's statistics in
+ *             pg_class.
+ */
+static void
+lazy_vacuum_index(Relation indrel, LVRelStats *vacrelstats)
+{
+       RetrieveIndexResult res;
+       IndexScanDesc iscan;
+       int                     tups_vacuumed;
+       BlockNumber     num_pages;
+       double          num_index_tuples;
+       VacRUsage       ru0;
+
+       vac_init_rusage(&ru0);
+
+       /*
+        * Only btree and hash indexes are currently safe for concurrent access;
+        * see notes in ExecOpenIndices().  XXX should rely on index AM for this
+        */
+       if (indrel->rd_rel->relam != BTREE_AM_OID &&
+               indrel->rd_rel->relam != HASH_AM_OID)
+               LockRelation(indrel, AccessExclusiveLock);
+
+       /* XXX should use a bulk-delete call here */
+
+       /* walk through the entire index */
+       iscan = index_beginscan(indrel, false, 0, (ScanKey) NULL);
+       tups_vacuumed = 0;
+       num_index_tuples = 0;
+
+       while ((res = index_getnext(iscan, ForwardScanDirection))
+                  != (RetrieveIndexResult) NULL)
+       {
+               ItemPointer heapptr = &res->heap_iptr;
+
+               if (lazy_tid_reaped(heapptr, vacrelstats))
+               {
+                       index_delete(indrel, &res->index_iptr);
+                       ++tups_vacuumed;
+               }
+               else
+                       num_index_tuples += 1;
+
+               pfree(res);
+       }
+
+       index_endscan(iscan);
+
+       /* now update statistics in pg_class */
+       num_pages = RelationGetNumberOfBlocks(indrel);
+       vac_update_relstats(RelationGetRelid(indrel),
+                                               num_pages, num_index_tuples, false);
+
+       /*
+        * Release lock acquired above.
+        */
+       if (indrel->rd_rel->relam != BTREE_AM_OID &&
+               indrel->rd_rel->relam != HASH_AM_OID)
+               UnlockRelation(indrel, AccessExclusiveLock);
+
+       elog(MESSAGE_LEVEL, "Index %s: Pages %u; Tuples %.0f: Deleted %u.\n\t%s",
+                RelationGetRelationName(indrel), num_pages,
+                num_index_tuples, tups_vacuumed,
+                vac_show_rusage(&ru0));
+}
+
+/*
+ * lazy_truncate_heap - try to truncate off any empty pages at the end
+ */
+static void
+lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats)
+{
+       BlockNumber     old_rel_pages = vacrelstats->rel_pages;
+       BlockNumber     new_rel_pages;
+       BlockNumber *pages;
+       Size       *spaceavail;
+       int                     n;
+       int                     i,
+                               j;
+       VacRUsage       ru0;
+
+       vac_init_rusage(&ru0);
+
+       /*
+        * We need full exclusive lock on the relation in order to do truncation.
+        * If we can't get it, give up rather than waiting --- we don't want
+        * to block other backends, and we don't want to deadlock (which is
+        * quite possible considering we already hold a lower-grade lock).
+        */
+       if (! ConditionalLockRelation(onerel, AccessExclusiveLock))
+               return;
+
+       /*
+        * Now that we have exclusive lock, look to see if the rel has grown
+        * whilst we were vacuuming with non-exclusive lock.  If so, give up;
+        * the newly added pages presumably contain non-deletable tuples.
+        */
+       new_rel_pages = RelationGetNumberOfBlocks(onerel);
+       if (new_rel_pages != old_rel_pages)
+       {
+               /* might as well use the latest news when we update pg_class stats */
+               vacrelstats->rel_pages = new_rel_pages;
+               UnlockRelation(onerel, AccessExclusiveLock);
+               return;
+       }
+
+       /*
+        * Scan backwards from the end to verify that the end pages actually
+        * contain nothing we need to keep.  This is *necessary*, not optional,
+        * because other backends could have added tuples to these pages whilst
+        * we were vacuuming.
+        */
+       new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);
+
+       if (new_rel_pages >= old_rel_pages)
+       {
+               /* can't do anything after all */
+               UnlockRelation(onerel, AccessExclusiveLock);
+               return;
+       }
+
+       /*
+        * Okay to truncate.
+        *
+        * First, flush any shared buffers for the blocks we intend to delete.
+        * FlushRelationBuffers is a bit more than we need for this, since it
+        * will also write out dirty buffers for blocks we aren't deleting,
+        * but it's the closest thing in bufmgr's API.
+        */
+       i = FlushRelationBuffers(onerel, new_rel_pages);
+       if (i < 0)
+               elog(ERROR, "VACUUM (lazy_truncate_heap): FlushRelationBuffers returned %d",
+                        i);
+
+       /*
+        * Do the physical truncation.
+        */
+       new_rel_pages = smgrtruncate(DEFAULT_SMGR, onerel, new_rel_pages);
+       onerel->rd_nblocks = new_rel_pages;     /* update relcache immediately */
+       onerel->rd_targblock = InvalidBlockNumber;
+       vacrelstats->rel_pages = new_rel_pages; /* save new number of blocks */
+
+       /*
+        * Drop free-space info for removed blocks; these must not get entered
+        * into the FSM!
+        */
+       pages = vacrelstats->free_pages;
+       spaceavail = vacrelstats->free_spaceavail;
+       n = vacrelstats->num_free_pages;
+       j = 0;
+       for (i = 0; i < n; i++)
+       {
+               if (pages[i] < new_rel_pages)
+               {
+                       pages[j] = pages[i];
+                       spaceavail[j] = spaceavail[i];
+                       j++;
+               }
+       }
+       vacrelstats->num_free_pages = j;
+
+       /*
+        * We keep the exclusive lock until commit (perhaps not necessary)?
+        */
+
+       elog(MESSAGE_LEVEL, "Truncated %u --> %u pages.\n\t%s",
+                old_rel_pages, new_rel_pages,
+                vac_show_rusage(&ru0));
+}
+
+/*
+ * Rescan end pages to verify that they are (still) empty of needed tuples.
+ *
+ * Returns number of nondeletable pages (last nonempty page + 1).
+ */
+static BlockNumber
+count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
+{
+       BlockNumber blkno;
+       HeapTupleData tuple;
+
+       /* Strange coding of loop control is needed because blkno is unsigned */
+       blkno = vacrelstats->rel_pages;
+       while (blkno > vacrelstats->nonempty_pages)
+       {
+               Buffer          buf;
+               Page            page;
+               OffsetNumber offnum,
+                                       maxoff;
+               bool            pgchanged,
+                                       tupgone,
+                                       hastup;
+
+               blkno--;
+
+               buf = ReadBuffer(onerel, blkno);
+
+               /* In this phase we only need shared access to the buffer */
+               LockBuffer(buf, BUFFER_LOCK_SHARE);
+
+               page = BufferGetPage(buf);
+
+               if (PageIsNew(page) || PageIsEmpty(page))
+               {
+                       /* PageIsNew robably shouldn't happen... */
+                       LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+                       ReleaseBuffer(buf);
+                       continue;
+               }
+
+               pgchanged = false;
+               hastup = false;
+               maxoff = PageGetMaxOffsetNumber(page);
+               for (offnum = FirstOffsetNumber;
+                        offnum <= maxoff;
+                        offnum = OffsetNumberNext(offnum))
+               {
+                       ItemId          itemid;
+                       uint16          sv_infomask;
+
+                       itemid = PageGetItemId(page, offnum);
+
+                       if (!ItemIdIsUsed(itemid))
+                               continue;
+
+                       tuple.t_datamcxt = NULL;
+                       tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid);
+                       tuple.t_len = ItemIdGetLength(itemid);
+                       ItemPointerSet(&(tuple.t_self), blkno, offnum);
+
+                       tupgone = false;
+                       sv_infomask = tuple.t_data->t_infomask;
+
+                       switch (HeapTupleSatisfiesVacuum(tuple.t_data, XmaxRecent))
+                       {
+                               case HEAPTUPLE_DEAD:
+                                       tupgone = true; /* we can delete the tuple */
+                                       break;
+                               case HEAPTUPLE_LIVE:
+                                       break;
+                               case HEAPTUPLE_RECENTLY_DEAD:
+                                       /*
+                                        * If tuple is recently deleted then we must not remove
+                                        * it from relation.
+                                        */
+                                       break;
+                               case HEAPTUPLE_INSERT_IN_PROGRESS:
+                                       /* This is an expected case during concurrent vacuum */
+                                       break;
+                               case HEAPTUPLE_DELETE_IN_PROGRESS:
+                                       /* This is an expected case during concurrent vacuum */
+                                       break;
+                               default:
+                                       elog(ERROR, "Unexpected HeapTupleSatisfiesVacuum result");
+                                       break;
+                       }
+
+                       /* check for hint-bit update by HeapTupleSatisfiesVacuum */
+                       if (sv_infomask != tuple.t_data->t_infomask)
+                               pgchanged = true;
+
+                       if (!tupgone)
+                       {
+                               hastup = true;
+                               break;                  /* can stop scanning */
+                       }
+               } /* scan along page */
+
+               LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+               if (pgchanged)
+                       WriteBuffer(buf);
+               else
+                       ReleaseBuffer(buf);
+
+               /* Done scanning if we found a tuple here */
+               if (hastup)
+                       return blkno + 1;
+       }
+
+       /*
+        * If we fall out of the loop, all the previously-thought-to-be-empty
+        * pages really are; we need not bother to look at the last known-nonempty
+        * page.
+        */
+       return vacrelstats->nonempty_pages;
+}
+
+/*
+ * lazy_space_alloc - space allocation decisions for lazy vacuum
+ *
+ * See the comments at the head of this file for rationale.
+ *
+ * XXX Should we have our own GUC parameter, instead of using SortMem?
+ */
+static void
+lazy_space_alloc(LVRelStats *vacrelstats, BlockNumber relblocks)
+{
+       int                     maxtuples;
+       int                     maxpages;
+
+       maxtuples = (int) ((SortMem * 1024L) / sizeof(ItemPointerData));
+       /* stay sane if small SortMem */
+       if (maxtuples < MAX_TUPLES_PER_PAGE)
+               maxtuples = MAX_TUPLES_PER_PAGE;
+
+       vacrelstats->num_dead_tuples = 0;
+       vacrelstats->max_dead_tuples = maxtuples;
+       vacrelstats->dead_tuples = (ItemPointer)
+               palloc(maxtuples * sizeof(ItemPointerData));
+
+       maxpages = MaxFSMPages;
+       /* No need to allocate more pages than the relation has blocks */
+       if (relblocks < (BlockNumber) maxpages)
+               maxpages = (int) relblocks;
+       /* avoid palloc(0) */
+       if (maxpages < 1)
+               maxpages = 1;
+
+       vacrelstats->fs_is_heap = false;
+       vacrelstats->num_free_pages = 0;
+       vacrelstats->max_free_pages = maxpages;
+       vacrelstats->free_pages = (BlockNumber *)
+               palloc(maxpages * sizeof(BlockNumber));
+       vacrelstats->free_spaceavail = (Size *)
+               palloc(maxpages * sizeof(Size));
+}
+
+/*
+ * lazy_record_dead_tuple - remember one deletable tuple
+ */
+static void
+lazy_record_dead_tuple(LVRelStats *vacrelstats,
+                                          ItemPointer itemptr)
+{
+       /*
+        * The array shouldn't overflow under normal behavior,
+        * but perhaps it could if we are given a really small SortMem.
+        * In that case, just forget the last few tuples.
+        */
+       if (vacrelstats->num_dead_tuples < vacrelstats->max_dead_tuples)
+       {
+               vacrelstats->dead_tuples[vacrelstats->num_dead_tuples] = *itemptr;
+               vacrelstats->num_dead_tuples++;
+       }
+}
+
+/*
+ * lazy_record_free_space - remember free space on one page
+ */
+static void
+lazy_record_free_space(LVRelStats *vacrelstats,
+                                          BlockNumber page,
+                                          Size avail)
+{
+       BlockNumber *pages;
+       Size       *spaceavail;
+       int                     n;
+
+       /* Ignore pages with little free space */
+       if (avail < PAGE_SPACE_THRESHOLD)
+               return;
+
+       /* Copy pointers to local variables for notational simplicity */
+       pages = vacrelstats->free_pages;
+       spaceavail = vacrelstats->free_spaceavail;
+       n = vacrelstats->max_free_pages;
+
+       /* If we haven't filled the array yet, just keep adding entries */
+       if (vacrelstats->num_free_pages < n)
+       {
+               pages[vacrelstats->num_free_pages] = page;
+               spaceavail[vacrelstats->num_free_pages] = avail;
+               vacrelstats->num_free_pages++;
+               return;
+       }
+
+       /*----------
+        * The rest of this routine works with "heap" organization of the
+        * free space arrays, wherein we maintain the heap property
+        *                      spaceavail[(j-1) div 2] <= spaceavail[j]  for 0 < j < n.
+        * In particular, the zero'th element always has the smallest available
+        * space and can be discarded to make room for a new page with more space.
+        * See Knuth's discussion of heap-based priority queues, sec 5.2.3;
+        * but note he uses 1-origin array subscripts, not 0-origin.
+        *----------
+        */
+
+       /* If we haven't yet converted the array to heap organization, do it */
+       if (! vacrelstats->fs_is_heap)
+       {
+               /*
+                * Scan backwards through the array, "sift-up" each value into its
+                * correct position.  We can start the scan at n/2-1 since each entry
+                * above that position has no children to worry about.
+                */
+               int             l = n / 2;
+
+               while (--l >= 0)
+               {
+                       BlockNumber     R = pages[l];
+                       Size            K = spaceavail[l];
+                       int                     i;              /* i is where the "hole" is */
+
+                       i = l;
+                       for (;;)
+                       {
+                               int             j = 2*i + 1;
+
+                               if (j >= n)
+                                       break;
+                               if (j+1 < n && spaceavail[j] > spaceavail[j+1])
+                                       j++;
+                               if (K <= spaceavail[j])
+                                       break;
+                               pages[i] = pages[j];
+                               spaceavail[i] = spaceavail[j];
+                               i = j;
+                       }
+                       pages[i] = R;
+                       spaceavail[i] = K;
+               }
+
+               vacrelstats->fs_is_heap = true;
+       }
+
+       /* If new page has more than zero'th entry, insert it into heap */
+       if (avail > spaceavail[0])
+       {
+               /*
+                * Notionally, we replace the zero'th entry with the new data,
+                * and then sift-up to maintain the heap property.  Physically,
+                * the new data doesn't get stored into the arrays until we find
+                * the right location for it.
+                */
+               int             i = 0;                  /* i is where the "hole" is */
+
+               for (;;)
+               {
+                       int             j = 2*i + 1;
+
+                       if (j >= n)
+                               break;
+                       if (j+1 < n && spaceavail[j] > spaceavail[j+1])
+                               j++;
+                       if (avail <= spaceavail[j])
+                               break;
+                       pages[i] = pages[j];
+                       spaceavail[i] = spaceavail[j];
+                       i = j;
+               }
+               pages[i] = page;
+               spaceavail[i] = avail;
+       }
+}
+
+/*
+ *     lazy_tid_reaped() -- is a particular tid deletable?
+ *
+ *             Assumes dead_tuples array is in sorted order.
+ */
+static bool
+lazy_tid_reaped(ItemPointer itemptr, LVRelStats *vacrelstats)
+{
+       ItemPointer     res;
+
+       res = (ItemPointer) bsearch((void *) itemptr,
+                                                               (void *) vacrelstats->dead_tuples,
+                                                               vacrelstats->num_dead_tuples,
+                                                               sizeof(ItemPointerData),
+                                                               vac_cmp_itemptr);
+
+       return (res != NULL);
+}
+
+/*
+ * Update the shared Free Space Map with the info we now have about
+ * free space in the relation, discarding any old info the map may have.
+ */
+static void
+lazy_update_fsm(Relation onerel, LVRelStats *vacrelstats)
+{
+       /*
+        * Since MultiRecordFreeSpace doesn't currently impose any restrictions
+        * on the ordering of the input, we can just pass it the arrays as-is,
+        * whether they are in heap or linear order.
+        */
+       MultiRecordFreeSpace(&onerel->rd_node,
+                                                0, MaxBlockNumber,
+                                                vacrelstats->num_free_pages,
+                                                vacrelstats->free_pages,
+                                                vacrelstats->free_spaceavail);
+}
+
+/*
+ * Comparator routines for use with qsort() and bsearch().
+ */
+static int
+vac_cmp_itemptr(const void *left, const void *right)
+{
+       BlockNumber lblk,
+                               rblk;
+       OffsetNumber loff,
+                               roff;
+
+       lblk = ItemPointerGetBlockNumber((ItemPointer) left);
+       rblk = ItemPointerGetBlockNumber((ItemPointer) right);
+
+       if (lblk < rblk)
+               return -1;
+       if (lblk > rblk)
+               return 1;
+
+       loff = ItemPointerGetOffsetNumber((ItemPointer) left);
+       roff = ItemPointerGetOffsetNumber((ItemPointer) right);
+
+       if (loff < roff)
+               return -1;
+       if (loff > roff)
+               return 1;
+
+       return 0;
+}
index ff02055..9fd6513 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: vacuum.h,v 1.37 2001/07/12 04:11:13 tgl Exp $
+ * $Id: vacuum.h,v 1.38 2001/07/13 22:55:59 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -24,7 +24,7 @@
 #endif
 
 #include "nodes/parsenodes.h"
-#include "storage/block.h"
+#include "utils/rel.h"
 
 
 /* State structure for vac_init_rusage/vac_show_rusage */
@@ -37,6 +37,9 @@ typedef struct VacRUsage
 
 /* in commands/vacuum.c */
 extern void vacuum(VacuumStmt *vacstmt);
+extern void vac_open_indexes(Relation relation, int *nindexes,
+                                                        Relation **Irel);
+extern void vac_close_indexes(int nindexes, Relation *Irel);
 extern void vac_update_relstats(Oid relid,
                                                                BlockNumber num_pages,
                                                                double num_tuples,
@@ -44,6 +47,9 @@ extern void vac_update_relstats(Oid relid,
 extern void vac_init_rusage(VacRUsage *ru0);
 extern const char *vac_show_rusage(VacRUsage *ru0);
 
+/* in commands/vacuumlazy.c */
+extern void lazy_vacuum_rel(Relation onerel, VacuumStmt *vacstmt);
+
 /* in commands/analyze.c */
 extern void analyze_rel(Oid relid, VacuumStmt *vacstmt);