OSDN Git Service

Generalize concept of temporary relations to "relation persistence".
[pg-rex/syncrep.git] / src / backend / utils / cache / relcache.c
index 45cb103..1509686 100644 (file)
@@ -3,19 +3,20 @@
  * relcache.c
  *       POSTGRES relation descriptor cache code
  *
- * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.261 2007/05/27 03:50:39 tgl Exp $
+ *       src/backend/utils/cache/relcache.c
  *
  *-------------------------------------------------------------------------
  */
 /*
  * INTERFACE ROUTINES
  *             RelationCacheInitialize                 - initialize relcache (to empty)
- *             RelationCacheInitializePhase2   - finish initializing relcache
+ *             RelationCacheInitializePhase2   - initialize shared-catalog entries
+ *             RelationCacheInitializePhase3   - finish initializing relcache
  *             RelationIdGetRelation                   - get a reldesc by relation id
  *             RelationClose                                   - close an open relation
  *
 #include <unistd.h>
 
 #include "access/genam.h"
-#include "access/heapam.h"
 #include "access/reloptions.h"
+#include "access/sysattr.h"
+#include "access/transam.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
+#include "catalog/index.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
-#include "catalog/pg_amop.h"
 #include "catalog/pg_amproc.h"
 #include "catalog/pg_attrdef.h"
 #include "catalog/pg_authid.h"
+#include "catalog/pg_auth_members.h"
 #include "catalog/pg_constraint.h"
+#include "catalog/pg_database.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_rewrite.h"
+#include "catalog/pg_tablespace.h"
+#include "catalog/pg_trigger.h"
 #include "catalog/pg_type.h"
+#include "catalog/schemapg.h"
+#include "catalog/storage.h"
 #include "commands/trigger.h"
 #include "miscadmin.h"
 #include "optimizer/clauses.h"
 #include "optimizer/planmain.h"
 #include "optimizer/prep.h"
+#include "optimizer/var.h"
 #include "rewrite/rewriteDefine.h"
 #include "storage/fd.h"
+#include "storage/lmgr.h"
 #include "storage/smgr.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/inval.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/relcache.h"
+#include "utils/relmapper.h"
 #include "utils/resowner.h"
 #include "utils/syscache.h"
-#include "utils/typcache.h"
+#include "utils/tqual.h"
 
 
 /*
- * name of relcache init file, used to speed up backend startup
+ *             name of relcache init file(s), used to speed up backend startup
  */
 #define RELCACHE_INIT_FILENAME "pg_internal.init"
 
-#define RELCACHE_INIT_FILEMAGIC                0x573264        /* version ID value */
+#define RELCACHE_INIT_FILEMAGIC                0x573266        /* version ID value */
 
 /*
- *             hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
+ *             hardcoded tuple descriptors, contents generated by genbki.pl
  */
-static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
-static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
-static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
-static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
-static FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
+static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
+static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
+static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
+static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
+static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
+static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
+static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
+static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
 
 /*
  *             Hash tables that index the relation cache
@@ -101,6 +117,12 @@ static HTAB *RelationIdCache;
 bool           criticalRelcachesBuilt = false;
 
 /*
+ * This flag is false until we have prepared the critical relcache entries
+ * for shared catalogs (which are the tables needed for login).
+ */
+bool           criticalSharedRelcachesBuilt = false;
+
+/*
  * This counter counts relcache inval events received since backend startup
  * (but only for rels that are actually in cache).     Presently, we use it only
  * to detect whether data about to be written by write_relcache_init_file()
@@ -109,8 +131,10 @@ bool               criticalRelcachesBuilt = false;
 static long relcacheInvalsReceived = 0L;
 
 /*
- * This list remembers the OIDs of the relations cached in the relcache
- * init file.
+ * This list remembers the OIDs of the non-shared relations cached in the
+ * database's local relcache init file.  Note that there is no corresponding
+ * list for the shared relcache init file, for reasons explained in the
+ * comments for RelationCacheInitFileRemove.
  */
 static List *initFileRelationIds = NIL;
 
@@ -128,8 +152,7 @@ do { \
        RelIdCacheEnt *idhentry; bool found; \
        idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
                                                                                   (void *) &(RELATION->rd_id), \
-                                                                                  HASH_ENTER, \
-                                                                                  &found); \
+                                                                                  HASH_ENTER, &found); \
        /* used to give notice if found -- now just keep quiet */ \
        idhentry->reldesc = RELATION; \
 } while(0)
@@ -138,7 +161,8 @@ do { \
 do { \
        RelIdCacheEnt *hentry; \
        hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
-                                                                                (void *) &(ID), HASH_FIND,NULL); \
+                                                                                (void *) &(ID), \
+                                                                                HASH_FIND, NULL); \
        if (hentry) \
                RELATION = hentry->reldesc; \
        else \
@@ -159,19 +183,17 @@ do { \
 /*
  * Special cache for opclass-related information
  *
- * Note: only default operators and support procs get cached, ie, those with
+ * Note: only default support procs get cached, ie, those with
  * lefttype = righttype = opcintype.
  */
 typedef struct opclasscacheent
 {
        Oid                     opclassoid;             /* lookup key: OID of opclass */
        bool            valid;                  /* set TRUE after successful fill-in */
-       StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
        StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
        Oid                     opcfamily;              /* OID of opclass's family */
        Oid                     opcintype;              /* OID of opclass's declared input type */
-       Oid                *operatorOids;       /* strategy operators' OIDs */
-       RegProcedure *supportProcs; /* support procs */
+       RegProcedure *supportProcs; /* OIDs of support procedures */
 } OpClassCacheEnt;
 
 static HTAB *OpClassCache = NULL;
@@ -179,46 +201,51 @@ static HTAB *OpClassCache = NULL;
 
 /* non-export function prototypes */
 
+static void RelationDestroyRelation(Relation relation);
 static void RelationClearRelation(Relation relation, bool rebuild);
 
 static void RelationReloadIndexInfo(Relation relation);
 static void RelationFlushRelation(Relation relation);
-static bool load_relcache_init_file(void);
-static void write_relcache_init_file(void);
+static bool load_relcache_init_file(bool shared);
+static void write_relcache_init_file(bool shared);
 static void write_item(const void *data, Size len, FILE *fp);
 
 static void formrdesc(const char *relationName, Oid relationReltype,
-                 bool hasoids, int natts, FormData_pg_attribute *att);
+                 bool isshared, bool hasoids,
+                 int natts, const FormData_pg_attribute *attrs);
 
 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
-static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
+static Relation AllocateRelationDesc(Form_pg_class relp);
 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
 static void RelationBuildTupleDesc(Relation relation);
-static Relation RelationBuildDesc(Oid targetRelId, Relation oldrelation);
+static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
 static void RelationInitPhysicalAddr(Relation relation);
+static void load_critical_index(Oid indexoid, Oid heapoid);
 static TupleDesc GetPgClassDescriptor(void);
 static TupleDesc GetPgIndexDescriptor(void);
 static void AttrDefaultFetch(Relation relation);
 static void CheckConstraintFetch(Relation relation);
 static List *insert_ordered_oid(List *list, Oid datum);
 static void IndexSupportInitialize(oidvector *indclass,
-                                          Oid *indexOperator,
                                           RegProcedure *indexSupport,
                                           Oid *opFamily,
                                           Oid *opcInType,
-                                          StrategyNumber maxStrategyNumber,
                                           StrategyNumber maxSupportNumber,
                                           AttrNumber maxAttributeNumber);
 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
-                                 StrategyNumber numStrats,
                                  StrategyNumber numSupport);
+static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
+static void unlink_initfile(const char *initfilename);
 
 
 /*
  *             ScanPgRelation
  *
- *             this is used by RelationBuildDesc to find a pg_class
- *             tuple matching targetRelId.
+ *             This is used by RelationBuildDesc to find a pg_class
+ *             tuple matching targetRelId.  The caller must hold at least
+ *             AccessShareLock on the target relid to prevent concurrent-update
+ *             scenarios --- else our SnapshotNow scan might fail to find any
+ *             version that it thinks is live.
  *
  *             NB: the returned tuple has been copied into palloc'd storage
  *             and must eventually be freed with heap_freetuple.
@@ -232,6 +259,15 @@ ScanPgRelation(Oid targetRelId, bool indexOK)
        ScanKeyData key[1];
 
        /*
+        * If something goes wrong during backend startup, we might find ourselves
+        * trying to read pg_class before we've selected a database.  That ain't
+        * gonna work, so bail out with a useful error message.  If this happens,
+        * it probably means a relcache entry that needs to be nailed isn't.
+        */
+       if (!OidIsValid(MyDatabaseId))
+               elog(FATAL, "cannot read pg_class without having selected a database");
+
+       /*
         * form a scan key
         */
        ScanKeyInit(&key[0],
@@ -270,15 +306,12 @@ ScanPgRelation(Oid targetRelId, bool indexOK)
  *             AllocateRelationDesc
  *
  *             This is used to allocate memory for a new relation descriptor
- *             and initialize the rd_rel field.
- *
- *             If 'relation' is NULL, allocate a new RelationData object.
- *             If not, reuse the given object (that path is taken only when
- *             we have to rebuild a relcache entry during RelationClearRelation).
+ *             and initialize the rd_rel field from the given pg_class tuple.
  */
 static Relation
-AllocateRelationDesc(Relation relation, Form_pg_class relp)
+AllocateRelationDesc(Form_pg_class relp)
 {
+       Relation        relation;
        MemoryContext oldcxt;
        Form_pg_class relationForm;
 
@@ -286,16 +319,9 @@ AllocateRelationDesc(Relation relation, Form_pg_class relp)
        oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
        /*
-        * allocate space for new relation descriptor, if needed
-        */
-       if (relation == NULL)
-               relation = (Relation) palloc(sizeof(RelationData));
-
-       /*
-        * clear all fields of reldesc
+        * allocate and zero space for new relation descriptor
         */
-       MemSet(relation, 0, sizeof(RelationData));
-       relation->rd_targblock = InvalidBlockNumber;
+       relation = (Relation) palloc0(sizeof(RelationData));
 
        /* make sure relation is marked as having no open file yet */
        relation->rd_smgr = NULL;
@@ -341,8 +367,6 @@ AllocateRelationDesc(Relation relation, Form_pg_class relp)
 static void
 RelationParseRelOptions(Relation relation, HeapTuple tuple)
 {
-       Datum           datum;
-       bool            isnull;
        bytea      *options;
 
        relation->rd_options = NULL;
@@ -352,7 +376,6 @@ RelationParseRelOptions(Relation relation, HeapTuple tuple)
        {
                case RELKIND_RELATION:
                case RELKIND_TOASTVALUE:
-               case RELKIND_UNCATALOGED:
                case RELKIND_INDEX:
                        break;
                default:
@@ -364,38 +387,23 @@ RelationParseRelOptions(Relation relation, HeapTuple tuple)
         * we might not have any other for pg_class yet (consider executing this
         * code for pg_class itself)
         */
-       datum = fastgetattr(tuple,
-                                               Anum_pg_class_reloptions,
-                                               GetPgClassDescriptor(),
-                                               &isnull);
-       if (isnull)
-               return;
-
-       /* Parse into appropriate format; don't error out here */
-       switch (relation->rd_rel->relkind)
-       {
-               case RELKIND_RELATION:
-               case RELKIND_TOASTVALUE:
-               case RELKIND_UNCATALOGED:
-                       options = heap_reloptions(relation->rd_rel->relkind, datum,
-                                                                         false);
-                       break;
-               case RELKIND_INDEX:
-                       options = index_reloptions(relation->rd_am->amoptions, datum,
-                                                                          false);
-                       break;
-               default:
-                       Assert(false);          /* can't get here */
-                       options = NULL;         /* keep compiler quiet */
-                       break;
-       }
+       options = extractRelOptions(tuple,
+                                                               GetPgClassDescriptor(),
+                                                               relation->rd_rel->relkind == RELKIND_INDEX ?
+                                                               relation->rd_am->amoptions : InvalidOid);
 
-       /* Copy parsed data into CacheMemoryContext */
+       /*
+        * Copy parsed data into CacheMemoryContext.  To guard against the
+        * possibility of leaks in the reloptions code, we want to do the actual
+        * parsing in the caller's memory context and copy the results into
+        * CacheMemoryContext after the fact.
+        */
        if (options)
        {
                relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
                                                                                                  VARSIZE(options));
                memcpy(relation->rd_options, options, VARSIZE(options));
+               pfree(options);
        }
 }
 
@@ -470,7 +478,7 @@ RelationBuildTupleDesc(Relation relation)
 
                memcpy(relation->rd_att->attrs[attp->attnum - 1],
                           attp,
-                          ATTRIBUTE_TUPLE_SIZE);
+                          ATTRIBUTE_FIXED_PART_SIZE);
 
                /* Update constraint/default info */
                if (attp->attnotnull)
@@ -640,7 +648,6 @@ RelationBuildRuleLock(Relation relation)
                Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
                bool            isnull;
                Datum           rule_datum;
-               text       *rule_text;
                char       *rule_str;
                RewriteRule *rule;
 
@@ -665,30 +672,22 @@ RelationBuildRuleLock(Relation relation)
                                                                  rewrite_tupdesc,
                                                                  &isnull);
                Assert(!isnull);
-               rule_text = DatumGetTextP(rule_datum);
-               rule_str = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                               PointerGetDatum(rule_text)));
+               rule_str = TextDatumGetCString(rule_datum);
                oldcxt = MemoryContextSwitchTo(rulescxt);
                rule->actions = (List *) stringToNode(rule_str);
                MemoryContextSwitchTo(oldcxt);
                pfree(rule_str);
-               if ((Pointer) rule_text != DatumGetPointer(rule_datum))
-                       pfree(rule_text);
 
                rule_datum = heap_getattr(rewrite_tuple,
                                                                  Anum_pg_rewrite_ev_qual,
                                                                  rewrite_tupdesc,
                                                                  &isnull);
                Assert(!isnull);
-               rule_text = DatumGetTextP(rule_datum);
-               rule_str = DatumGetCString(DirectFunctionCall1(textout,
-                                                                                               PointerGetDatum(rule_text)));
+               rule_str = TextDatumGetCString(rule_datum);
                oldcxt = MemoryContextSwitchTo(rulescxt);
                rule->qual = (Node *) stringToNode(rule_str);
                MemoryContextSwitchTo(oldcxt);
                pfree(rule_str);
