OSDN Git Service

Generalize concept of temporary relations to "relation persistence".
[pg-rex/syncrep.git] / src / backend / utils / cache / relcache.c
index 1cac276..1509686 100644 (file)
@@ -3,19 +3,20 @@
  * relcache.c
  *       POSTGRES relation descriptor cache code
  *
- * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.283 2009/01/26 19:41:06 alvherre Exp $
+ *       src/backend/utils/cache/relcache.c
  *
  *-------------------------------------------------------------------------
  */
 /*
  * INTERFACE ROUTINES
  *             RelationCacheInitialize                 - initialize relcache (to empty)
- *             RelationCacheInitializePhase2   - finish initializing relcache
+ *             RelationCacheInitializePhase2   - initialize shared-catalog entries
+ *             RelationCacheInitializePhase3   - finish initializing relcache
  *             RelationIdGetRelation                   - get a reldesc by relation id
  *             RelationClose                                   - close an open relation
  *
 #include <unistd.h>
 
 #include "access/genam.h"
-#include "access/heapam.h"
 #include "access/reloptions.h"
 #include "access/sysattr.h"
+#include "access/transam.h"
 #include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/index.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
-#include "catalog/pg_amop.h"
 #include "catalog/pg_amproc.h"
 #include "catalog/pg_attrdef.h"
 #include "catalog/pg_authid.h"
+#include "catalog/pg_auth_members.h"
 #include "catalog/pg_constraint.h"
+#include "catalog/pg_database.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_rewrite.h"
+#include "catalog/pg_tablespace.h"
+#include "catalog/pg_trigger.h"
 #include "catalog/pg_type.h"
+#include "catalog/schemapg.h"
+#include "catalog/storage.h"
 #include "commands/trigger.h"
 #include "miscadmin.h"
 #include "optimizer/clauses.h"
 #include "storage/fd.h"
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
+#include "utils/array.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/inval.h"
+#include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/relcache.h"
+#include "utils/relmapper.h"
 #include "utils/resowner.h"
 #include "utils/syscache.h"
 #include "utils/tqual.h"
-#include "utils/typcache.h"
 
 
 /*
- * name of relcache init file, used to speed up backend startup
+ *             name of relcache init file(s), used to speed up backend startup
  */
 #define RELCACHE_INIT_FILENAME "pg_internal.init"
 
-#define RELCACHE_INIT_FILEMAGIC                0x573264        /* version ID value */
+#define RELCACHE_INIT_FILEMAGIC                0x573266        /* version ID value */
 
 /*
- *             hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
+ *             hardcoded tuple descriptors, contents generated by genbki.pl
  */
-static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
-static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
-static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
-static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
-static FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
+static const FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
+static const FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
+static const FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
+static const FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
+static const FormData_pg_attribute Desc_pg_database[Natts_pg_database] = {Schema_pg_database};
+static const FormData_pg_attribute Desc_pg_authid[Natts_pg_authid] = {Schema_pg_authid};
+static const FormData_pg_attribute Desc_pg_auth_members[Natts_pg_auth_members] = {Schema_pg_auth_members};
+static const FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
 
 /*
  *             Hash tables that index the relation cache
@@ -106,6 +117,12 @@ static HTAB *RelationIdCache;
 bool           criticalRelcachesBuilt = false;
 
 /*
+ * This flag is false until we have prepared the critical relcache entries
+ * for shared catalogs (which are the tables needed for login).
+ */
+bool           criticalSharedRelcachesBuilt = false;
+
+/*
  * This counter counts relcache inval events received since backend startup
  * (but only for rels that are actually in cache).     Presently, we use it only
  * to detect whether data about to be written by write_relcache_init_file()
@@ -114,8 +131,10 @@ bool               criticalRelcachesBuilt = false;
 static long relcacheInvalsReceived = 0L;
 
 /*
- * This list remembers the OIDs of the relations cached in the relcache
- * init file.
+ * This list remembers the OIDs of the non-shared relations cached in the
+ * database's local relcache init file.  Note that there is no corresponding
+ * list for the shared relcache init file, for reasons explained in the
+ * comments for RelationCacheInitFileRemove.
  */
 static List *initFileRelationIds = NIL;
 
@@ -133,8 +152,7 @@ do { \
        RelIdCacheEnt *idhentry; bool found; \
        idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
                                                                                   (void *) &(RELATION->rd_id), \
-                                                                                  HASH_ENTER, \
-                                                                                  &found); \
+                                                                                  HASH_ENTER, &found); \
        /* used to give notice if found -- now just keep quiet */ \
        idhentry->reldesc = RELATION; \
 } while(0)
@@ -143,7 +161,8 @@ do { \
 do { \
        RelIdCacheEnt *hentry; \
        hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
-                                                                                (void *) &(ID), HASH_FIND,NULL); \
+                                                                                (void *) &(ID), \
+                                                                                HASH_FIND, NULL); \
        if (hentry) \
                RELATION = hentry->reldesc; \
        else \
@@ -164,19 +183,17 @@ do { \
 /*
  * Special cache for opclass-related information
  *
- * Note: only default operators and support procs get cached, ie, those with
+ * Note: only default support procs get cached, ie, those with
  * lefttype = righttype = opcintype.
  */
 typedef struct opclasscacheent
 {
        Oid                     opclassoid;             /* lookup key: OID of opclass */
        bool            valid;                  /* set TRUE after successful fill-in */
-       StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
        StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
        Oid                     opcfamily;              /* OID of opclass's family */
        Oid                     opcintype;              /* OID of opclass's declared input type */
-       Oid                *operatorOids;       /* strategy operators' OIDs */
-       RegProcedure *supportProcs; /* support procs */
+       RegProcedure *supportProcs; /* OIDs of support procedures */
 } OpClassCacheEnt;
 
 static HTAB *OpClassCache = NULL;
@@ -184,39 +201,41 @@ static HTAB *OpClassCache = NULL;
 
 /* non-export function prototypes */
 
+static void RelationDestroyRelation(Relation relation);
 static void RelationClearRelation(Relation relation, bool rebuild);
 
 static void RelationReloadIndexInfo(Relation relation);
 static void RelationFlushRelation(Relation relation);
-static bool load_relcache_init_file(void);
-static void write_relcache_init_file(void);
+static bool load_relcache_init_file(bool shared);
+static void write_relcache_init_file(bool shared);
 static void write_item(const void *data, Size len, FILE *fp);
 
 static void formrdesc(const char *relationName, Oid relationReltype,
-                 bool hasoids, int natts, FormData_pg_attribute *att);
+                 bool isshared, bool hasoids,
+                 int natts, const FormData_pg_attribute *attrs);
 
 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
-static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
+static Relation AllocateRelationDesc(Form_pg_class relp);
 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
 static void RelationBuildTupleDesc(Relation relation);
-static Relation RelationBuildDesc(Oid targetRelId, Relation oldrelation);
+static Relation RelationBuildDesc(Oid targetRelId, bool insertIt);
 static void RelationInitPhysicalAddr(Relation relation);
+static void load_critical_index(Oid indexoid, Oid heapoid);
 static TupleDesc GetPgClassDescriptor(void);
 static TupleDesc GetPgIndexDescriptor(void);
 static void AttrDefaultFetch(Relation relation);
 static void CheckConstraintFetch(Relation relation);
 static List *insert_ordered_oid(List *list, Oid datum);
 static void IndexSupportInitialize(oidvector *indclass,
-                                          Oid *indexOperator,
                                           RegProcedure *indexSupport,
                                           Oid *opFamily,
                                           Oid *opcInType,
-                                          StrategyNumber maxStrategyNumber,
                                           StrategyNumber maxSupportNumber,
                                           AttrNumber maxAttributeNumber);
 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
-                                 StrategyNumber numStrats,
                                  StrategyNumber numSupport);
+static void RelationCacheInitFileRemoveInDir(const char *tblspcpath);
+static void unlink_initfile(const char *initfilename);
 
 
 /*
@@ -240,6 +259,15 @@ ScanPgRelation(Oid targetRelId, bool indexOK)
        ScanKeyData key[1];
 
        /*
+        * If something goes wrong during backend startup, we might find ourselves
+        * trying to read pg_class before we've selected a database.  That ain't
+        * gonna work, so bail out with a useful error message.  If this happens,
+        * it probably means a relcache entry that needs to be nailed isn't.
+        */
+       if (!OidIsValid(MyDatabaseId))
+               elog(FATAL, "cannot read pg_class without having selected a database");
+
+       /*
         * form a scan key
         */
        ScanKeyInit(&key[0],
@@ -278,15 +306,12 @@ ScanPgRelation(Oid targetRelId, bool indexOK)
  *             AllocateRelationDesc
  *
  *             This is used to allocate memory for a new relation descriptor
- *             and initialize the rd_rel field.
- *
- *             If 'relation' is NULL, allocate a new RelationData object.
- *             If not, reuse the given object (that path is taken only when
- *             we have to rebuild a relcache entry during RelationClearRelation).
+ *             and initialize the rd_rel field from the given pg_class tuple.
  */
 static Relation
-AllocateRelationDesc(Relation relation, Form_pg_class relp)
+AllocateRelationDesc(Form_pg_class relp)
 {
+       Relation        relation;
        MemoryContext oldcxt;
        Form_pg_class relationForm;
 
@@ -294,18 +319,9 @@ AllocateRelationDesc(Relation relation, Form_pg_class relp)
        oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
        /*
-        * allocate space for new relation descriptor, if needed
-        */
-       if (relation == NULL)
-               relation = (Relation) palloc(sizeof(RelationData));
-
-       /*
-        * clear all fields of reldesc
+        * allocate and zero space for new relation descriptor
         */
-       MemSet(relation, 0, sizeof(RelationData));
-       relation->rd_targblock = InvalidBlockNumber;
-       relation->rd_fsm_nblocks = InvalidBlockNumber;
-       relation->rd_vm_nblocks = InvalidBlockNumber;
+       relation = (Relation) palloc0(sizeof(RelationData));
 
        /* make sure relation is marked as having no open file yet */
        relation->rd_smgr = NULL;
@@ -360,7 +376,6 @@ RelationParseRelOptions(Relation relation, HeapTuple tuple)
        {
                case RELKIND_RELATION:
                case RELKIND_TOASTVALUE:
-               case RELKIND_UNCATALOGED:
                case RELKIND_INDEX:
                        break;
                default:
@@ -377,12 +392,18 @@ RelationParseRelOptions(Relation relation, HeapTuple tuple)
                                                                relation->rd_rel->relkind == RELKIND_INDEX ?
                                                                relation->rd_am->amoptions : InvalidOid);
 
-       /* Copy parsed data into CacheMemoryContext */
+       /*
+        * Copy parsed data into CacheMemoryContext.  To guard against the
+        * possibility of leaks in the reloptions code, we want to do the actual
+        * parsing in the caller's memory context and copy the results into
+        * CacheMemoryContext after the fact.
+        */
        if (options)
        {
                relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
                                                                                                  VARSIZE(options));
                memcpy(relation->rd_options, options, VARSIZE(options));
+               pfree(options);
        }
 }
 
@@ -639,7 +660,6 @@ RelationBuildRuleLock(Relation relation)
                rule->attrno = rewrite_form->ev_attr;
                rule->enabled = rewrite_form->ev_enabled;
                rule->isInstead = rewrite_form->is_instead;
-               rule->is_auto = rewrite_form->is_auto;
 
                /*
                 * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
@@ -763,8 +783,6 @@ equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
                                return false;
                        if (!equal(rule1->actions, rule2->actions))
                                return false;
-                       if(rule1->is_auto != rule2->is_auto)
-                               return false;
                }
        }
        else if (rlock2 != NULL)
@@ -776,24 +794,22 @@ equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
 /*
  *             RelationBuildDesc
  *
- *             Build a relation descriptor --- either a new one, or by
- *             recycling the given old relation object.  The latter case
- *             supports rebuilding a relcache entry without invalidating
- *             pointers to it.  The caller must hold at least
+ *             Build a relation descriptor.  The caller must hold at least
  *             AccessShareLock on the target relid.
  *
+ *             The new descriptor is inserted into the hash table if insertIt is true.
+ *
  *             Returns NULL if no pg_class row could be found for the given relid
  *             (suggesting we are trying to access a just-deleted relation).
  *             Any other error is reported via elog.
  */
 static Relation
