OSDN Git Service

Change version to 1.5.0
[pgdbmsstats/pg_dbms_stats.git] / pg_dbms_stats.c
index ec54306..8a31969 100644 (file)
@@ -1,33 +1,43 @@
 /*
  * pg_dbms_stats.c
  *
- * Copyright (c) 2009-2014, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
+ * Copyright (c) 2009-2018, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
  * Portions Copyright (c) 1996-2013, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  */
 #include "postgres.h"
 
+#include "access/sysattr.h"
 #include "access/transam.h"
+#include "access/relation.h"
+#include "catalog/pg_index.h"
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_type.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_authid.h"
 #include "commands/trigger.h"
+#include "common/hashfn.h"
 #include "executor/spi.h"
 #include "funcapi.h"
 #include "optimizer/plancat.h"
+#include "optimizer/planner.h"
+#include "parser/parse_oper.h"
+#include "parser/parsetree.h"
 #include "storage/bufmgr.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
+#include "utils/elog.h"
+#include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/selfuncs.h"
 #include "utils/syscache.h"
-#if PG_VERSION_NUM >= 90200
+#include "miscadmin.h"
 #include "utils/rel.h"
-#endif
-#if PG_VERSION_NUM >= 90300
 #include "access/htup_details.h"
 #include "utils/catcache.h"
-#endif
+#include <math.h>
 
 #include "pg_dbms_stats.h"
 
@@ -39,19 +49,29 @@ PG_MODULE_MAGIC;
 
 #define MAX_REL_CACHE          50              /* expected max # of rel stats entries */
 