-               if ((Pointer) rule_text != DatumGetPointer(rule_datum))
-                       pfree(rule_text);
 
                /*
                 * We want the rule's table references to be checked as though by the
@@ -722,6 +721,17 @@ RelationBuildRuleLock(Relation relation)
        heap_close(rewrite_desc, AccessShareLock);
 
        /*
+        * there might not be any rules (if relhasrules is out-of-date)
+        */
+       if (numlocks == 0)
+       {
+               relation->rd_rules = NULL;
+               relation->rd_rulescxt = NULL;
+               MemoryContextDelete(rulescxt);
+               return;
+       }
+
+       /*
         * form a RuleLock and insert into relation
         */
        rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
@@ -765,6 +775,8 @@ equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
                                return false;
                        if (rule1->attrno != rule2->attrno)
                                return false;
+                       if (rule1->enabled != rule2->enabled)
+                               return false;
                        if (rule1->isInstead != rule2->isInstead)
                                return false;
                        if (!equal(rule1->qual, rule2->qual))
@@ -779,27 +791,25 @@ equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
 }
 
 
-/* ----------------------------------
+/*
  *             RelationBuildDesc
  *
- *             Build a relation descriptor --- either a new one, or by
- *             recycling the given old relation object.  The latter case
- *             supports rebuilding a relcache entry without invalidating
- *             pointers to it.
+ *             Build a relation descriptor.  The caller must hold at least
+ *             AccessShareLock on the target relid.
+ *
+ *             The new descriptor is inserted into the hash table if insertIt is true.
  *
  *             Returns NULL if no pg_class row could be found for the given relid
  *             (suggesting we are trying to access a just-deleted relation).
  *             Any other error is reported via elog.
- * --------------------------------
  */
 static Relation
-RelationBuildDesc(Oid targetRelId, Relation oldrelation)
+RelationBuildDesc(Oid targetRelId, bool insertIt)
 {
        Relation        relation;
        Oid                     relid;
        HeapTuple       pg_class_tuple;
        Form_pg_class relp;
-       MemoryContext oldcxt;
 
        /*
         * find the tuple in pg_class corresponding to the given relation id
@@ -817,12 +827,13 @@ RelationBuildDesc(Oid targetRelId, Relation oldrelation)
         */
        relid = HeapTupleGetOid(pg_class_tuple);
        relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
+       Assert(relid == targetRelId);
 
        /*
         * allocate storage for the relation descriptor, and copy pg_class_tuple
         * to relation->rd_rel.
         */
-       relation = AllocateRelationDesc(oldrelation, relp);
+       relation = AllocateRelationDesc(relp);
 
        /*
         * initialize the relation's relation id (relation->rd_id)
@@ -838,7 +849,31 @@ RelationBuildDesc(Oid targetRelId, Relation oldrelation)
        relation->rd_isnailed = false;
        relation->rd_createSubid = InvalidSubTransactionId;
        relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
-       relation->rd_istemp = isTempNamespace(relation->rd_rel->relnamespace);
+       switch (relation->rd_rel->relpersistence)
+       {
+               case RELPERSISTENCE_PERMANENT:
+                       relation->rd_backend = InvalidBackendId;
+                       break;
+               case RELPERSISTENCE_TEMP:
+                       if (isTempOrToastNamespace(relation->rd_rel->relnamespace))
+                               relation->rd_backend = MyBackendId;
+                       else
+                       {
+                               /*
+                                * If it's a local temp table, but not one of ours, we have to
+                                * use the slow, grotty method to figure out the owning
+                                * backend.
+                                */
+                               relation->rd_backend =
+                                       GetTempNamespaceBackendId(relation->rd_rel->relnamespace);
+                               Assert(relation->rd_backend != InvalidBackendId);
+                       }
+                       break;
+               default:
+                       elog(ERROR, "invalid relpersistence: %c",
+                                relation->rd_rel->relpersistence);
+                       break;
+       }
 
        /*
         * initialize the tuple descriptor (relation->rd_att).
@@ -856,7 +891,7 @@ RelationBuildDesc(Oid targetRelId, Relation oldrelation)
                relation->rd_rulescxt = NULL;
        }
 
-       if (relation->rd_rel->reltriggers > 0)
+       if (relation->rd_rel->relhastriggers)
                RelationBuildTriggers(relation);
        else
                relation->trigdesc = NULL;
@@ -889,11 +924,10 @@ RelationBuildDesc(Oid targetRelId, Relation oldrelation)
        heap_freetuple(pg_class_tuple);
 
        /*
-        * Insert newly created relation into relcache hash tables.
+        * Insert newly created relation into relcache hash table, if requested.
         */
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-       RelationCacheInsert(relation);
-       MemoryContextSwitchTo(oldcxt);
+       if (insertIt)
+               RelationCacheInsert(relation);
 
        /* It's fully valid */
        relation->rd_isvalid = true;
@@ -903,6 +937,10 @@ RelationBuildDesc(Oid targetRelId, Relation oldrelation)
 
 /*
  * Initialize the physical addressing info (RelFileNode) for a relcache entry
+ *
+ * Note: at the physical level, relations in the pg_global tablespace must
+ * be treated as shared, even if relisshared isn't set.  Hence we do not
+ * look at relisshared here.
  */
 static void
 RelationInitPhysicalAddr(Relation relation)
@@ -911,11 +949,22 @@ RelationInitPhysicalAddr(Relation relation)
                relation->rd_node.spcNode = relation->rd_rel->reltablespace;
        else
                relation->rd_node.spcNode = MyDatabaseTableSpace;
-       if (relation->rd_rel->relisshared)
+       if (relation->rd_node.spcNode == GLOBALTABLESPACE_OID)
                relation->rd_node.dbNode = InvalidOid;
        else
                relation->rd_node.dbNode = MyDatabaseId;
-       relation->rd_node.relNode = relation->rd_rel->relfilenode;
+       if (relation->rd_rel->relfilenode)
+               relation->rd_node.relNode = relation->rd_rel->relfilenode;
+       else
+       {
+               /* Consult the relation mapper */
+               relation->rd_node.relNode =
+                       RelationMapOidToFilenode(relation->rd_id,
+                                                                        relation->rd_rel->relisshared);
+               if (!OidIsValid(relation->rd_node.relNode))
+                       elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
+                                RelationGetRelationName(relation), relation->rd_id);
+       }
 }
 
 /*
@@ -930,11 +979,10 @@ RelationInitIndexAccessInfo(Relation relation)
        Datum           indoptionDatum;
        bool            isnull;
        oidvector  *indclass;
-       int2vector  *indoption;
+       int2vector *indoption;
        MemoryContext indexcxt;
        MemoryContext oldcontext;
        int                     natts;
-       uint16          amstrategies;
        uint16          amsupport;
 
        /*
@@ -942,9 +990,8 @@ RelationInitIndexAccessInfo(Relation relation)
         * contains variable-length and possibly-null fields, we have to do this
         * honestly rather than just treating it as a Form_pg_index struct.
         */
-       tuple = SearchSysCache(INDEXRELID,
-                                                  ObjectIdGetDatum(RelationGetRelid(relation)),
-                                                  0, 0, 0);
+       tuple = SearchSysCache1(INDEXRELID,
+                                                       ObjectIdGetDatum(RelationGetRelid(relation)));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for index %u",
                         RelationGetRelid(relation));
@@ -957,9 +1004,7 @@ RelationInitIndexAccessInfo(Relation relation)
        /*
         * Make a copy of the pg_am entry for the index's access method
         */
-       tuple = SearchSysCache(AMOID,
-                                                  ObjectIdGetDatum(relation->rd_rel->relam),
-                                                  0, 0, 0);
+       tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for access method %u",
                         relation->rd_rel->relam);
@@ -972,7 +1017,6 @@ RelationInitIndexAccessInfo(Relation relation)
        if (natts != relation->rd_index->indnatts)
                elog(ERROR, "relnatts disagrees with indnatts for index %u",
                         RelationGetRelid(relation));
-       amstrategies = aform->amstrategies;
        amsupport = aform->amsupport;
 
        /*
@@ -1001,13 +1045,6 @@ RelationInitIndexAccessInfo(Relation relation)
        relation->rd_opcintype = (Oid *)
                MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
 
-       if (amstrategies > 0)
-               relation->rd_operator = (Oid *)
-                       MemoryContextAllocZero(indexcxt,
-                                                                  natts * amstrategies * sizeof(Oid));
-       else
-               relation->rd_operator = NULL;
-
        if (amsupport > 0)
        {
                int                     nsupport = natts * amsupport;
@@ -1028,8 +1065,8 @@ RelationInitIndexAccessInfo(Relation relation)
 
        /*
         * indclass cannot be referenced directly through the C struct, because it
-        * comes after the variable-width indkey field.  Must extract the
-        * datum the hard way...
+        * comes after the variable-width indkey field.  Must extract the datum
+        * the hard way...
         */
        indclassDatum = fastgetattr(relation->rd_indextuple,
                                                                Anum_pg_index_indclass,
@@ -1039,14 +1076,13 @@ RelationInitIndexAccessInfo(Relation relation)
        indclass = (oidvector *) DatumGetPointer(indclassDatum);
 
        /*
-        * Fill the operator and support procedure OID arrays, as well as the
-        * info about opfamilies and opclass input types.  (aminfo and
-        * supportinfo are left as zeroes, and are filled on-the-fly when used)
+        * Fill the support procedure OID array, as well as the info about
+        * opfamilies and opclass input types.  (aminfo and supportinfo are left
+        * as zeroes, and are filled on-the-fly when used)
         */
-       IndexSupportInitialize(indclass,
-                                                  relation->rd_operator, relation->rd_support,
+       IndexSupportInitialize(indclass, relation->rd_support,
                                                   relation->rd_opfamily, relation->rd_opcintype,
-                                                  amstrategies, amsupport, natts);
+                                                  amsupport, natts);
 
        /*
         * Similarly extract indoption and copy it to the cache entry
@@ -1060,10 +1096,13 @@ RelationInitIndexAccessInfo(Relation relation)
        memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));
 
        /*
-        * expressions and predicate cache will be filled later
+        * expressions, predicate, exclusion caches will be filled later
         */
        relation->rd_indexprs = NIL;
        relation->rd_indpred = NIL;
+       relation->rd_exclops = NULL;
+       relation->rd_exclprocs = NULL;
+       relation->rd_exclstrats = NULL;
        relation->rd_amcache = NULL;
 }
 
@@ -1072,22 +1111,19 @@ RelationInitIndexAccessInfo(Relation relation)
  *             Initializes an index's cached opclass information,
  *             given the index's pg_index.indclass entry.
  *
- * Data is returned into *indexOperator, *indexSupport, *opFamily, and
- * *opcInType, which are arrays allocated by the caller.
+ * Data is returned into *indexSupport, *opFamily, and *opcInType,
+ * which are arrays allocated by the caller.
  *