-RelationBuildDesc(Oid targetRelId, Relation oldrelation)
+RelationBuildDesc(Oid targetRelId, bool insertIt)
 {
        Relation        relation;
        Oid                     relid;
        HeapTuple       pg_class_tuple;
        Form_pg_class relp;
-       MemoryContext oldcxt;
 
        /*
         * find the tuple in pg_class corresponding to the given relation id
@@ -811,12 +827,13 @@ RelationBuildDesc(Oid targetRelId, Relation oldrelation)
         */
        relid = HeapTupleGetOid(pg_class_tuple);
        relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
+       Assert(relid == targetRelId);
 
        /*
         * allocate storage for the relation descriptor, and copy pg_class_tuple
         * to relation->rd_rel.
         */
-       relation = AllocateRelationDesc(oldrelation, relp);
+       relation = AllocateRelationDesc(relp);
 
        /*
         * initialize the relation's relation id (relation->rd_id)
@@ -832,7 +849,31 @@ RelationBuildDesc(Oid targetRelId, Relation oldrelation)
        relation->rd_isnailed = false;
        relation->rd_createSubid = InvalidSubTransactionId;
        relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
-       relation->rd_istemp = isTempOrToastNamespace(relation->rd_rel->relnamespace);
+       switch (relation->rd_rel->relpersistence)
+       {
+               case RELPERSISTENCE_PERMANENT:
+                       relation->rd_backend = InvalidBackendId;
+                       break;
+               case RELPERSISTENCE_TEMP:
+                       if (isTempOrToastNamespace(relation->rd_rel->relnamespace))
+                               relation->rd_backend = MyBackendId;
+                       else
+                       {
+                               /*
+                                * If it's a local temp table, but not one of ours, we have to
+                                * use the slow, grotty method to figure out the owning
+                                * backend.
+                                */
+                               relation->rd_backend =
+                                       GetTempNamespaceBackendId(relation->rd_rel->relnamespace);
+                               Assert(relation->rd_backend != InvalidBackendId);
+                       }
+                       break;
+               default:
+                       elog(ERROR, "invalid relpersistence: %c",
+                                relation->rd_rel->relpersistence);
+                       break;
+       }
 
        /*
         * initialize the tuple descriptor (relation->rd_att).
@@ -883,11 +924,10 @@ RelationBuildDesc(Oid targetRelId, Relation oldrelation)
        heap_freetuple(pg_class_tuple);
 
        /*
-        * Insert newly created relation into relcache hash tables.
+        * Insert newly created relation into relcache hash table, if requested.
         */
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-       RelationCacheInsert(relation);
-       MemoryContextSwitchTo(oldcxt);
+       if (insertIt)
+               RelationCacheInsert(relation);
 
        /* It's fully valid */
        relation->rd_isvalid = true;
@@ -897,6 +937,10 @@ RelationBuildDesc(Oid targetRelId, Relation oldrelation)
 
 /*
  * Initialize the physical addressing info (RelFileNode) for a relcache entry
+ *
+ * Note: at the physical level, relations in the pg_global tablespace must
+ * be treated as shared, even if relisshared isn't set.  Hence we do not
+ * look at relisshared here.
  */
 static void
 RelationInitPhysicalAddr(Relation relation)
@@ -905,11 +949,22 @@ RelationInitPhysicalAddr(Relation relation)
                relation->rd_node.spcNode = relation->rd_rel->reltablespace;
        else
                relation->rd_node.spcNode = MyDatabaseTableSpace;
-       if (relation->rd_rel->relisshared)
+       if (relation->rd_node.spcNode == GLOBALTABLESPACE_OID)
                relation->rd_node.dbNode = InvalidOid;
        else
                relation->rd_node.dbNode = MyDatabaseId;
-       relation->rd_node.relNode = relation->rd_rel->relfilenode;
+       if (relation->rd_rel->relfilenode)
+               relation->rd_node.relNode = relation->rd_rel->relfilenode;
+       else
+       {
+               /* Consult the relation mapper */
+               relation->rd_node.relNode =
+                       RelationMapOidToFilenode(relation->rd_id,
+                                                                        relation->rd_rel->relisshared);
+               if (!OidIsValid(relation->rd_node.relNode))
+                       elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
+                                RelationGetRelationName(relation), relation->rd_id);
+       }
 }
 
 /*
@@ -928,7 +983,6 @@ RelationInitIndexAccessInfo(Relation relation)
        MemoryContext indexcxt;
        MemoryContext oldcontext;
        int                     natts;
-       uint16          amstrategies;
        uint16          amsupport;
 
        /*
@@ -936,9 +990,8 @@ RelationInitIndexAccessInfo(Relation relation)
         * contains variable-length and possibly-null fields, we have to do this
         * honestly rather than just treating it as a Form_pg_index struct.
         */
-       tuple = SearchSysCache(INDEXRELID,
-                                                  ObjectIdGetDatum(RelationGetRelid(relation)),
-                                                  0, 0, 0);
+       tuple = SearchSysCache1(INDEXRELID,
+                                                       ObjectIdGetDatum(RelationGetRelid(relation)));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for index %u",
                         RelationGetRelid(relation));
@@ -951,9 +1004,7 @@ RelationInitIndexAccessInfo(Relation relation)
        /*
         * Make a copy of the pg_am entry for the index's access method
         */
-       tuple = SearchSysCache(AMOID,
-                                                  ObjectIdGetDatum(relation->rd_rel->relam),
-                                                  0, 0, 0);
+       tuple = SearchSysCache1(AMOID, ObjectIdGetDatum(relation->rd_rel->relam));
        if (!HeapTupleIsValid(tuple))
                elog(ERROR, "cache lookup failed for access method %u",
                         relation->rd_rel->relam);
@@ -966,7 +1017,6 @@ RelationInitIndexAccessInfo(Relation relation)
        if (natts != relation->rd_index->indnatts)
                elog(ERROR, "relnatts disagrees with indnatts for index %u",
                         RelationGetRelid(relation));
-       amstrategies = aform->amstrategies;
        amsupport = aform->amsupport;
 
        /*
@@ -995,13 +1045,6 @@ RelationInitIndexAccessInfo(Relation relation)
        relation->rd_opcintype = (Oid *)
                MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
 
-       if (amstrategies > 0)
-               relation->rd_operator = (Oid *)
-                       MemoryContextAllocZero(indexcxt,
-                                                                  natts * amstrategies * sizeof(Oid));
-       else
-               relation->rd_operator = NULL;
-
        if (amsupport > 0)
        {
                int                     nsupport = natts * amsupport;
@@ -1033,14 +1076,13 @@ RelationInitIndexAccessInfo(Relation relation)
        indclass = (oidvector *) DatumGetPointer(indclassDatum);
 
        /*
-        * Fill the operator and support procedure OID arrays, as well as the info
-        * about opfamilies and opclass input types.  (aminfo and supportinfo are
-        * left as zeroes, and are filled on-the-fly when used)
+        * Fill the support procedure OID array, as well as the info about
+        * opfamilies and opclass input types.  (aminfo and supportinfo are left
+        * as zeroes, and are filled on-the-fly when used)
         */
-       IndexSupportInitialize(indclass,
-                                                  relation->rd_operator, relation->rd_support,
+       IndexSupportInitialize(indclass, relation->rd_support,
                                                   relation->rd_opfamily, relation->rd_opcintype,
-                                                  amstrategies, amsupport, natts);
+                                                  amsupport, natts);
 
        /*
         * Similarly extract indoption and copy it to the cache entry
@@ -1054,10 +1096,13 @@ RelationInitIndexAccessInfo(Relation relation)
        memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));
 
        /*
-        * expressions and predicate cache will be filled later
+        * expressions, predicate, exclusion caches will be filled later
         */
        relation->rd_indexprs = NIL;
        relation->rd_indpred = NIL;
+       relation->rd_exclops = NULL;
+       relation->rd_exclprocs = NULL;
+       relation->rd_exclstrats = NULL;
        relation->rd_amcache = NULL;
 }
 
@@ -1066,22 +1111,19 @@ RelationInitIndexAccessInfo(Relation relation)
  *             Initializes an index's cached opclass information,
  *             given the index's pg_index.indclass entry.
  *
- * Data is returned into *indexOperator, *indexSupport, *opFamily, and
- * *opcInType, which are arrays allocated by the caller.
+ * Data is returned into *indexSupport, *opFamily, and *opcInType,
+ * which are arrays allocated by the caller.
  *