+/*
+ * acl_ok of the returning VariableStatData must be set if set_acl_okk is
+ * true. The code is compiled only if the compile target PG version is match
+ * the above conditions. Conversely, acl_ok is added to the end of
+ * VariableStatData so we can safely omit setting it when the PG version
+ * pg_dbms_stats is loaded onto is out of the conditions.
+ */
+static bool set_acl_ok = false;
+
+#define get_attrs(pgatt_attrs) (&(pgatt_attrs))
+
 /* Relation statistics cache entry */
 typedef struct StatsRelationEntry
 {
        Oid                                     relid;          /* hash key must be at the head */
-
        bool                            valid;          /* T if the entry has valid stats */
-
+       bool                            invalidated; /* T if this relation has been
+                                                                         * invalidated */
        BlockNumber                     relpages;       /* # of pages as of last ANALYZE */
        double                          reltuples;      /* # of tuples as of last ANALYZE */
        BlockNumber                     relallvisible;  /* # of all-visible pages as of last
                                                                                 * ANALYZE */
        BlockNumber                     curpages;       /* # of pages as of lock/restore */
-
        List                       *col_stats;  /* list of StatsColumnEntry, each element
                                                                           of which is pg_statistic record of this
                                                                           relation. */
@@ -65,6 +85,7 @@ typedef struct StatsColumnEntry
 {
   bool         negative;
   int32                attnum;
+  bool         inh;
   HeapTuple tuple;
 } StatsColumnEntry;
 
@@ -73,21 +94,24 @@ get_relation_info_hook_type         prev_get_relation_info = NULL;
 get_attavgwidth_hook_type              prev_get_attavgwidth = NULL;
 get_relation_stats_hook_type   prev_get_relation_stats = NULL;
 get_index_stats_hook_type              prev_get_index_stats = NULL;
+planner_hook_type                              prev_planner_hook = NULL;
+
+/* namings */
+#define NSPNAME "dbms_stats"
+#define RELSTAT_TBLNAME "relation_stats_locked"
+#define COLSTAT_TBLNAME "column_stats_locked"
 
 /* rows_query(oid) RETURNS int4, float4, int4 */
 static const char  *rows_query =
-       "SELECT relpages, reltuples, curpages"
-#if PG_VERSION_NUM >= 90200
-       ", relallvisible"
-#endif
-       "  FROM dbms_stats.relation_stats_locked"
+       "SELECT relpages, reltuples, curpages, relallvisible"
+       "  FROM " NSPNAME "." RELSTAT_TBLNAME
        " WHERE relid = $1";
 static SPIPlanPtr      rows_plan = NULL;
 
 /* tuple_query(oid, int2, bool) RETURNS pg_statistic */
 static const char  *tuple_query =
        "SELECT * "
-       "  FROM dbms_stats.column_stats_locked "
+       "  FROM " NSPNAME "." COLSTAT_TBLNAME
        " WHERE starelid = $1 "
        "   AND staattnum = $2 "
        "   AND stainherit = $3";
@@ -101,8 +125,16 @@ static int                 nested_level = 0;
 
 /*
  * The relation_stats_effective statistic cache is stored in hash table.
+ * rel_invalidated is set true if the hash has invalidated entries.
  */
 static HTAB       *rel_stats;
+static bool            rel_invalidated = false;
+
+/*
+ * The owner of pg_dbms_stats statistic tables.
+ */
+static Oid     stats_table_owner = InvalidOid;
+static char *stats_table_owner_name = "";
 
 #define get_pg_statistic(tuple)        ((Form_pg_statistic) GETSTRUCT(tuple))
 
@@ -111,12 +143,18 @@ PG_FUNCTION_INFO_V1(dbms_stats_invalidate_relation_cache);
 PG_FUNCTION_INFO_V1(dbms_stats_invalidate_column_cache);
 PG_FUNCTION_INFO_V1(dbms_stats_is_system_schema);
 PG_FUNCTION_INFO_V1(dbms_stats_is_system_catalog);
+PG_FUNCTION_INFO_V1(dbms_stats_anyary_anyary);
+PG_FUNCTION_INFO_V1(dbms_stats_type_is_analyzable);
+PG_FUNCTION_INFO_V1(dbms_stats_anyarray_basetype);
 
 extern Datum dbms_stats_merge(PG_FUNCTION_ARGS);
 extern Datum dbms_stats_invalidate_relation_cache(PG_FUNCTION_ARGS);
 extern Datum dbms_stats_invalidate_column_cache(PG_FUNCTION_ARGS);
 extern Datum dbms_stats_is_system_schema(PG_FUNCTION_ARGS);
 extern Datum dbms_stats_is_system_catalog(PG_FUNCTION_ARGS);
+extern Datum dbms_stats_anyary_anyary(PG_FUNCTION_ARGS);
+extern Datum dbms_stats_type_is_analyzable(PG_FUNCTION_ARGS);
+extern Datum dbms_stats_anyarray_basetype(PG_FUNCTION_ARGS);
 
 static HeapTuple dbms_stats_merge_internal(HeapTuple lhs, HeapTuple rhs,
        TupleDesc tupledesc);
@@ -128,6 +166,7 @@ static void dbms_stats_invalidate_cache_internal(Oid relid, bool sta_col);
 void   _PG_init(void);
 void   _PG_fini(void);
 
+/* hook functions */
 static void dbms_stats_get_relation_info(PlannerInfo *root, Oid relid,
        bool inhparent, RelOptInfo *rel);
 static int32 dbms_stats_get_attavgwidth(Oid relid, AttrNumber attnum);
@@ -135,7 +174,11 @@ static bool dbms_stats_get_relation_stats(PlannerInfo *root, RangeTblEntry *rte,
        AttrNumber attnum, VariableStatData *vardata);
 static bool dbms_stats_get_index_stats(PlannerInfo *root, Oid indexOid,
        AttrNumber indexattnum, VariableStatData *vardata);
+static PlannedStmt *dbms_stats_planner(Query *parse, const char *query_string,
+                                                                          int cursorOptions,
+                                                                          ParamListInfo boundParams);
 
+/* internal functions */
 static void get_merged_relation_stats(Oid relid, BlockNumber *pages,
        double *tuples, double *allvisfrac, bool estimate);
 static int32 get_merged_avgwidth(Oid relid, AttrNumber attnum);
@@ -143,12 +186,15 @@ static HeapTuple get_merged_column_stats(Oid relid, AttrNumber attnum,
        bool inh);
 static HeapTuple column_cache_search(Oid relid, AttrNumber attnum,
                                                                         bool inh, bool*negative);
-static HeapTuple column_cache_enter(Oid relid, int32 attnum, HeapTuple tuple);
+static HeapTuple column_cache_enter(Oid relid, int32 attnum, bool inh,
+                                                                       HeapTuple tuple);
 static bool execute_plan(SPIPlanPtr *plan, const char *query, Oid relid,
        const AttrNumber *attnum, bool inh);
-static void StatsCacheRelCallback(Datum arg, Oid relid);
+static void statscache_rel_callback(Datum arg, Oid relid);
+static void cleanup_invalidated_cache(void);
 static void init_rel_stats(void);
 static void init_rel_stats_entry(StatsRelationEntry *entry, Oid relid);
+
 /* copied from PG core source tree */
 static void dbms_stats_estimate_rel_size(Relation rel, int32 *attr_widths,
                                  BlockNumber *pages, double *tuples, double *allvisfrac,
@@ -182,6 +228,23 @@ _PG_init(void)
        }
 #endif
 
+       {
+               /*
+                * Check the PG version this module loaded onto. This aid is required
+                * for binary backward compatibility within a major PG version.
+                */
+               int major_version = PG_VERSION_NUM / 100;
+               int minor_version = PG_VERSION_NUM % 100;
+
+               if (major_version >= 1000 ||
+                       (major_version == 906 && minor_version >= 3) ||
+                       (major_version == 905 && minor_version >= 7) ||
+                       (major_version == 904 && minor_version >= 12) ||
+                       (major_version == 903 && minor_version >= 17) ||
+                       (major_version == 902 && minor_version >= 21))
+                       set_acl_ok = true;
+       }
+
        /* Define custom GUC variables. */
        DefineCustomBoolVariable("pg_dbms_stats.use_locked_stats",
                                                         "Enable user defined statistics.",
@@ -205,12 +268,14 @@ _PG_init(void)
        get_relation_stats_hook = dbms_stats_get_relation_stats;
        prev_get_index_stats = get_index_stats_hook;
        get_index_stats_hook = dbms_stats_get_index_stats;
+       prev_planner_hook = planner_hook;
+       planner_hook = dbms_stats_planner;
 
        /* Initialize hash table for statistics caching. */
        init_rel_stats();
 
        /* Also set up a callback for relcache SI invalidations */
-       CacheRegisterRelcacheCallback(StatsCacheRelCallback, (Datum) 0);
+       CacheRegisterRelcacheCallback(statscache_rel_callback, (Datum) 0);
 }
 
 /*
@@ -224,11 +289,109 @@ _PG_fini(void)
        get_attavgwidth_hook = prev_get_attavgwidth;
        get_relation_stats_hook = prev_get_relation_stats;
        get_index_stats_hook = prev_get_index_stats;
+       planner_hook = prev_planner_hook;
 
        /* A function to unregister callback for relcache is NOT provided. */
 }
 
 /*
+ * Function to convert from any array from dbms_stats.anyarray.
+ */
+Datum
+dbms_stats_anyary_anyary(PG_FUNCTION_ARGS)
+{
+  ArrayType *arr = PG_GETARG_ARRAYTYPE_P(0);
+  if (ARR_NDIM(arr) != 1)
+         elog(ERROR, "array must be one-dimentional.");
+
+  PG_RETURN_ARRAYTYPE_P(arr);
+}
+
+/*
+ * Function to check if the type can have statistics.
+ */
+Datum
+dbms_stats_type_is_analyzable(PG_FUNCTION_ARGS)
+{
+       Oid typid = PG_GETARG_OID(0);
+       Oid     eqopr;
+
+       if (!OidIsValid(typid))
+               PG_RETURN_BOOL(false);
+
+       get_sort_group_operators(typid, false, false, false,
+                                                        NULL, &eqopr, NULL,
+                                                        NULL);
+       PG_RETURN_BOOL(OidIsValid(eqopr));
+}
+
+/*
+ * Function to get base type of the value of the type dbms_stats.anyarray.
+ */
+Datum
+dbms_stats_anyarray_basetype(PG_FUNCTION_ARGS)
+{
+       ArrayType  *arr = PG_GETARG_ARRAYTYPE_P(0);
+       Oid                     elemtype = arr->elemtype;
+       HeapTuple       tp;
+       Form_pg_type typtup;
+       Name            result;
+
+       if (!OidIsValid(elemtype))
+               elog(ERROR, "invalid base type oid: %u", elemtype);
+
+       tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(elemtype));
+       if (!HeapTupleIsValid(tp))  /* I trust you. */
+               elog(ERROR, "invalid base type oid: %u", elemtype);
+
+       typtup = (Form_pg_type) GETSTRUCT(tp);
+       result = (Name) palloc0(NAMEDATALEN);
+       StrNCpy(NameStr(*result), NameStr(typtup->typname), NAMEDATALEN);
+
+       ReleaseSysCache(tp);
+       PG_RETURN_NAME(result);
+}
+
+/*
+ * Find and store the owner of the dummy statistics table.
+ *
+ * We will access statistics tables using this owner
+ */
+static Oid
+get_stats_table_owner(void)
+{
+       HeapTuple tp;
+
+       if (!OidIsValid(stats_table_owner))
+       {
+               tp = SearchSysCache2(RELNAMENSP,
+                                        PointerGetDatum(RELSTAT_TBLNAME),
+                                        ObjectIdGetDatum(get_namespace_oid(NSPNAME, false)));
+               if (!HeapTupleIsValid(tp))
+                       elog(ERROR, "table \"%s.%s\" not found in pg_class",
+                                NSPNAME, RELSTAT_TBLNAME);
+               stats_table_owner =     ((Form_pg_class) GETSTRUCT(tp))->relowner;
+               if (!OidIsValid(stats_table_owner))
+                       elog(ERROR, "owner uid of table \"%s.%s\" is invalid",
+                                NSPNAME, RELSTAT_TBLNAME);
+               ReleaseSysCache(tp);
+
+               tp = SearchSysCache1(AUTHOID, ObjectIdGetDatum(stats_table_owner));
+               if (!HeapTupleIsValid(tp))
+               {
+                       elog(ERROR,
+                                "role id %u for the owner of the relation \"%s.%s\"is invalid",
+                                stats_table_owner, NSPNAME, RELSTAT_TBLNAME);
+               }
+               /* This will be done once for the session, so not pstrdup. */
+               stats_table_owner_name =
+                       strdup(NameStr(((Form_pg_authid) GETSTRUCT(tp))->rolname));
+               ReleaseSysCache(tp);
+       }
+       return stats_table_owner;
+}
+
+/*
  * Store heap tuple header into given heap tuple.
  */
 static void