- * The caller also passes maxStrategyNumber, maxSupportNumber, and
- * maxAttributeNumber, since these indicate the size of the arrays
- * it has allocated --- but in practice these numbers must always match
- * those obtainable from the system catalog entries for the index and
- * access method.
+ * The caller also passes maxSupportNumber and maxAttributeNumber, since these
+ * indicate the size of the arrays it has allocated --- but in practice these
+ * numbers must always match those obtainable from the system catalog entries
+ * for the index and access method.
  */
 static void
 IndexSupportInitialize(oidvector *indclass,
-                                          Oid *indexOperator,
                                           RegProcedure *indexSupport,
                                           Oid *opFamily,
                                           Oid *opcInType,
-                                          StrategyNumber maxStrategyNumber,
                                           StrategyNumber maxSupportNumber,
                                           AttrNumber maxAttributeNumber)
 {
@@ -1102,16 +1138,11 @@ IndexSupportInitialize(oidvector *indclass,
 
                /* look up the info for this opclass, using a cache */
                opcentry = LookupOpclassInfo(indclass->values[attIndex],
-                                                                        maxStrategyNumber,
                                                                         maxSupportNumber);
 
                /* copy cached data into relcache entry */
                opFamily[attIndex] = opcentry->opcfamily;
                opcInType[attIndex] = opcentry->opcintype;
-               if (maxStrategyNumber > 0)
-                       memcpy(&indexOperator[attIndex * maxStrategyNumber],
-                                  opcentry->operatorOids,
-                                  maxStrategyNumber * sizeof(Oid));
                if (maxSupportNumber > 0)
                        memcpy(&indexSupport[attIndex * maxSupportNumber],
                                   opcentry->supportProcs,
@@ -1125,19 +1156,22 @@ IndexSupportInitialize(oidvector *indclass,
  * This routine maintains a per-opclass cache of the information needed
  * by IndexSupportInitialize().  This is more efficient than relying on
  * the catalog cache, because we can load all the info about a particular
- * opclass in a single indexscan of pg_amproc or pg_amop.
+ * opclass in a single indexscan of pg_amproc.
  *
- * The information from pg_am about expected range of strategy and support
+ * The information from pg_am about expected range of support function
  * numbers is passed in, rather than being looked up, mainly because the
  * caller will have it already.
  *
- * XXX There isn't any provision for flushing the cache.  However, there
- * isn't any provision for flushing relcache entries when opclass info
- * changes, either :-(
+ * Note there is no provision for flushing the cache.  This is OK at the
+ * moment because there is no way to ALTER any interesting properties of an
+ * existing opclass --- all you can do is drop it, which will result in
+ * a useless but harmless dead entry in the cache.     To support altering
+ * opclass membership (not the same as opfamily membership!), we'd need to
+ * be able to flush this cache as well as the contents of relcache entries
+ * for indexes.
  */
 static OpClassCacheEnt *
 LookupOpclassInfo(Oid operatorClassOid,
-                                 StrategyNumber numStrats,
                                  StrategyNumber numSupport)
 {
        OpClassCacheEnt *opcentry;
@@ -1153,49 +1187,57 @@ LookupOpclassInfo(Oid operatorClassOid,
                /* First time through: initialize the opclass cache */
                HASHCTL         ctl;
 
-               if (!CacheMemoryContext)
-                       CreateCacheMemoryContext();
-
                MemSet(&ctl, 0, sizeof(ctl));
                ctl.keysize = sizeof(Oid);
                ctl.entrysize = sizeof(OpClassCacheEnt);
                ctl.hash = oid_hash;
                OpClassCache = hash_create("Operator class cache", 64,
                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
+
+               /* Also make sure CacheMemoryContext exists */
+               if (!CacheMemoryContext)
+                       CreateCacheMemoryContext();
        }
 
        opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
                                                                                           (void *) &operatorClassOid,
                                                                                           HASH_ENTER, &found);
 
-       if (found && opcentry->valid)
+       if (!found)
+       {
+               /* Need to allocate memory for new entry */
+               opcentry->valid = false;        /* until known OK */
+               opcentry->numSupport = numSupport;
+
+               if (numSupport > 0)
+                       opcentry->supportProcs = (RegProcedure *)
+                               MemoryContextAllocZero(CacheMemoryContext,
+                                                                          numSupport * sizeof(RegProcedure));
+               else
+                       opcentry->supportProcs = NULL;
+       }
+       else
        {
-               /* Already made an entry for it */
-               Assert(numStrats == opcentry->numStrats);
                Assert(numSupport == opcentry->numSupport);
-               return opcentry;
        }
 
-       /* Need to fill in new entry */
-       opcentry->valid = false;        /* until known OK */
-       opcentry->numStrats = numStrats;
-       opcentry->numSupport = numSupport;
-
-       if (numStrats > 0)
-               opcentry->operatorOids = (Oid *)
-                       MemoryContextAllocZero(CacheMemoryContext,
-                                                                  numStrats * sizeof(Oid));
-       else
-               opcentry->operatorOids = NULL;
+       /*
+        * When testing for cache-flush hazards, we intentionally disable the
+        * operator class cache and force reloading of the info on each call. This
+        * is helpful because we want to test the case where a cache flush occurs
+        * while we are loading the info, and it's very hard to provoke that if
+        * this happens only once per opclass per backend.
+        */
+#if defined(CLOBBER_CACHE_ALWAYS)
+       opcentry->valid = false;
+#endif
 
-       if (numSupport > 0)
-               opcentry->supportProcs = (RegProcedure *)
-                       MemoryContextAllocZero(CacheMemoryContext,
-                                                                  numSupport * sizeof(RegProcedure));
-       else
-               opcentry->supportProcs = NULL;
+       if (opcentry->valid)
+               return opcentry;
 
        /*
+        * Need to fill in new entry.
+        *
         * To avoid infinite recursion during startup, force heap scans if we're
         * looking up info for the opclasses used by the indexes we would like to
         * reference here.
@@ -1206,7 +1248,7 @@ LookupOpclassInfo(Oid operatorClassOid,
 
        /*
         * We have to fetch the pg_opclass row to determine its opfamily and
-        * opcintype, which are needed to look up the operators and functions.
+        * opcintype, which are needed to look up related operators and functions.
         * It'd be convenient to use the syscache here, but that probably doesn't
         * work while bootstrapping.
         */
@@ -1231,45 +1273,6 @@ LookupOpclassInfo(Oid operatorClassOid,
        systable_endscan(scan);
        heap_close(rel, AccessShareLock);
 
-
-       /*
-        * Scan pg_amop to obtain operators for the opclass.  We only fetch the
-        * default ones (those with lefttype = righttype = opcintype).
-        */
-       if (numStrats > 0)
-       {
-               ScanKeyInit(&skey[0],
-                                       Anum_pg_amop_amopfamily,
-                                       BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(opcentry->opcfamily));
-               ScanKeyInit(&skey[1],
-                                       Anum_pg_amop_amoplefttype,
-                                       BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(opcentry->opcintype));
-               ScanKeyInit(&skey[2],
-                                       Anum_pg_amop_amoprighttype,
-                                       BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(opcentry->opcintype));
-               rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
-               scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
-                                                                 SnapshotNow, 3, skey);
-
-               while (HeapTupleIsValid(htup = systable_getnext(scan)))
-               {
-                       Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);
-
-                       if (amopform->amopstrategy <= 0 ||
-                               (StrategyNumber) amopform->amopstrategy > numStrats)
-                               elog(ERROR, "invalid amopstrategy number %d for opclass %u",
-                                        amopform->amopstrategy, operatorClassOid);
-                       opcentry->operatorOids[amopform->amopstrategy - 1] =
-                               amopform->amopopr;
-               }
-
-               systable_endscan(scan);
-               heap_close(rel, AccessShareLock);
-       }
-
        /*
         * Scan pg_amproc to obtain support procs for the opclass.      We only fetch
         * the default ones (those with lefttype = righttype = opcintype).
@@ -1317,24 +1320,29 @@ LookupOpclassInfo(Oid operatorClassOid,
 /*
  *             formrdesc
  *
- *             This is a special cut-down version of RelationBuildDesc()
- *             used by RelationCacheInitializePhase2() in initializing the relcache.
+ *             This is a special cut-down version of RelationBuildDesc(),
+ *             used while initializing the relcache.
  *             The relation descriptor is built just from the supplied parameters,
  *             without actually looking at any system table entries.  We cheat
  *             quite a lot since we only need to work for a few basic system
  *             catalogs.
  *
- * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
- * and pg_type (see RelationCacheInitializePhase2).
+ * formrdesc is currently used for: pg_database, pg_authid, pg_auth_members,
+ * pg_class, pg_attribute, pg_proc, and pg_type
+ * (see RelationCacheInitializePhase2/3).
  *
  * Note that these catalogs can't have constraints (except attnotnull),
  * default values, rules, or triggers, since we don't cope with any of that.
+ * (Well, actually, this only matters for properties that need to be valid
+ * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
+ * these properties matter then...)
  *
  * NOTE: we assume we are already switched into CacheMemoryContext.
  */
 static void
 formrdesc(const char *relationName, Oid relationReltype,
-                 bool hasoids, int natts, FormData_pg_attribute *att)
+                 bool isshared, bool hasoids,
+                 int natts, const FormData_pg_attribute *attrs)
 {
        Relation        relation;
        int                     i;
@@ -1344,7 +1352,6 @@ formrdesc(const char *relationName, Oid relationReltype,
         * allocate new relation desc, clear all fields of reldesc
         */
        relation = (Relation) palloc0(sizeof(RelationData));
-       relation->rd_targblock = InvalidBlockNumber;
 
        /* make sure relation is marked as having no open file yet */
        relation->rd_smgr = NULL;
@@ -1361,14 +1368,16 @@ formrdesc(const char *relationName, Oid relationReltype,
        relation->rd_isnailed = true;
        relation->rd_createSubid = InvalidSubTransactionId;
        relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
-       relation->rd_istemp = false;
+       relation->rd_backend = InvalidBackendId;
 
        /*
         * initialize relation tuple form
         *
         * The data we insert here is pretty incomplete/bogus, but it'll serve to
-        * get us launched.  RelationCacheInitializePhase2() will read the real
-        * data from pg_class and replace what we've done here.
+        * get us launched.  RelationCacheInitializePhase3() will read the real
+        * data from pg_class and replace what we've done here.  Note in
+        * particular that relowner is left as zero; this cues
+        * RelationCacheInitializePhase3 that the real data isn't there yet.
         */
        relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
 
@@ -1378,10 +1387,14 @@ formrdesc(const char *relationName, Oid relationReltype,
 
        /*
         * It's important to distinguish between shared and non-shared relations,
-        * even at bootstrap time, to make sure we know where they are stored.  At
-        * present, all relations that formrdesc is used for are not shared.
+        * even at bootstrap time, to make sure we know where they are stored.
         */
-       relation->rd_rel->relisshared = false;
+       relation->rd_rel->relisshared = isshared;
+       if (isshared)
+               relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
+
+       /* formrdesc is used only for permanent relations */
+       relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
 
        relation->rd_rel->relpages = 1;
        relation->rd_rel->reltuples = 1;
@@ -1393,8 +1406,8 @@ formrdesc(const char *relationName, Oid relationReltype,
         * initialize attribute tuple form
         *
         * Unlike the case with the relation tuple, this data had better be right
-        * because it will never be replaced.  The input values must be correctly
-        * defined by macros in src/include/catalog/ headers.
+        * because it will never be replaced.  The data comes from
+        * src/include/catalog/ headers via genbki.pl.
         */
        relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
        relation->rd_att->tdrefcount = 1;       /* mark as refcounted */
@@ -1409,9 +1422,9 @@ formrdesc(const char *relationName, Oid relationReltype,
        for (i = 0; i < natts; i++)
        {
                memcpy(relation->rd_att->attrs[i],
-                          &att[i],
-                          ATTRIBUTE_TUPLE_SIZE);
-               has_not_null |= att[i].attnotnull;
+                          &attrs[i],
+                          ATTRIBUTE_FIXED_PART_SIZE);
+               has_not_null |= attrs[i].attnotnull;
                /* make sure attcacheoff is valid */
                relation->rd_att->attrs[i]->attcacheoff = -1;
        }
@@ -1432,7 +1445,18 @@ formrdesc(const char *relationName, Oid relationReltype,
         * initialize relation id from info in att array (my, this is ugly)
         */
        RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
-       relation->rd_rel->relfilenode = RelationGetRelid(relation);
+
+       /*
+        * All relations made with formrdesc are mapped.  This is necessarily so
+        * because there is no other way to know what filenode they currently
+        * have.  In bootstrap mode, add them to the initial relation mapper data,
+        * specifying that the initial filenode is the same as the OID.
+        */
+       relation->rd_rel->relfilenode = InvalidOid;
+       if (IsBootstrapProcessingMode())
+               RelationMapUpdateMap(RelationGetRelid(relation),
+                                                        RelationGetRelid(relation),
+                                                        isshared, true);
 
        /*
         * initialize the relation lock manager information
@@ -1502,9 +1526,19 @@ RelationIdGetRelation(Oid relationId)
        if (RelationIsValid(rd))
        {
                RelationIncrementReferenceCount(rd);
-               /* revalidate nailed index if necessary */
+               /* revalidate cache entry if necessary */
                if (!rd->rd_isvalid)
-                       RelationReloadIndexInfo(rd);
+               {
+                       /*
+                        * Indexes only have a limited number of possible schema changes,
+                        * and we don't want to use the full-blown procedure because it's
+                        * a headache for indexes that reload itself depends on.
+                        */
+                       if (rd->rd_rel->relkind == RELKIND_INDEX)
+                               RelationReloadIndexInfo(rd);
+                       else
+                               RelationClearRelation(rd, true);
+               }
                return rd;
        }
 
@@ -1512,7 +1546,7 @@ RelationIdGetRelation(Oid relationId)
         * no reldesc in the cache, so have RelationBuildDesc() build one and add
         * it.
         */
-       rd = RelationBuildDesc(relationId, NULL);
+       rd = RelationBuildDesc(relationId, true);
        if (RelationIsValid(rd))
                RelationIncrementReferenceCount(rd);
        return rd;
@@ -1594,6 +1628,10 @@ RelationClose(Relation relation)
  *     RelationClearRelation just marks the entry as invalid by setting
  *     rd_isvalid to false.  This routine is called to fix the entry when it
  *     is next needed.
+ *
+ *     We assume that at the time we are called, we have at least AccessShareLock
+ *     on the target index.  (Note: in the calls from RelationClearRelation,
+ *     this is legitimate because we know the rel has positive refcount.)
  */
 static void
 RelationReloadIndexInfo(Relation relation)
@@ -1608,6 +1646,24 @@ RelationReloadIndexInfo(Relation relation)
        /* Should be closed at smgr level */
        Assert(relation->rd_smgr == NULL);
 
+       /* Must free any AM cached data upon relcache flush */
+       if (relation->rd_amcache)
+               pfree(relation->rd_amcache);
+       relation->rd_amcache = NULL;
+
+       /*
+        * If it's a shared index, we might be called before backend startup has
+        * finished selecting a database, in which case we have no way to read
+        * pg_class yet.  However, a shared index can never have any significant
+        * schema updates, so it's okay to ignore the invalidation signal.  Just
+        * mark it valid and return without doing anything more.
+        */
+       if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
+       {
+               relation->rd_isvalid = true;
+               return;
+       }
+
        /*
         * Read the pg_class row
         *
@@ -1629,12 +1685,6 @@ RelationReloadIndexInfo(Relation relation)
        heap_freetuple(pg_class_tuple);
        /* We must recalculate physical address in case it changed */
        RelationInitPhysicalAddr(relation);
-       /* Make sure targblock is reset in case rel was truncated */
-       relation->rd_targblock = InvalidBlockNumber;
-       /* Must free any AM cached data, too */
-       if (relation->rd_amcache)
-               pfree(relation->rd_amcache);
-       relation->rd_amcache = NULL;
 
        /*
         * For a non-system index, there are fields of the pg_index row that are
@@ -1649,15 +1699,18 @@ RelationReloadIndexInfo(Relation relation)
                HeapTuple       tuple;
                Form_pg_index index;
 
-               tuple = SearchSysCache(INDEXRELID,
-                                                          ObjectIdGetDatum(RelationGetRelid(relation)),
-                                                          0, 0, 0);
+               tuple = SearchSysCache1(INDEXRELID,
+                                                               ObjectIdGetDatum(RelationGetRelid(relation)));
                if (!HeapTupleIsValid(tuple))
-                               elog(ERROR, "cache lookup failed for index %u",
-                                        RelationGetRelid(relation));
+                       elog(ERROR, "cache lookup failed for index %u",
+                                RelationGetRelid(relation));
                index = (Form_pg_index) GETSTRUCT(tuple);
 
                relation->rd_index->indisvalid = index->indisvalid;
+               relation->rd_index->indcheckxmin = index->indcheckxmin;
+               relation->rd_index->indisready = index->indisready;
+               HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
+                                                          HeapTupleHeaderGetXmin(tuple->t_data));
 
                ReleaseSysCache(tuple);
        }
@@ -1667,19 +1720,78 @@ RelationReloadIndexInfo(Relation relation)
 }
 
 /*
+ * RelationDestroyRelation
+ *
+ *     Physically delete a relation cache entry and all subsidiary data.
+ *     Caller must already have unhooked the entry from the hash table.
+ */
+static void
+RelationDestroyRelation(Relation relation)
+{
+       Assert(RelationHasReferenceCountZero(relation));
+
+       /*
+        * Make sure smgr and lower levels close the relation's files, if they
+        * weren't closed already.  (This was probably done by caller, but let's
+        * just be real sure.)
+        */
+       RelationCloseSmgr(relation);
+
+       /*
+        * Free all the subsidiary data structures of the relcache entry, then the
+        * entry itself.
+        */
+       if (relation->rd_rel)
+               pfree(relation->rd_rel);
+       /* can't use DecrTupleDescRefCount here */
+       Assert(relation->rd_att->tdrefcount > 0);
+       if (--relation->rd_att->tdrefcount == 0)
+               FreeTupleDesc(relation->rd_att);
+       list_free(relation->rd_indexlist);
+       bms_free(relation->rd_indexattr);
+       FreeTriggerDesc(relation->trigdesc);
+       if (relation->rd_options)
+               pfree(relation->rd_options);
+       if (relation->rd_indextuple)
+               pfree(relation->rd_indextuple);
+       if (relation->rd_am)
+               pfree(relation->rd_am);
+       if (relation->rd_indexcxt)
+               MemoryContextDelete(relation->rd_indexcxt);
+       if (relation->rd_rulescxt)
+               MemoryContextDelete(relation->rd_rulescxt);
+       pfree(relation);
+}
+
+/*
  * RelationClearRelation
  *
  *      Physically blow away a relation cache entry, or reset it and rebuild
  *      it from scratch (that is, from catalog entries).  The latter path is
- *      usually used when we are notified of a change to an open relation
- *      (one with refcount > 0).  However, this routine just does whichever
- *      it's told to do; callers must determine which they want.
+ *      used when we are notified of a change to an open relation (one with
+ *      refcount > 0).
+ *
+ *      NB: when rebuilding, we'd better hold some lock on the relation,
+ *      else the catalog data we need to read could be changing under us.
+ *      Also, a rel to be rebuilt had better have refcnt > 0.  This is because
+ *      an sinval reset could happen while we're accessing the catalogs, and
+ *      the rel would get blown away underneath us by RelationCacheInvalidate
+ *      if it has zero refcnt.
+ *
+ *      The "rebuild" parameter is redundant in current usage because it has
+ *      to match the relation's refcnt status, but we keep it as a crosscheck
+ *      that we're doing what the caller expects.
  */
 static void
 RelationClearRelation(Relation relation, bool rebuild)
 {
-       Oid                     old_reltype = relation->rd_rel->reltype;
-       MemoryContext oldcxt;
+       /*
+        * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while of
+        * course it would be a bad idea to blow away one with nonzero refcnt.
+        */
+       Assert(rebuild ?
+                  !RelationHasReferenceCountZero(relation) :
+                  RelationHasReferenceCountZero(relation));
 
        /*
         * Make sure smgr and lower levels close the relation's files, if they
@@ -1692,9 +1804,8 @@ RelationClearRelation(Relation relation, bool rebuild)
 
        /*
         * Never, never ever blow away a nailed-in system relation, because we'd
-        * be unable to recover.  However, we must reset rd_targblock, in case we
-        * got called because of a relation cache flush that was triggered by
-        * VACUUM.
+        * be unable to recover.  However, we must redo RelationInitPhysicalAddr
+        * in case it is a mapped relation whose mapping changed.
         *
         * If it's a nailed index, then we need to re-read the pg_class row to see
         * if its relfilenode changed.  We can't necessarily do that here, because
@@ -1705,7 +1816,8 @@ RelationClearRelation(Relation relation, bool rebuild)
         */
        if (relation->rd_isnailed)
        {
-               relation->rd_targblock = InvalidBlockNumber;
+               RelationInitPhysicalAddr(relation);
+
                if (relation->rd_rel->relkind == RELKIND_INDEX)
                {
                        relation->rd_isvalid = false;           /* needs to be revalidated */
@@ -1731,39 +1843,8 @@ RelationClearRelation(Relation relation, bool rebuild)
                return;
        }
 
-       /*
-        * Remove relation from hash tables
-        *
-        * Note: we might be reinserting it momentarily, but we must not have it
-        * visible in the hash tables until it's valid again, so don't try to
-        * optimize this away...
-        */
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-       RelationCacheDelete(relation);
-       MemoryContextSwitchTo(oldcxt);
-
-       /* Clear out catcache's entries for this relation */
-       CatalogCacheFlushRelation(RelationGetRelid(relation));
-
-       /*
-        * Free all the subsidiary data structures of the relcache entry. We
-        * cannot free rd_att if we are trying to rebuild the entry, however,
-        * because pointers to it may be cached in various places. The rule
-        * manager might also have pointers into the rewrite rules. So to begin
-        * with, we can only get rid of these fields:
-        */
-       FreeTriggerDesc(relation->trigdesc);
-       if (relation->rd_indextuple)
-               pfree(relation->rd_indextuple);
-       if (relation->rd_am)
-               pfree(relation->rd_am);
-       if (relation->rd_rel)
-               pfree(relation->rd_rel);
-       if (relation->rd_options)
-               pfree(relation->rd_options);
-       list_free(relation->rd_indexlist);
-       if (relation->rd_indexcxt)
-               MemoryContextDelete(relation->rd_indexcxt);
+       /* Mark it invalid until we've finished rebuild */
+       relation->rd_isvalid = false;
 
        /*
         * If we're really done with the relcache entry, blow it away. But if
@@ -1773,84 +1854,117 @@ RelationClearRelation(Relation relation, bool rebuild)
         */
        if (!rebuild)
        {
-               /* ok to zap remaining substructure */
-               flush_rowtype_cache(old_reltype);
-               /* can't use DecrTupleDescRefCount here */
-               Assert(relation->rd_att->tdrefcount > 0);
-               if (--relation->rd_att->tdrefcount == 0)
-                       FreeTupleDesc(relation->rd_att);
-               if (relation->rd_rulescxt)
-                       MemoryContextDelete(relation->rd_rulescxt);
-               pfree(relation);
+               /* Remove it from the hash table */
+               RelationCacheDelete(relation);
+
+               /* And release storage */
+               RelationDestroyRelation(relation);
        }
        else
        {
                /*
-                * When rebuilding an open relcache entry, must preserve ref count and
-                * rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
-                * preserve the tupledesc and rewrite-rule substructures in place.
-                * (Note: the refcount mechanism for tupledescs may eventually ensure
-                * that we don't really need to preserve the tupledesc in-place, but
-                * for now there are still a lot of places that assume an open rel's
-                * tupledesc won't move.)
+                * Our strategy for rebuilding an open relcache entry is to build a
+                * new entry from scratch, swap its contents with the old entry, and
+                * finally delete the new entry (along with any infrastructure swapped
+                * over from the old entry).  This is to avoid trouble in case an
+                * error causes us to lose control partway through.  The old entry
+                * will still be marked !rd_isvalid, so we'll try to rebuild it again
+                * on next access.      Meanwhile it's not any less valid than it was
+                * before, so any code that might expect to continue accessing it
+                * isn't hurt by the rebuild failure.  (Consider for example a
+                * subtransaction that ALTERs a table and then gets cancelled partway
+                * through the cache entry rebuild.  The outer transaction should
+                * still see the not-modified cache entry as valid.)  The worst
+                * consequence of an error is leaking the necessarily-unreferenced new
+                * entry, and this shouldn't happen often enough for that to be a big
+                * problem.
+                *
+                * When rebuilding an open relcache entry, we must preserve ref count,
+                * rd_createSubid/rd_newRelfilenodeSubid, and rd_toastoid state.  Also
+                * attempt to preserve the pg_class entry (rd_rel), tupledesc, and
+                * rewrite-rule substructures in place, because various places assume
+                * that these structures won't move while they are working with an
+                * open relcache entry.  (Note: the refcount mechanism for tupledescs
+                * might someday allow us to remove this hack for the tupledesc.)
                 *
                 * Note that this process does not touch CurrentResourceOwner; which
                 * is good because whatever ref counts the entry may have do not
                 * necessarily belong to that resource owner.
                 */
+               Relation        newrel;
                Oid                     save_relid = RelationGetRelid(relation);
-               int                     old_refcnt = relation->rd_refcnt;
-               SubTransactionId old_createSubid = relation->rd_createSubid;
-               SubTransactionId old_newRelfilenodeSubid = relation->rd_newRelfilenodeSubid;
-               struct PgStat_TableStatus *old_pgstat_info = relation->pgstat_info;
-               TupleDesc       old_att = relation->rd_att;
-               RuleLock   *old_rules = relation->rd_rules;
-               MemoryContext old_rulescxt = relation->rd_rulescxt;
-
-               if (RelationBuildDesc(save_relid, relation) != relation)
+               bool            keep_tupdesc;
+               bool            keep_rules;
+
+               /* Build temporary entry, but don't link it into hashtable */
+               newrel = RelationBuildDesc(save_relid, false);
+               if (newrel == NULL)
                {
                        /* Should only get here if relation was deleted */
-                       flush_rowtype_cache(old_reltype);
-                       Assert(old_att->tdrefcount > 0);
-                       if (--old_att->tdrefcount == 0)
-                               FreeTupleDesc(old_att);
-                       if (old_rulescxt)
-                               MemoryContextDelete(old_rulescxt);
-                       pfree(relation);
+                       RelationCacheDelete(relation);
+                       RelationDestroyRelation(relation);
                        elog(ERROR, "relation %u deleted while still in use", save_relid);
                }
-               relation->rd_refcnt = old_refcnt;
-               relation->rd_createSubid = old_createSubid;
-               relation->rd_newRelfilenodeSubid = old_newRelfilenodeSubid;
-               relation->pgstat_info = old_pgstat_info;
 
-               if (equalTupleDescs(old_att, relation->rd_att))
-               {
-                       /* needn't flush typcache here */
-                       Assert(relation->rd_att->tdrefcount == 1);
-                       if (--relation->rd_att->tdrefcount == 0)
-                               FreeTupleDesc(relation->rd_att);
-                       relation->rd_att = old_att;
-               }
-               else
-               {
-                       flush_rowtype_cache(old_reltype);
-                       Assert(old_att->tdrefcount > 0);
-                       if (--old_att->tdrefcount == 0)
-                               FreeTupleDesc(old_att);
-               }
-               if (equalRuleLocks(old_rules, relation->rd_rules))
+               keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
+               keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
+
+               /*
+                * Perform swapping of the relcache entry contents.  Within this
+                * process the old entry is momentarily invalid, so there *must* be no
+                * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
+                * all-in-line code for safety.
+                *
+                * Since the vast majority of fields should be swapped, our method is
+                * to swap the whole structures and then re-swap those few fields we
+                * didn't want swapped.
+                */
+#define SWAPFIELD(fldtype, fldname) \
+               do { \
+                       fldtype _tmp = newrel->fldname; \
+                       newrel->fldname = relation->fldname; \
+                       relation->fldname = _tmp; \
+               } while (0)
+
+               /* swap all Relation struct fields */
                {
-                       if (relation->rd_rulescxt)
-                               MemoryContextDelete(relation->rd_rulescxt);
-                       relation->rd_rules = old_rules;
-                       relation->rd_rulescxt = old_rulescxt;
+                       RelationData tmpstruct;
+
+                       memcpy(&tmpstruct, newrel, sizeof(RelationData));
+                       memcpy(newrel, relation, sizeof(RelationData));
+                       memcpy(relation, &tmpstruct, sizeof(RelationData));
                }
-               else
+
+               /* rd_smgr must not be swapped, due to back-links from smgr level */
+               SWAPFIELD(SMgrRelation, rd_smgr);
+               /* rd_refcnt must be preserved */
+               SWAPFIELD(int, rd_refcnt);
+               /* isnailed shouldn't change */
+               Assert(newrel->rd_isnailed == relation->rd_isnailed);
+               /* creation sub-XIDs must be preserved */
+               SWAPFIELD(SubTransactionId, rd_createSubid);
+               SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
+               /* un-swap rd_rel pointers, swap contents instead */
+               SWAPFIELD(Form_pg_class, rd_rel);
+               /* ... but actually, we don't have to update newrel->rd_rel */
+               memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
+               /* preserve old tupledesc and rules if no logical change */
+               if (keep_tupdesc)
+                       SWAPFIELD(TupleDesc, rd_att);
+               if (keep_rules)
                {
-                       if (old_rulescxt)
-                               MemoryContextDelete(old_rulescxt);
+                       SWAPFIELD(RuleLock *, rd_rules);
+                       SWAPFIELD(MemoryContext, rd_rulescxt);
                }
+               /* toast OID override must be preserved */
+               SWAPFIELD(Oid, rd_toastoid);
+               /* pgstat_info must be preserved */
+               SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
+
+#undef SWAPFIELD
+
+               /* And now we can throw away the temporary entry */
+               RelationDestroyRelation(newrel);
        }
 }
 
@@ -1862,8 +1976,6 @@ RelationClearRelation(Relation relation, bool rebuild)
 static void
 RelationFlushRelation(Relation relation)
 {
-       bool            rebuild;
-
        if (relation->rd_createSubid != InvalidSubTransactionId ||
                relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
        {
@@ -1871,18 +1983,24 @@ RelationFlushRelation(Relation relation)
                 * New relcache entries are always rebuilt, not flushed; else we'd
                 * forget the "new" status of the relation, which is a useful
                 * optimization to have.  Ditto for the new-relfilenode status.
+                *
+                * The rel could have zero refcnt here, so temporarily increment the
+                * refcnt to ensure it's safe to rebuild it.  We can assume that the
+                * current transaction has some lock on the rel already.
                 */
-               rebuild = true;
+               RelationIncrementReferenceCount(relation);
+               RelationClearRelation(relation, true);
+               RelationDecrementReferenceCount(relation);
        }
        else
        {
                /*
                 * Pre-existing rels can be dropped from the relcache if not open.
                 */
-               rebuild = !RelationHasReferenceCountZero(relation);
-       }
+               bool            rebuild = !RelationHasReferenceCountZero(relation);
 
-       RelationClearRelation(relation, rebuild);
+               RelationClearRelation(relation, rebuild);
+       }
 }
 
 /*
@@ -1941,7 +2059,7 @@ RelationCacheInvalidateEntry(Oid relationId)
  * RelationCacheInvalidate
  *      Blow away cached relation descriptors that have zero reference counts,
  *      and rebuild those with positive reference counts.      Also reset the smgr
- *      relation cache.
+ *      relation cache and re-read relation mapping data.
  *
  *      This is currently used only to recover from SI message buffer overflow,
  *      so we do not touch new-in-transaction relations; they cannot be targets
@@ -2027,6 +2145,11 @@ RelationCacheInvalidate(void)
         */
        smgrcloseall();
 
+       /*
+        * Reload relation mapping data before starting to reconstruct cache.
+        */
+       RelationMapInvalidateAll();
+
        /* Phase 2: rebuild the items found to need rebuild in phase 1 */
        foreach(l, rebuildFirstList)
        {
@@ -2043,6 +2166,25 @@ RelationCacheInvalidate(void)
 }
 
 /*
+ * RelationCloseSmgrByOid - close a relcache entry's smgr link
+ *
+ * Needed in some cases where we are changing a relation's physical mapping.
+ * The link will be automatically reopened on next use.
+ */
+void
+RelationCloseSmgrByOid(Oid relationId)
+{
+       Relation        relation;
+
+       RelationIdCacheLookup(relationId, relation);
+
+       if (!PointerIsValid(relation))
+               return;                                 /* not in cache, nothing to do */
+
+       RelationCloseSmgr(relation);
+}
+
+/*
  * AtEOXact_RelationCache
  *
  *     Clean up the relcache at main-transaction commit or abort.
@@ -2071,7 +2213,7 @@ AtEOXact_RelationCache(bool isCommit)
         * for us to do here, so we keep a static flag that gets set if there is
         * anything to do.      (Currently, this means either a relation is created in
         * the current xact, or one is given a new relfilenode, or an index list
-        * is forced.)  For simplicity, the flag remains set till end of top-level
+        * is forced.)  For simplicity, the flag remains set till end of top-level
         * transaction, even though we could clear it at subtransaction end in
         * some cases.
         */
@@ -2187,14 +2329,14 @@ AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
                                relation->rd_createSubid = parentSubid;
                        else
                        {
-                               Assert(RelationHasReferenceCountZero(relation));
                                RelationClearRelation(relation, false);
                                continue;
                        }
                }
 
                /*
-                * Likewise, update or drop any new-relfilenode-in-subtransaction hint.
+                * Likewise, update or drop any new-relfilenode-in-subtransaction
+                * hint.
                 */
                if (relation->rd_newRelfilenodeSubid == mySubid)
                {
@@ -2217,22 +2359,6 @@ AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
        }
 }
 
-/*
- * RelationCacheMarkNewRelfilenode
- *
- *     Mark the rel as having been given a new relfilenode in the current
- *     (sub) transaction.  This is a hint that can be used to optimize
- *     later operations on the rel in the same transaction.
- */
-void
-RelationCacheMarkNewRelfilenode(Relation rel)
-{
-       /* Mark it... */
-       rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
-       /* ... and now we have eoxact cleanup work to do */
-       need_eoxact_work = true;
-}
-
 
 /*
  *             RelationBuildLocalRelation
@@ -2245,7 +2371,9 @@ RelationBuildLocalRelation(const char *relname,
                                                   TupleDesc tupDesc,
                                                   Oid relid,
                                                   Oid reltablespace,
-                                                  bool shared_relation)
+                                                  bool shared_relation,
+                                                  bool mapped_relation,
+                                                  char relpersistence)
 {
        Relation        rel;
        MemoryContext oldcxt;
@@ -2259,10 +2387,14 @@ RelationBuildLocalRelation(const char *relname,
        /*
         * check for creation of a rel that must be nailed in cache.
         *
-        * XXX this list had better match RelationCacheInitializePhase2's list.
+        * XXX this list had better match the relations specially handled in
+        * RelationCacheInitializePhase2/3.
         */
        switch (relid)
        {
+               case DatabaseRelationId:
+               case AuthIdRelationId:
+               case AuthMemRelationId:
                case RelationRelationId:
                case AttributeRelationId:
                case ProcedureRelationId:
@@ -2284,6 +2416,9 @@ RelationBuildLocalRelation(const char *relname,
                elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
                         relname, relid);
 
+       /* Shared relations had better be mapped, too */
+       Assert(mapped_relation || !shared_relation);
+
        /*
         * switch to the cache context to create the relcache entry.
         */
@@ -2297,8 +2432,6 @@ RelationBuildLocalRelation(const char *relname,
         */
        rel = (Relation) palloc0(sizeof(RelationData));
 
-       rel->rd_targblock = InvalidBlockNumber;
-
        /* make sure relation is marked as having no open file yet */
        rel->rd_smgr = NULL;
 
@@ -2314,9 +2447,6 @@ RelationBuildLocalRelation(const char *relname,
        /* must flag that we have rels created in this transaction */
        need_eoxact_work = true;
 
-       /* is it a temporary relation? */
-       rel->rd_istemp = isTempNamespace(relnamespace);
-
        /*
         * create a new tuple descriptor from the one passed in.  We do this
         * partly to copy it into the cache context, and partly because the new
@@ -2356,10 +2486,27 @@ RelationBuildLocalRelation(const char *relname,
        /* needed when bootstrapping: */
        rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
 
+       /* set up persistence; rd_backend is a function of persistence type */
+       rel->rd_rel->relpersistence = relpersistence;
+       switch (relpersistence)
+       {
+               case RELPERSISTENCE_PERMANENT:
+                       rel->rd_backend = InvalidBackendId;
+                       break;
+               case RELPERSISTENCE_TEMP:
+                       rel->rd_backend = MyBackendId;
+                       break;
+               default:
+                       elog(ERROR, "invalid relpersistence: %c", relpersistence);
+                       break;
+       }
+
        /*
         * Insert relation physical and logical identifiers (OIDs) into the right
         * places.      Note that the physical ID (relfilenode) is initially the same
-        * as the logical ID (OID).
+        * as the logical ID (OID); except that for a mapped relation, we set
+        * relfilenode to zero and rely on RelationInitPhysicalAddr to consult the
+        * map.
         */
        rel->rd_rel->relisshared = shared_relation;
 
@@ -2368,9 +2515,17 @@ RelationBuildLocalRelation(const char *relname,
        for (i = 0; i < natts; i++)
                rel->rd_att->attrs[i]->attrelid = relid;
 
-       rel->rd_rel->relfilenode = relid;
        rel->rd_rel->reltablespace = reltablespace;
 
+       if (mapped_relation)
+       {
+               rel->rd_rel->relfilenode = InvalidOid;
+               /* Add it to the active mapping information */
+               RelationMapUpdateMap(relid, relid, shared_relation, true);
+       }
+       else
+               rel->rd_rel->relfilenode = relid;
+
        RelationInitLockInfo(rel);      /* see lmgr.c */
 
        RelationInitPhysicalAddr(rel);
@@ -2396,15 +2551,126 @@ RelationBuildLocalRelation(const char *relname,
        return rel;
 }
 
+
 /*
- *             RelationCacheInitialize
+ * RelationSetNewRelfilenode
  *
- *             This initializes the relation descriptor cache.  At the time
- *             that this is invoked, we can't do database access yet (mainly
- *             because the transaction subsystem is not up); all we are doing
- *             is making an empty cache hashtable.  This must be done before
- *             starting the initialization transaction, because otherwise
- *             AtEOXact_RelationCache would crash if that transaction aborts
+ * Assign a new relfilenode (physical file name) to the relation.
+ *
+ * This allows a full rewrite of the relation to be done with transactional
+ * safety (since the filenode assignment can be rolled back).  Note however
+ * that there is no simple way to access the relation's old data for the
+ * remainder of the current transaction.  This limits the usefulness to cases
+ * such as TRUNCATE or rebuilding an index from scratch.
+ *
+ * Caller must already hold exclusive lock on the relation.
+ *
+ * The relation is marked with relfrozenxid = freezeXid (InvalidTransactionId
+ * must be passed for indexes and sequences).  This should be a lower bound on
+ * the XIDs that will be put into the new relation contents.
+ */
+void
+RelationSetNewRelfilenode(Relation relation, TransactionId freezeXid)
+{
+       Oid                     newrelfilenode;
+       RelFileNodeBackend newrnode;
+       Relation        pg_class;
+       HeapTuple       tuple;
+       Form_pg_class classform;
+
+       /* Indexes, sequences must have Invalid frozenxid; other rels must not */
+       Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
+                       relation->rd_rel->relkind == RELKIND_SEQUENCE) ?
+                  freezeXid == InvalidTransactionId :
+                  TransactionIdIsNormal(freezeXid));
+
+       /* Allocate a new relfilenode */
+       newrelfilenode = GetNewRelFileNode(relation->rd_rel->reltablespace, NULL,
+                                                                          relation->rd_rel->relpersistence);
+
+       /*
+        * Get a writable copy of the pg_class tuple for the given relation.
+        */
+       pg_class = heap_open(RelationRelationId, RowExclusiveLock);
+
+       tuple = SearchSysCacheCopy1(RELOID,
+                                                               ObjectIdGetDatum(RelationGetRelid(relation)));
+       if (!HeapTupleIsValid(tuple))
+               elog(ERROR, "could not find tuple for relation %u",
+                        RelationGetRelid(relation));
+       classform = (Form_pg_class) GETSTRUCT(tuple);
+
+       /*
+        * Create storage for the main fork of the new relfilenode.
+        *
+        * NOTE: any conflict in relfilenode value will be caught here, if
+        * GetNewRelFileNode messes up for any reason.
+        */
+       newrnode.node = relation->rd_node;
+       newrnode.node.relNode = newrelfilenode;
+       newrnode.backend = relation->rd_backend;
+       RelationCreateStorage(newrnode.node, relation->rd_rel->relpersistence);
+       smgrclosenode(newrnode);
+
+       /*
+        * Schedule unlinking of the old storage at transaction commit.
+        */
+       RelationDropStorage(relation);
+
+       /*
+        * Now update the pg_class row.  However, if we're dealing with a mapped
+        * index, pg_class.relfilenode doesn't change; instead we have to send the
+        * update to the relation mapper.
+        */
+       if (RelationIsMapped(relation))
+               RelationMapUpdateMap(RelationGetRelid(relation),
+                                                        newrelfilenode,
+                                                        relation->rd_rel->relisshared,
+                                                        false);
+       else
+               classform->relfilenode = newrelfilenode;
+
+       /* These changes are safe even for a mapped relation */
+       if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
+       {
+               classform->relpages = 0;        /* it's empty until further notice */
+               classform->reltuples = 0;
+       }
+       classform->relfrozenxid = freezeXid;
+
+       simple_heap_update(pg_class, &tuple->t_self, tuple);
+       CatalogUpdateIndexes(pg_class, tuple);
+
+       heap_freetuple(tuple);
+
+       heap_close(pg_class, RowExclusiveLock);
+
+       /*
+        * Make the pg_class row change visible, as well as the relation map
+        * change if any.  This will cause the relcache entry to get updated, too.
+        */
+       CommandCounterIncrement();
+
+       /*
+        * Mark the rel as having been given a new relfilenode in the current
+        * (sub) transaction.  This is a hint that can be used to optimize later
+        * operations on the rel in the same transaction.
+        */
+       relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
+       /* ... and now we have eoxact cleanup work to do */
+       need_eoxact_work = true;
+}
+
+
+/*
+ *             RelationCacheInitialize
+ *
+ *             This initializes the relation descriptor cache.  At the time
+ *             that this is invoked, we can't do database access yet (mainly
+ *             because the transaction subsystem is not up); all we are doing
+ *             is making an empty cache hashtable.  This must be done before
+ *             starting the initialization transaction, because otherwise
+ *             AtEOXact_RelationCache would crash if that transaction aborts
  *             before we can get the relcache set up.
  */
 
@@ -2413,17 +2679,14 @@ RelationBuildLocalRelation(const char *relname,
 void
 RelationCacheInitialize(void)
 {
-       MemoryContext oldcxt;
        HASHCTL         ctl;
 
        /*
-        * switch to cache memory context
+        * make sure cache memory context exists
         */
        if (!CacheMemoryContext)
                CreateCacheMemoryContext();
 
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-
        /*
         * create hashtable that indexes the relcache
         */
@@ -2434,29 +2697,90 @@ RelationCacheInitialize(void)
        RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
                                                                  &ctl, HASH_ELEM | HASH_FUNCTION);
 
-       MemoryContextSwitchTo(oldcxt);
+       /*
+        * relation mapper needs to be initialized too
+        */
+       RelationMapInitialize();
 }
 
 /*
  *             RelationCacheInitializePhase2
  *
- *             This is called as soon as the catcache and transaction system
- *             are functional.  At this point we can actually read data from
- *             the system catalogs.  We first try to read pre-computed relcache
- *             entries from the pg_internal.init file.  If that's missing or
- *             broken, make phony entries for the minimum set of nailed-in-cache
- *             relations.      Then (unless bootstrapping) make sure we have entries
- *             for the critical system indexes.  Once we've done all this, we
- *             have enough infrastructure to open any system catalog or use any
- *             catcache.  The last step is to rewrite pg_internal.init if needed.
+ *             This is called to prepare for access to shared catalogs during startup.
+ *             We must at least set up nailed reldescs for pg_database, pg_authid,
+ *             and pg_auth_members.  Ideally we'd like to have reldescs for their
+ *             indexes, too.  We attempt to load this information from the shared
+ *             relcache init file.  If that's missing or broken, just make phony
+ *             entries for the catalogs themselves.  RelationCacheInitializePhase3
+ *             will clean up as needed.
  */
 void
 RelationCacheInitializePhase2(void)
 {
+       MemoryContext oldcxt;
+
+       /*
+        * relation mapper needs initialized too
+        */
+       RelationMapInitializePhase2();
+
+       /*
+        * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
+        * nothing.
+        */
+       if (IsBootstrapProcessingMode())
+               return;
+
+       /*
+        * switch to cache memory context
+        */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+       /*
+        * Try to load the shared relcache cache file.  If unsuccessful, bootstrap
+        * the cache with pre-made descriptors for the critical shared catalogs.
+        */
+       if (!load_relcache_init_file(true))
+       {
+               formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
+                                 true, Natts_pg_database, Desc_pg_database);
+               formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
+                                 true, Natts_pg_authid, Desc_pg_authid);
+               formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
+                                 false, Natts_pg_auth_members, Desc_pg_auth_members);
+
+#define NUM_CRITICAL_SHARED_RELS       3       /* fix if you change list above */
+       }
+
+       MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ *             RelationCacheInitializePhase3
+ *
+ *             This is called as soon as the catcache and transaction system
+ *             are functional and we have determined MyDatabaseId.  At this point
+ *             we can actually read data from the database's system catalogs.
+ *             We first try to read pre-computed relcache entries from the local
+ *             relcache init file.  If that's missing or broken, make phony entries
+ *             for the minimum set of nailed-in-cache relations.  Then (unless
+ *             bootstrapping) make sure we have entries for the critical system
+ *             indexes.  Once we've done all this, we have enough infrastructure to
+ *             open any system catalog or use any catcache.  The last step is to
+ *             rewrite the cache files if needed.
+ */
+void
+RelationCacheInitializePhase3(void)
+{
        HASH_SEQ_STATUS status;
        RelIdCacheEnt *idhentry;
        MemoryContext oldcxt;
-       bool            needNewCacheFile = false;
+       bool            needNewCacheFile = !criticalSharedRelcachesBuilt;
+
+       /*
+        * relation mapper needs initialized too
+        */
+       RelationMapInitializePhase3();
 
        /*
         * switch to cache memory context
@@ -2464,25 +2788,25 @@ RelationCacheInitializePhase2(void)
        oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
        /*
-        * Try to load the relcache cache file.  If unsuccessful, bootstrap the
-        * cache with pre-made descriptors for the critical "nailed-in" system
+        * Try to load the local relcache cache file.  If unsuccessful, bootstrap
+        * the cache with pre-made descriptors for the critical "nailed-in" system
         * catalogs.
         */
        if (IsBootstrapProcessingMode() ||
-               !load_relcache_init_file())
+               !load_relcache_init_file(false))
        {
                needNewCacheFile = true;
 
-               formrdesc("pg_class", PG_CLASS_RELTYPE_OID,
+               formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
                                  true, Natts_pg_class, Desc_pg_class);
-               formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID,
+               formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
                                  false, Natts_pg_attribute, Desc_pg_attribute);
-               formrdesc("pg_proc", PG_PROC_RELTYPE_OID,
+               formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
                                  true, Natts_pg_proc, Desc_pg_proc);
-               formrdesc("pg_type", PG_TYPE_RELTYPE_OID,
+               formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
                                  true, Natts_pg_type, Desc_pg_type);
 
-#define NUM_CRITICAL_RELS      4       /* fix if you change list above */
+#define NUM_CRITICAL_LOCAL_RELS 4              /* fix if you change list above */
        }
 
        MemoryContextSwitchTo(oldcxt);
@@ -2518,55 +2842,92 @@ RelationCacheInitializePhase2(void)
         */
        if (!criticalRelcachesBuilt)
        {
-               Relation        ird;
-
-#define LOAD_CRIT_INDEX(indexoid) \
-               do { \
-                       ird = RelationBuildDesc((indexoid), NULL); \
-                       ird->rd_isnailed = true; \
-                       ird->rd_refcnt = 1; \
-               } while (0)
-
-               LOAD_CRIT_INDEX(ClassOidIndexId);
-               LOAD_CRIT_INDEX(AttributeRelidNumIndexId);
-               LOAD_CRIT_INDEX(IndexRelidIndexId);
-               LOAD_CRIT_INDEX(OpclassOidIndexId);
-               LOAD_CRIT_INDEX(AccessMethodStrategyIndexId);
-               LOAD_CRIT_INDEX(AccessMethodProcedureIndexId);
-               LOAD_CRIT_INDEX(OperatorOidIndexId);
-               LOAD_CRIT_INDEX(RewriteRelRulenameIndexId);
-               LOAD_CRIT_INDEX(TriggerRelidNameIndexId);
-
-#define NUM_CRITICAL_INDEXES   9               /* fix if you change list above */
+               load_critical_index(ClassOidIndexId,
+                                                       RelationRelationId);
+               load_critical_index(AttributeRelidNumIndexId,
+                                                       AttributeRelationId);
+               load_critical_index(IndexRelidIndexId,
+                                                       IndexRelationId);
+               load_critical_index(OpclassOidIndexId,
+                                                       OperatorClassRelationId);
+               load_critical_index(AccessMethodProcedureIndexId,
+                                                       AccessMethodProcedureRelationId);
+               load_critical_index(RewriteRelRulenameIndexId,
+                                                       RewriteRelationId);
+               load_critical_index(TriggerRelidNameIndexId,
+                                                       TriggerRelationId);
+
+#define NUM_CRITICAL_LOCAL_INDEXES     7       /* fix if you change list above */
 
                criticalRelcachesBuilt = true;
        }
 
        /*
+        * Process critical shared indexes too.
+        *
+        * DatabaseNameIndexId isn't critical for relcache loading, but rather for
+        * initial lookup of MyDatabaseId, without which we'll never find any
+        * non-shared catalogs at all.  Autovacuum calls InitPostgres with a
+        * database OID, so it instead depends on DatabaseOidIndexId.  We also
+        * need to nail up some indexes on pg_authid and pg_auth_members for use
+        * during client authentication.
+        */
+       if (!criticalSharedRelcachesBuilt)
+       {
+               load_critical_index(DatabaseNameIndexId,
+                                                       DatabaseRelationId);
+               load_critical_index(DatabaseOidIndexId,
+                                                       DatabaseRelationId);
+               load_critical_index(AuthIdRolnameIndexId,
+                                                       AuthIdRelationId);
+               load_critical_index(AuthIdOidIndexId,
+                                                       AuthIdRelationId);
+               load_critical_index(AuthMemMemRoleIndexId,
+                                                       AuthMemRelationId);
+
+#define NUM_CRITICAL_SHARED_INDEXES 5  /* fix if you change list above */
+
+               criticalSharedRelcachesBuilt = true;
+       }
+
+       /*
         * Now, scan all the relcache entries and update anything that might be
         * wrong in the results from formrdesc or the relcache cache file. If we
         * faked up relcache entries using formrdesc, then read the real pg_class
         * rows and replace the fake entries with them. Also, if any of the
         * relcache entries have rules or triggers, load that info the hard way
         * since it isn't recorded in the cache file.
+        *
+        * Whenever we access the catalogs to read data, there is a possibility of
+        * a shared-inval cache flush causing relcache entries to be removed.
+        * Since hash_seq_search only guarantees to still work after the *current*
+        * entry is removed, it's unsafe to continue the hashtable scan afterward.
+        * We handle this by restarting the scan from scratch after each access.
+        * This is theoretically O(N^2), but the number of entries that actually
+        * need to be fixed is small enough that it doesn't matter.
         */
        hash_seq_init(&status, RelationIdCache);
 
        while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
        {
                Relation        relation = idhentry->reldesc;
+               bool            restart = false;
+
+               /*
+                * Make sure *this* entry doesn't get flushed while we work with it.
+                */
+               RelationIncrementReferenceCount(relation);
 
                /*
                 * If it's a faked-up entry, read the real pg_class tuple.
                 */
-               if (needNewCacheFile && relation->rd_isnailed)
+               if (relation->rd_rel->relowner == InvalidOid)
                {
                        HeapTuple       htup;
                        Form_pg_class relp;
 
-                       htup = SearchSysCache(RELOID,
-                                                               ObjectIdGetDatum(RelationGetRelid(relation)),
-                                                                 0, 0, 0);
+                       htup = SearchSysCache1(RELOID,
+                                                          ObjectIdGetDatum(RelationGetRelid(relation)));
                        if (!HeapTupleIsValid(htup))
                                elog(FATAL, "cache lookup failed for relation %u",
                                         RelationGetRelid(relation));
@@ -2576,7 +2937,6 @@ RelationCacheInitializePhase2(void)
                         * Copy tuple to relation->rd_rel. (See notes in
                         * AllocateRelationDesc())
                         */
-                       Assert(relation->rd_rel != NULL);
                        memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
 
                        /* Update rd_options while we have the tuple */
@@ -2585,26 +2945,62 @@ RelationCacheInitializePhase2(void)
                        RelationParseRelOptions(relation, htup);
 
                        /*
-                        * Also update the derived fields in rd_att.
+                        * Check the values in rd_att were set up correctly.  (We cannot
+                        * just copy them over now: formrdesc must have set up the rd_att
+                        * data correctly to start with, because it may already have been
+                        * copied into one or more catcache entries.)
                         */
-                       relation->rd_att->tdtypeid = relp->reltype;
-                       relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
-                       relation->rd_att->tdhasoid = relp->relhasoids;
+                       Assert(relation->rd_att->tdtypeid == relp->reltype);
+                       Assert(relation->rd_att->tdtypmod == -1);
+                       Assert(relation->rd_att->tdhasoid == relp->relhasoids);
 
                        ReleaseSysCache(htup);
+
+                       /* relowner had better be OK now, else we'll loop forever */
+                       if (relation->rd_rel->relowner == InvalidOid)
+                               elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
+                                        RelationGetRelationName(relation));
+
+                       restart = true;
                }
 
                /*
                 * Fix data that isn't saved in relcache cache file.
+                *
+                * relhasrules or relhastriggers could possibly be wrong or out of
+                * date.  If we don't actually find any rules or triggers, clear the
+                * local copy of the flag so that we don't get into an infinite loop
+                * here.  We don't make any attempt to fix the pg_class entry, though.
                 */
                if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
+               {
                        RelationBuildRuleLock(relation);
-               if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
+                       if (relation->rd_rules == NULL)
+                               relation->rd_rel->relhasrules = false;
+                       restart = true;
+               }
+               if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
+               {
                        RelationBuildTriggers(relation);
+                       if (relation->trigdesc == NULL)
+                               relation->rd_rel->relhastriggers = false;
+                       restart = true;
+               }
+
+               /* Release hold on the relation */
+               RelationDecrementReferenceCount(relation);
+
+               /* Now, restart the hashtable scan if needed */
+               if (restart)
+               {
+                       hash_seq_term(&status);
+                       hash_seq_init(&status, RelationIdCache);
+               }
        }
 
        /*
-        * Lastly, write out a new relcache cache file if one is needed.
+        * Lastly, write out new relcache cache files if needed.  We don't bother
+        * to distinguish cases where only one of the two needs an update.
         */
        if (needNewCacheFile)
        {
@@ -2612,16 +3008,48 @@ RelationCacheInitializePhase2(void)
                 * Force all the catcaches to finish initializing and thereby open the
                 * catalogs and indexes they use.  This will preload the relcache with
                 * entries for all the most important system catalogs and indexes, so
-                * that the init file will be most useful for future backends.
+                * that the init files will be most useful for future backends.
                 */
                InitCatalogCachePhase2();
 
-               /* now write the file */
-               write_relcache_init_file();
+               /* reset initFileRelationIds list; we'll fill it during write */
+               initFileRelationIds = NIL;
+
+               /* now write the files */
+               write_relcache_init_file(true);
+               write_relcache_init_file(false);
        }
 }
 
 /*
+ * Load one critical system index into the relcache
+ *
+ * indexoid is the OID of the target index, heapoid is the OID of the catalog
+ * it belongs to.
+ */
+static void
+load_critical_index(Oid indexoid, Oid heapoid)
+{
+       Relation        ird;
+
+       /*
+        * We must lock the underlying catalog before locking the index to avoid
+        * deadlock, since RelationBuildDesc might well need to read the catalog,
+        * and if anyone else is exclusive-locking this catalog and index they'll
+        * be doing it in that order.
+        */
+       LockRelationOid(heapoid, AccessShareLock);
+       LockRelationOid(indexoid, AccessShareLock);
+       ird = RelationBuildDesc(indexoid, true);
+       if (ird == NULL)
+               elog(PANIC, "could not open critical system index %u", indexoid);
+       ird->rd_isnailed = true;
+       ird->rd_refcnt = 1;
+       UnlockRelationOid(indexoid, AccessShareLock);
+       UnlockRelationOid(heapoid, AccessShareLock);
+}
+
+/*
  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
  *
@@ -2634,7 +3062,8 @@ RelationCacheInitializePhase2(void)
  * extracting fields.
  */
 static TupleDesc
-BuildHardcodedDescriptor(int natts, Form_pg_attribute attrs, bool hasoids)
+BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
+                                                bool hasoids)
 {
        TupleDesc       result;
        MemoryContext oldcxt;
@@ -2648,7 +3077,7 @@ BuildHardcodedDescriptor(int natts, Form_pg_attribute attrs, bool hasoids)
 
        for (i = 0; i < natts; i++)
        {
-               memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_TUPLE_SIZE);
+               memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_FIXED_PART_SIZE);
                /* make sure attcacheoff is valid */
                result->attrs[i]->attcacheoff = -1;
        }
@@ -2691,6 +3120,9 @@ GetPgIndexDescriptor(void)
        return pgindexdesc;
 }
 
+/*
+ * Load any default attribute value definitions for the relation.
+ */
 static void
 AttrDefaultFetch(Relation relation)
 {
@@ -2739,8 +3171,7 @@ AttrDefaultFetch(Relation relation)
                                         RelationGetRelationName(relation));
                        else
                                attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
-                                                                DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                                                        val)));
+                                                                                                  TextDatumGetCString(val));
                        break;
                }
 