- * The caller also passes maxStrategyNumber, maxSupportNumber, and
- * maxAttributeNumber, since these indicate the size of the arrays
- * it has allocated --- but in practice these numbers must always match
- * those obtainable from the system catalog entries for the index and
- * access method.
+ * The caller also passes maxSupportNumber and maxAttributeNumber, since these
+ * indicate the size of the arrays it has allocated --- but in practice these
+ * numbers must always match those obtainable from the system catalog entries
+ * for the index and access method.
  */
 static void
 IndexSupportInitialize(oidvector *indclass,
-                                          Oid *indexOperator,
                                           RegProcedure *indexSupport,
                                           Oid *opFamily,
                                           Oid *opcInType,
-                                          StrategyNumber maxStrategyNumber,
                                           StrategyNumber maxSupportNumber,
                                           AttrNumber maxAttributeNumber)
 {
@@ -1096,16 +1138,11 @@ IndexSupportInitialize(oidvector *indclass,
 
                /* look up the info for this opclass, using a cache */
                opcentry = LookupOpclassInfo(indclass->values[attIndex],
-                                                                        maxStrategyNumber,
                                                                         maxSupportNumber);
 
                /* copy cached data into relcache entry */
                opFamily[attIndex] = opcentry->opcfamily;
                opcInType[attIndex] = opcentry->opcintype;
-               if (maxStrategyNumber > 0)
-                       memcpy(&indexOperator[attIndex * maxStrategyNumber],
-                                  opcentry->operatorOids,
-                                  maxStrategyNumber * sizeof(Oid));
                if (maxSupportNumber > 0)
                        memcpy(&indexSupport[attIndex * maxSupportNumber],
                                   opcentry->supportProcs,
@@ -1119,23 +1156,22 @@ IndexSupportInitialize(oidvector *indclass,
  * This routine maintains a per-opclass cache of the information needed
  * by IndexSupportInitialize().  This is more efficient than relying on
  * the catalog cache, because we can load all the info about a particular
- * opclass in a single indexscan of pg_amproc or pg_amop.
+ * opclass in a single indexscan of pg_amproc.
  *
- * The information from pg_am about expected range of strategy and support
+ * The information from pg_am about expected range of support function
  * numbers is passed in, rather than being looked up, mainly because the
  * caller will have it already.
  *
  * Note there is no provision for flushing the cache.  This is OK at the
  * moment because there is no way to ALTER any interesting properties of an
  * existing opclass --- all you can do is drop it, which will result in
- * a useless but harmless dead entry in the cache.  To support altering
+ * a useless but harmless dead entry in the cache.     To support altering
  * opclass membership (not the same as opfamily membership!), we'd need to
  * be able to flush this cache as well as the contents of relcache entries
  * for indexes.
  */
 static OpClassCacheEnt *
 LookupOpclassInfo(Oid operatorClassOid,
-                                 StrategyNumber numStrats,
                                  StrategyNumber numSupport)
 {
        OpClassCacheEnt *opcentry;
@@ -1151,15 +1187,16 @@ LookupOpclassInfo(Oid operatorClassOid,
                /* First time through: initialize the opclass cache */
                HASHCTL         ctl;
 
-               if (!CacheMemoryContext)
-                       CreateCacheMemoryContext();
-
                MemSet(&ctl, 0, sizeof(ctl));
                ctl.keysize = sizeof(Oid);
                ctl.entrysize = sizeof(OpClassCacheEnt);
                ctl.hash = oid_hash;
                OpClassCache = hash_create("Operator class cache", 64,
                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
+
+               /* Also make sure CacheMemoryContext exists */
+               if (!CacheMemoryContext)
+                       CreateCacheMemoryContext();
        }
 
        opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
@@ -1170,16 +1207,8 @@ LookupOpclassInfo(Oid operatorClassOid,
        {
                /* Need to allocate memory for new entry */
                opcentry->valid = false;        /* until known OK */
-               opcentry->numStrats = numStrats;
                opcentry->numSupport = numSupport;
 
-               if (numStrats > 0)
-                       opcentry->operatorOids = (Oid *)
-                               MemoryContextAllocZero(CacheMemoryContext,
-                                                                          numStrats * sizeof(Oid));
-               else
-                       opcentry->operatorOids = NULL;
-
                if (numSupport > 0)
                        opcentry->supportProcs = (RegProcedure *)
                                MemoryContextAllocZero(CacheMemoryContext,
@@ -1189,16 +1218,15 @@ LookupOpclassInfo(Oid operatorClassOid,
        }
        else
        {
-               Assert(numStrats == opcentry->numStrats);
                Assert(numSupport == opcentry->numSupport);
        }
 
        /*
         * When testing for cache-flush hazards, we intentionally disable the
-        * operator class cache and force reloading of the info on each call.
-        * This is helpful because we want to test the case where a cache flush
-        * occurs while we are loading the info, and it's very hard to provoke
-        * that if this happens only once per opclass per backend.
+        * operator class cache and force reloading of the info on each call. This
+        * is helpful because we want to test the case where a cache flush occurs
+        * while we are loading the info, and it's very hard to provoke that if
+        * this happens only once per opclass per backend.
         */
 #if defined(CLOBBER_CACHE_ALWAYS)
        opcentry->valid = false;
@@ -1220,7 +1248,7 @@ LookupOpclassInfo(Oid operatorClassOid,
 
        /*
         * We have to fetch the pg_opclass row to determine its opfamily and
-        * opcintype, which are needed to look up the operators and functions.
+        * opcintype, which are needed to look up related operators and functions.
         * It'd be convenient to use the syscache here, but that probably doesn't
         * work while bootstrapping.
         */
@@ -1245,45 +1273,6 @@ LookupOpclassInfo(Oid operatorClassOid,
        systable_endscan(scan);
        heap_close(rel, AccessShareLock);
 
-
-       /*
-        * Scan pg_amop to obtain operators for the opclass.  We only fetch the
-        * default ones (those with lefttype = righttype = opcintype).
-        */
-       if (numStrats > 0)
-       {
-               ScanKeyInit(&skey[0],
-                                       Anum_pg_amop_amopfamily,
-                                       BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(opcentry->opcfamily));
-               ScanKeyInit(&skey[1],
-                                       Anum_pg_amop_amoplefttype,
-                                       BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(opcentry->opcintype));
-               ScanKeyInit(&skey[2],
-                                       Anum_pg_amop_amoprighttype,
-                                       BTEqualStrategyNumber, F_OIDEQ,
-                                       ObjectIdGetDatum(opcentry->opcintype));
-               rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
-               scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
-                                                                 SnapshotNow, 3, skey);
-
-               while (HeapTupleIsValid(htup = systable_getnext(scan)))
-               {
-                       Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);
-
-                       if (amopform->amopstrategy <= 0 ||
-                               (StrategyNumber) amopform->amopstrategy > numStrats)
-                               elog(ERROR, "invalid amopstrategy number %d for opclass %u",
-                                        amopform->amopstrategy, operatorClassOid);
-                       opcentry->operatorOids[amopform->amopstrategy - 1] =
-                               amopform->amopopr;
-               }
-
-               systable_endscan(scan);
-               heap_close(rel, AccessShareLock);
-       }
-
        /*
         * Scan pg_amproc to obtain support procs for the opclass.      We only fetch
         * the default ones (those with lefttype = righttype = opcintype).
@@ -1331,24 +1320,29 @@ LookupOpclassInfo(Oid operatorClassOid,
 /*
  *             formrdesc
  *
- *             This is a special cut-down version of RelationBuildDesc()
- *             used by RelationCacheInitializePhase2() in initializing the relcache.
+ *             This is a special cut-down version of RelationBuildDesc(),
+ *             used while initializing the relcache.
  *             The relation descriptor is built just from the supplied parameters,
  *             without actually looking at any system table entries.  We cheat
  *             quite a lot since we only need to work for a few basic system
  *             catalogs.
  *
- * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
- * and pg_type (see RelationCacheInitializePhase2).
+ * formrdesc is currently used for: pg_database, pg_authid, pg_auth_members,
+ * pg_class, pg_attribute, pg_proc, and pg_type
+ * (see RelationCacheInitializePhase2/3).
  *
  * Note that these catalogs can't have constraints (except attnotnull),
  * default values, rules, or triggers, since we don't cope with any of that.
+ * (Well, actually, this only matters for properties that need to be valid
+ * during bootstrap or before RelationCacheInitializePhase3 runs, and none of
+ * these properties matter then...)
  *
  * NOTE: we assume we are already switched into CacheMemoryContext.
  */
 static void
 formrdesc(const char *relationName, Oid relationReltype,
-                 bool hasoids, int natts, FormData_pg_attribute *att)
+                 bool isshared, bool hasoids,
+                 int natts, const FormData_pg_attribute *attrs)
 {
        Relation        relation;
        int                     i;
@@ -1358,9 +1352,6 @@ formrdesc(const char *relationName, Oid relationReltype,
         * allocate new relation desc, clear all fields of reldesc
         */
        relation = (Relation) palloc0(sizeof(RelationData));
-       relation->rd_targblock = InvalidBlockNumber;
-       relation->rd_fsm_nblocks = InvalidBlockNumber;
-       relation->rd_vm_nblocks = InvalidBlockNumber;
 
        /* make sure relation is marked as having no open file yet */
        relation->rd_smgr = NULL;
@@ -1377,14 +1368,16 @@ formrdesc(const char *relationName, Oid relationReltype,
        relation->rd_isnailed = true;
        relation->rd_createSubid = InvalidSubTransactionId;
        relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
-       relation->rd_istemp = false;
+       relation->rd_backend = InvalidBackendId;
 
        /*
         * initialize relation tuple form
         *
         * The data we insert here is pretty incomplete/bogus, but it'll serve to
-        * get us launched.  RelationCacheInitializePhase2() will read the real
-        * data from pg_class and replace what we've done here.
+        * get us launched.  RelationCacheInitializePhase3() will read the real
+        * data from pg_class and replace what we've done here.  Note in
+        * particular that relowner is left as zero; this cues
+        * RelationCacheInitializePhase3 that the real data isn't there yet.
         */
        relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
 
@@ -1394,10 +1387,14 @@ formrdesc(const char *relationName, Oid relationReltype,
 
        /*
         * It's important to distinguish between shared and non-shared relations,
-        * even at bootstrap time, to make sure we know where they are stored.  At
-        * present, all relations that formrdesc is used for are not shared.
+        * even at bootstrap time, to make sure we know where they are stored.
         */
-       relation->rd_rel->relisshared = false;
+       relation->rd_rel->relisshared = isshared;
+       if (isshared)
+               relation->rd_rel->reltablespace = GLOBALTABLESPACE_OID;
+
+       /* formrdesc is used only for permanent relations */
+       relation->rd_rel->relpersistence = RELPERSISTENCE_PERMANENT;
 
        relation->rd_rel->relpages = 1;
        relation->rd_rel->reltuples = 1;
@@ -1409,8 +1406,8 @@ formrdesc(const char *relationName, Oid relationReltype,
         * initialize attribute tuple form
         *
         * Unlike the case with the relation tuple, this data had better be right
-        * because it will never be replaced.  The input values must be correctly
-        * defined by macros in src/include/catalog/ headers.
+        * because it will never be replaced.  The data comes from
+        * src/include/catalog/ headers via genbki.pl.
         */
        relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
        relation->rd_att->tdrefcount = 1;       /* mark as refcounted */
@@ -1425,9 +1422,9 @@ formrdesc(const char *relationName, Oid relationReltype,
        for (i = 0; i < natts; i++)
        {
                memcpy(relation->rd_att->attrs[i],
-                          &att[i],
+                          &attrs[i],
                           ATTRIBUTE_FIXED_PART_SIZE);
-               has_not_null |= att[i].attnotnull;
+               has_not_null |= attrs[i].attnotnull;
                /* make sure attcacheoff is valid */
                relation->rd_att->attrs[i]->attcacheoff = -1;
        }
@@ -1448,7 +1445,18 @@ formrdesc(const char *relationName, Oid relationReltype,
         * initialize relation id from info in att array (my, this is ugly)
         */
        RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
-       relation->rd_rel->relfilenode = RelationGetRelid(relation);
+
+       /*
+        * All relations made with formrdesc are mapped.  This is necessarily so
+        * because there is no other way to know what filenode they currently
+        * have.  In bootstrap mode, add them to the initial relation mapper data,
+        * specifying that the initial filenode is the same as the OID.
+        */
+       relation->rd_rel->relfilenode = InvalidOid;
+       if (IsBootstrapProcessingMode())
+               RelationMapUpdateMap(RelationGetRelid(relation),
+                                                        RelationGetRelid(relation),
+                                                        isshared, true);
 
        /*
         * initialize the relation lock manager information
@@ -1518,9 +1526,19 @@ RelationIdGetRelation(Oid relationId)
        if (RelationIsValid(rd))
        {
                RelationIncrementReferenceCount(rd);
-               /* revalidate nailed index if necessary */
+               /* revalidate cache entry if necessary */
                if (!rd->rd_isvalid)
-                       RelationReloadIndexInfo(rd);
+               {
+                       /*
+                        * Indexes only have a limited number of possible schema changes,
+                        * and we don't want to use the full-blown procedure because it's
+                        * a headache for indexes that reload itself depends on.
+                        */
+                       if (rd->rd_rel->relkind == RELKIND_INDEX)
+                               RelationReloadIndexInfo(rd);
+                       else
+                               RelationClearRelation(rd, true);
+               }
                return rd;
        }
 
@@ -1528,7 +1546,7 @@ RelationIdGetRelation(Oid relationId)
         * no reldesc in the cache, so have RelationBuildDesc() build one and add
         * it.
         */
-       rd = RelationBuildDesc(relationId, NULL);
+       rd = RelationBuildDesc(relationId, true);
        if (RelationIsValid(rd))
                RelationIncrementReferenceCount(rd);
        return rd;
@@ -1628,6 +1646,24 @@ RelationReloadIndexInfo(Relation relation)
        /* Should be closed at smgr level */
        Assert(relation->rd_smgr == NULL);
 
+       /* Must free any AM cached data upon relcache flush */
+       if (relation->rd_amcache)
+               pfree(relation->rd_amcache);
+       relation->rd_amcache = NULL;
+
+       /*
+        * If it's a shared index, we might be called before backend startup has
+        * finished selecting a database, in which case we have no way to read
+        * pg_class yet.  However, a shared index can never have any significant
+        * schema updates, so it's okay to ignore the invalidation signal.  Just
+        * mark it valid and return without doing anything more.
+        */
+       if (relation->rd_rel->relisshared && !criticalRelcachesBuilt)
+       {
+               relation->rd_isvalid = true;
+               return;
+       }
+
        /*
         * Read the pg_class row
         *
@@ -1649,17 +1685,6 @@ RelationReloadIndexInfo(Relation relation)
        heap_freetuple(pg_class_tuple);
        /* We must recalculate physical address in case it changed */
        RelationInitPhysicalAddr(relation);
-       /*
-        * Must reset targblock, fsm_nblocks and vm_nblocks in case rel was
-        * truncated
-        */
-       relation->rd_targblock = InvalidBlockNumber;
-       relation->rd_fsm_nblocks = InvalidBlockNumber;
-       relation->rd_vm_nblocks = InvalidBlockNumber;
-       /* Must free any AM cached data, too */
-       if (relation->rd_amcache)
-               pfree(relation->rd_amcache);
-       relation->rd_amcache = NULL;
 
        /*
         * For a non-system index, there are fields of the pg_index row that are
@@ -1674,9 +1699,8 @@ RelationReloadIndexInfo(Relation relation)
                HeapTuple       tuple;
                Form_pg_index index;
 
-               tuple = SearchSysCache(INDEXRELID,
-                                                          ObjectIdGetDatum(RelationGetRelid(relation)),
-                                                          0, 0, 0);
+               tuple = SearchSysCache1(INDEXRELID,
+                                                               ObjectIdGetDatum(RelationGetRelid(relation)));
                if (!HeapTupleIsValid(tuple))
                        elog(ERROR, "cache lookup failed for index %u",
                                 RelationGetRelid(relation));
@@ -1696,22 +1720,78 @@ RelationReloadIndexInfo(Relation relation)
 }
 
 /*
+ * RelationDestroyRelation
+ *
+ *     Physically delete a relation cache entry and all subsidiary data.
+ *     Caller must already have unhooked the entry from the hash table.
+ */
+static void
+RelationDestroyRelation(Relation relation)
+{
+       Assert(RelationHasReferenceCountZero(relation));
+
+       /*
+        * Make sure smgr and lower levels close the relation's files, if they
+        * weren't closed already.  (This was probably done by caller, but let's
+        * just be real sure.)
+        */
+       RelationCloseSmgr(relation);
+
+       /*
+        * Free all the subsidiary data structures of the relcache entry, then the
+        * entry itself.
+        */
+       if (relation->rd_rel)
+               pfree(relation->rd_rel);
+       /* can't use DecrTupleDescRefCount here */
+       Assert(relation->rd_att->tdrefcount > 0);
+       if (--relation->rd_att->tdrefcount == 0)
+               FreeTupleDesc(relation->rd_att);
+       list_free(relation->rd_indexlist);
+       bms_free(relation->rd_indexattr);
+       FreeTriggerDesc(relation->trigdesc);
+       if (relation->rd_options)
+               pfree(relation->rd_options);
+       if (relation->rd_indextuple)
+               pfree(relation->rd_indextuple);
+       if (relation->rd_am)
+               pfree(relation->rd_am);
+       if (relation->rd_indexcxt)
+               MemoryContextDelete(relation->rd_indexcxt);
+       if (relation->rd_rulescxt)
+               MemoryContextDelete(relation->rd_rulescxt);
+       pfree(relation);
+}
+
+/*
  * RelationClearRelation
  *
  *      Physically blow away a relation cache entry, or reset it and rebuild
  *      it from scratch (that is, from catalog entries).  The latter path is
- *      usually used when we are notified of a change to an open relation
- *      (one with refcount > 0).  However, this routine just does whichever
- *      it's told to do; callers must determine which they want.
+ *      used when we are notified of a change to an open relation (one with
+ *      refcount > 0).
+ *
+ *      NB: when rebuilding, we'd better hold some lock on the relation,
+ *      else the catalog data we need to read could be changing under us.
+ *      Also, a rel to be rebuilt had better have refcnt > 0.  This is because
+ *      an sinval reset could happen while we're accessing the catalogs, and
+ *      the rel would get blown away underneath us by RelationCacheInvalidate
+ *      if it has zero refcnt.
  *
- *      NB: when rebuilding, we'd better hold some lock on the relation.
- *      In current usages this is presumed true because it has refcnt > 0.
+ *      The "rebuild" parameter is redundant in current usage because it has
+ *      to match the relation's refcnt status, but we keep it as a crosscheck
+ *      that we're doing what the caller expects.
  */
 static void
 RelationClearRelation(Relation relation, bool rebuild)
 {
-       Oid                     old_reltype = relation->rd_rel->reltype;
-       MemoryContext oldcxt;
+       /*
+        * As per notes above, a rel to be rebuilt MUST have refcnt > 0; while of
+        * course it would be a bad idea to blow away one with nonzero refcnt.
+        */
+       Assert(rebuild ?
+                  !RelationHasReferenceCountZero(relation) :
+                  RelationHasReferenceCountZero(relation));
 
        /*
         * Make sure smgr and lower levels close the relation's files, if they
@@ -1724,9 +1804,8 @@ RelationClearRelation(Relation relation, bool rebuild)
 
        /*
         * Never, never ever blow away a nailed-in system relation, because we'd
-        * be unable to recover.  However, we must reset rd_targblock, in case we
-        * got called because of a relation cache flush that was triggered by
-        * VACUUM.
+        * be unable to recover.  However, we must redo RelationInitPhysicalAddr
+        * in case it is a mapped relation whose mapping changed.
         *
         * If it's a nailed index, then we need to re-read the pg_class row to see
         * if its relfilenode changed.  We can't necessarily do that here, because
@@ -1737,9 +1816,8 @@ RelationClearRelation(Relation relation, bool rebuild)
         */
        if (relation->rd_isnailed)
        {
-               relation->rd_targblock = InvalidBlockNumber;
-               relation->rd_fsm_nblocks = InvalidBlockNumber;
-               relation->rd_vm_nblocks = InvalidBlockNumber;
+               RelationInitPhysicalAddr(relation);
+
                if (relation->rd_rel->relkind == RELKIND_INDEX)
                {
                        relation->rd_isvalid = false;           /* needs to be revalidated */
@@ -1765,40 +1843,8 @@ RelationClearRelation(Relation relation, bool rebuild)
                return;
        }
 
-       /*
-        * Remove relation from hash tables
-        *
-        * Note: we might be reinserting it momentarily, but we must not have it
-        * visible in the hash tables until it's valid again, so don't try to
-        * optimize this away...
-        */
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-       RelationCacheDelete(relation);
-       MemoryContextSwitchTo(oldcxt);
-
-       /* Clear out catcache's entries for this relation */
-       CatalogCacheFlushRelation(RelationGetRelid(relation));
-
-       /*
-        * Free all the subsidiary data structures of the relcache entry. We
-        * cannot free rd_att if we are trying to rebuild the entry, however,
-        * because pointers to it may be cached in various places. The rule
-        * manager might also have pointers into the rewrite rules. So to begin
-        * with, we can only get rid of these fields:
-        */
-       FreeTriggerDesc(relation->trigdesc);
-       if (relation->rd_indextuple)
-               pfree(relation->rd_indextuple);
-       if (relation->rd_am)
-               pfree(relation->rd_am);
-       if (relation->rd_rel)
-               pfree(relation->rd_rel);
-       if (relation->rd_options)
-               pfree(relation->rd_options);
-       list_free(relation->rd_indexlist);
-       bms_free(relation->rd_indexattr);
-       if (relation->rd_indexcxt)
-               MemoryContextDelete(relation->rd_indexcxt);
+       /* Mark it invalid until we've finished rebuild */
+       relation->rd_isvalid = false;
 
        /*
         * If we're really done with the relcache entry, blow it away. But if
@@ -1808,84 +1854,117 @@ RelationClearRelation(Relation relation, bool rebuild)
         */
        if (!rebuild)
        {
-               /* ok to zap remaining substructure */
-               flush_rowtype_cache(old_reltype);
-               /* can't use DecrTupleDescRefCount here */
-               Assert(relation->rd_att->tdrefcount > 0);
-               if (--relation->rd_att->tdrefcount == 0)
-                       FreeTupleDesc(relation->rd_att);
-               if (relation->rd_rulescxt)
-                       MemoryContextDelete(relation->rd_rulescxt);
-               pfree(relation);
+               /* Remove it from the hash table */
+               RelationCacheDelete(relation);
+
+               /* And release storage */
+               RelationDestroyRelation(relation);
        }
        else
        {
                /*
-                * When rebuilding an open relcache entry, must preserve ref count and
-                * rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
-                * preserve the tupledesc and rewrite-rule substructures in place.
-                * (Note: the refcount mechanism for tupledescs may eventually ensure
-                * that we don't really need to preserve the tupledesc in-place, but
-                * for now there are still a lot of places that assume an open rel's
-                * tupledesc won't move.)
+                * Our strategy for rebuilding an open relcache entry is to build a
+                * new entry from scratch, swap its contents with the old entry, and
+                * finally delete the new entry (along with any infrastructure swapped
+                * over from the old entry).  This is to avoid trouble in case an
+                * error causes us to lose control partway through.  The old entry
+                * will still be marked !rd_isvalid, so we'll try to rebuild it again
+                * on next access.      Meanwhile it's not any less valid than it was
+                * before, so any code that might expect to continue accessing it
+                * isn't hurt by the rebuild failure.  (Consider for example a
+                * subtransaction that ALTERs a table and then gets cancelled partway
+                * through the cache entry rebuild.  The outer transaction should
+                * still see the not-modified cache entry as valid.)  The worst
+                * consequence of an error is leaking the necessarily-unreferenced new
+                * entry, and this shouldn't happen often enough for that to be a big
+                * problem.
+                *
+                * When rebuilding an open relcache entry, we must preserve ref count,
+                * rd_createSubid/rd_newRelfilenodeSubid, and rd_toastoid state.  Also
+                * attempt to preserve the pg_class entry (rd_rel), tupledesc, and
+                * rewrite-rule substructures in place, because various places assume
+                * that these structures won't move while they are working with an
+                * open relcache entry.  (Note: the refcount mechanism for tupledescs
+                * might someday allow us to remove this hack for the tupledesc.)
                 *
                 * Note that this process does not touch CurrentResourceOwner; which
                 * is good because whatever ref counts the entry may have do not
                 * necessarily belong to that resource owner.
                 */
+               Relation        newrel;
                Oid                     save_relid = RelationGetRelid(relation);
-               int                     old_refcnt = relation->rd_refcnt;
-               SubTransactionId old_createSubid = relation->rd_createSubid;
-               SubTransactionId old_newRelfilenodeSubid = relation->rd_newRelfilenodeSubid;
-               struct PgStat_TableStatus *old_pgstat_info = relation->pgstat_info;
-               TupleDesc       old_att = relation->rd_att;
-               RuleLock   *old_rules = relation->rd_rules;
-               MemoryContext old_rulescxt = relation->rd_rulescxt;
-
-               if (RelationBuildDesc(save_relid, relation) != relation)
+               bool            keep_tupdesc;
+               bool            keep_rules;
+
+               /* Build temporary entry, but don't link it into hashtable */
+               newrel = RelationBuildDesc(save_relid, false);
+               if (newrel == NULL)
                {
                        /* Should only get here if relation was deleted */
-                       flush_rowtype_cache(old_reltype);
-                       Assert(old_att->tdrefcount > 0);
-                       if (--old_att->tdrefcount == 0)
-                               FreeTupleDesc(old_att);
-                       if (old_rulescxt)
-                               MemoryContextDelete(old_rulescxt);
-                       pfree(relation);
+                       RelationCacheDelete(relation);
+                       RelationDestroyRelation(relation);
                        elog(ERROR, "relation %u deleted while still in use", save_relid);
                }
-               relation->rd_refcnt = old_refcnt;
-               relation->rd_createSubid = old_createSubid;
-               relation->rd_newRelfilenodeSubid = old_newRelfilenodeSubid;
-               relation->pgstat_info = old_pgstat_info;
 
-               if (equalTupleDescs(old_att, relation->rd_att))
-               {
-                       /* needn't flush typcache here */
-                       Assert(relation->rd_att->tdrefcount == 1);
-                       if (--relation->rd_att->tdrefcount == 0)
-                               FreeTupleDesc(relation->rd_att);
-                       relation->rd_att = old_att;
-               }
-               else
-               {
-                       flush_rowtype_cache(old_reltype);
-                       Assert(old_att->tdrefcount > 0);
-                       if (--old_att->tdrefcount == 0)
-                               FreeTupleDesc(old_att);
-               }
-               if (equalRuleLocks(old_rules, relation->rd_rules))
+               keep_tupdesc = equalTupleDescs(relation->rd_att, newrel->rd_att);
+               keep_rules = equalRuleLocks(relation->rd_rules, newrel->rd_rules);
+
+               /*
+                * Perform swapping of the relcache entry contents.  Within this
+                * process the old entry is momentarily invalid, so there *must* be no
+                * possibility of CHECK_FOR_INTERRUPTS within this sequence. Do it in
+                * all-in-line code for safety.
+                *
+                * Since the vast majority of fields should be swapped, our method is
+                * to swap the whole structures and then re-swap those few fields we
+                * didn't want swapped.
+                */
+#define SWAPFIELD(fldtype, fldname) \
+               do { \
+                       fldtype _tmp = newrel->fldname; \
+                       newrel->fldname = relation->fldname; \
+                       relation->fldname = _tmp; \
+               } while (0)
+
+               /* swap all Relation struct fields */
                {
-                       if (relation->rd_rulescxt)
-                               MemoryContextDelete(relation->rd_rulescxt);
-                       relation->rd_rules = old_rules;
-                       relation->rd_rulescxt = old_rulescxt;
+                       RelationData tmpstruct;
+
+                       memcpy(&tmpstruct, newrel, sizeof(RelationData));
+                       memcpy(newrel, relation, sizeof(RelationData));
+                       memcpy(relation, &tmpstruct, sizeof(RelationData));
                }
-               else
+
+               /* rd_smgr must not be swapped, due to back-links from smgr level */
+               SWAPFIELD(SMgrRelation, rd_smgr);
+               /* rd_refcnt must be preserved */
+               SWAPFIELD(int, rd_refcnt);
+               /* isnailed shouldn't change */
+               Assert(newrel->rd_isnailed == relation->rd_isnailed);
+               /* creation sub-XIDs must be preserved */
+               SWAPFIELD(SubTransactionId, rd_createSubid);
+               SWAPFIELD(SubTransactionId, rd_newRelfilenodeSubid);
+               /* un-swap rd_rel pointers, swap contents instead */
+               SWAPFIELD(Form_pg_class, rd_rel);
+               /* ... but actually, we don't have to update newrel->rd_rel */
+               memcpy(relation->rd_rel, newrel->rd_rel, CLASS_TUPLE_SIZE);
+               /* preserve old tupledesc and rules if no logical change */
+               if (keep_tupdesc)
+                       SWAPFIELD(TupleDesc, rd_att);
+               if (keep_rules)
                {
-                       if (old_rulescxt)
-                               MemoryContextDelete(old_rulescxt);
+                       SWAPFIELD(RuleLock *, rd_rules);
+                       SWAPFIELD(MemoryContext, rd_rulescxt);
                }
+               /* toast OID override must be preserved */
+               SWAPFIELD(Oid, rd_toastoid);
+               /* pgstat_info must be preserved */
+               SWAPFIELD(struct PgStat_TableStatus *, pgstat_info);
+
+#undef SWAPFIELD
+
+               /* And now we can throw away the temporary entry */
+               RelationDestroyRelation(newrel);
        }
 }
 
@@ -1897,8 +1976,6 @@ RelationClearRelation(Relation relation, bool rebuild)
 static void
 RelationFlushRelation(Relation relation)
 {
-       bool            rebuild;
-
        if (relation->rd_createSubid != InvalidSubTransactionId ||
                relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
        {
@@ -1906,18 +1983,24 @@ RelationFlushRelation(Relation relation)
                 * New relcache entries are always rebuilt, not flushed; else we'd
                 * forget the "new" status of the relation, which is a useful
                 * optimization to have.  Ditto for the new-relfilenode status.
+                *
+                * The rel could have zero refcnt here, so temporarily increment the
+                * refcnt to ensure it's safe to rebuild it.  We can assume that the
+                * current transaction has some lock on the rel already.
                 */
-               rebuild = true;
+               RelationIncrementReferenceCount(relation);
+               RelationClearRelation(relation, true);
+               RelationDecrementReferenceCount(relation);
        }
        else
        {
                /*
                 * Pre-existing rels can be dropped from the relcache if not open.
                 */
-               rebuild = !RelationHasReferenceCountZero(relation);
-       }
+               bool            rebuild = !RelationHasReferenceCountZero(relation);
 
-       RelationClearRelation(relation, rebuild);
+               RelationClearRelation(relation, rebuild);
+       }
 }
 
 /*
@@ -1976,7 +2059,7 @@ RelationCacheInvalidateEntry(Oid relationId)
  * RelationCacheInvalidate
  *      Blow away cached relation descriptors that have zero reference counts,
  *      and rebuild those with positive reference counts.      Also reset the smgr
- *      relation cache.
+ *      relation cache and re-read relation mapping data.
  *
  *      This is currently used only to recover from SI message buffer overflow,
  *      so we do not touch new-in-transaction relations; they cannot be targets
@@ -2062,6 +2145,11 @@ RelationCacheInvalidate(void)
         */
        smgrcloseall();
 
+       /*
+        * Reload relation mapping data before starting to reconstruct cache.
+        */
+       RelationMapInvalidateAll();
+
        /* Phase 2: rebuild the items found to need rebuild in phase 1 */
        foreach(l, rebuildFirstList)
        {
@@ -2078,6 +2166,25 @@ RelationCacheInvalidate(void)
 }
 
 /*
+ * RelationCloseSmgrByOid - close a relcache entry's smgr link
+ *
+ * Needed in some cases where we are changing a relation's physical mapping.
+ * The link will be automatically reopened on next use.
+ */
+void
+RelationCloseSmgrByOid(Oid relationId)
+{
+       Relation        relation;
+
+       RelationIdCacheLookup(relationId, relation);
+
+       if (!PointerIsValid(relation))
+               return;                                 /* not in cache, nothing to do */
+
+       RelationCloseSmgr(relation);
+}
+
+/*
  * AtEOXact_RelationCache
  *
  *     Clean up the relcache at main-transaction commit or abort.
@@ -2222,7 +2329,6 @@ AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
                                relation->rd_createSubid = parentSubid;
                        else
                        {
-                               Assert(RelationHasReferenceCountZero(relation));
                                RelationClearRelation(relation, false);
                                continue;
                        }
@@ -2253,22 +2359,6 @@ AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
        }
 }
 
-/*
- * RelationCacheMarkNewRelfilenode
- *
- *     Mark the rel as having been given a new relfilenode in the current
- *     (sub) transaction.      This is a hint that can be used to optimize
- *     later operations on the rel in the same transaction.
- */
-void
-RelationCacheMarkNewRelfilenode(Relation rel)
-{
-       /* Mark it... */
-       rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
-       /* ... and now we have eoxact cleanup work to do */
-       need_eoxact_work = true;
-}
-
 
 /*
  *             RelationBuildLocalRelation
@@ -2281,7 +2371,9 @@ RelationBuildLocalRelation(const char *relname,
                                                   TupleDesc tupDesc,
                                                   Oid relid,
                                                   Oid reltablespace,
-                                                  bool shared_relation)
+                                                  bool shared_relation,
+                                                  bool mapped_relation,
+                                                  char relpersistence)
 {
        Relation        rel;
        MemoryContext oldcxt;
@@ -2295,10 +2387,14 @@ RelationBuildLocalRelation(const char *relname,
        /*
         * check for creation of a rel that must be nailed in cache.
         *
-        * XXX this list had better match RelationCacheInitializePhase2's list.
+        * XXX this list had better match the relations specially handled in
+        * RelationCacheInitializePhase2/3.
         */
        switch (relid)
        {
+               case DatabaseRelationId:
+               case AuthIdRelationId:
+               case AuthMemRelationId:
                case RelationRelationId:
                case AttributeRelationId:
                case ProcedureRelationId:
@@ -2320,6 +2416,9 @@ RelationBuildLocalRelation(const char *relname,
                elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
                         relname, relid);
 
+       /* Shared relations had better be mapped, too */
+       Assert(mapped_relation || !shared_relation);
+
        /*
         * switch to the cache context to create the relcache entry.
         */
@@ -2333,10 +2432,6 @@ RelationBuildLocalRelation(const char *relname,
         */
        rel = (Relation) palloc0(sizeof(RelationData));
 
-       rel->rd_targblock = InvalidBlockNumber;
-       rel->rd_fsm_nblocks = InvalidBlockNumber;
-       rel->rd_vm_nblocks = InvalidBlockNumber;
-
        /* make sure relation is marked as having no open file yet */
        rel->rd_smgr = NULL;
 
@@ -2352,9 +2447,6 @@ RelationBuildLocalRelation(const char *relname,
        /* must flag that we have rels created in this transaction */
        need_eoxact_work = true;
 
-       /* is it a temporary relation? */
-       rel->rd_istemp = isTempOrToastNamespace(relnamespace);
-
        /*
         * create a new tuple descriptor from the one passed in.  We do this
         * partly to copy it into the cache context, and partly because the new
@@ -2394,10 +2486,27 @@ RelationBuildLocalRelation(const char *relname,
        /* needed when bootstrapping: */
        rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
 
+       /* set up persistence; rd_backend is a function of persistence type */
+       rel->rd_rel->relpersistence = relpersistence;
+       switch (relpersistence)
+       {
+               case RELPERSISTENCE_PERMANENT:
+                       rel->rd_backend = InvalidBackendId;
+                       break;
+               case RELPERSISTENCE_TEMP:
+                       rel->rd_backend = MyBackendId;
+                       break;
+               default:
+                       elog(ERROR, "invalid relpersistence: %c", relpersistence);
+                       break;
+       }
+
        /*
         * Insert relation physical and logical identifiers (OIDs) into the right
         * places.      Note that the physical ID (relfilenode) is initially the same
-        * as the logical ID (OID).
+        * as the logical ID (OID); except that for a mapped relation, we set
+        * relfilenode to zero and rely on RelationInitPhysicalAddr to consult the
+        * map.
         */
        rel->rd_rel->relisshared = shared_relation;
 
@@ -2406,9 +2515,17 @@ RelationBuildLocalRelation(const char *relname,
        for (i = 0; i < natts; i++)
                rel->rd_att->attrs[i]->attrelid = relid;
 
-       rel->rd_rel->relfilenode = relid;
        rel->rd_rel->reltablespace = reltablespace;
 
+       if (mapped_relation)
+       {
+               rel->rd_rel->relfilenode = InvalidOid;
+               /* Add it to the active mapping information */
+               RelationMapUpdateMap(relid, relid, shared_relation, true);
+       }
+       else
+               rel->rd_rel->relfilenode = relid;
+
        RelationInitLockInfo(rel);      /* see lmgr.c */
 
        RelationInitPhysicalAddr(rel);
@@ -2434,6 +2551,117 @@ RelationBuildLocalRelation(const char *relname,
        return rel;
 }
 
+
+/*
+ * RelationSetNewRelfilenode
+ *
+ * Assign a new relfilenode (physical file name) to the relation.
+ *
+ * This allows a full rewrite of the relation to be done with transactional
+ * safety (since the filenode assignment can be rolled back).  Note however
+ * that there is no simple way to access the relation's old data for the
+ * remainder of the current transaction.  This limits the usefulness to cases
+ * such as TRUNCATE or rebuilding an index from scratch.
+ *
+ * Caller must already hold exclusive lock on the relation.
+ *
+ * The relation is marked with relfrozenxid = freezeXid (InvalidTransactionId
+ * must be passed for indexes and sequences).  This should be a lower bound on
+ * the XIDs that will be put into the new relation contents.
+ */
+void
+RelationSetNewRelfilenode(Relation relation, TransactionId freezeXid)
+{
+       Oid                     newrelfilenode;
+       RelFileNodeBackend newrnode;
+       Relation        pg_class;
+       HeapTuple       tuple;
+       Form_pg_class classform;
+
+       /* Indexes, sequences must have Invalid frozenxid; other rels must not */
+       Assert((relation->rd_rel->relkind == RELKIND_INDEX ||
+                       relation->rd_rel->relkind == RELKIND_SEQUENCE) ?
+                  freezeXid == InvalidTransactionId :
+                  TransactionIdIsNormal(freezeXid));
+
+       /* Allocate a new relfilenode */
+       newrelfilenode = GetNewRelFileNode(relation->rd_rel->reltablespace, NULL,
+                                                                          relation->rd_rel->relpersistence);
+
+       /*
+        * Get a writable copy of the pg_class tuple for the given relation.
+        */
+       pg_class = heap_open(RelationRelationId, RowExclusiveLock);
+
+       tuple = SearchSysCacheCopy1(RELOID,
+                                                               ObjectIdGetDatum(RelationGetRelid(relation)));
+       if (!HeapTupleIsValid(tuple))
+               elog(ERROR, "could not find tuple for relation %u",
+                        RelationGetRelid(relation));
+       classform = (Form_pg_class) GETSTRUCT(tuple);
+
+       /*
+        * Create storage for the main fork of the new relfilenode.
+        *
+        * NOTE: any conflict in relfilenode value will be caught here, if
+        * GetNewRelFileNode messes up for any reason.
+        */
+       newrnode.node = relation->rd_node;
+       newrnode.node.relNode = newrelfilenode;
+       newrnode.backend = relation->rd_backend;
+       RelationCreateStorage(newrnode.node, relation->rd_rel->relpersistence);
+       smgrclosenode(newrnode);
+
+       /*
+        * Schedule unlinking of the old storage at transaction commit.
+        */
+       RelationDropStorage(relation);
+
+       /*
+        * Now update the pg_class row.  However, if we're dealing with a mapped
+        * index, pg_class.relfilenode doesn't change; instead we have to send the
+        * update to the relation mapper.
+        */
+       if (RelationIsMapped(relation))
+               RelationMapUpdateMap(RelationGetRelid(relation),
+                                                        newrelfilenode,
+                                                        relation->rd_rel->relisshared,
+                                                        false);
+       else
+               classform->relfilenode = newrelfilenode;
+
+       /* These changes are safe even for a mapped relation */
+       if (relation->rd_rel->relkind != RELKIND_SEQUENCE)
+       {
+               classform->relpages = 0;        /* it's empty until further notice */
+               classform->reltuples = 0;
+       }
+       classform->relfrozenxid = freezeXid;
+
+       simple_heap_update(pg_class, &tuple->t_self, tuple);
+       CatalogUpdateIndexes(pg_class, tuple);
+
+       heap_freetuple(tuple);
+
+       heap_close(pg_class, RowExclusiveLock);
+
+       /*
+        * Make the pg_class row change visible, as well as the relation map
+        * change if any.  This will cause the relcache entry to get updated, too.
+        */
+       CommandCounterIncrement();
+
+       /*
+        * Mark the rel as having been given a new relfilenode in the current
+        * (sub) transaction.  This is a hint that can be used to optimize later
+        * operations on the rel in the same transaction.
+        */
+       relation->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
+       /* ... and now we have eoxact cleanup work to do */
+       need_eoxact_work = true;
+}
+
+
 /*
  *             RelationCacheInitialize
  *
@@ -2451,17 +2679,14 @@ RelationBuildLocalRelation(const char *relname,
 void
 RelationCacheInitialize(void)
 {
-       MemoryContext oldcxt;
        HASHCTL         ctl;
 
        /*
-        * switch to cache memory context
+        * make sure cache memory context exists
         */
        if (!CacheMemoryContext)
                CreateCacheMemoryContext();
 
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-
        /*
         * create hashtable that indexes the relcache
         */
@@ -2472,29 +2697,90 @@ RelationCacheInitialize(void)
        RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
                                                                  &ctl, HASH_ELEM | HASH_FUNCTION);
 
-       MemoryContextSwitchTo(oldcxt);
+       /*
+        * relation mapper needs to be initialized too
+        */
+       RelationMapInitialize();
 }
 
 /*
  *             RelationCacheInitializePhase2
  *
- *             This is called as soon as the catcache and transaction system
- *             are functional.  At this point we can actually read data from
- *             the system catalogs.  We first try to read pre-computed relcache
- *             entries from the pg_internal.init file.  If that's missing or
- *             broken, make phony entries for the minimum set of nailed-in-cache
- *             relations.      Then (unless bootstrapping) make sure we have entries
- *             for the critical system indexes.  Once we've done all this, we
- *             have enough infrastructure to open any system catalog or use any
- *             catcache.  The last step is to rewrite pg_internal.init if needed.
+ *             This is called to prepare for access to shared catalogs during startup.
+ *             We must at least set up nailed reldescs for pg_database, pg_authid,
+ *             and pg_auth_members.  Ideally we'd like to have reldescs for their
+ *             indexes, too.  We attempt to load this information from the shared
+ *             relcache init file.  If that's missing or broken, just make phony
+ *             entries for the catalogs themselves.  RelationCacheInitializePhase3
+ *             will clean up as needed.
  */
 void
 RelationCacheInitializePhase2(void)
 {
+       MemoryContext oldcxt;
+
+       /*
+        * relation mapper needs initialized too
+        */
+       RelationMapInitializePhase2();
+
+       /*
+        * In bootstrap mode, the shared catalogs aren't there yet anyway, so do
+        * nothing.
+        */
+       if (IsBootstrapProcessingMode())
+               return;
+
+       /*
+        * switch to cache memory context
+        */
+       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+
+       /*
+        * Try to load the shared relcache cache file.  If unsuccessful, bootstrap
+        * the cache with pre-made descriptors for the critical shared catalogs.
+        */
+       if (!load_relcache_init_file(true))
+       {
+               formrdesc("pg_database", DatabaseRelation_Rowtype_Id, true,
+                                 true, Natts_pg_database, Desc_pg_database);
+               formrdesc("pg_authid", AuthIdRelation_Rowtype_Id, true,
+                                 true, Natts_pg_authid, Desc_pg_authid);
+               formrdesc("pg_auth_members", AuthMemRelation_Rowtype_Id, true,
+                                 false, Natts_pg_auth_members, Desc_pg_auth_members);
+
+#define NUM_CRITICAL_SHARED_RELS       3       /* fix if you change list above */
+       }
+
+       MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ *             RelationCacheInitializePhase3
+ *
+ *             This is called as soon as the catcache and transaction system
+ *             are functional and we have determined MyDatabaseId.  At this point
+ *             we can actually read data from the database's system catalogs.
+ *             We first try to read pre-computed relcache entries from the local
+ *             relcache init file.  If that's missing or broken, make phony entries
+ *             for the minimum set of nailed-in-cache relations.  Then (unless
+ *             bootstrapping) make sure we have entries for the critical system
+ *             indexes.  Once we've done all this, we have enough infrastructure to
+ *             open any system catalog or use any catcache.  The last step is to
+ *             rewrite the cache files if needed.
+ */
+void
+RelationCacheInitializePhase3(void)
+{
        HASH_SEQ_STATUS status;
        RelIdCacheEnt *idhentry;
        MemoryContext oldcxt;
-       bool            needNewCacheFile = false;
+       bool            needNewCacheFile = !criticalSharedRelcachesBuilt;
+
+       /*
+        * relation mapper needs initialized too
+        */
+       RelationMapInitializePhase3();
 
        /*
         * switch to cache memory context
@@ -2502,25 +2788,25 @@ RelationCacheInitializePhase2(void)
        oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
 
        /*
-        * Try to load the relcache cache file.  If unsuccessful, bootstrap the
-        * cache with pre-made descriptors for the critical "nailed-in" system
+        * Try to load the local relcache cache file.  If unsuccessful, bootstrap
+        * the cache with pre-made descriptors for the critical "nailed-in" system
         * catalogs.
         */
        if (IsBootstrapProcessingMode() ||
-               !load_relcache_init_file())
+               !load_relcache_init_file(false))
        {
                needNewCacheFile = true;
 
-               formrdesc("pg_class", PG_CLASS_RELTYPE_OID,
+               formrdesc("pg_class", RelationRelation_Rowtype_Id, false,
                                  true, Natts_pg_class, Desc_pg_class);
-               formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID,
+               formrdesc("pg_attribute", AttributeRelation_Rowtype_Id, false,
                                  false, Natts_pg_attribute, Desc_pg_attribute);
-               formrdesc("pg_proc", PG_PROC_RELTYPE_OID,
+               formrdesc("pg_proc", ProcedureRelation_Rowtype_Id, false,
                                  true, Natts_pg_proc, Desc_pg_proc);
-               formrdesc("pg_type", PG_TYPE_RELTYPE_OID,
+               formrdesc("pg_type", TypeRelation_Rowtype_Id, false,
                                  true, Natts_pg_type, Desc_pg_type);
 
-#define NUM_CRITICAL_RELS      4       /* fix if you change list above */
+#define NUM_CRITICAL_LOCAL_RELS 4              /* fix if you change list above */
        }
 
        MemoryContextSwitchTo(oldcxt);
@@ -2556,60 +2842,92 @@ RelationCacheInitializePhase2(void)
         */
        if (!criticalRelcachesBuilt)
        {
-               Relation        ird;
-
-#define LOAD_CRIT_INDEX(indexoid) \
-               do { \
-                       LockRelationOid(indexoid, AccessShareLock); \
-                       ird = RelationBuildDesc(indexoid, NULL); \
-                       if (ird == NULL) \
-                               elog(PANIC, "could not open critical system index %u", \
-                                        indexoid); \
-                       ird->rd_isnailed = true; \
-                       ird->rd_refcnt = 1; \
-                       UnlockRelationOid(indexoid, AccessShareLock); \
-               } while (0)
-
-               LOAD_CRIT_INDEX(ClassOidIndexId);
-               LOAD_CRIT_INDEX(AttributeRelidNumIndexId);
-               LOAD_CRIT_INDEX(IndexRelidIndexId);
-               LOAD_CRIT_INDEX(OpclassOidIndexId);
-               LOAD_CRIT_INDEX(AccessMethodStrategyIndexId);
-               LOAD_CRIT_INDEX(AccessMethodProcedureIndexId);
-               LOAD_CRIT_INDEX(OperatorOidIndexId);
-               LOAD_CRIT_INDEX(RewriteRelRulenameIndexId);
-               LOAD_CRIT_INDEX(TriggerRelidNameIndexId);
-
-#define NUM_CRITICAL_INDEXES   9               /* fix if you change list above */
+               load_critical_index(ClassOidIndexId,
+                                                       RelationRelationId);
+               load_critical_index(AttributeRelidNumIndexId,
+                                                       AttributeRelationId);
+               load_critical_index(IndexRelidIndexId,
+                                                       IndexRelationId);
+               load_critical_index(OpclassOidIndexId,
+                                                       OperatorClassRelationId);
+               load_critical_index(AccessMethodProcedureIndexId,
+                                                       AccessMethodProcedureRelationId);
+               load_critical_index(RewriteRelRulenameIndexId,
+                                                       RewriteRelationId);
+               load_critical_index(TriggerRelidNameIndexId,
+                                                       TriggerRelationId);
+
+#define NUM_CRITICAL_LOCAL_INDEXES     7       /* fix if you change list above */
 
                criticalRelcachesBuilt = true;
        }
 
        /*
+        * Process critical shared indexes too.
+        *
+        * DatabaseNameIndexId isn't critical for relcache loading, but rather for
+        * initial lookup of MyDatabaseId, without which we'll never find any
+        * non-shared catalogs at all.  Autovacuum calls InitPostgres with a
+        * database OID, so it instead depends on DatabaseOidIndexId.  We also
+        * need to nail up some indexes on pg_authid and pg_auth_members for use
+        * during client authentication.
+        */
+       if (!criticalSharedRelcachesBuilt)
+       {
+               load_critical_index(DatabaseNameIndexId,
+                                                       DatabaseRelationId);
+               load_critical_index(DatabaseOidIndexId,
+                                                       DatabaseRelationId);
+               load_critical_index(AuthIdRolnameIndexId,
+                                                       AuthIdRelationId);
+               load_critical_index(AuthIdOidIndexId,
+                                                       AuthIdRelationId);
+               load_critical_index(AuthMemMemRoleIndexId,
+                                                       AuthMemRelationId);
+
+#define NUM_CRITICAL_SHARED_INDEXES 5  /* fix if you change list above */
+
+               criticalSharedRelcachesBuilt = true;
+       }
+
+       /*
         * Now, scan all the relcache entries and update anything that might be
         * wrong in the results from formrdesc or the relcache cache file. If we
         * faked up relcache entries using formrdesc, then read the real pg_class
         * rows and replace the fake entries with them. Also, if any of the
         * relcache entries have rules or triggers, load that info the hard way
         * since it isn't recorded in the cache file.
+        *
+        * Whenever we access the catalogs to read data, there is a possibility of
+        * a shared-inval cache flush causing relcache entries to be removed.
+        * Since hash_seq_search only guarantees to still work after the *current*
+        * entry is removed, it's unsafe to continue the hashtable scan afterward.
+        * We handle this by restarting the scan from scratch after each access.
+        * This is theoretically O(N^2), but the number of entries that actually
+        * need to be fixed is small enough that it doesn't matter.
         */
        hash_seq_init(&status, RelationIdCache);
 
        while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
        {
                Relation        relation = idhentry->reldesc;
+               bool            restart = false;
+
+               /*
+                * Make sure *this* entry doesn't get flushed while we work with it.
+                */
+               RelationIncrementReferenceCount(relation);
 
                /*
                 * If it's a faked-up entry, read the real pg_class tuple.
                 */
-               if (needNewCacheFile && relation->rd_isnailed)
+               if (relation->rd_rel->relowner == InvalidOid)
                {
                        HeapTuple       htup;
                        Form_pg_class relp;
 
-                       htup = SearchSysCache(RELOID,
-                                                               ObjectIdGetDatum(RelationGetRelid(relation)),
-                                                                 0, 0, 0);
+                       htup = SearchSysCache1(RELOID,
+                                                          ObjectIdGetDatum(RelationGetRelid(relation)));
                        if (!HeapTupleIsValid(htup))
                                elog(FATAL, "cache lookup failed for relation %u",
                                         RelationGetRelid(relation));
@@ -2619,7 +2937,6 @@ RelationCacheInitializePhase2(void)
                         * Copy tuple to relation->rd_rel. (See notes in
                         * AllocateRelationDesc())
                         */
-                       Assert(relation->rd_rel != NULL);
                        memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
 
                        /* Update rd_options while we have the tuple */
@@ -2628,26 +2945,62 @@ RelationCacheInitializePhase2(void)
                        RelationParseRelOptions(relation, htup);
 
                        /*
-                        * Also update the derived fields in rd_att.
+                        * Check the values in rd_att were set up correctly.  (We cannot
+                        * just copy them over now: formrdesc must have set up the rd_att
+                        * data correctly to start with, because it may already have been
+                        * copied into one or more catcache entries.)
                         */
-                       relation->rd_att->tdtypeid = relp->reltype;
-                       relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
-                       relation->rd_att->tdhasoid = relp->relhasoids;
+                       Assert(relation->rd_att->tdtypeid == relp->reltype);
+                       Assert(relation->rd_att->tdtypmod == -1);
+                       Assert(relation->rd_att->tdhasoid == relp->relhasoids);
 
                        ReleaseSysCache(htup);
+
+                       /* relowner had better be OK now, else we'll loop forever */
+                       if (relation->rd_rel->relowner == InvalidOid)
+                               elog(ERROR, "invalid relowner in pg_class entry for \"%s\"",
+                                        RelationGetRelationName(relation));
+
+                       restart = true;
                }
 
                /*
                 * Fix data that isn't saved in relcache cache file.
+                *
+                * relhasrules or relhastriggers could possibly be wrong or out of
+                * date.  If we don't actually find any rules or triggers, clear the
+                * local copy of the flag so that we don't get into an infinite loop
+                * here.  We don't make any attempt to fix the pg_class entry, though.
                 */
                if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
+               {
                        RelationBuildRuleLock(relation);
+                       if (relation->rd_rules == NULL)
+                               relation->rd_rel->relhasrules = false;
+                       restart = true;
+               }
                if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
+               {
                        RelationBuildTriggers(relation);
+                       if (relation->trigdesc == NULL)
+                               relation->rd_rel->relhastriggers = false;
+                       restart = true;
+               }
+
+               /* Release hold on the relation */
+               RelationDecrementReferenceCount(relation);
+
+               /* Now, restart the hashtable scan if needed */
+               if (restart)
+               {
+                       hash_seq_term(&status);
+                       hash_seq_init(&status, RelationIdCache);
+               }
        }
 
        /*
-        * Lastly, write out a new relcache cache file if one is needed.
+        * Lastly, write out new relcache cache files if needed.  We don't bother
+        * to distinguish cases where only one of the two needs an update.
         */
        if (needNewCacheFile)
        {
@@ -2655,16 +3008,48 @@ RelationCacheInitializePhase2(void)
                 * Force all the catcaches to finish initializing and thereby open the
                 * catalogs and indexes they use.  This will preload the relcache with
                 * entries for all the most important system catalogs and indexes, so
-                * that the init file will be most useful for future backends.
+                * that the init files will be most useful for future backends.
                 */
                InitCatalogCachePhase2();
 
-               /* now write the file */
-               write_relcache_init_file();
+               /* reset initFileRelationIds list; we'll fill it during write */
+               initFileRelationIds = NIL;
+
+               /* now write the files */
+               write_relcache_init_file(true);
+               write_relcache_init_file(false);
        }
 }
 
 /*
+ * Load one critical system index into the relcache
+ *
+ * indexoid is the OID of the target index, heapoid is the OID of the catalog
+ * it belongs to.
+ */
+static void
+load_critical_index(Oid indexoid, Oid heapoid)
+{
+       Relation        ird;
+
+       /*
+        * We must lock the underlying catalog before locking the index to avoid
+        * deadlock, since RelationBuildDesc might well need to read the catalog,
+        * and if anyone else is exclusive-locking this catalog and index they'll
+        * be doing it in that order.
+        */
+       LockRelationOid(heapoid, AccessShareLock);
+       LockRelationOid(indexoid, AccessShareLock);
+       ird = RelationBuildDesc(indexoid, true);
+       if (ird == NULL)
+               elog(PANIC, "could not open critical system index %u", indexoid);
+       ird->rd_isnailed = true;
+       ird->rd_refcnt = 1;
+       UnlockRelationOid(indexoid, AccessShareLock);
+       UnlockRelationOid(heapoid, AccessShareLock);
+}
+
+/*
  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
  *
@@ -2677,7 +3062,8 @@ RelationCacheInitializePhase2(void)
  * extracting fields.
  */
 static TupleDesc
-BuildHardcodedDescriptor(int natts, Form_pg_attribute attrs, bool hasoids)
+BuildHardcodedDescriptor(int natts, const FormData_pg_attribute *attrs,
+                                                bool hasoids)
 {
        TupleDesc       result;
        MemoryContext oldcxt;
@@ -2734,6 +3120,9 @@ GetPgIndexDescriptor(void)
        return pgindexdesc;
 }
 
+/*
+ * Load any default attribute value definitions for the relation.
+ */
 static void
 AttrDefaultFetch(Relation relation)
 {
@@ -2782,7 +3171,7 @@ AttrDefaultFetch(Relation relation)
                                         RelationGetRelationName(relation));
                        else
                                attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
-                                                                                                       TextDatumGetCString(val));
+                                                                                                  TextDatumGetCString(val));
                        break;
                }
 
@@ -2799,6 +3188,9 @@ AttrDefaultFetch(Relation relation)
                         ndef - found, RelationGetRelationName(relation));
 }
 
+/*
+ * Load any check constraints for the relation.
+ */
 static void
 CheckConstraintFetch(Relation relation)
 {
@@ -2927,7 +3319,7 @@ RelationGetIndexList(Relation relation)
 
                /* Check to see if it is a unique, non-partial btree index on OID */
                if (index->indnatts == 1 &&
-                       index->indisunique &&
+                       index->indisunique && index->indimmediate &&
                        index->indkey.values[0] == ObjectIdAttributeNumber &&
                        index->indclass.values[0] == OID_BTREE_OPS_OID &&
                        heap_attisnull(htup, Anum_pg_index_indpred))
@@ -3106,7 +3498,7 @@ RelationGetIndexExpressions(Relation relation)
        fix_opfuncids((Node *) result);
 
        /* Now save a copy of the completed tree in the relcache entry. */
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
        relation->rd_indexprs = (List *) copyObject(result);
        MemoryContextSwitchTo(oldcxt);
 
@@ -3181,7 +3573,7 @@ RelationGetIndexPredicate(Relation relation)
        fix_opfuncids((Node *) result);
 
        /* Now save a copy of the completed tree in the relcache entry. */
-       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+       oldcxt = MemoryContextSwitchTo(relation->rd_indexcxt);
        relation->rd_indpred = (List *) copyObject(result);
        MemoryContextSwitchTo(oldcxt);
 
@@ -3273,6 +3665,130 @@ RelationGetIndexAttrBitmap(Relation relation)
        return indexattrs;
 }
 
+/*
+ * RelationGetExclusionInfo -- get info about index's exclusion constraint
+ *
+ * This should be called only for an index that is known to have an
+ * associated exclusion constraint.  It returns arrays (palloc'd in caller's
+ * context) of the exclusion operator OIDs, their underlying functions'
+ * OIDs, and their strategy numbers in the index's opclasses.  We cache
+ * all this information since it requires a fair amount of work to get.
+ */
+void
+RelationGetExclusionInfo(Relation indexRelation,
+                                                Oid **operators,
+                                                Oid **procs,
+                                                uint16 **strategies)
+{
+       int                     ncols = indexRelation->rd_rel->relnatts;
+       Oid                *ops;
+       Oid                *funcs;
+       uint16     *strats;
+       Relation        conrel;
+       SysScanDesc conscan;
+       ScanKeyData skey[1];
+       HeapTuple       htup;
+       bool            found;
+       MemoryContext oldcxt;
+       int                     i;
+
+       /* Allocate result space in caller context */
+       *operators = ops = (Oid *) palloc(sizeof(Oid) * ncols);
+       *procs = funcs = (Oid *) palloc(sizeof(Oid) * ncols);
+       *strategies = strats = (uint16 *) palloc(sizeof(uint16) * ncols);
+
+       /* Quick exit if we have the data cached already */
+       if (indexRelation->rd_exclstrats != NULL)
+       {
+               memcpy(ops, indexRelation->rd_exclops, sizeof(Oid) * ncols);
+               memcpy(funcs, indexRelation->rd_exclprocs, sizeof(Oid) * ncols);
+               memcpy(strats, indexRelation->rd_exclstrats, sizeof(uint16) * ncols);
+               return;
+       }
+
+       /*
+        * Search pg_constraint for the constraint associated with the index. To
+        * make this not too painfully slow, we use the index on conrelid; that
+        * will hold the parent relation's OID not the index's own OID.
+        */
+       ScanKeyInit(&skey[0],
+                               Anum_pg_constraint_conrelid,
+                               BTEqualStrategyNumber, F_OIDEQ,
+                               ObjectIdGetDatum(indexRelation->rd_index->indrelid));
+
+       conrel = heap_open(ConstraintRelationId, AccessShareLock);
+       conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
+                                                                SnapshotNow, 1, skey);
+       found = false;
+
+       while (HeapTupleIsValid(htup = systable_getnext(conscan)))
+       {
+               Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
+               Datum           val;
+               bool            isnull;
+               ArrayType  *arr;
+               int                     nelem;
+
+               /* We want the exclusion constraint owning the index */
+               if (conform->contype != CONSTRAINT_EXCLUSION ||
+                       conform->conindid != RelationGetRelid(indexRelation))
+                       continue;
+
+               /* There should be only one */
+               if (found)
+                       elog(ERROR, "unexpected exclusion constraint record found for rel %s",
+                                RelationGetRelationName(indexRelation));
+               found = true;
+
+               /* Extract the operator OIDS from conexclop */
+               val = fastgetattr(htup,
+                                                 Anum_pg_constraint_conexclop,
+                                                 conrel->rd_att, &isnull);
+               if (isnull)
+                       elog(ERROR, "null conexclop for rel %s",
+                                RelationGetRelationName(indexRelation));
+
+               arr = DatumGetArrayTypeP(val);  /* ensure not toasted */
+               nelem = ARR_DIMS(arr)[0];
+               if (ARR_NDIM(arr) != 1 ||
+                       nelem != ncols ||
+                       ARR_HASNULL(arr) ||
+                       ARR_ELEMTYPE(arr) != OIDOID)
+                       elog(ERROR, "conexclop is not a 1-D Oid array");
+
+               memcpy(ops, ARR_DATA_PTR(arr), sizeof(Oid) * ncols);
+       }
+
+       systable_endscan(conscan);
+       heap_close(conrel, AccessShareLock);
+
+       if (!found)
+               elog(ERROR, "exclusion constraint record missing for rel %s",
+                        RelationGetRelationName(indexRelation));
+
+       /* We need the func OIDs and strategy numbers too */
+       for (i = 0; i < ncols; i++)
+       {
+               funcs[i] = get_opcode(ops[i]);
+               strats[i] = get_op_opfamily_strategy(ops[i],
+                                                                                        indexRelation->rd_opfamily[i]);
+               /* shouldn't fail, since it was checked at index creation */
+               if (strats[i] == InvalidStrategy)
+                       elog(ERROR, "could not find strategy for operator %u in family %u",
+                                ops[i], indexRelation->rd_opfamily[i]);
+       }
+
+       /* Save a copy of the results in the relcache entry. */
+       oldcxt = MemoryContextSwitchTo(indexRelation->rd_indexcxt);
+       indexRelation->rd_exclops = (Oid *) palloc(sizeof(Oid) * ncols);
+       indexRelation->rd_exclprocs = (Oid *) palloc(sizeof(Oid) * ncols);
+       indexRelation->rd_exclstrats = (uint16 *) palloc(sizeof(uint16) * ncols);
+       memcpy(indexRelation->rd_exclops, ops, sizeof(Oid) * ncols);
+       memcpy(indexRelation->rd_exclprocs, funcs, sizeof(Oid) * ncols);
+       memcpy(indexRelation->rd_exclstrats, strats, sizeof(uint16) * ncols);
+       MemoryContextSwitchTo(oldcxt);
+}
+
 
 /*
  *     load_relcache_init_file, write_relcache_init_file
@@ -3299,7 +3815,10 @@ RelationGetIndexAttrBitmap(Relation relation)
  *                       relation descriptors using sequential scans and write 'em to
  *                       the initialization file for use by subsequent backends.
  *
- *             We could dispense with the initialization file and just build the
+ *             As of Postgres 9.0, there is one local initialization file in each
+ *             database, plus one shared initialization file for shared catalogs.
+ *
+ *             We could dispense with the initialization files and just build the
  *             critical reldescs the hard way on every backend startup, but that
  *             slows down backend startup noticeably.
  *
@@ -3307,24 +3826,26 @@ RelationGetIndexAttrBitmap(Relation relation)
  *             just the ones that are absolutely critical; this allows us to speed
  *             up backend startup by not having to build such entries the hard way.
  *             Presently, all the catalog and index entries that are referred to
- *             by catcaches are stored in the initialization file.
+ *             by catcaches are stored in the initialization files.
  *
  *             The same mechanism that detects when catcache and relcache entries
  *             need to be invalidated (due to catalog updates) also arranges to
- *             unlink the initialization file when its contents may be out of date.
- *             The file will then be rebuilt during the next backend startup.
+ *             unlink the initialization files when the contents may be out of date.
+ *             The files will then be rebuilt during the next backend startup.
  */
 
 /*
- * load_relcache_init_file -- attempt to load cache from the init file
+ * load_relcache_init_file -- attempt to load cache from the shared
+ * or local cache init file
  *
- * If successful, return TRUE and set criticalRelcachesBuilt to true.
+ * If successful, return TRUE and set criticalRelcachesBuilt or
+ * criticalSharedRelcachesBuilt to true.
  * If not successful, return FALSE.
  *
  * NOTE: we assume we are already switched into CacheMemoryContext.
  */
 static bool
-load_relcache_init_file(void)
+load_relcache_init_file(bool shared)
 {
        FILE       *fp;
        char            initfilename[MAXPGPATH];
@@ -3337,8 +3858,12 @@ load_relcache_init_file(void)
                                magic;
        int                     i;
 
-       snprintf(initfilename, sizeof(initfilename), "%s/%s",
-                        DatabasePath, RELCACHE_INIT_FILENAME);
+       if (shared)
+               snprintf(initfilename, sizeof(initfilename), "global/%s",
+                                RELCACHE_INIT_FILENAME);
+       else
+               snprintf(initfilename, sizeof(initfilename), "%s/%s",
+                                DatabasePath, RELCACHE_INIT_FILENAME);
 
        fp = AllocateFile(initfilename, PG_BINARY_R);
        if (fp == NULL)
@@ -3353,7 +3878,6 @@ load_relcache_init_file(void)
        rels = (Relation *) palloc(max_rels * sizeof(Relation));
        num_rels = 0;
        nailed_rels = nailed_indexes = 0;
-       initFileRelationIds = NIL;
 
        /* check for correct magic number (compatible version) */
        if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
@@ -3370,7 +3894,8 @@ load_relcache_init_file(void)
                bool            has_not_null;
 
                /* first read the relation descriptor length */
-               if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+               nread = fread(&len, 1, sizeof(len), fp);
+               if (nread != sizeof(len))
                {
                        if (nread == 0)
                                break;                  /* end of file */
@@ -3391,15 +3916,15 @@ load_relcache_init_file(void)
                rel = rels[num_rels++] = (Relation) palloc(len);
 
                /* then, read the Relation structure */
-               if ((nread = fread(rel, 1, len, fp)) != len)
+               if (fread(rel, 1, len, fp) != len)
                        goto read_failed;
 
                /* next read the relation tuple form */
-               if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+               if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                        goto read_failed;
 
                relform = (Form_pg_class) palloc(len);
-               if ((nread = fread(relform, 1, len, fp)) != len)
+               if (fread(relform, 1, len, fp) != len)
                        goto read_failed;
 
                rel->rd_rel = relform;
@@ -3416,23 +3941,23 @@ load_relcache_init_file(void)
                has_not_null = false;
                for (i = 0; i < relform->relnatts; i++)
                {
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
                        if (len != ATTRIBUTE_FIXED_PART_SIZE)
                                goto read_failed;
-                       if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
+                       if (fread(rel->rd_att->attrs[i], 1, len, fp) != len)
                                goto read_failed;
 
                        has_not_null |= rel->rd_att->attrs[i]->attnotnull;
                }
 
                /* next read the access method specific field */
-               if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+               if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                        goto read_failed;
                if (len > 0)
                {
                        rel->rd_options = palloc(len);
-                       if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
+                       if (fread(rel->rd_options, 1, len, fp) != len)
                                goto read_failed;
                        if (len != VARSIZE(rel->rd_options))
                                goto read_failed;               /* sanity check */
@@ -3458,7 +3983,6 @@ load_relcache_init_file(void)
                        MemoryContext indexcxt;
                        Oid                *opfamily;
                        Oid                *opcintype;
-                       Oid                *operator;
                        RegProcedure *support;
                        int                     nsupport;
                        int16      *indoption;
@@ -3468,11 +3992,11 @@ load_relcache_init_file(void)
                                nailed_indexes++;
 
                        /* next, read the pg_index tuple */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
 
                        rel->rd_indextuple = (HeapTuple) palloc(len);
-                       if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
+                       if (fread(rel->rd_indextuple, 1, len, fp) != len)
                                goto read_failed;
 
                        /* Fix up internal pointers in the tuple -- see heap_copytuple */
@@ -3480,11 +4004,11 @@ load_relcache_init_file(void)
                        rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
 
                        /* next, read the access method tuple form */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
 
                        am = (Form_pg_am) palloc(len);
-                       if ((nread = fread(am, 1, len, fp)) != len)
+                       if (fread(am, 1, len, fp) != len)
                                goto read_failed;
                        rel->rd_am = am;
 
@@ -3500,50 +4024,40 @@ load_relcache_init_file(void)
                        rel->rd_indexcxt = indexcxt;
 
                        /* next, read the vector of opfamily OIDs */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
 
                        opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
-                       if ((nread = fread(opfamily, 1, len, fp)) != len)
+                       if (fread(opfamily, 1, len, fp) != len)
                                goto read_failed;
 
                        rel->rd_opfamily = opfamily;
 
                        /* next, read the vector of opcintype OIDs */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
 
                        opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
-                       if ((nread = fread(opcintype, 1, len, fp)) != len)
+                       if (fread(opcintype, 1, len, fp) != len)
                                goto read_failed;
 
                        rel->rd_opcintype = opcintype;
 
-                       /* next, read the vector of operator OIDs */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
-                               goto read_failed;
-
-                       operator = (Oid *) MemoryContextAlloc(indexcxt, len);
-                       if ((nread = fread(operator, 1, len, fp)) != len)
-                               goto read_failed;
-
-                       rel->rd_operator = operator;
-
-                       /* next, read the vector of support procedures */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       /* next, read the vector of support procedure OIDs */
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
                        support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
-                       if ((nread = fread(support, 1, len, fp)) != len)
+                       if (fread(support, 1, len, fp) != len)
                                goto read_failed;
 
                        rel->rd_support = support;
 
                        /* finally, read the vector of indoption values */
-                       if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
+                       if (fread(&len, 1, sizeof(len), fp) != sizeof(len))
                                goto read_failed;
 
                        indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
-                       if ((nread = fread(indoption, 1, len, fp)) != len)
+                       if (fread(indoption, 1, len, fp) != len)
                                goto read_failed;
 
                        rel->rd_indoption = indoption;
@@ -3568,7 +4082,6 @@ load_relcache_init_file(void)
                        Assert(rel->rd_aminfo == NULL);
                        Assert(rel->rd_opfamily == NULL);
                        Assert(rel->rd_opcintype == NULL);
-                       Assert(rel->rd_operator == NULL);
                        Assert(rel->rd_support == NULL);
                        Assert(rel->rd_supportinfo == NULL);
                        Assert(rel->rd_indoption == NULL);
@@ -3577,23 +4090,23 @@ load_relcache_init_file(void)
                /*
                 * Rules and triggers are not saved (mainly because the internal
                 * format is complex and subject to change).  They must be rebuilt if
-                * needed by RelationCacheInitializePhase2.  This is not expected to
+                * needed by RelationCacheInitializePhase3.  This is not expected to
                 * be a big performance hit since few system catalogs have such. Ditto
-                * for index expressions and predicates.
+                * for index expressions, predicates, and exclusion info.
                 */
                rel->rd_rules = NULL;
                rel->rd_rulescxt = NULL;
                rel->trigdesc = NULL;
                rel->rd_indexprs = NIL;
                rel->rd_indpred = NIL;
+               rel->rd_exclops = NULL;
+               rel->rd_exclprocs = NULL;
+               rel->rd_exclstrats = NULL;
 
                /*
                 * Reset transient-state fields in the relcache entry
                 */
                rel->rd_smgr = NULL;
-               rel->rd_targblock = InvalidBlockNumber;
-               rel->rd_fsm_nblocks = InvalidBlockNumber;
-               rel->rd_vm_nblocks = InvalidBlockNumber;
                if (rel->rd_isnailed)
                        rel->rd_refcnt = 1;
                else
@@ -3621,9 +4134,18 @@ load_relcache_init_file(void)
         * get the right number of nailed items?  (This is a useful crosscheck in
         * case the set of critical rels or indexes changes.)
         */
-       if (nailed_rels != NUM_CRITICAL_RELS ||
-               nailed_indexes != NUM_CRITICAL_INDEXES)
-               goto read_failed;
+       if (shared)
+       {
+               if (nailed_rels != NUM_CRITICAL_SHARED_RELS ||
+                       nailed_indexes != NUM_CRITICAL_SHARED_INDEXES)
+                       goto read_failed;
+       }
+       else
+       {
+               if (nailed_rels != NUM_CRITICAL_LOCAL_RELS ||
+                       nailed_indexes != NUM_CRITICAL_LOCAL_INDEXES)
+                       goto read_failed;
+       }
 
        /*
         * OK, all appears well.
@@ -3634,14 +4156,18 @@ load_relcache_init_file(void)
        {
                RelationCacheInsert(rels[relno]);
                /* also make a list of their OIDs, for RelationIdIsInInitFile */
-               initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
-                                                                               initFileRelationIds);
+               if (!shared)
+                       initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
+                                                                                       initFileRelationIds);
        }
 
        pfree(rels);
        FreeFile(fp);
 
-       criticalRelcachesBuilt = true;
+       if (shared)
+               criticalSharedRelcachesBuilt = true;
+       else
+               criticalRelcachesBuilt = true;
        return true;
 
        /*
@@ -3658,10 +4184,10 @@ read_failed:
 
 /*
  * Write out a new initialization file with the current contents
- * of the relcache.
+ * of the relcache (either shared rels or local rels, as indicated).
  */
 static void
-write_relcache_init_file(void)
+write_relcache_init_file(bool shared)
 {
        FILE       *fp;
        char            tempfilename[MAXPGPATH];
@@ -3677,10 +4203,20 @@ write_relcache_init_file(void)
         * another backend starting at about the same time might crash trying to
         * read the partially-complete file.
         */
-       snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
-                        DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
-       snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
-                        DatabasePath, RELCACHE_INIT_FILENAME);
+       if (shared)
+       {
+               snprintf(tempfilename, sizeof(tempfilename), "global/%s.%d",
+                                RELCACHE_INIT_FILENAME, MyProcPid);
+               snprintf(finalfilename, sizeof(finalfilename), "global/%s",
+                                RELCACHE_INIT_FILENAME);
+       }
+       else
+       {
+               snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
+                                DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
+               snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
+                                DatabasePath, RELCACHE_INIT_FILENAME);
+       }
 
        unlink(tempfilename);           /* in case it exists w/wrong permissions */
 
@@ -3708,17 +4244,19 @@ write_relcache_init_file(void)
                elog(FATAL, "could not write init file");
 
        /*
-        * Write all the reldescs (in no particular order).
+        * Write all the appropriate reldescs (in no particular order).
         */
        hash_seq_init(&status, RelationIdCache);
 
-       initFileRelationIds = NIL;
-
        while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
        {
                Relation        rel = idhentry->reldesc;
                Form_pg_class relform = rel->rd_rel;
 
+               /* ignore if not correct group */
+               if (relform->relisshared != shared)
+                       continue;
+
                /* first write the relcache entry proper */
                write_item(rel, sizeof(RelationData), fp);
 
@@ -3760,12 +4298,7 @@ write_relcache_init_file(void)
                                           relform->relnatts * sizeof(Oid),
                                           fp);
 
-                       /* next, write the vector of operator OIDs */
-                       write_item(rel->rd_operator,
-                                          relform->relnatts * (am->amstrategies * sizeof(Oid)),
-                                          fp);
-
-                       /* next, write the vector of support procedures */
+                       /* next, write the vector of support procedure OIDs */
                        write_item(rel->rd_support,
                                  relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
                                           fp);
@@ -3777,10 +4310,13 @@ write_relcache_init_file(void)
                }
 
                /* also make a list of their OIDs, for RelationIdIsInInitFile */
-               oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
-               initFileRelationIds = lcons_oid(RelationGetRelid(rel),
-                                                                               initFileRelationIds);
-               MemoryContextSwitchTo(oldcxt);
+               if (!shared)
+               {
+                       oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
+                       initFileRelationIds = lcons_oid(RelationGetRelid(rel),
+                                                                                       initFileRelationIds);
+                       MemoryContextSwitchTo(oldcxt);
+               }
        }
 
        if (FreeFile(fp))
@@ -3841,7 +4377,7 @@ write_item(const void *data, Size len, FILE *fp)
 
 /*
  * Detect whether a given relation (identified by OID) is one of the ones
- * we store in the init file.
+ * we store in the local relcache init file.
  *
  * Note that we effectively assume that all backends running in a database
  * would choose to store the same set of relations in the init file;
@@ -3857,7 +4393,7 @@ RelationIdIsInInitFile(Oid relationId)
 /*
  * Invalidate (remove) the init file during commit of a transaction that
  * changed one or more of the relation cache entries that are kept in the
- * init file.
+ * local init file.
  *
  * We actually need to remove the init file twice: once just before sending
  * the SI messages that include relcache inval for such relations, and once
@@ -3872,6 +4408,13 @@ RelationIdIsInInitFile(Oid relationId)
  *
  * Ignore any failure to unlink the file, since it might not be there if
  * no backend has been started since the last removal.
+ *
+ * Notice this deals only with the local init file, not the shared init file.
+ * The reason is that there can never be a "significant" change to the
+ * relcache entry of a shared relation; the most that could happen is
+ * updates of noncritical fields such as relpages/reltuples.  So, while
+ * it's worth updating the shared init file from time to time, it can never
+ * be invalid enough to make it necessary to remove it.
  */
 void
 RelationCacheInitFileInvalidate(bool beforeSend)
@@ -3903,23 +4446,94 @@ RelationCacheInitFileInvalidate(bool beforeSend)
 }
 
 /*
- * Remove the init file for a given database during postmaster startup.
+ * Remove the init files during postmaster startup.
  *
- * We used to keep the init file across restarts, but that is unsafe in PITR
+ * We used to keep the init files across restarts, but that is unsafe in PITR
  * scenarios, and even in simple crash-recovery cases there are windows for
- * the init file to become out-of-sync with the database.  So now we just
- * remove it during startup and expect the first backend launch to rebuild it.
- * Of course, this has to happen in each database of the cluster.  For
- * simplicity this is driven by flatfiles.c, which has to scan pg_database
- * anyway.
+ * the init files to become out-of-sync with the database.     So now we just
+ * remove them during startup and expect the first backend launch to rebuild
+ * them.  Of course, this has to happen in each database of the cluster.
  */
 void
-RelationCacheInitFileRemove(const char *dbPath)
+RelationCacheInitFileRemove(void)
+{
+       const char *tblspcdir = "pg_tblspc";
+       DIR                *dir;
+       struct dirent *de;
+       char            path[MAXPGPATH];
+
+       /*
+        * We zap the shared cache file too.  In theory it can't get out of sync
+        * enough to be a problem, but in data-corruption cases, who knows ...
+        */
+       snprintf(path, sizeof(path), "global/%s",
+                        RELCACHE_INIT_FILENAME);
+       unlink_initfile(path);
+
+       /* Scan everything in the default tablespace */
+       RelationCacheInitFileRemoveInDir("base");
+
+       /* Scan the tablespace link directory to find non-default tablespaces */
+       dir = AllocateDir(tblspcdir);
+       if (dir == NULL)
+       {
+               elog(LOG, "could not open tablespace link directory \"%s\": %m",
+                        tblspcdir);
+               return;
+       }
+
+       while ((de = ReadDir(dir, tblspcdir)) != NULL)
+       {
+               if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
+               {
+                       /* Scan the tablespace dir for per-database dirs */
+                       snprintf(path, sizeof(path), "%s/%s/%s",
+                                        tblspcdir, de->d_name, TABLESPACE_VERSION_DIRECTORY);
+                       RelationCacheInitFileRemoveInDir(path);
+               }
+       }
+
+       FreeDir(dir);
+}
+
+/* Process one per-tablespace directory for RelationCacheInitFileRemove */
+static void
+RelationCacheInitFileRemoveInDir(const char *tblspcpath)
 {
+       DIR                *dir;
+       struct dirent *de;
        char            initfilename[MAXPGPATH];
 
-       snprintf(initfilename, sizeof(initfilename), "%s/%s",
-                        dbPath, RELCACHE_INIT_FILENAME);
-       unlink(initfilename);
-       /* ignore any error, since it might not be there at all */
+       /* Scan the tablespace directory to find per-database directories */
+       dir = AllocateDir(tblspcpath);
+       if (dir == NULL)
+       {
+               elog(LOG, "could not open tablespace directory \"%s\": %m",
+                        tblspcpath);
+               return;
+       }
+
+       while ((de = ReadDir(dir, tblspcpath)) != NULL)
+       {
+               if (strspn(de->d_name, "0123456789") == strlen(de->d_name))
+               {
+                       /* Try to remove the init file in each database */
+                       snprintf(initfilename, sizeof(initfilename), "%s/%s/%s",
+                                        tblspcpath, de->d_name, RELCACHE_INIT_FILENAME);
+                       unlink_initfile(initfilename);
+               }
+       }
+
+       FreeDir(dir);
+}
+
+static void
+unlink_initfile(const char *initfilename)
+{
+       if (unlink(initfilename) < 0)
+       {
+               /* It might not be there, but log any error other than ENOENT */
+               if (errno != ENOENT)
+                       elog(LOG, "could not remove cache file \"%s\": %m", initfilename);
+       }
 }