@@ -341,7 +504,8 @@ dbms_stats_merge_internal(HeapTuple lhs, HeapTuple rhs, TupleDesc tupdesc)
                                                (errmsg("pg_dbms_stats: bad statistics"),
                                                 errdetail("column \"%s\" should not be null",
                                                        get_attname(StatisticRelationId,
-                                                                               tupdesc->attrs[i]->attnum))));
+                                                                               get_attrs(tupdesc->attrs[i])->attnum,
+                                                                               true))));
                                        return NULL;    /* should not be null */
                                }
                        }
@@ -368,7 +532,8 @@ dbms_stats_merge_internal(HeapTuple lhs, HeapTuple rhs, TupleDesc tupdesc)
                                                        (errmsg("pg_dbms_stats: bad statistics"),
                                                         errdetail("column \"%s\" should not be null",
                                                                get_attname(StatisticRelationId,
-                                                                                       tupdesc->attrs[i]->attnum))));
+                                                                       get_attrs(tupdesc->attrs[i])->attnum,
+                                                                       true))));
                                                return NULL;    /* should not be null */
                                        }
                                }
@@ -409,7 +574,7 @@ dbms_stats_merge_internal(HeapTuple lhs, HeapTuple rhs, TupleDesc tupdesc)
                                        values[Anum_pg_statistic_stavalues1 + i - 1]);
                        if (arr == NULL || arr->elemtype != atttype)
                        {
-                               const char         *attname = get_attname(relid, attnum);
+                               const char         *attname = get_attname(relid, attnum, true);
 
                                /*
                                 * relid and attnum must be valid here because valid atttype
@@ -533,13 +698,13 @@ dbms_stats_invalidate_cache_internal(Oid relid, bool sta_col)
        /*
         * invalidate prepared statements and force re-planning with pg_dbms_stats.
         */
-       rel = try_relation_open(relid, NoLock);
+       rel = try_relation_open(relid, AccessShareLock);
        if (rel != NULL)
        {
                if (sta_col &&
                        rel->rd_rel->relkind == RELKIND_INDEX &&
                        (rel->rd_indextuple == NULL ||
-                        heap_attisnull(rel->rd_indextuple, Anum_pg_index_indexprs)))
+                        heap_attisnull(rel->rd_indextuple, Anum_pg_index_indexprs, NULL)))
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
                                         errmsg("\"%s\" is an index except an index expression",
@@ -560,7 +725,7 @@ dbms_stats_invalidate_cache_internal(Oid relid, bool sta_col)
                        CacheInvalidateRelcacheByRelid(rel->rd_index->indrelid);
 
                CacheInvalidateRelcache(rel);
-               relation_close(rel, NoLock);
+               relation_close(rel, AccessShareLock);
        }
 }
 