@@ -2757,6 +3188,9 @@ AttrDefaultFetch(Relation relation)
                         ndef - found, RelationGetRelationName(relation));
 }
 
+/*
+ * Load any check constraints for the relation.
+ */
 static void
 CheckConstraintFetch(Relation relation)
 {
@@ -2803,8 +3237,7 @@ CheckConstraintFetch(Relation relation)
                                 RelationGetRelationName(relation));
 
                check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
-                                                                DatumGetCString(DirectFunctionCall1(textout,
-                                                                                                                                        val)));
+                                                                                                TextDatumGetCString(val));
                found++;
        }
 
@@ -2886,7 +3319,7 @@ RelationGetIndexList(Relation relation)
 
                /* Check to see if it is a unique, non-partial btree index on OID */
                if (index->indnatts == 1 &&
-                       index->indisunique &&
+                       index->indisunique && index->indimmediate &&
                        index->indkey.values[0] == ObjectIdAttributeNumber &&
                        index->indclass.values[0] == OID_BTREE_OPS_OID &&
                        heap_attisnull(htup, Anum_pg_index_indpred))
@@ -2951,6 +3384,13 @@ insert_ordered_oid(List *list, Oid datum)
  * messages.  In practice it is only used on pg_class (see REINDEX).
  *
  * It is up to the caller to make sure the given list is correctly ordered.
+ *
+ * We deliberately do not change rd_indexattr here: even when operating
+ * with a temporary partial index list, HOT-update decisions must be made
+ * correctly with respect to the full index set.  It is up to the caller
+ * to ensure that a correct rd_indexattr set has been cached before first
+ * calling RelationSetIndexList; else a subsequent inquiry might cause a
+ * wrong rd_indexattr set to get computed and cached.
  */
 void
 RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
@@ -3036,7 +3476,7 @@ RelationGetIndexExpressions(Relation relation)
                                                          GetPgIndexDescriptor(),
                                                          &isnull);
        Assert(!isnull);
-       exprsString = DatumGetCString(DirectFunctionCall1(textout, exprsDatum));
+       exprsString = TextDatumGetCString(exprsDatum);
        result = (List *) stringToNode(exprsString);
        pfree(exprsString);
 
@@ -3046,7 +3486,7 @@ RelationGetIndexExpressions(Relation relation)
         * them to similarly-processed qual clauses, and may fail to detect valid
         * matches without this.  We don't bother with canonicalize_qual, however.
         */
-       result = (List *) eval_const_expressions((Node *) result);
+       result = (List *) eval_const_expressions(NULL, (Node *) result);
 
        /*
         * Also mark any coercion format fields as "don't care", so that the
@@ -3058,7 +3498,7 @@ RelationGetIndexExpressions(Relation relation)
        fix_opfuncids((Node *) result);
 
        /* Now save a copy of the completed tree in the relcache entry. */
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
        relation->rd_indexprs = (List *) copyObject(result);
        MemoryContextSwitchTo(oldcxt);
 