@@ -599,7 +764,7 @@ dbms_stats_is_system_schema_internal(char *schema_name)
        if (strcmp(schema_name, "pg_catalog") == 0 ||
                strcmp(schema_name, "pg_toast") == 0 ||
                strcmp(schema_name, "information_schema") == 0 ||
-               strcmp(schema_name, "dbms_stats") == 0)
+               strcmp(schema_name, NSPNAME) == 0)
                return true;
 
        return false;
@@ -641,14 +806,14 @@ dbms_stats_is_system_catalog_internal(Oid relid)
                return false;
 
        /* no such relation */
-       rel = try_relation_open(relid, NoLock);
+       rel = try_relation_open(relid, AccessShareLock);
        if (rel == NULL)
                return false;
 
        /* check by namespace name. */
        schema_name = get_namespace_name(rel->rd_rel->relnamespace);
        result = dbms_stats_is_system_schema_internal(schema_name);
-       relation_close(rel, NoLock);
+       relation_close(rel, AccessShareLock);
 
        return result;
 }
@@ -693,15 +858,8 @@ dbms_stats_get_relation_info(PlannerInfo *root,
         * calculation.
         */
        if (!inhparent)
-       {
-#if PG_VERSION_NUM >= 90200
                get_merged_relation_stats(relid, &rel->pages, &rel->tuples,
                                                                  &rel->allvisfrac, true);
-#else
-               get_merged_relation_stats(relid, &rel->pages, &rel->tuples,
-                                                                 &allvisfrac, true);
-#endif
-       }
        else
                return;
 
@@ -773,9 +931,23 @@ dbms_stats_get_relation_stats(PlannerInfo *root,
 
                tuple = get_merged_column_stats(rte->relid, attnum, rte->inh);
                vardata->statsTuple = tuple;
-               if (tuple != NULL)
+               if (HeapTupleIsValid(tuple))
                {
                        vardata->freefunc = FreeHeapTuple;
+
+                       /*
+                        * set acl_ok if required. See the definition of set_acl_ok for
+                        * details.
+                        */
+                       if (set_acl_ok)
+                        {
+                                vardata->acl_ok =
+                                        (pg_class_aclcheck(rte->relid, GetUserId(),
+                                                                               ACL_SELECT) == ACLCHECK_OK) ||
+                                        (pg_attribute_aclcheck(rte->relid, attnum, GetUserId(),
+                                                                                       ACL_SELECT) == ACLCHECK_OK);
+                        }
+
                        return true;
                }
        }
@@ -798,25 +970,110 @@ dbms_stats_get_index_stats(PlannerInfo *root,
                                                   AttrNumber indexattnum,
                                                   VariableStatData *vardata)
 {
-       if (pg_dbms_stats_use_locked_stats)
+       HeapTuple       tuple;
+
+       if (!pg_dbms_stats_use_locked_stats)
+               goto next_plugin;
+
+       tuple = get_merged_column_stats(indexOid, indexattnum, false);
+       vardata->statsTuple = tuple;
+       if (tuple == NULL)
+               goto next_plugin;
+
+       vardata->freefunc = FreeHeapTuple;
+                       
+       /*
+        * set acl_ok if required. See the definition of set_acl_ok for details.
+        */
+       if (set_acl_ok)
        {
-               HeapTuple       tuple;
+               /*
+                * XXX: we had to scan the whole the rel array since we got
+                * only the oid of the index.
+                */
+               int i;
 
-               tuple = get_merged_column_stats(indexOid, indexattnum, false);
-               vardata->statsTuple = tuple;
-               if (tuple != NULL)
+               /* don't stop by this misassumption */
+               if (root->simple_rel_array == NULL)
                {
-                       vardata->freefunc = FreeHeapTuple;
-                       return true;
+                       elog(WARNING, "pg_dbms_stats internal error. relation has not been set up. index %d ignored", indexOid);
+                       goto next_plugin;
+               }
+
+               /*
+                * scan over simple_rel_array_size to find the owner relation of the
+                * index with the oid
+                */
+               for (i = 1 ; i < root->simple_rel_array_size ; i++)
+               {
+                       ListCell *lc;
+                       RelOptInfo *rel = root->simple_rel_array[i];
+
+                       /* there may be empty slots corresponding to non-baserel RTEs  */
+                       if (rel == NULL)
+                               continue;
+
+                       foreach (lc, rel->indexlist)
+                       {
+                               IndexOptInfo *index = (IndexOptInfo *) lfirst(lc);
+                               RangeTblEntry *rte;
+
+                               if (index->indexoid != indexOid)
+                                       continue;
+
+                               /* This relation is the owner of the given index, go ahead */
+                               rte = planner_rt_fetch(index->rel->relid, root);
+
+                               /* don't stop by this error */
+                               if (rte->rtekind != RTE_RELATION)
+                               {
+                                       elog(WARNING, "pg_dbms_stats internal error. index %d is owned by a non-relation", indexOid);
+                                       goto next_plugin;
+                               }
+
+                               vardata->acl_ok =
+                                       (pg_class_aclcheck(rte->relid, GetUserId(),
+                                                                          ACL_SELECT) == ACLCHECK_OK);
+                               break;
+                       }
                }
        }
 
+       return true;
+
+next_plugin:
        if (prev_get_index_stats)
                return prev_get_index_stats(root, indexOid, indexattnum, vardata);
        else
                return false;
 }
 
+
+/*
+ * dbms_stats_planner
+ *   Hook function for planner_hook which cleans up invalidated statistics.
+ */
+static PlannedStmt *
+dbms_stats_planner(Query *parse, const char *query_string,
+                                  int cursorOptions, ParamListInfo boundParams)
+{
+       PlannedStmt *ret;
+
+       cleanup_invalidated_cache();
+
+       if (prev_planner_hook)
+               ret = (*prev_planner_hook) (parse, query_string,
+                                                                       cursorOptions, boundParams);
+       else
+               ret = standard_planner(parse, query_string,
+                                                          cursorOptions, boundParams);
+
+       cleanup_invalidated_cache();
+
+       return ret;
+}
+
+
 /*
  * Extract binary value from given column.
  */
@@ -865,7 +1122,7 @@ get_merged_relation_stats(Oid relid, BlockNumber *pages, double *tuples,
                 * eliminates excessive SPI calls below. Negative caches will be
                 * invalidated again on invalidation of system relation cache, which
                 * occur on modification of the dummy stats tables
-                * dbms_stats._relation_stats_locked and _column_stats_locked.
+                * dbms_stats.relation_stats_locked and column_stats_locked.
                 */
                if (entry->relpages == InvalidBlockNumber)
                        return;
@@ -882,7 +1139,6 @@ get_merged_relation_stats(Oid relid, BlockNumber *pages, double *tuples,
                PG_TRY();
                {
                        ++nested_level;
-
                        SPI_connect();
 
                        /*
@@ -921,11 +1177,9 @@ get_merged_relation_stats(Oid relid, BlockNumber *pages, double *tuples,
                                val = get_binary_datum(3, &isnull);
                                entry->curpages = isnull ? InvalidBlockNumber :
                                        (BlockNumber) DatumGetInt32(val);
-#if PG_VERSION_NUM >= 90200
                                val = get_binary_datum(4, &isnull);
                                entry->relallvisible = isnull ? form->relallvisible :
                                        (BlockNumber) DatumGetInt32(val);
-#endif
 
                                ReleaseSysCache(tuple);
                        }
@@ -963,9 +1217,7 @@ get_merged_relation_stats(Oid relid, BlockNumber *pages, double *tuples,
        rel = relation_open(relid, NoLock);
        rel->rd_rel->relpages = entry->relpages;
        rel->rd_rel->reltuples = entry->reltuples;
-#if PG_VERSION_NUM >= 90200
        rel->rd_rel->relallvisible = entry->relallvisible;
-#endif
        dbms_stats_estimate_rel_size(rel, NULL, pages, tuples, allvisfrac,
                                                                 entry->curpages);
        relation_close(rel, NoLock);
@@ -1066,7 +1318,7 @@ get_merged_column_stats(Oid relid, AttrNumber attnum, bool inh)
                                tuple = NULL;
 
                        /* Cache merged result for subsequent calls. */
-                       tuple = column_cache_enter(relid, attnum, tuple);
+                       tuple = column_cache_enter(relid, attnum, inh, tuple);
 
                        /* Return system stats if the merging results in failure. */
                        if (!HeapTupleIsValid(tuple))
@@ -1118,7 +1370,7 @@ column_cache_search(Oid relid, AttrNumber attnum, bool inh, bool *negative)
        {
                StatsColumnEntry *ent = (StatsColumnEntry*) lfirst (lc);
 
-               if (ent->attnum != attnum) continue;
+               if (ent->attnum != attnum || ent->inh != inh) continue;
 
                if (ent->negative)
                {
@@ -1126,15 +1378,8 @@ column_cache_search(Oid relid, AttrNumber attnum, bool inh, bool *negative)
                        *negative = true;
                        return NULL;
                }
-               else
-               {
-                       HeapTuple       tuple = (HeapTuple) ent->tuple;
-                       Form_pg_statistic form = get_pg_statistic(tuple);
 
-                       /* Find statistic of the given column from the cache. */
-                       if (form->stainherit == inh)
-                               return tuple;
-               }
+               return ent->tuple;
        }
 
        return NULL;    /* Not yet registered. */
@@ -1146,14 +1391,14 @@ column_cache_search(Oid relid, AttrNumber attnum, bool inh, bool *negative)
  * table definition have been changed.
  */
 static HeapTuple
-column_cache_enter(Oid relid, int32 attnum, HeapTuple tuple)
+column_cache_enter(Oid relid, int32 attnum, bool inh, HeapTuple tuple)
 {
        MemoryContext   oldcontext;
        StatsColumnEntry *newcolent;
        StatsRelationEntry *entry;
        bool                    found;
 
-       Assert(tuple == NULL || !heap_attisnull(tuple, 1));
+       Assert(tuple == NULL || !heap_attisnull(tuple, 1, NULL));
 
        entry = hash_search(rel_stats, &relid, HASH_ENTER, &found);
        if (!found)
@@ -1166,6 +1411,8 @@ column_cache_enter(Oid relid, int32 attnum, HeapTuple tuple)
        oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
        newcolent = (StatsColumnEntry*)palloc(sizeof(StatsColumnEntry));
        newcolent->attnum = attnum;
+       newcolent->inh = inh;
+
        if (HeapTupleIsValid(tuple))
        {
                newcolent->negative = false;
@@ -1201,73 +1448,144 @@ execute_plan(SPIPlanPtr *plan,
        Oid             argtypes[3] = { OIDOID, INT2OID, BOOLOID };
        int             nargs;
        Datum   values[3];
-       bool    nulls[3] = { false, false, false };
+       Oid                     save_userid;
+       int                     save_sec_context;
 
+       /* XXXX: this works for now but should be fixed later.. */
        nargs = (attnum ? 3 : 1);
 
-       /* When plan is not given, create plan from query string at first. */
-       if (*plan == NULL)
+       /*
+        * The dummy statistics table allows access from no one other than its
+        * owner or superuser.
+        */
+       GetUserIdAndSecContext(&save_userid, &save_sec_context);
+       SetUserIdAndSecContext(get_stats_table_owner(),
+                                                  save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
+
+       PG_TRY();
        {
-               SPIPlanPtr p;
-               p = SPI_prepare(query, nargs, argtypes);
-               if (p == NULL)
-                       elog(ERROR, "pg_dbms_stats: SPI_prepare => %d", SPI_result);
-               *plan = SPI_saveplan(p);
-               SPI_freeplan(p);
-       }
+               /* Create plan from the query if not yet. */
+               if (*plan == NULL)
+               {
+                       *plan = SPI_prepare(query, nargs, argtypes);
+                       if (*plan == NULL)
+                               elog(ERROR,
+                                        "pg_dbms_stats: SPI_prepare() failed. result = %d",
+                                        SPI_result);
+
+                       SPI_keepplan(*plan);
+               }
 
-       values[0] = ObjectIdGetDatum(relid);
-       values[1] = Int16GetDatum(attnum ? *attnum : 0);
-       values[2] = BoolGetDatum(inh);
+               values[0] = ObjectIdGetDatum(relid);
+               values[1] = Int16GetDatum(attnum ? *attnum : 0);
+               values[2] = BoolGetDatum(inh);
 
-       ret = SPI_execute_plan(*plan, values, nulls, true, 1);
+               ret = SPI_execute_plan(*plan, values, NULL, true, 1);
+       }
+       PG_CATCH();
+       {
+               SetUserIdAndSecContext(save_userid, save_sec_context);
+               if (geterrcode() == ERRCODE_INSUFFICIENT_PRIVILEGE)
+                       errdetail("dbms_stats could not access the object as the role \"%s\".",
+                               stats_table_owner_name);
+               errhint("Check your settings of pg_dbms_stats.");
+               PG_RE_THROW();
+       }
+       PG_END_TRY();
+
+       SetUserIdAndSecContext(save_userid, save_sec_context);
        if (ret != SPI_OK_SELECT)
-               elog(ERROR, "pg_dbms_stats: SPI_execute_plan => %d", ret);
+               elog(ERROR, "pg_dbms_stats: SPI_execute_plan() returned %d", ret);
 
        return SPI_processed > 0;
 }
 
 /*
- * StatsCacheRelCallback
+ * statscache_rel_callback
  *             Relcache inval callback function
  *
- * Invalidate cached statistic info of the given relid, or all cached statistic
- * info if relid == InvalidOid.  We don't complain even when we don't have such
- * statistics.
+ * Invalidates cached statistics of the given relid, or all cached statistics
+ * if relid == InvalidOid. The statsTuple in the hash entries are directly
+ * passed to planner so we cannot remove them until planner ends. Just mark
+ * here then cleanup after planner finishes work.
+ */
+static void
+statscache_rel_callback(Datum arg, Oid relid)
+{
+       StatsRelationEntry *entry;
+
+       if (relid != InvalidOid)
+       {
+               bool found;
+
+               /*
+                * invalidate the entry for the specfied relation. Don't mind if found.
+                */
+               entry = hash_search(rel_stats, &relid, HASH_FIND, &found);
+               if (found)
+               {
+                       entry->invalidated = true;
+                       rel_invalidated = true;
+               }
+       }
+       else
+       {
+               /* invalidate all the entries of the hash */
+               HASH_SEQ_STATUS         status;
+
+               hash_seq_init(&status, rel_stats);
+               while ((entry = hash_seq_search(&status)) != NULL)
+               {
+                       entry->invalidated = true;
+                       rel_invalidated = true;
+               }
+       }
+}
+
+/*
+ * cleanup_invalidated_cache()
+ *             Cleanup invalidated stats cache
  *
- * Note: arg is not used.
+ * removes invalidated cache entries.
  */
 static void
-StatsCacheRelCallback(Datum arg, Oid relid)
+cleanup_invalidated_cache(void)
 {
        HASH_SEQ_STATUS         status;
        StatsRelationEntry *entry;
 
+       /* Return immediately if nothing to do */
+       if (!rel_invalidated)
+               return;
+
+       /*
+        * Reset rel_invalidated first so that we don't lose invalidations that
+        * happens during this round of cleanup.
+        */
+       rel_invalidated = false;
+
        hash_seq_init(&status, rel_stats);
        while ((entry = hash_seq_search(&status)) != NULL)
        {
-               if (relid == InvalidOid || relid == entry->relid)
-               {
-                       ListCell *lc;
+               ListCell        *lc;
 
-                       /* Mark the relation entry as INVALID */
-                       entry->valid = false;
+               if (!entry->invalidated)
+                       continue;
 
-                       /* Discard every column statistics */
-                       foreach (lc, entry->col_stats)
-                       {
-                               StatsColumnEntry *ent = (StatsColumnEntry*) lfirst(lc);
+               /* Discard every column statistics */
+               foreach (lc, entry->col_stats)
+               {
+                       StatsColumnEntry *ent = (StatsColumnEntry*) lfirst(lc);
 
-                               if (!ent->negative)
-                                       pfree(ent->tuple);
-                               pfree(ent);
-                       }
-                       list_free(entry->col_stats);
-                       entry->col_stats = NIL;
+                       if (!ent->negative)
+                               pfree(ent->tuple);
+                       pfree(ent);
                }
-       }
+               list_free(entry->col_stats);
 
-       /* We always check throughout the list, so hash_seq_term is not necessary */
+               /* Finally remove the hash entry. */
+               hash_search(rel_stats, &entry->relid, HASH_REMOVE, NULL);
+       }
 }
 
 /*
@@ -1304,6 +1622,7 @@ init_rel_stats_entry(StatsRelationEntry *entry, Oid relid)
 {
        entry->relid = relid;
        entry->valid = false;
+       entry->invalidated = false;
        entry->relpages = InvalidBlockNumber;
        entry->reltuples = 0.0;
        entry->relallvisible = InvalidBlockNumber;
@@ -1323,12 +1642,12 @@ init_rel_stats_entry(StatsRelationEntry *entry, Oid relid)
  * the attribute widths for estimation purposes.
  *
  * Note: This function is copied from plancat.c in core source tree of version
- * 9.2, and customized for pg_dbms_stats.  Changes from orignal one are:
+ * 9.2, and customized for pg_dbms_stats.  Changes from original one are:
  *   - rename by prefixing dbms_stats_
  *   - add 3 parameters (relpages, reltuples, curpage) to pass dummy curpage
  *     values.
  *   - Get current # of pages only when supplied curpages is InvalidBlockNumber
- *   - get franction of all-visible-pages
+ *   - get fraction of all-visible-pages
  */
 static void
 dbms_stats_estimate_rel_size(Relation rel, int32 *attr_widths,
@@ -1344,9 +1663,7 @@ dbms_stats_estimate_rel_size(Relation rel, int32 *attr_widths,
        {
                case RELKIND_RELATION:
                case RELKIND_INDEX:
-#if PG_VERSION_NUM >= 90300
                case RELKIND_MATVIEW:
-#endif
                case RELKIND_TOASTVALUE:
                        /* it has storage, ok to call the smgr */
                        if (curpages == InvalidBlockNumber)
@@ -1400,11 +1717,8 @@ dbms_stats_estimate_rel_size(Relation rel, int32 *attr_widths,
                        /* coerce values in pg_class to more desirable types */
                        relpages = (BlockNumber) rel->rd_rel->relpages;
                        reltuples = (double) rel->rd_rel->reltuples;
-#if PG_VERSION_NUM >= 90200
                        relallvisible = (BlockNumber) rel->rd_rel->relallvisible;
-#else
-                       relallvisible = 0;
-#endif
+
                        /*
                         * If it's an index, discount the metapage while estimating the
                         * number of tuples.  This is a kluge because it assumes more than
@@ -1506,7 +1820,7 @@ dbms_stats_get_rel_data_width(Relation rel, int32 *attr_widths)
 
        for (i = 1; i <= RelationGetNumberOfAttributes(rel); i++)
        {
-               Form_pg_attribute att = rel->rd_att->attrs[i - 1];
+               Form_pg_attribute att = get_attrs(rel->rd_att->attrs[i - 1]);
                int32           item_width;
 
                if (att->attisdropped)