@@ -3103,7 +3543,7 @@ RelationGetIndexPredicate(Relation relation)
                                                         GetPgIndexDescriptor(),
                                                         &isnull);
        Assert(!isnull);
-       predString = DatumGetCString(DirectFunctionCall1(textout, predDatum));
+       predString = TextDatumGetCString(predDatum);
        result = (List *) stringToNode(predString);
        pfree(predString);
 
@@ -3116,7 +3556,7 @@ RelationGetIndexPredicate(Relation relation)
         * stuff involving subqueries, however, since we don't allow any in index
         * predicates.)
         */
-       result = (List *) eval_const_expressions((Node *) result);
+       result = (List *) eval_const_expressions(NULL, (Node *) result);
 
        result = (List *) canonicalize_qual((Expr *) result);
 
@@ -3133,13 +3573,222 @@ RelationGetIndexPredicate(Relation relation)
        fix_opfuncids((Node *) result);
 
        /* Now save a copy of the completed tree in the relcache entry. */
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
        relation->rd_indpred = (List *) copyObject(result);
        MemoryContextSwitchTo(oldcxt);
 
        return result;
 }
 
+/*
+ * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
+ *
+ * The result has a bit set for each attribute used anywhere in the index
+ * definitions of all the indexes on this relation.  (This includes not only
+ * simple index keys, but attributes used in expressions and partial-index
+ * predicates.)
+ *
+ * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
+ * we can include system attributes (e.g., OID) in the bitmap representation.
+ *
+ * The returned result is palloc'd in the caller's memory context and should
+ * be bms_free'd when not needed anymore.
+ */
+Bitmapset *
+RelationGetIndexAttrBitmap(Relation relation)
+{
+       Bitmapset  *indexattrs;
+       List       *indexoidlist;
+       ListCell   *l;
+       MemoryContext oldcxt;
+
+       /* Quick exit if we already computed the result. */
+       if (relation->rd_indexattr != NULL)
+               return bms_copy(relation->rd_indexattr);
+
+       /* Fast path if definitely no indexes */
+       if (!RelationGetForm(relation)->relhasindex)
+               return NULL;
+
+       /*
+        * Get cached list of index OIDs
+        */
+       indexoidlist = RelationGetIndexList(relation);
+
+       /* Fall out if no indexes (but relhasindex was set) */
+       if (indexoidlist == NIL)
+               return NULL;
+
+       /*
+        * For each index, add referenced attributes to indexattrs.
+        */
+       indexattrs = NULL;
+       foreach(l, indexoidlist)
+       {
+               Oid                     indexOid = lfirst_oid(l);
+               Relation        indexDesc;
+               IndexInfo  *indexInfo;
+               int                     i;
+
+               indexDesc = index_open(indexOid, AccessShareLock);
+
+               /* Extract index key information from the index's pg_index row */
+               indexInfo = BuildIndexInfo(indexDesc);
+
+               /* Collect simple attribute references */
+               for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
+               {
+                       int                     attrnum = indexInfo->ii_KeyAttrNumbers[i];
+
+                       if (attrnum != 0)
+                               indexattrs = bms_add_member(indexattrs,
+                                                          attrnum - FirstLowInvalidHeapAttributeNumber);
+               }
+
+               /* Collect all attributes used in expressions, too */
+               pull_varattnos((Node *) indexInfo->ii_Expressions, &indexattrs);
+
+               /* Collect all attributes in the index predicate, too */
+               pull_varattnos((Node *) indexInfo->ii_Predicate, &indexattrs);
+
+               index_close(indexDesc, AccessShareLock);
+       }
+
+       list_free(indexoidlist);
+
+       /* Now save a copy of the bitmap in the relcache entry. */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       relation->rd_indexattr = bms_copy(indexattrs);
+       MemoryContextSwitchTo(oldcxt);
+
+       /* We return our original working copy for caller to play with */
+       return indexattrs;
+}
+
+/*
+ * RelationGetExclusionInfo -- get info about index's exclusion constraint
+ *
+ * This should be called only for an index that is known to have an
+ * associated exclusion constraint.  It returns arrays (palloc'd in caller's
+ * context) of the exclusion operator OIDs, their underlying functions'
+ * OIDs, and their strategy numbers in the index's opclasses.  We cache
+ * all this information since it requires a fair amount of work to get.
+ */
+void
+RelationGetExclusionInfo(Relation indexRelation,
+                                                Oid **operators,
+                                                Oid **procs,
+                                                uint16 **strategies)
+{
+       int                     ncols = indexRelation->rd_rel->relnatts;
+       Oid                *ops;
+       Oid                *funcs;
+       uint16     *strats;
+       Relation        conrel;
+       SysScanDesc conscan;
+       ScanKeyData skey[1];
+       HeapTuple       htup;
+       bool            found;
+       MemoryContext oldcxt;
+       int                     i;
+
+       /* Allocate result space in caller context */
+       *operators = ops = (Oid *) palloc(sizeof(Oid) * ncols);
+       *procs = funcs = (Oid *) palloc(sizeof(Oid) * ncols);
+       *strategies = strats = (uint16 *) palloc(sizeof(uint16) * ncols);
+
+       /* Quick exit if we have the data cached already */
+       if (indexRelation->rd_exclstrats != NULL)
+       {
+               memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * ncols);
+               memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * ncols);
+               memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * ncols);
+               return;
+       }
+
+       /*
+        * Search pg_constraint for the constraint associated with the index. To
+        * make this not too painfully slow, we use the index on conrelid; that
+        * will hold the parent relation's OID not the index's own OID.
+        */
+       ScanKeyInit(&skey[0],
+                               Anum_pg_constraint_conrelid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(indexRelation->rd_index->indrelid));
+
+       conrel = heap_open(ConstraintRelationId, AccessShareLock);
+       conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
+                                                                SnapshotNow, 1, skey);
+       found = false;
+
+       while (HeapTupleIsValid(htup = systable_getnext(conscan)))
+       {
+               Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
+               Datum           val;
+               bool            isnull;
+               ArrayType  *arr;
+               int                     nelem;
+
+               /* We want the exclusion constraint owning the index */
+               if (conform->contype != CONSTRAINT_EXCLUSION ||
+                       conform->conindid != RelationGetRelid(indexRelation))
+                       continue;
+
+               /* There should be only one */
+               if (found)
+                       elog(ERROR, "unexpected exclusion constraint record found for rel %s",
+                                RelationGetRelationName(indexRelation));
+               found = true;
+
+               /* Extract the operator OIDS from conexclop */
+               val = fastgetattr(htup,
+                                                 Anum_pg_constraint_conexclop,
+                                                 conrel->rd_att, &isnull);
+               if (isnull)
+                       elog(ERROR, "null conexclop for rel %s",
+                                RelationGetRelationName(indexRelation));
+
+               arr = DatumGetArrayTypeP(val);  /* ensure not toasted */
+               nelem = ARR_DIMS(arr)[0];
+               if (ARR_NDIM(arr) != 1 ||
+                       nelem != ncols ||
+                       ARR_HASNULL(arr) ||
+                       ARR_ELEMTYPE(arr) != OIDOID)
+                       elog(ERROR, "conexclop is not a 1-D Oid array");
+
+               memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * ncols);
+       }
+
+       systable_endscan(conscan);
+       heap_close(conrel, AccessShareLock);
+
+       if (!found)
+               elog(ERROR, "exclusion constraint record missing for rel %s",
+                        RelationGetRelationName(indexRelation));
+
+       /* We need the func OIDs and strategy numbers too */
+       for (i = 0; i < ncols; i++)
+       {
+               funcs[i] = get_opcode(ops[i]);
+               strats[i] = get_op_opfamily_strategy(ops[i],
+                                                                                        indexRelation->rd_opfamily[i]);
+               /* shouldn't fail, since it was checked at index creation */
+               if (strats[i] == InvalidStrategy)
+                       elog(ERROR, "could not find strategy for operator %u in family %u",
+                                ops[i], indexRelation->rd_opfamily[i]);
+       }
+
+       /* Save a copy of the results in the relcache entry. */
+       oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
+       indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * ncols);
+       indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * ncols);
+       indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+       memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * ncols);
+       memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * ncols);
+       memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * ncols);
+       MemoryContextSwitchTo(oldcxt);
+}
+
 
 /*
  *     load_relcache_init_file, write_relcache_init_file
@@ -3166,7 +3815,10 @@ RelationGetIndexPredicate(Relation relation)
  *                       relation descriptors using sequential scans and write 'em to
  *                       the initialization file for use by subsequent backends.
  *
- *             We could dispense with the initialization file and just build the
+ *             As of Postgres 9.0, there is one local initialization file in each
+ *             database, plus one shared initialization file for shared catalogs.
+ *
+ *             We could dispense with the initialization files and just build the
  *             critical reldescs the hard way on every backend startup, but that
  *             slows down backend startup noticeably.
  *
@@ -3174,24 +3826,26 @@ RelationGetIndexPredicate(Relation relation)
  *             just the ones that are absolutely critical; this allows us to speed
  *             up backend startup by not having to build such entries the hard way.
  *             Presently, all the catalog and index entries that are referred to
- *             by catcaches are stored in the initialization file.
+ *             by catcaches are stored in the initialization files.
  *
  *             The same mechanism that detects when catcache and relcache entries
  *             need to be invalidated (due to catalog updates) also arranges to
- *             unlink the initialization file when its contents may be out of date.
- *             The file will then be rebuilt during the next backend startup.
+ *             unlink the initialization files when the contents may be out of date.
+ *             The files will then be rebuilt during the next backend startup.
  */
 
 /*
- * load_relcache_init_file -- attempt to load cache from the init file
+ * load_relcache_init_file -- attempt to load cache from the shared
+ * or local cache init file
  *
- * If successful, return TRUE and set criticalRelcachesBuilt to true.
+ * If successful, return TRUE and set criticalRelcachesBuilt or
+ * criticalSharedRelcachesBuilt to true.
  * If not successful, return FALSE.
  *
  * NOTE: we assume we are already switched into CacheMemoryContext.
  */
 static bool
-load_relcache_init_file(void)
+load_relcache_init_file(bool shared)
 {
        FILE       *fp;
        char            initfilename[MAXPGPATH];
@@ -3204,8 +3858,12 @@ load_relcache_init_file(void)
                                magic;
        int                     i;
 
-       snprintf(initfilename, sizeof(initfilename), "%s/%s",
-                        DatabasePath, RELCACHE_INIT_FILENAME);
+       if (shared)
+               snprintf(initfilename, sizeof(initfilename), "global/%s",
+                                RELCACHE_INIT_FILENAME);
+       else
+               snprintf(initfilename, sizeof(initfilename), "%s/%s",
+                                DatabasePath, RELCACHE_INIT_FILENAME);
 
        fp = AllocateFile(initfilename, PG_BINARY_R);
        if (fp == NULL)
@@ -3220,7 +3878,6 @@ load_relcache_init_file(void)
        rels = (Relation *) palloc(max_rels * sizeof(Relation));
        num_rels = 0;
        nailed_rels = nailed_indexes = 0;
-       initFileRelationIds = NIL;
 
        /* check for correct magic number (compatible version) */
        if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
@@ -3237,7 +3894,8 @@ load_relcache_init_file(void)
                bool            has_not_null;
 
                /* first read the relation descriptor length */
-               if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+               nread = fread(&len, 1, sizeof(len), fp);
+               if (nread != sizeof(len))
                {
                        if (nread == 0)
                                break;                  /* end of file */
@@ -3258,15 +3916,15 @@ load_relcache_init_file(void)
                rel = rels[num_rels++] = (Relation) palloc(len);
 
                /* then, read the Relation structure */
-               if ((nread = fread(rel, 1, len, fp)) != len)
+               if (fread(rel, 1, len, fp) != len)
                        goto read_failed;
 
                /* next read the relation tuple form */
-               if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+               if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                        goto read_failed;
 
                relform = (Form_pg_class) palloc(len);
-               if ((nread = fread(relform, 1, len, fp)) != len)
+               if (fread(relform, 1, len, fp) != len)
                        goto read_failed;
 
                rel->rd_rel = relform;
@@ -3283,23 +3941,23 @@ load_relcache_init_file(void)
                has_not_null = false;
                for (i = 0; i < relform->relnatts; i++)
                {
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
-                       if (len != ATTRIBUTE_TUPLE_SIZE)
+                       if (len != ATTRIBUTE_FIXED_PART_SIZE)
                                goto read_failed;
-                       if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
+                       if (fread(rel->rd_att->attrs[i], 1, len, fp) != len)
                                goto read_failed;
 
                        has_not_null |= rel->rd_att->attrs[i]->attnotnull;
                }
 
                /* next read the access method specific field */
-               if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+               if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                        goto read_failed;
                if (len > 0)
                {
                        rel->rd_options = palloc(len);
-                       if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
+                       if (fread(rel->rd_options, 1, len, fp) != len)
                                goto read_failed;
                        if (len != VARSIZE(rel->rd_options))
                                goto read_failed;               /* sanity check */
@@ -3325,7 +3983,6 @@ load_relcache_init_file(void)
                        MemoryContext indexcxt;
                        Oid                *opfamily;
                        Oid                *opcintype;
-                       Oid                *operator;
                        RegProcedure *support;
                        int                     nsupport;
                        int16      *indoption;
@@ -3335,11 +3992,11 @@ load_relcache_init_file(void)
                                nailed_indexes++;
 
                        /* next, read the pg_index tuple */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
 
                        rel->rd_indextuple = (HeapTuple) palloc(len);
-                       if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
+                       if (fread(rel->rd_indextuple, 1, len, fp) != len)
                                goto read_failed;
 
                        /* Fix up internal pointers in the tuple -- see heap_copytuple */
@@ -3347,11 +4004,11 @@ load_relcache_init_file(void)
                        rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
 
                        /* next, read the access method tuple form */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
 
                        am = (Form_pg_am) palloc(len);
-                       if ((nread = fread(am, 1, len, fp)) != len)
+                       if (fread(am, 1, len, fp) != len)
                                goto read_failed;
                        rel->rd_am = am;
 
@@ -3367,50 +4024,40 @@ load_relcache_init_file(void)
                        rel->rd_indexcxt = indexcxt;
 
                        /* next, read the vector of opfamily OIDs */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
 
                        opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
-                       if ((nread = fread(opfamily, 1, len, fp)) != len)
+                       if (fread(opfamily, 1, len, fp) != len)
                                goto read_failed;
 
                        rel->rd_opfamily = opfamily;
 
                        /* next, read the vector of opcintype OIDs */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
 
                        opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
-                       if ((nread = fread(opcintype, 1, len, fp)) != len)
+                       if (fread(opcintype, 1, len, fp) != len)
                                goto read_failed;
 
                        rel->rd_opcintype = opcintype;
 
-                       /* next, read the vector of operator OIDs */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
-                               goto read_failed;
-
-                       operator = (Oid *) MemoryContextAlloc(indexcxt, len);
-                       if ((nread = fread(operator, 1, len, fp)) != len)
-                               goto read_failed;
-
-                       rel->rd_operator = operator;
-
-                       /* next, read the vector of support procedures */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       /* next, read the vector of support procedure OIDs */
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
                        support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
-                       if ((nread = fread(support, 1, len, fp)) != len)
+                       if (fread(support, 1, len, fp) != len)
                                goto read_failed;
 
                        rel->rd_support = support;
 
                        /* finally, read the vector of indoption values */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
 
                        indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
-                       if ((nread = fread(indoption, 1, len, fp)) != len)
+                       if (fread(indoption, 1, len, fp) != len)
                                goto read_failed;
 
                        rel->rd_indoption = indoption;
@@ -3435,7 +4082,6 @@ load_relcache_init_file(void)
                        Assert(rel->rd_aminfo == NULL);
                        Assert(rel->rd_opfamily == NULL);
                        Assert(rel->rd_opcintype == NULL);
-                       Assert(rel->rd_operator == NULL);
                        Assert(rel->rd_support == NULL);
                        Assert(rel->rd_supportinfo == NULL);
                        Assert(rel->rd_indoption == NULL);
@@ -3444,27 +4090,30 @@ load_relcache_init_file(void)
                /*
                 * Rules and triggers are not saved (mainly because the internal
                 * format is complex and subject to change).  They must be rebuilt if
-                * needed by RelationCacheInitializePhase2.  This is not expected to
+                * needed by RelationCacheInitializePhase3.  This is not expected to
                 * be a big performance hit since few system catalogs have such. Ditto
-                * for index expressions and predicates.
+                * for index expressions, predicates, and exclusion info.
                 */
                rel->rd_rules = NULL;
                rel->rd_rulescxt = NULL;
                rel->trigdesc = NULL;
                rel->rd_indexprs = NIL;
                rel->rd_indpred = NIL;
+               rel->rd_exclops = NULL;
+               rel->rd_exclprocs = NULL;
+               rel->rd_exclstrats = NULL;
 
                /*
                 * Reset transient-state fields in the relcache entry
                 */
                rel->rd_smgr = NULL;
-               rel->rd_targblock = InvalidBlockNumber;
                if (rel->rd_isnailed)
                        rel->rd_refcnt = 1;
                else
                        rel->rd_refcnt = 0;
                rel->rd_indexvalid = 0;
                rel->rd_indexlist = NIL;
+               rel->rd_indexattr = NULL;
                rel->rd_oidindex = InvalidOid;
                rel->rd_createSubid = InvalidSubTransactionId;
                rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
@@ -3485,9 +4134,18 @@ load_relcache_init_file(void)
         * get the right number of nailed items?  (This is a useful crosscheck in
         * case the set of critical rels or indexes changes.)
         */
-       if (nailed_rels != NUM_CRITICAL_RELS ||
-               nailed_indexes != NUM_CRITICAL_INDEXES)
-               goto read_failed;
+       if (shared)
+       {
+               if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
+                       nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
+                       goto read_failed;
+       }
+       else
+       {
+               if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
+                       nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
+                       goto read_failed;
+       }
 
        /*
         * OK, all appears well.
@@ -3498,14 +4156,18 @@ load_relcache_init_file(void)
        {
                RelationCacheInsert(rels[relno]);
                /* also make a list of their OIDs, for RelationIdIsInInitFile */
-               initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
-                                                                               initFileRelationIds);
+               if (!shared)
+                       initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
+                                                                                       initFileRelationIds);
        }
 
        pfree(rels);
        FreeFile(fp);
 
-       criticalRelcachesBuilt = true;
+       if (shared)
+               criticalSharedRelcachesBuilt = true;
+       else
+               criticalRelcachesBuilt = true;
        return true;
 
        /*
@@ -3522,10 +4184,10 @@ read_failed:
 
 /*
  * Write out a new initialization file with the current contents
- * of the relcache.
+ * of the relcache (either shared rels or local rels, as indicated).
  */
 static void
-write_relcache_init_file(void)
+write_relcache_init_file(bool shared)
 {
        FILE       *fp;
        char            tempfilename[MAXPGPATH];
@@ -3541,10 +4203,20 @@ write_relcache_init_file(void)
         * another backend starting at about the same time might crash trying to
         * read the partially-complete file.
         */
-       snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
-                        DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
-       snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
-                        DatabasePath, RELCACHE_INIT_FILENAME);
+       if (shared)
+       {
+               snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
+                                RELCACHE_INIT_FILENAME, MyProcPid);
+               snprintf(finalfilename, sizeof(finalfilename), "global/%s",
+                                RELCACHE_INIT_FILENAME);
+       }
+       else
+       {
+               snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
+                                DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
+               snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
+                                DatabasePath, RELCACHE_INIT_FILENAME);
+       }
 
        unlink(tempfilename);           /* in case it exists w/wrong permissions */
 
@@ -3572,17 +4244,19 @@ write_relcache_init_file(void)
                elog(FATAL, "could not write init file");
 
        /*
-        * Write all the reldescs (in no particular order).
+        * Write all the appropriate reldescs (in no particular order).
         */
        hash_seq_init(&status, RelationIdCache);
 
-       initFileRelationIds = NIL;
-
        while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
        {
                Relation        rel = idhentry->reldesc;
                Form_pg_class relform = rel->rd_rel;
 
+               /* ignore if not correct group */
+               if (relform->relisshared != shared)
+                       continue;
+
                /* first write the relcache entry proper */
                write_item(rel, sizeof(RelationData), fp);
 
@@ -3592,7 +4266,7 @@ write_relcache_init_file(void)
                /* next, do all the attribute tuple form data entries */
                for (i = 0; i < relform->relnatts; i++)
                {
-                       write_item(rel->rd_att->attrs[i], ATTRIBUTE_TUPLE_SIZE, fp);
+                       write_item(rel->rd_att->attrs[i], ATTRIBUTE_FIXED_PART_SIZE, fp);
                }
 
                /* next, do the access method specific field */
@@ -3624,12 +4298,7 @@ write_relcache_init_file(void)
                                           relform->relnatts * sizeof(Oid),
                                           fp);
 
-                       /* next, write the vector of operator OIDs */
-                       write_item(rel->rd_operator,
-                                          relform->relnatts * (am->amstrategies * sizeof(Oid)),
-                                          fp);
-
-                       /* next, write the vector of support procedures */
+                       /* next, write the vector of support procedure OIDs */
                        write_item(rel->rd_support,
                                  relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
                                           fp);
@@ -3641,10 +4310,13 @@ write_relcache_init_file(void)
                }
 
                /* also make a list of their OIDs, for RelationIdIsInInitFile */
-               oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-               initFileRelationIds = lcons_oid(RelationGetRelid(rel),
-                                                                               initFileRelationIds);
-               MemoryContextSwitchTo(oldcxt);
+               if (!shared)
+               {
+                       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+                       initFileRelationIds = lcons_oid(RelationGetRelid(rel),
+                                                                                       initFileRelationIds);
+                       MemoryContextSwitchTo(oldcxt);
+               }
        }
 
        if (FreeFile(fp))
@@ -3705,7 +4377,7 @@ write_item(const void *data, Size len, FILE *fp)
 
 /*
  * Detect whether a given relation (identified by OID) is one of the ones
- * we store in the init file.
+ * we store in the local relcache init file.
  *
  * Note that we effectively assume that all backends running in a database
  * would choose to store the same set of relations in the init file;
@@ -3721,7 +4393,7 @@ RelationIdIsInInitFile(Oid relationId)
 /*
  * Invalidate (remove) the init file during commit of a transaction that
  * changed one or more of the relation cache entries that are kept in the
- * init file.
+ * local init file.
  *
  * We actually need to remove the init file twice: once just before sending
  * the SI messages that include relcache inval for such relations, and once
@@ -3736,6 +4408,13 @@ RelationIdIsInInitFile(Oid relationId)
  *
  * Ignore any failure to unlink the file, since it might not be there if
  * no backend has been started since the last removal.
+ *
+ * Notice this deals only with the local init file, not the shared init file.
+ * The reason is that there can never be a "significant" change to the
+ * relcache entry of a shared relation; the most that could happen is
+ * updates of noncritical fields such as relpages/reltuples.  So, while
+ * it's worth updating the shared init file from time to time, it can never
+ * be invalid enough to make it necessary to remove it.
  */
 void
 RelationCacheInitFileInvalidate(bool beforeSend)
@@ -3767,23 +4446,94 @@ RelationCacheInitFileInvalidate(bool beforeSend)
 }
 
 /*
- * Remove the init file for a given database during postmaster startup.
+ * Remove the init files during postmaster startup.
  *
- * We used to keep the init file across restarts, but that is unsafe in PITR
+ * We used to keep the init files across restarts, but that is unsafe in PITR
  * scenarios, and even in simple crash-recovery cases there are windows for
- * the init file to become out-of-sync with the database.  So now we just
- * remove it during startup and expect the first backend launch to rebuild it.
- * Of course, this has to happen in each database of the cluster.  For
- * simplicity this is driven by flatfiles.c, which has to scan pg_database
- * anyway.
+ * the init files to become out-of-sync with the database.     So now we just
+ * remove them during startup and expect the first backend launch to rebuild
+ * them.  Of course, this has to happen in each database of the cluster.
  */
 void
-RelationCacheInitFileRemove(const char *dbPath)
+RelationCacheInitFileRemove(void)
+{
+       const char *tblspcdir = "pg_tblspc";
+       DIR                *dir;
+       struct dirent *de;
+       char            path[MAXPGPATH];
+
+       /*
+        * We zap the shared cache file too.  In theory it can't get out of sync
+        * enough to be a problem, but in data-corruption cases, who knows ...
+        */
+       snprintf(path, sizeof(path), "global/%s",
+                        RELCACHE_INIT_FILENAME);
+       unlink_initfile(path);
+
+       /* Scan everything in the default tablespace */
+       RelationCacheInitFileRemoveInDir("base");
+
+       /* Scan the tablespace link directory to find non-default tablespaces */
+       dir = AllocateDir(tblspcdir);
+       if (dir == NULL)
+       {
+               elog(LOG, "could not open tablespace link directory \"%s\": %m",
+                        tblspcdir);
+               return;
+       }
+
+       while ((de = ReadDir(dir, tblspcdir)) != NULL)
+       {
+               if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
+               {
+                       /* Scan the tablespace dir for per-database dirs */
+                       snprintf(path, sizeof(path), "%s/%s/%s",
+                                        tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
+                       RelationCacheInitFileRemoveInDir(path);
+               }
+       }
+
+       FreeDir(dir);
+}
+
+/* Process one per-tablespace directory for RelationCacheInitFileRemove */
+static void
+RelationCacheInitFileRemoveInDir(const char *tblspcpath)
 {
+       DIR                *dir;
+       struct dirent *de;
        char            initfilename[MAXPGPATH];
 
-       snprintf(initfilename, sizeof(initfilename), "%s/%s",
-                        dbPath, RELCACHE_INIT_FILENAME);
-       unlink(initfilename);
-       /* ignore any error, since it might not be there at all */
+       /* Scan the tablespace directory to find per-database directories */
+       dir = AllocateDir(tblspcpath);
+       if (dir == NULL)
+       {
+               elog(LOG, "could not open tablespace directory \"%s\": %m",
+                        tblspcpath);
+               return;
+       }
+
+       while ((de = ReadDir(dir, tblspcpath)) != NULL)
+       {
+               if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
+               {
+                       /* Try to remove the init file in each database */
+                       snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
+                                        tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
+                       unlink_initfile(initfilename);
+               }
+       }
+
+       FreeDir(dir);
+}
+
+static void
+unlink_initfile(const char *initfilename)
+{
+       if (unlink(initfilename) < 0)
+       {
+               /* It might not be there, but log any error other than ENOENT */
+               if (errno != ENOENT)
+                       elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
+       }
 }