OSDN Git Service

Introduce visibility map. The visibility map is a bitmap with one bit per
[pg-rex/syncrep.git] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *        POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.278 2008/12/03 13:05:22 heikki Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              RelationCacheInitialize                 - initialize relcache (to empty)
18  *              RelationCacheInitializePhase2   - finish initializing relcache
19  *              RelationIdGetRelation                   - get a reldesc by relation id
20  *              RelationClose                                   - close an open relation
21  *
22  * NOTES
23  *              The following code contains many undocumented hacks.  Please be
24  *              careful....
25  */
26 #include "postgres.h"
27
28 #include <sys/file.h>
29 #include <fcntl.h>
30 #include <unistd.h>
31
32 #include "access/genam.h"
33 #include "access/heapam.h"
34 #include "access/reloptions.h"
35 #include "access/sysattr.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "catalog/index.h"
39 #include "catalog/indexing.h"
40 #include "catalog/namespace.h"
41 #include "catalog/pg_amop.h"
42 #include "catalog/pg_amproc.h"
43 #include "catalog/pg_attrdef.h"
44 #include "catalog/pg_authid.h"
45 #include "catalog/pg_constraint.h"
46 #include "catalog/pg_namespace.h"
47 #include "catalog/pg_opclass.h"
48 #include "catalog/pg_proc.h"
49 #include "catalog/pg_rewrite.h"
50 #include "catalog/pg_type.h"
51 #include "commands/trigger.h"
52 #include "miscadmin.h"
53 #include "optimizer/clauses.h"
54 #include "optimizer/planmain.h"
55 #include "optimizer/prep.h"
56 #include "optimizer/var.h"
57 #include "rewrite/rewriteDefine.h"
58 #include "storage/fd.h"
59 #include "storage/lmgr.h"
60 #include "storage/smgr.h"
61 #include "utils/builtins.h"
62 #include "utils/fmgroids.h"
63 #include "utils/inval.h"
64 #include "utils/memutils.h"
65 #include "utils/relcache.h"
66 #include "utils/resowner.h"
67 #include "utils/syscache.h"
68 #include "utils/tqual.h"
69 #include "utils/typcache.h"
70
71
72 /*
73  * name of relcache init file, used to speed up backend startup
74  */
75 #define RELCACHE_INIT_FILENAME  "pg_internal.init"
76
77 #define RELCACHE_INIT_FILEMAGIC         0x573264        /* version ID value */
78
79 /*
80  *              hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
81  */
82 static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
83 static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
84 static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
85 static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
86 static FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
87
88 /*
89  *              Hash tables that index the relation cache
90  *
91  *              We used to index the cache by both name and OID, but now there
92  *              is only an index by OID.
93  */
94 typedef struct relidcacheent
95 {
96         Oid                     reloid;
97         Relation        reldesc;
98 } RelIdCacheEnt;
99
100 static HTAB *RelationIdCache;
101
102 /*
103  * This flag is false until we have prepared the critical relcache entries
104  * that are needed to do indexscans on the tables read by relcache building.
105  */
106 bool            criticalRelcachesBuilt = false;
107
108 /*
109  * This counter counts relcache inval events received since backend startup
110  * (but only for rels that are actually in cache).      Presently, we use it only
111  * to detect whether data about to be written by write_relcache_init_file()
112  * might already be obsolete.
113  */
114 static long relcacheInvalsReceived = 0L;
115
116 /*
117  * This list remembers the OIDs of the relations cached in the relcache
118  * init file.
119  */
120 static List *initFileRelationIds = NIL;
121
122 /*
123  * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
124  */
125 static bool need_eoxact_work = false;
126
127
128 /*
129  *              macros to manipulate the lookup hashtables
130  */
131 #define RelationCacheInsert(RELATION)   \
132 do { \
133         RelIdCacheEnt *idhentry; bool found; \
134         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
135                                                                                    (void *) &(RELATION->rd_id), \
136                                                                                    HASH_ENTER, \
137                                                                                    &found); \
138         /* used to give notice if found -- now just keep quiet */ \
139         idhentry->reldesc = RELATION; \
140 } while(0)
141
142 #define RelationIdCacheLookup(ID, RELATION) \
143 do { \
144         RelIdCacheEnt *hentry; \
145         hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
146                                                                                  (void *) &(ID), HASH_FIND,NULL); \
147         if (hentry) \
148                 RELATION = hentry->reldesc; \
149         else \
150                 RELATION = NULL; \
151 } while(0)
152
153 #define RelationCacheDelete(RELATION) \
154 do { \
155         RelIdCacheEnt *idhentry; \
156         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
157                                                                                    (void *) &(RELATION->rd_id), \
158                                                                                    HASH_REMOVE, NULL); \
159         if (idhentry == NULL) \
160                 elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
161 } while(0)
162
163
164 /*
165  * Special cache for opclass-related information
166  *
167  * Note: only default operators and support procs get cached, ie, those with
168  * lefttype = righttype = opcintype.
169  */
170 typedef struct opclasscacheent
171 {
172         Oid                     opclassoid;             /* lookup key: OID of opclass */
173         bool            valid;                  /* set TRUE after successful fill-in */
174         StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
175         StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
176         Oid                     opcfamily;              /* OID of opclass's family */
177         Oid                     opcintype;              /* OID of opclass's declared input type */
178         Oid                *operatorOids;       /* strategy operators' OIDs */
179         RegProcedure *supportProcs; /* support procs */
180 } OpClassCacheEnt;
181
182 static HTAB *OpClassCache = NULL;
183
184
185 /* non-export function prototypes */
186
187 static void RelationClearRelation(Relation relation, bool rebuild);
188
189 static void RelationReloadIndexInfo(Relation relation);
190 static void RelationFlushRelation(Relation relation);
191 static bool load_relcache_init_file(void);
192 static void write_relcache_init_file(void);
193 static void write_item(const void *data, Size len, FILE *fp);
194
195 static void formrdesc(const char *relationName, Oid relationReltype,
196                   bool hasoids, int natts, FormData_pg_attribute *att);
197
198 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
199 static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
200 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
201 static void RelationBuildTupleDesc(Relation relation);
202 static Relation RelationBuildDesc(Oid targetRelId, Relation oldrelation);
203 static void RelationInitPhysicalAddr(Relation relation);
204 static TupleDesc GetPgClassDescriptor(void);
205 static TupleDesc GetPgIndexDescriptor(void);
206 static void AttrDefaultFetch(Relation relation);
207 static void CheckConstraintFetch(Relation relation);
208 static List *insert_ordered_oid(List *list, Oid datum);
209 static void IndexSupportInitialize(oidvector *indclass,
210                                            Oid *indexOperator,
211                                            RegProcedure *indexSupport,
212                                            Oid *opFamily,
213                                            Oid *opcInType,
214                                            StrategyNumber maxStrategyNumber,
215                                            StrategyNumber maxSupportNumber,
216                                            AttrNumber maxAttributeNumber);
217 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
218                                   StrategyNumber numStrats,
219                                   StrategyNumber numSupport);
220
221
222 /*
223  *              ScanPgRelation
224  *
225  *              This is used by RelationBuildDesc to find a pg_class
226  *              tuple matching targetRelId.  The caller must hold at least
227  *              AccessShareLock on the target relid to prevent concurrent-update
228  *              scenarios --- else our SnapshotNow scan might fail to find any
229  *              version that it thinks is live.
230  *
231  *              NB: the returned tuple has been copied into palloc'd storage
232  *              and must eventually be freed with heap_freetuple.
233  */
234 static HeapTuple
235 ScanPgRelation(Oid targetRelId, bool indexOK)
236 {
237         HeapTuple       pg_class_tuple;
238         Relation        pg_class_desc;
239         SysScanDesc pg_class_scan;
240         ScanKeyData key[1];
241
242         /*
243          * form a scan key
244          */
245         ScanKeyInit(&key[0],
246                                 ObjectIdAttributeNumber,
247                                 BTEqualStrategyNumber, F_OIDEQ,
248                                 ObjectIdGetDatum(targetRelId));
249
250         /*
251          * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
252          * built the critical relcache entries (this includes initdb and startup
253          * without a pg_internal.init file).  The caller can also force a heap
254          * scan by setting indexOK == false.
255          */
256         pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
257         pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
258                                                                            indexOK && criticalRelcachesBuilt,
259                                                                            SnapshotNow,
260                                                                            1, key);
261
262         pg_class_tuple = systable_getnext(pg_class_scan);
263
264         /*
265          * Must copy tuple before releasing buffer.
266          */
267         if (HeapTupleIsValid(pg_class_tuple))
268                 pg_class_tuple = heap_copytuple(pg_class_tuple);
269
270         /* all done */
271         systable_endscan(pg_class_scan);
272         heap_close(pg_class_desc, AccessShareLock);
273
274         return pg_class_tuple;
275 }
276
277 /*
278  *              AllocateRelationDesc
279  *
280  *              This is used to allocate memory for a new relation descriptor
281  *              and initialize the rd_rel field.
282  *
283  *              If 'relation' is NULL, allocate a new RelationData object.
284  *              If not, reuse the given object (that path is taken only when
285  *              we have to rebuild a relcache entry during RelationClearRelation).
286  */
287 static Relation
288 AllocateRelationDesc(Relation relation, Form_pg_class relp)
289 {
290         MemoryContext oldcxt;
291         Form_pg_class relationForm;
292
293         /* Relcache entries must live in CacheMemoryContext */
294         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
295
296         /*
297          * allocate space for new relation descriptor, if needed
298          */
299         if (relation == NULL)
300                 relation = (Relation) palloc(sizeof(RelationData));
301
302         /*
303          * clear all fields of reldesc
304          */
305         MemSet(relation, 0, sizeof(RelationData));
306         relation->rd_targblock = InvalidBlockNumber;
307         relation->rd_fsm_nblocks = InvalidBlockNumber;
308         relation->rd_vm_nblocks = InvalidBlockNumber;
309
310         /* make sure relation is marked as having no open file yet */
311         relation->rd_smgr = NULL;
312
313         /*
314          * Copy the relation tuple form
315          *
316          * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE. The
317          * variable-length fields (relacl, reloptions) are NOT stored in the
318          * relcache --- there'd be little point in it, since we don't copy the
319          * tuple's nulls bitmap and hence wouldn't know if the values are valid.
320          * Bottom line is that relacl *cannot* be retrieved from the relcache. Get
321          * it from the syscache if you need it.  The same goes for the original
322          * form of reloptions (however, we do store the parsed form of reloptions
323          * in rd_options).
324          */
325         relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
326
327         memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
328
329         /* initialize relation tuple form */
330         relation->rd_rel = relationForm;
331
332         /* and allocate attribute tuple form storage */
333         relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
334                                                                                            relationForm->relhasoids);
335         /* which we mark as a reference-counted tupdesc */
336         relation->rd_att->tdrefcount = 1;
337
338         MemoryContextSwitchTo(oldcxt);
339
340         return relation;
341 }
342
343 /*
344  * RelationParseRelOptions
345  *              Convert pg_class.reloptions into pre-parsed rd_options
346  *
347  * tuple is the real pg_class tuple (not rd_rel!) for relation
348  *
349  * Note: rd_rel and (if an index) rd_am must be valid already
350  */
351 static void
352 RelationParseRelOptions(Relation relation, HeapTuple tuple)
353 {
354         Datum           datum;
355         bool            isnull;
356         bytea      *options;
357
358         relation->rd_options = NULL;
359
360         /* Fall out if relkind should not have options */
361         switch (relation->rd_rel->relkind)
362         {
363                 case RELKIND_RELATION:
364                 case RELKIND_TOASTVALUE:
365                 case RELKIND_UNCATALOGED:
366                 case RELKIND_INDEX:
367                         break;
368                 default:
369                         return;
370         }
371
372         /*
373          * Fetch reloptions from tuple; have to use a hardwired descriptor because
374          * we might not have any other for pg_class yet (consider executing this
375          * code for pg_class itself)
376          */
377         datum = fastgetattr(tuple,
378                                                 Anum_pg_class_reloptions,
379                                                 GetPgClassDescriptor(),
380                                                 &isnull);
381         if (isnull)
382                 return;
383
384         /* Parse into appropriate format; don't error out here */
385         switch (relation->rd_rel->relkind)
386         {
387                 case RELKIND_RELATION:
388                 case RELKIND_TOASTVALUE:
389                 case RELKIND_UNCATALOGED:
390                         options = heap_reloptions(relation->rd_rel->relkind, datum,
391                                                                           false);
392                         break;
393                 case RELKIND_INDEX:
394                         options = index_reloptions(relation->rd_am->amoptions, datum,
395                                                                            false);
396                         break;
397                 default:
398                         Assert(false);          /* can't get here */
399                         options = NULL;         /* keep compiler quiet */
400                         break;
401         }
402
403         /* Copy parsed data into CacheMemoryContext */
404         if (options)
405         {
406                 relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
407                                                                                                   VARSIZE(options));
408                 memcpy(relation->rd_options, options, VARSIZE(options));
409         }
410 }
411
412 /*
413  *              RelationBuildTupleDesc
414  *
415  *              Form the relation's tuple descriptor from information in
416  *              the pg_attribute, pg_attrdef & pg_constraint system catalogs.
417  */
418 static void
419 RelationBuildTupleDesc(Relation relation)
420 {
421         HeapTuple       pg_attribute_tuple;
422         Relation        pg_attribute_desc;
423         SysScanDesc pg_attribute_scan;
424         ScanKeyData skey[2];
425         int                     need;
426         TupleConstr *constr;
427         AttrDefault *attrdef = NULL;
428         int                     ndef = 0;
429
430         /* copy some fields from pg_class row to rd_att */
431         relation->rd_att->tdtypeid = relation->rd_rel->reltype;
432         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
433         relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
434
435         constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
436                                                                                                 sizeof(TupleConstr));
437         constr->has_not_null = false;
438
439         /*
440          * Form a scan key that selects only user attributes (attnum > 0).
441          * (Eliminating system attribute rows at the index level is lots faster
442          * than fetching them.)
443          */
444         ScanKeyInit(&skey[0],
445                                 Anum_pg_attribute_attrelid,
446                                 BTEqualStrategyNumber, F_OIDEQ,
447                                 ObjectIdGetDatum(RelationGetRelid(relation)));
448         ScanKeyInit(&skey[1],
449                                 Anum_pg_attribute_attnum,
450                                 BTGreaterStrategyNumber, F_INT2GT,
451                                 Int16GetDatum(0));
452
453         /*
454          * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
455          * built the critical relcache entries (this includes initdb and startup
456          * without a pg_internal.init file).
457          */
458         pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
459         pg_attribute_scan = systable_beginscan(pg_attribute_desc,
460                                                                                    AttributeRelidNumIndexId,
461                                                                                    criticalRelcachesBuilt,
462                                                                                    SnapshotNow,
463                                                                                    2, skey);
464
465         /*
466          * add attribute data to relation->rd_att
467          */
468         need = relation->rd_rel->relnatts;
469
470         while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
471         {
472                 Form_pg_attribute attp;
473
474                 attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
475
476                 if (attp->attnum <= 0 ||
477                         attp->attnum > relation->rd_rel->relnatts)
478                         elog(ERROR, "invalid attribute number %d for %s",
479                                  attp->attnum, RelationGetRelationName(relation));
480
481                 memcpy(relation->rd_att->attrs[attp->attnum - 1],
482                            attp,
483                            ATTRIBUTE_TUPLE_SIZE);
484
485                 /* Update constraint/default info */
486                 if (attp->attnotnull)
487                         constr->has_not_null = true;
488
489                 if (attp->atthasdef)
490                 {
491                         if (attrdef == NULL)
492                                 attrdef = (AttrDefault *)
493                                         MemoryContextAllocZero(CacheMemoryContext,
494                                                                                    relation->rd_rel->relnatts *
495                                                                                    sizeof(AttrDefault));
496                         attrdef[ndef].adnum = attp->attnum;
497                         attrdef[ndef].adbin = NULL;
498                         ndef++;
499                 }
500                 need--;
501                 if (need == 0)
502                         break;
503         }
504
505         /*
506          * end the scan and close the attribute relation
507          */
508         systable_endscan(pg_attribute_scan);
509         heap_close(pg_attribute_desc, AccessShareLock);
510
511         if (need != 0)
512                 elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
513                          need, RelationGetRelid(relation));
514
515         /*
516          * The attcacheoff values we read from pg_attribute should all be -1
517          * ("unknown").  Verify this if assert checking is on.  They will be
518          * computed when and if needed during tuple access.
519          */
520 #ifdef USE_ASSERT_CHECKING
521         {
522                 int                     i;
523
524                 for (i = 0; i < relation->rd_rel->relnatts; i++)
525                         Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
526         }
527 #endif
528
529         /*
530          * However, we can easily set the attcacheoff value for the first
531          * attribute: it must be zero.  This eliminates the need for special cases
532          * for attnum=1 that used to exist in fastgetattr() and index_getattr().
533          */
534         if (relation->rd_rel->relnatts > 0)
535                 relation->rd_att->attrs[0]->attcacheoff = 0;
536
537         /*
538          * Set up constraint/default info
539          */
540         if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
541         {
542                 relation->rd_att->constr = constr;
543
544                 if (ndef > 0)                   /* DEFAULTs */
545                 {
546                         if (ndef < relation->rd_rel->relnatts)
547                                 constr->defval = (AttrDefault *)
548                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
549                         else
550                                 constr->defval = attrdef;
551                         constr->num_defval = ndef;
552                         AttrDefaultFetch(relation);
553                 }
554                 else
555                         constr->num_defval = 0;
556
557                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
558                 {
559                         constr->num_check = relation->rd_rel->relchecks;
560                         constr->check = (ConstrCheck *)
561                                 MemoryContextAllocZero(CacheMemoryContext,
562                                                                         constr->num_check * sizeof(ConstrCheck));
563                         CheckConstraintFetch(relation);
564                 }
565                 else
566                         constr->num_check = 0;
567         }
568         else
569         {
570                 pfree(constr);
571                 relation->rd_att->constr = NULL;
572         }
573 }
574
575 /*
576  *              RelationBuildRuleLock
577  *
578  *              Form the relation's rewrite rules from information in
579  *              the pg_rewrite system catalog.
580  *
581  * Note: The rule parsetrees are potentially very complex node structures.
582  * To allow these trees to be freed when the relcache entry is flushed,
583  * we make a private memory context to hold the RuleLock information for
584  * each relcache entry that has associated rules.  The context is used
585  * just for rule info, not for any other subsidiary data of the relcache
586  * entry, because that keeps the update logic in RelationClearRelation()
587  * manageable.  The other subsidiary data structures are simple enough
588  * to be easy to free explicitly, anyway.
589  */
590 static void
591 RelationBuildRuleLock(Relation relation)
592 {
593         MemoryContext rulescxt;
594         MemoryContext oldcxt;
595         HeapTuple       rewrite_tuple;
596         Relation        rewrite_desc;
597         TupleDesc       rewrite_tupdesc;
598         SysScanDesc rewrite_scan;
599         ScanKeyData key;
600         RuleLock   *rulelock;
601         int                     numlocks;
602         RewriteRule **rules;
603         int                     maxlocks;
604
605         /*
606          * Make the private context.  Parameters are set on the assumption that
607          * it'll probably not contain much data.
608          */
609         rulescxt = AllocSetContextCreate(CacheMemoryContext,
610                                                                          RelationGetRelationName(relation),
611                                                                          ALLOCSET_SMALL_MINSIZE,
612                                                                          ALLOCSET_SMALL_INITSIZE,
613                                                                          ALLOCSET_SMALL_MAXSIZE);
614         relation->rd_rulescxt = rulescxt;
615
616         /*
617          * allocate an array to hold the rewrite rules (the array is extended if
618          * necessary)
619          */
620         maxlocks = 4;
621         rules = (RewriteRule **)
622                 MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
623         numlocks = 0;
624
625         /*
626          * form a scan key
627          */
628         ScanKeyInit(&key,
629                                 Anum_pg_rewrite_ev_class,
630                                 BTEqualStrategyNumber, F_OIDEQ,
631                                 ObjectIdGetDatum(RelationGetRelid(relation)));
632
633         /*
634          * open pg_rewrite and begin a scan
635          *
636          * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
637          * be reading the rules in name order, except possibly during
638          * emergency-recovery operations (ie, IgnoreSystemIndexes). This in turn
639          * ensures that rules will be fired in name order.
640          */
641         rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
642         rewrite_tupdesc = RelationGetDescr(rewrite_desc);
643         rewrite_scan = systable_beginscan(rewrite_desc,
644                                                                           RewriteRelRulenameIndexId,
645                                                                           true, SnapshotNow,
646                                                                           1, &key);
647
648         while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
649         {
650                 Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
651                 bool            isnull;
652                 Datum           rule_datum;
653                 char       *rule_str;
654                 RewriteRule *rule;
655
656                 rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
657                                                                                                   sizeof(RewriteRule));
658
659                 rule->ruleId = HeapTupleGetOid(rewrite_tuple);
660
661                 rule->event = rewrite_form->ev_type - '0';
662                 rule->attrno = rewrite_form->ev_attr;
663                 rule->enabled = rewrite_form->ev_enabled;
664                 rule->isInstead = rewrite_form->is_instead;
665
666                 /*
667                  * Must use heap_getattr to fetch ev_action and ev_qual.  Also, the
668                  * rule strings are often large enough to be toasted.  To avoid
669                  * leaking memory in the caller's context, do the detoasting here so
670                  * we can free the detoasted version.
671                  */
672                 rule_datum = heap_getattr(rewrite_tuple,
673                                                                   Anum_pg_rewrite_ev_action,
674                                                                   rewrite_tupdesc,
675                                                                   &isnull);
676                 Assert(!isnull);
677                 rule_str = TextDatumGetCString(rule_datum);
678                 oldcxt = MemoryContextSwitchTo(rulescxt);
679                 rule->actions = (List *) stringToNode(rule_str);
680                 MemoryContextSwitchTo(oldcxt);
681                 pfree(rule_str);
682
683                 rule_datum = heap_getattr(rewrite_tuple,
684                                                                   Anum_pg_rewrite_ev_qual,
685                                                                   rewrite_tupdesc,
686                                                                   &isnull);
687                 Assert(!isnull);
688                 rule_str = TextDatumGetCString(rule_datum);
689                 oldcxt = MemoryContextSwitchTo(rulescxt);
690                 rule->qual = (Node *) stringToNode(rule_str);
691                 MemoryContextSwitchTo(oldcxt);
692                 pfree(rule_str);
693
694                 /*
695                  * We want the rule's table references to be checked as though by the
696                  * table owner, not the user referencing the rule.      Therefore, scan
697                  * through the rule's actions and set the checkAsUser field on all
698                  * rtable entries.      We have to look at the qual as well, in case it
699                  * contains sublinks.
700                  *
701                  * The reason for doing this when the rule is loaded, rather than when
702                  * it is stored, is that otherwise ALTER TABLE OWNER would have to
703                  * grovel through stored rules to update checkAsUser fields. Scanning
704                  * the rule tree during load is relatively cheap (compared to
705                  * constructing it in the first place), so we do it here.
706                  */
707                 setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
708                 setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
709
710                 if (numlocks >= maxlocks)
711                 {
712                         maxlocks *= 2;
713                         rules = (RewriteRule **)
714                                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
715                 }
716                 rules[numlocks++] = rule;
717         }
718
719         /*
720          * end the scan and close the attribute relation
721          */
722         systable_endscan(rewrite_scan);
723         heap_close(rewrite_desc, AccessShareLock);
724
725         /*
726          * there might not be any rules (if relhasrules is out-of-date)
727          */
728         if (numlocks == 0)
729         {
730                 relation->rd_rules = NULL;
731                 relation->rd_rulescxt = NULL;
732                 MemoryContextDelete(rulescxt);
733                 return;
734         }
735
736         /*
737          * form a RuleLock and insert into relation
738          */
739         rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
740         rulelock->numLocks = numlocks;
741         rulelock->rules = rules;
742
743         relation->rd_rules = rulelock;
744 }
745
746 /*
747  *              equalRuleLocks
748  *
749  *              Determine whether two RuleLocks are equivalent
750  *
751  *              Probably this should be in the rules code someplace...
752  */
753 static bool
754 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
755 {
756         int                     i;
757
758         /*
759          * As of 7.3 we assume the rule ordering is repeatable, because
760          * RelationBuildRuleLock should read 'em in a consistent order.  So just
761          * compare corresponding slots.
762          */
763         if (rlock1 != NULL)
764         {
765                 if (rlock2 == NULL)
766                         return false;
767                 if (rlock1->numLocks != rlock2->numLocks)
768                         return false;
769                 for (i = 0; i < rlock1->numLocks; i++)
770                 {
771                         RewriteRule *rule1 = rlock1->rules[i];
772                         RewriteRule *rule2 = rlock2->rules[i];
773
774                         if (rule1->ruleId != rule2->ruleId)
775                                 return false;
776                         if (rule1->event != rule2->event)
777                                 return false;
778                         if (rule1->attrno != rule2->attrno)
779                                 return false;
780                         if (rule1->isInstead != rule2->isInstead)
781                                 return false;
782                         if (!equal(rule1->qual, rule2->qual))
783                                 return false;
784                         if (!equal(rule1->actions, rule2->actions))
785                                 return false;
786                 }
787         }
788         else if (rlock2 != NULL)
789                 return false;
790         return true;
791 }
792
793
794 /*
795  *              RelationBuildDesc
796  *
797  *              Build a relation descriptor --- either a new one, or by
798  *              recycling the given old relation object.  The latter case
799  *              supports rebuilding a relcache entry without invalidating
800  *              pointers to it.  The caller must hold at least
801  *              AccessShareLock on the target relid.
802  *
803  *              Returns NULL if no pg_class row could be found for the given relid
804  *              (suggesting we are trying to access a just-deleted relation).
805  *              Any other error is reported via elog.
806  */
807 static Relation
808 RelationBuildDesc(Oid targetRelId, Relation oldrelation)
809 {
810         Relation        relation;
811         Oid                     relid;
812         HeapTuple       pg_class_tuple;
813         Form_pg_class relp;
814         MemoryContext oldcxt;
815
816         /*
817          * find the tuple in pg_class corresponding to the given relation id
818          */
819         pg_class_tuple = ScanPgRelation(targetRelId, true);
820
821         /*
822          * if no such tuple exists, return NULL
823          */
824         if (!HeapTupleIsValid(pg_class_tuple))
825                 return NULL;
826
827         /*
828          * get information from the pg_class_tuple
829          */
830         relid = HeapTupleGetOid(pg_class_tuple);
831         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
832
833         /*
834          * allocate storage for the relation descriptor, and copy pg_class_tuple
835          * to relation->rd_rel.
836          */
837         relation = AllocateRelationDesc(oldrelation, relp);
838
839         /*
840          * initialize the relation's relation id (relation->rd_id)
841          */
842         RelationGetRelid(relation) = relid;
843
844         /*
845          * normal relations are not nailed into the cache; nor can a pre-existing
846          * relation be new.  It could be temp though.  (Actually, it could be new
847          * too, but it's okay to forget that fact if forced to flush the entry.)
848          */
849         relation->rd_refcnt = 0;
850         relation->rd_isnailed = false;
851         relation->rd_createSubid = InvalidSubTransactionId;
852         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
853         relation->rd_istemp = isTempOrToastNamespace(relation->rd_rel->relnamespace);
854
855         /*
856          * initialize the tuple descriptor (relation->rd_att).
857          */
858         RelationBuildTupleDesc(relation);
859
860         /*
861          * Fetch rules and triggers that affect this relation
862          */
863         if (relation->rd_rel->relhasrules)
864                 RelationBuildRuleLock(relation);
865         else
866         {
867                 relation->rd_rules = NULL;
868                 relation->rd_rulescxt = NULL;
869         }
870
871         if (relation->rd_rel->relhastriggers)
872                 RelationBuildTriggers(relation);
873         else
874                 relation->trigdesc = NULL;
875
876         /*
877          * if it's an index, initialize index-related information
878          */
879         if (OidIsValid(relation->rd_rel->relam))
880                 RelationInitIndexAccessInfo(relation);
881
882         /* extract reloptions if any */
883         RelationParseRelOptions(relation, pg_class_tuple);
884
885         /*
886          * initialize the relation lock manager information
887          */
888         RelationInitLockInfo(relation);         /* see lmgr.c */
889
890         /*
891          * initialize physical addressing information for the relation
892          */
893         RelationInitPhysicalAddr(relation);
894
895         /* make sure relation is marked as having no open file yet */
896         relation->rd_smgr = NULL;
897
898         /*
899          * now we can free the memory allocated for pg_class_tuple
900          */
901         heap_freetuple(pg_class_tuple);
902
903         /*
904          * Insert newly created relation into relcache hash tables.
905          */
906         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
907         RelationCacheInsert(relation);
908         MemoryContextSwitchTo(oldcxt);
909
910         /* It's fully valid */
911         relation->rd_isvalid = true;
912
913         return relation;
914 }
915
916 /*
917  * Initialize the physical addressing info (RelFileNode) for a relcache entry
918  */
919 static void
920 RelationInitPhysicalAddr(Relation relation)
921 {
922         if (relation->rd_rel->reltablespace)
923                 relation->rd_node.spcNode = relation->rd_rel->reltablespace;
924         else
925                 relation->rd_node.spcNode = MyDatabaseTableSpace;
926         if (relation->rd_rel->relisshared)
927                 relation->rd_node.dbNode = InvalidOid;
928         else
929                 relation->rd_node.dbNode = MyDatabaseId;
930         relation->rd_node.relNode = relation->rd_rel->relfilenode;
931 }
932
933 /*
934  * Initialize index-access-method support data for an index relation
935  */
936 void
937 RelationInitIndexAccessInfo(Relation relation)
938 {
939         HeapTuple       tuple;
940         Form_pg_am      aform;
941         Datum           indclassDatum;
942         Datum           indoptionDatum;
943         bool            isnull;
944         oidvector  *indclass;
945         int2vector *indoption;
946         MemoryContext indexcxt;
947         MemoryContext oldcontext;
948         int                     natts;
949         uint16          amstrategies;
950         uint16          amsupport;
951
952         /*
953          * Make a copy of the pg_index entry for the index.  Since pg_index
954          * contains variable-length and possibly-null fields, we have to do this
955          * honestly rather than just treating it as a Form_pg_index struct.
956          */
957         tuple = SearchSysCache(INDEXRELID,
958                                                    ObjectIdGetDatum(RelationGetRelid(relation)),
959                                                    0, 0, 0);
960         if (!HeapTupleIsValid(tuple))
961                 elog(ERROR, "cache lookup failed for index %u",
962                          RelationGetRelid(relation));
963         oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
964         relation->rd_indextuple = heap_copytuple(tuple);
965         relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
966         MemoryContextSwitchTo(oldcontext);
967         ReleaseSysCache(tuple);
968
969         /*
970          * Make a copy of the pg_am entry for the index's access method
971          */
972         tuple = SearchSysCache(AMOID,
973                                                    ObjectIdGetDatum(relation->rd_rel->relam),
974                                                    0, 0, 0);
975         if (!HeapTupleIsValid(tuple))
976                 elog(ERROR, "cache lookup failed for access method %u",
977                          relation->rd_rel->relam);
978         aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
979         memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
980         ReleaseSysCache(tuple);
981         relation->rd_am = aform;
982
983         natts = relation->rd_rel->relnatts;
984         if (natts != relation->rd_index->indnatts)
985                 elog(ERROR, "relnatts disagrees with indnatts for index %u",
986                          RelationGetRelid(relation));
987         amstrategies = aform->amstrategies;
988         amsupport = aform->amsupport;
989
990         /*
991          * Make the private context to hold index access info.  The reason we need
992          * a context, and not just a couple of pallocs, is so that we won't leak
993          * any subsidiary info attached to fmgr lookup records.
994          *
995          * Context parameters are set on the assumption that it'll probably not
996          * contain much data.
997          */
998         indexcxt = AllocSetContextCreate(CacheMemoryContext,
999                                                                          RelationGetRelationName(relation),
1000                                                                          ALLOCSET_SMALL_MINSIZE,
1001                                                                          ALLOCSET_SMALL_INITSIZE,
1002                                                                          ALLOCSET_SMALL_MAXSIZE);
1003         relation->rd_indexcxt = indexcxt;
1004
1005         /*
1006          * Allocate arrays to hold data
1007          */
1008         relation->rd_aminfo = (RelationAmInfo *)
1009                 MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
1010
1011         relation->rd_opfamily = (Oid *)
1012                 MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
1013         relation->rd_opcintype = (Oid *)
1014                 MemoryContextAllocZero(indexcxt, natts * sizeof(Oid));
1015
1016         if (amstrategies > 0)
1017                 relation->rd_operator = (Oid *)
1018                         MemoryContextAllocZero(indexcxt,
1019                                                                    natts * amstrategies * sizeof(Oid));
1020         else
1021                 relation->rd_operator = NULL;
1022
1023         if (amsupport > 0)
1024         {
1025                 int                     nsupport = natts * amsupport;
1026
1027                 relation->rd_support = (RegProcedure *)
1028                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1029                 relation->rd_supportinfo = (FmgrInfo *)
1030                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1031         }
1032         else
1033         {
1034                 relation->rd_support = NULL;
1035                 relation->rd_supportinfo = NULL;
1036         }
1037
1038         relation->rd_indoption = (int16 *)
1039                 MemoryContextAllocZero(indexcxt, natts * sizeof(int16));
1040
1041         /*
1042          * indclass cannot be referenced directly through the C struct, because it
1043          * comes after the variable-width indkey field.  Must extract the datum
1044          * the hard way...
1045          */
1046         indclassDatum = fastgetattr(relation->rd_indextuple,
1047                                                                 Anum_pg_index_indclass,
1048                                                                 GetPgIndexDescriptor(),
1049                                                                 &isnull);
1050         Assert(!isnull);
1051         indclass = (oidvector *) DatumGetPointer(indclassDatum);
1052
1053         /*
1054          * Fill the operator and support procedure OID arrays, as well as the info
1055          * about opfamilies and opclass input types.  (aminfo and supportinfo are
1056          * left as zeroes, and are filled on-the-fly when used)
1057          */
1058         IndexSupportInitialize(indclass,
1059                                                    relation->rd_operator, relation->rd_support,
1060                                                    relation->rd_opfamily, relation->rd_opcintype,
1061                                                    amstrategies, amsupport, natts);
1062
1063         /*
1064          * Similarly extract indoption and copy it to the cache entry
1065          */
1066         indoptionDatum = fastgetattr(relation->rd_indextuple,
1067                                                                  Anum_pg_index_indoption,
1068                                                                  GetPgIndexDescriptor(),
1069                                                                  &isnull);
1070         Assert(!isnull);
1071         indoption = (int2vector *) DatumGetPointer(indoptionDatum);
1072         memcpy(relation->rd_indoption, indoption->values, natts * sizeof(int16));
1073
1074         /*
1075          * expressions and predicate cache will be filled later
1076          */
1077         relation->rd_indexprs = NIL;
1078         relation->rd_indpred = NIL;
1079         relation->rd_amcache = NULL;
1080 }
1081
1082 /*
1083  * IndexSupportInitialize
1084  *              Initializes an index's cached opclass information,
1085  *              given the index's pg_index.indclass entry.
1086  *
1087  * Data is returned into *indexOperator, *indexSupport, *opFamily, and
1088  * *opcInType, which are arrays allocated by the caller.
1089  *
1090  * The caller also passes maxStrategyNumber, maxSupportNumber, and
1091  * maxAttributeNumber, since these indicate the size of the arrays
1092  * it has allocated --- but in practice these numbers must always match
1093  * those obtainable from the system catalog entries for the index and
1094  * access method.
1095  */
1096 static void
1097 IndexSupportInitialize(oidvector *indclass,
1098                                            Oid *indexOperator,
1099                                            RegProcedure *indexSupport,
1100                                            Oid *opFamily,
1101                                            Oid *opcInType,
1102                                            StrategyNumber maxStrategyNumber,
1103                                            StrategyNumber maxSupportNumber,
1104                                            AttrNumber maxAttributeNumber)
1105 {
1106         int                     attIndex;
1107
1108         for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1109         {
1110                 OpClassCacheEnt *opcentry;
1111
1112                 if (!OidIsValid(indclass->values[attIndex]))
1113                         elog(ERROR, "bogus pg_index tuple");
1114
1115                 /* look up the info for this opclass, using a cache */
1116                 opcentry = LookupOpclassInfo(indclass->values[attIndex],
1117                                                                          maxStrategyNumber,
1118                                                                          maxSupportNumber);
1119
1120                 /* copy cached data into relcache entry */
1121                 opFamily[attIndex] = opcentry->opcfamily;
1122                 opcInType[attIndex] = opcentry->opcintype;
1123                 if (maxStrategyNumber > 0)
1124                         memcpy(&indexOperator[attIndex * maxStrategyNumber],
1125                                    opcentry->operatorOids,
1126                                    maxStrategyNumber * sizeof(Oid));
1127                 if (maxSupportNumber > 0)
1128                         memcpy(&indexSupport[attIndex * maxSupportNumber],
1129                                    opcentry->supportProcs,
1130                                    maxSupportNumber * sizeof(RegProcedure));
1131         }
1132 }
1133
1134 /*
1135  * LookupOpclassInfo
1136  *
1137  * This routine maintains a per-opclass cache of the information needed
1138  * by IndexSupportInitialize().  This is more efficient than relying on
1139  * the catalog cache, because we can load all the info about a particular
1140  * opclass in a single indexscan of pg_amproc or pg_amop.
1141  *
1142  * The information from pg_am about expected range of strategy and support
1143  * numbers is passed in, rather than being looked up, mainly because the
1144  * caller will have it already.
1145  *
1146  * Note there is no provision for flushing the cache.  This is OK at the
1147  * moment because there is no way to ALTER any interesting properties of an
1148  * existing opclass --- all you can do is drop it, which will result in
1149  * a useless but harmless dead entry in the cache.  To support altering
1150  * opclass membership (not the same as opfamily membership!), we'd need to
1151  * be able to flush this cache as well as the contents of relcache entries
1152  * for indexes.
1153  */
1154 static OpClassCacheEnt *
1155 LookupOpclassInfo(Oid operatorClassOid,
1156                                   StrategyNumber numStrats,
1157                                   StrategyNumber numSupport)
1158 {
1159         OpClassCacheEnt *opcentry;
1160         bool            found;
1161         Relation        rel;
1162         SysScanDesc scan;
1163         ScanKeyData skey[3];
1164         HeapTuple       htup;
1165         bool            indexOK;
1166
1167         if (OpClassCache == NULL)
1168         {
1169                 /* First time through: initialize the opclass cache */
1170                 HASHCTL         ctl;
1171
1172                 if (!CacheMemoryContext)
1173                         CreateCacheMemoryContext();
1174
1175                 MemSet(&ctl, 0, sizeof(ctl));
1176                 ctl.keysize = sizeof(Oid);
1177                 ctl.entrysize = sizeof(OpClassCacheEnt);
1178                 ctl.hash = oid_hash;
1179                 OpClassCache = hash_create("Operator class cache", 64,
1180                                                                    &ctl, HASH_ELEM | HASH_FUNCTION);
1181         }
1182
1183         opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1184                                                                                            (void *) &operatorClassOid,
1185                                                                                            HASH_ENTER, &found);
1186
1187         if (!found)
1188         {
1189                 /* Need to allocate memory for new entry */
1190                 opcentry->valid = false;        /* until known OK */
1191                 opcentry->numStrats = numStrats;
1192                 opcentry->numSupport = numSupport;
1193
1194                 if (numStrats > 0)
1195                         opcentry->operatorOids = (Oid *)
1196                                 MemoryContextAllocZero(CacheMemoryContext,
1197                                                                            numStrats * sizeof(Oid));
1198                 else
1199                         opcentry->operatorOids = NULL;
1200
1201                 if (numSupport > 0)
1202                         opcentry->supportProcs = (RegProcedure *)
1203                                 MemoryContextAllocZero(CacheMemoryContext,
1204                                                                            numSupport * sizeof(RegProcedure));
1205                 else
1206                         opcentry->supportProcs = NULL;
1207         }
1208         else
1209         {
1210                 Assert(numStrats == opcentry->numStrats);
1211                 Assert(numSupport == opcentry->numSupport);
1212         }
1213
1214         /*
1215          * When testing for cache-flush hazards, we intentionally disable the
1216          * operator class cache and force reloading of the info on each call.
1217          * This is helpful because we want to test the case where a cache flush
1218          * occurs while we are loading the info, and it's very hard to provoke
1219          * that if this happens only once per opclass per backend.
1220          */
1221 #if defined(CLOBBER_CACHE_ALWAYS)
1222         opcentry->valid = false;
1223 #endif
1224
1225         if (opcentry->valid)
1226                 return opcentry;
1227
1228         /*
1229          * Need to fill in new entry.
1230          *
1231          * To avoid infinite recursion during startup, force heap scans if we're
1232          * looking up info for the opclasses used by the indexes we would like to
1233          * reference here.
1234          */
1235         indexOK = criticalRelcachesBuilt ||
1236                 (operatorClassOid != OID_BTREE_OPS_OID &&
1237                  operatorClassOid != INT2_BTREE_OPS_OID);
1238
1239         /*
1240          * We have to fetch the pg_opclass row to determine its opfamily and
1241          * opcintype, which are needed to look up the operators and functions.
1242          * It'd be convenient to use the syscache here, but that probably doesn't
1243          * work while bootstrapping.
1244          */
1245         ScanKeyInit(&skey[0],
1246                                 ObjectIdAttributeNumber,
1247                                 BTEqualStrategyNumber, F_OIDEQ,
1248                                 ObjectIdGetDatum(operatorClassOid));
1249         rel = heap_open(OperatorClassRelationId, AccessShareLock);
1250         scan = systable_beginscan(rel, OpclassOidIndexId, indexOK,
1251                                                           SnapshotNow, 1, skey);
1252
1253         if (HeapTupleIsValid(htup = systable_getnext(scan)))
1254         {
1255                 Form_pg_opclass opclassform = (Form_pg_opclass) GETSTRUCT(htup);
1256
1257                 opcentry->opcfamily = opclassform->opcfamily;
1258                 opcentry->opcintype = opclassform->opcintype;
1259         }
1260         else
1261                 elog(ERROR, "could not find tuple for opclass %u", operatorClassOid);
1262
1263         systable_endscan(scan);
1264         heap_close(rel, AccessShareLock);
1265
1266
1267         /*
1268          * Scan pg_amop to obtain operators for the opclass.  We only fetch the
1269          * default ones (those with lefttype = righttype = opcintype).
1270          */
1271         if (numStrats > 0)
1272         {
1273                 ScanKeyInit(&skey[0],
1274                                         Anum_pg_amop_amopfamily,
1275                                         BTEqualStrategyNumber, F_OIDEQ,
1276                                         ObjectIdGetDatum(opcentry->opcfamily));
1277                 ScanKeyInit(&skey[1],
1278                                         Anum_pg_amop_amoplefttype,
1279                                         BTEqualStrategyNumber, F_OIDEQ,
1280                                         ObjectIdGetDatum(opcentry->opcintype));
1281                 ScanKeyInit(&skey[2],
1282                                         Anum_pg_amop_amoprighttype,
1283                                         BTEqualStrategyNumber, F_OIDEQ,
1284                                         ObjectIdGetDatum(opcentry->opcintype));
1285                 rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
1286                 scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
1287                                                                   SnapshotNow, 3, skey);
1288
1289                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1290                 {
1291                         Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);
1292
1293                         if (amopform->amopstrategy <= 0 ||
1294                                 (StrategyNumber) amopform->amopstrategy > numStrats)
1295                                 elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1296                                          amopform->amopstrategy, operatorClassOid);
1297                         opcentry->operatorOids[amopform->amopstrategy - 1] =
1298                                 amopform->amopopr;
1299                 }
1300
1301                 systable_endscan(scan);
1302                 heap_close(rel, AccessShareLock);
1303         }
1304
1305         /*
1306          * Scan pg_amproc to obtain support procs for the opclass.      We only fetch
1307          * the default ones (those with lefttype = righttype = opcintype).
1308          */
1309         if (numSupport > 0)
1310         {
1311                 ScanKeyInit(&skey[0],
1312                                         Anum_pg_amproc_amprocfamily,
1313                                         BTEqualStrategyNumber, F_OIDEQ,
1314                                         ObjectIdGetDatum(opcentry->opcfamily));
1315                 ScanKeyInit(&skey[1],
1316                                         Anum_pg_amproc_amproclefttype,
1317                                         BTEqualStrategyNumber, F_OIDEQ,
1318                                         ObjectIdGetDatum(opcentry->opcintype));
1319                 ScanKeyInit(&skey[2],
1320                                         Anum_pg_amproc_amprocrighttype,
1321                                         BTEqualStrategyNumber, F_OIDEQ,
1322                                         ObjectIdGetDatum(opcentry->opcintype));
1323                 rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
1324                 scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1325                                                                   SnapshotNow, 3, skey);
1326
1327                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1328                 {
1329                         Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1330
1331                         if (amprocform->amprocnum <= 0 ||
1332                                 (StrategyNumber) amprocform->amprocnum > numSupport)
1333                                 elog(ERROR, "invalid amproc number %d for opclass %u",
1334                                          amprocform->amprocnum, operatorClassOid);
1335
1336                         opcentry->supportProcs[amprocform->amprocnum - 1] =
1337                                 amprocform->amproc;
1338                 }
1339
1340                 systable_endscan(scan);
1341                 heap_close(rel, AccessShareLock);
1342         }
1343
1344         opcentry->valid = true;
1345         return opcentry;
1346 }
1347
1348
1349 /*
1350  *              formrdesc
1351  *
1352  *              This is a special cut-down version of RelationBuildDesc()
1353  *              used by RelationCacheInitializePhase2() in initializing the relcache.
1354  *              The relation descriptor is built just from the supplied parameters,
1355  *              without actually looking at any system table entries.  We cheat
1356  *              quite a lot since we only need to work for a few basic system
1357  *              catalogs.
1358  *
1359  * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
1360  * and pg_type (see RelationCacheInitializePhase2).
1361  *
1362  * Note that these catalogs can't have constraints (except attnotnull),
1363  * default values, rules, or triggers, since we don't cope with any of that.
1364  *
1365  * NOTE: we assume we are already switched into CacheMemoryContext.
1366  */
1367 static void
1368 formrdesc(const char *relationName, Oid relationReltype,
1369                   bool hasoids, int natts, FormData_pg_attribute *att)
1370 {
1371         Relation        relation;
1372         int                     i;
1373         bool            has_not_null;
1374
1375         /*
1376          * allocate new relation desc, clear all fields of reldesc
1377          */
1378         relation = (Relation) palloc0(sizeof(RelationData));
1379         relation->rd_targblock = InvalidBlockNumber;
1380         relation->rd_fsm_nblocks = InvalidBlockNumber;
1381         relation->rd_vm_nblocks = InvalidBlockNumber;
1382
1383         /* make sure relation is marked as having no open file yet */
1384         relation->rd_smgr = NULL;
1385
1386         /*
1387          * initialize reference count: 1 because it is nailed in cache
1388          */
1389         relation->rd_refcnt = 1;
1390
1391         /*
1392          * all entries built with this routine are nailed-in-cache; none are for
1393          * new or temp relations.
1394          */
1395         relation->rd_isnailed = true;
1396         relation->rd_createSubid = InvalidSubTransactionId;
1397         relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
1398         relation->rd_istemp = false;
1399
1400         /*
1401          * initialize relation tuple form
1402          *
1403          * The data we insert here is pretty incomplete/bogus, but it'll serve to
1404          * get us launched.  RelationCacheInitializePhase2() will read the real
1405          * data from pg_class and replace what we've done here.
1406          */
1407         relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1408
1409         namestrcpy(&relation->rd_rel->relname, relationName);
1410         relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1411         relation->rd_rel->reltype = relationReltype;
1412
1413         /*
1414          * It's important to distinguish between shared and non-shared relations,
1415          * even at bootstrap time, to make sure we know where they are stored.  At
1416          * present, all relations that formrdesc is used for are not shared.
1417          */
1418         relation->rd_rel->relisshared = false;
1419
1420         relation->rd_rel->relpages = 1;
1421         relation->rd_rel->reltuples = 1;
1422         relation->rd_rel->relkind = RELKIND_RELATION;
1423         relation->rd_rel->relhasoids = hasoids;
1424         relation->rd_rel->relnatts = (int16) natts;
1425
1426         /*
1427          * initialize attribute tuple form
1428          *
1429          * Unlike the case with the relation tuple, this data had better be right
1430          * because it will never be replaced.  The input values must be correctly
1431          * defined by macros in src/include/catalog/ headers.
1432          */
1433         relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1434         relation->rd_att->tdrefcount = 1;       /* mark as refcounted */
1435
1436         relation->rd_att->tdtypeid = relationReltype;
1437         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
1438
1439         /*
1440          * initialize tuple desc info
1441          */
1442         has_not_null = false;
1443         for (i = 0; i < natts; i++)
1444         {
1445                 memcpy(relation->rd_att->attrs[i],
1446                            &att[i],
1447                            ATTRIBUTE_TUPLE_SIZE);
1448                 has_not_null |= att[i].attnotnull;
1449                 /* make sure attcacheoff is valid */
1450                 relation->rd_att->attrs[i]->attcacheoff = -1;
1451         }
1452
1453         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1454         relation->rd_att->attrs[0]->attcacheoff = 0;
1455
1456         /* mark not-null status */
1457         if (has_not_null)
1458         {
1459                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1460
1461                 constr->has_not_null = true;
1462                 relation->rd_att->constr = constr;
1463         }
1464
1465         /*
1466          * initialize relation id from info in att array (my, this is ugly)
1467          */
1468         RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1469         relation->rd_rel->relfilenode = RelationGetRelid(relation);
1470
1471         /*
1472          * initialize the relation lock manager information
1473          */
1474         RelationInitLockInfo(relation);         /* see lmgr.c */
1475
1476         /*
1477          * initialize physical addressing information for the relation
1478          */
1479         RelationInitPhysicalAddr(relation);
1480
1481         /*
1482          * initialize the rel-has-index flag, using hardwired knowledge
1483          */
1484         if (IsBootstrapProcessingMode())
1485         {
1486                 /* In bootstrap mode, we have no indexes */
1487                 relation->rd_rel->relhasindex = false;
1488         }
1489         else
1490         {
1491                 /* Otherwise, all the rels formrdesc is used for have indexes */
1492                 relation->rd_rel->relhasindex = true;
1493         }
1494
1495         /*
1496          * add new reldesc to relcache
1497          */
1498         RelationCacheInsert(relation);
1499
1500         /* It's fully valid */
1501         relation->rd_isvalid = true;
1502 }
1503
1504
1505 /* ----------------------------------------------------------------
1506  *                               Relation Descriptor Lookup Interface
1507  * ----------------------------------------------------------------
1508  */
1509
1510 /*
1511  *              RelationIdGetRelation
1512  *
1513  *              Lookup a reldesc by OID; make one if not already in cache.
1514  *
1515  *              Returns NULL if no pg_class row could be found for the given relid
1516  *              (suggesting we are trying to access a just-deleted relation).
1517  *              Any other error is reported via elog.
1518  *
1519  *              NB: caller should already have at least AccessShareLock on the
1520  *              relation ID, else there are nasty race conditions.
1521  *
1522  *              NB: relation ref count is incremented, or set to 1 if new entry.
1523  *              Caller should eventually decrement count.  (Usually,
1524  *              that happens by calling RelationClose().)
1525  */
1526 Relation
1527 RelationIdGetRelation(Oid relationId)
1528 {
1529         Relation        rd;
1530
1531         /*
1532          * first try to find reldesc in the cache
1533          */
1534         RelationIdCacheLookup(relationId, rd);
1535
1536         if (RelationIsValid(rd))
1537         {
1538                 RelationIncrementReferenceCount(rd);
1539                 /* revalidate nailed index if necessary */
1540                 if (!rd->rd_isvalid)
1541                         RelationReloadIndexInfo(rd);
1542                 return rd;
1543         }
1544
1545         /*
1546          * no reldesc in the cache, so have RelationBuildDesc() build one and add
1547          * it.
1548          */
1549         rd = RelationBuildDesc(relationId, NULL);
1550         if (RelationIsValid(rd))
1551                 RelationIncrementReferenceCount(rd);
1552         return rd;
1553 }
1554
1555 /* ----------------------------------------------------------------
1556  *                              cache invalidation support routines
1557  * ----------------------------------------------------------------
1558  */
1559
1560 /*
1561  * RelationIncrementReferenceCount
1562  *              Increments relation reference count.
1563  *
1564  * Note: bootstrap mode has its own weird ideas about relation refcount
1565  * behavior; we ought to fix it someday, but for now, just disable
1566  * reference count ownership tracking in bootstrap mode.
1567  */
1568 void
1569 RelationIncrementReferenceCount(Relation rel)
1570 {
1571         ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
1572         rel->rd_refcnt += 1;
1573         if (!IsBootstrapProcessingMode())
1574                 ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
1575 }
1576
1577 /*
1578  * RelationDecrementReferenceCount
1579  *              Decrements relation reference count.
1580  */
1581 void
1582 RelationDecrementReferenceCount(Relation rel)
1583 {
1584         Assert(rel->rd_refcnt > 0);
1585         rel->rd_refcnt -= 1;
1586         if (!IsBootstrapProcessingMode())
1587                 ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
1588 }
1589
1590 /*
1591  * RelationClose - close an open relation
1592  *
1593  *      Actually, we just decrement the refcount.
1594  *
1595  *      NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
1596  *      will be freed as soon as their refcount goes to zero.  In combination
1597  *      with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
1598  *      to catch references to already-released relcache entries.  It slows
1599  *      things down quite a bit, however.
1600  */
1601 void
1602 RelationClose(Relation relation)
1603 {
1604         /* Note: no locking manipulations needed */
1605         RelationDecrementReferenceCount(relation);
1606
1607 #ifdef RELCACHE_FORCE_RELEASE
1608         if (RelationHasReferenceCountZero(relation) &&
1609                 relation->rd_createSubid == InvalidSubTransactionId &&
1610                 relation->rd_newRelfilenodeSubid == InvalidSubTransactionId)
1611                 RelationClearRelation(relation, false);
1612 #endif
1613 }
1614
1615 /*
1616  * RelationReloadIndexInfo - reload minimal information for an open index
1617  *
1618  *      This function is used only for indexes.  A relcache inval on an index
1619  *      can mean that its pg_class or pg_index row changed.  There are only
1620  *      very limited changes that are allowed to an existing index's schema,
1621  *      so we can update the relcache entry without a complete rebuild; which
1622  *      is fortunate because we can't rebuild an index entry that is "nailed"
1623  *      and/or in active use.  We support full replacement of the pg_class row,
1624  *      as well as updates of a few simple fields of the pg_index row.
1625  *
1626  *      We can't necessarily reread the catalog rows right away; we might be
1627  *      in a failed transaction when we receive the SI notification.  If so,
1628  *      RelationClearRelation just marks the entry as invalid by setting
1629  *      rd_isvalid to false.  This routine is called to fix the entry when it
1630  *      is next needed.
1631  *
1632  *      We assume that at the time we are called, we have at least AccessShareLock
1633  *      on the target index.  (Note: in the calls from RelationClearRelation,
1634  *      this is legitimate because we know the rel has positive refcount.)
1635  */
1636 static void
1637 RelationReloadIndexInfo(Relation relation)
1638 {
1639         bool            indexOK;
1640         HeapTuple       pg_class_tuple;
1641         Form_pg_class relp;
1642
1643         /* Should be called only for invalidated indexes */
1644         Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
1645                    !relation->rd_isvalid);
1646         /* Should be closed at smgr level */
1647         Assert(relation->rd_smgr == NULL);
1648
1649         /*
1650          * Read the pg_class row
1651          *
1652          * Don't try to use an indexscan of pg_class_oid_index to reload the info
1653          * for pg_class_oid_index ...
1654          */
1655         indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
1656         pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK);
1657         if (!HeapTupleIsValid(pg_class_tuple))
1658                 elog(ERROR, "could not find pg_class tuple for index %u",
1659                          RelationGetRelid(relation));
1660         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1661         memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
1662         /* Reload reloptions in case they changed */
1663         if (relation->rd_options)
1664                 pfree(relation->rd_options);
1665         RelationParseRelOptions(relation, pg_class_tuple);
1666         /* done with pg_class tuple */
1667         heap_freetuple(pg_class_tuple);
1668         /* We must recalculate physical address in case it changed */
1669         RelationInitPhysicalAddr(relation);
1670         /*
1671          * Must reset targblock, fsm_nblocks and vm_nblocks in case rel was
1672          * truncated
1673          */
1674         relation->rd_targblock = InvalidBlockNumber;
1675         relation->rd_fsm_nblocks = InvalidBlockNumber;
1676         relation->rd_vm_nblocks = InvalidBlockNumber;
1677         /* Must free any AM cached data, too */
1678         if (relation->rd_amcache)
1679                 pfree(relation->rd_amcache);
1680         relation->rd_amcache = NULL;
1681
1682         /*
1683          * For a non-system index, there are fields of the pg_index row that are
1684          * allowed to change, so re-read that row and update the relcache entry.
1685          * Most of the info derived from pg_index (such as support function lookup
1686          * info) cannot change, and indeed the whole point of this routine is to
1687          * update the relcache entry without clobbering that data; so wholesale
1688          * replacement is not appropriate.
1689          */
1690         if (!IsSystemRelation(relation))
1691         {
1692                 HeapTuple       tuple;
1693                 Form_pg_index index;
1694
1695                 tuple = SearchSysCache(INDEXRELID,
1696                                                            ObjectIdGetDatum(RelationGetRelid(relation)),
1697                                                            0, 0, 0);
1698                 if (!HeapTupleIsValid(tuple))
1699                         elog(ERROR, "cache lookup failed for index %u",
1700                                  RelationGetRelid(relation));
1701                 index = (Form_pg_index) GETSTRUCT(tuple);
1702
1703                 relation->rd_index->indisvalid = index->indisvalid;
1704                 relation->rd_index->indcheckxmin = index->indcheckxmin;
1705                 relation->rd_index->indisready = index->indisready;
1706                 HeapTupleHeaderSetXmin(relation->rd_indextuple->t_data,
1707                                                            HeapTupleHeaderGetXmin(tuple->t_data));
1708
1709                 ReleaseSysCache(tuple);
1710         }
1711
1712         /* Okay, now it's valid again */
1713         relation->rd_isvalid = true;
1714 }
1715
1716 /*
1717  * RelationClearRelation
1718  *
1719  *       Physically blow away a relation cache entry, or reset it and rebuild
1720  *       it from scratch (that is, from catalog entries).  The latter path is
1721  *       usually used when we are notified of a change to an open relation
1722  *       (one with refcount > 0).  However, this routine just does whichever
1723  *       it's told to do; callers must determine which they want.
1724  *
1725  *       NB: when rebuilding, we'd better hold some lock on the relation.
1726  *       In current usages this is presumed true because it has refcnt > 0.
1727  */
1728 static void
1729 RelationClearRelation(Relation relation, bool rebuild)
1730 {
1731         Oid                     old_reltype = relation->rd_rel->reltype;
1732         MemoryContext oldcxt;
1733
1734         /*
1735          * Make sure smgr and lower levels close the relation's files, if they
1736          * weren't closed already.  If the relation is not getting deleted, the
1737          * next smgr access should reopen the files automatically.      This ensures
1738          * that the low-level file access state is updated after, say, a vacuum
1739          * truncation.
1740          */
1741         RelationCloseSmgr(relation);
1742
1743         /*
1744          * Never, never ever blow away a nailed-in system relation, because we'd
1745          * be unable to recover.  However, we must reset rd_targblock, in case we
1746          * got called because of a relation cache flush that was triggered by
1747          * VACUUM.
1748          *
1749          * If it's a nailed index, then we need to re-read the pg_class row to see
1750          * if its relfilenode changed.  We can't necessarily do that here, because
1751          * we might be in a failed transaction.  We assume it's okay to do it if
1752          * there are open references to the relcache entry (cf notes for
1753          * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
1754          * invalid, and it'll be fixed when next opened.
1755          */
1756         if (relation->rd_isnailed)
1757         {
1758                 relation->rd_targblock = InvalidBlockNumber;
1759                 relation->rd_fsm_nblocks = InvalidBlockNumber;
1760                 relation->rd_vm_nblocks = InvalidBlockNumber;
1761                 if (relation->rd_rel->relkind == RELKIND_INDEX)
1762                 {
1763                         relation->rd_isvalid = false;           /* needs to be revalidated */
1764                         if (relation->rd_refcnt > 1)
1765                                 RelationReloadIndexInfo(relation);
1766                 }
1767                 return;
1768         }
1769
1770         /*
1771          * Even non-system indexes should not be blown away if they are open and
1772          * have valid index support information.  This avoids problems with active
1773          * use of the index support information.  As with nailed indexes, we
1774          * re-read the pg_class row to handle possible physical relocation of the
1775          * index, and we check for pg_index updates too.
1776          */
1777         if (relation->rd_rel->relkind == RELKIND_INDEX &&
1778                 relation->rd_refcnt > 0 &&
1779                 relation->rd_indexcxt != NULL)
1780         {
1781                 relation->rd_isvalid = false;   /* needs to be revalidated */
1782                 RelationReloadIndexInfo(relation);
1783                 return;
1784         }
1785
1786         /*
1787          * Remove relation from hash tables
1788          *
1789          * Note: we might be reinserting it momentarily, but we must not have it
1790          * visible in the hash tables until it's valid again, so don't try to
1791          * optimize this away...
1792          */
1793         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
1794         RelationCacheDelete(relation);
1795         MemoryContextSwitchTo(oldcxt);
1796
1797         /* Clear out catcache's entries for this relation */
1798         CatalogCacheFlushRelation(RelationGetRelid(relation));
1799
1800         /*
1801          * Free all the subsidiary data structures of the relcache entry. We
1802          * cannot free rd_att if we are trying to rebuild the entry, however,
1803          * because pointers to it may be cached in various places. The rule
1804          * manager might also have pointers into the rewrite rules. So to begin
1805          * with, we can only get rid of these fields:
1806          */
1807         FreeTriggerDesc(relation->trigdesc);
1808         if (relation->rd_indextuple)
1809                 pfree(relation->rd_indextuple);
1810         if (relation->rd_am)
1811                 pfree(relation->rd_am);
1812         if (relation->rd_rel)
1813                 pfree(relation->rd_rel);
1814         if (relation->rd_options)
1815                 pfree(relation->rd_options);
1816         list_free(relation->rd_indexlist);
1817         bms_free(relation->rd_indexattr);
1818         if (relation->rd_indexcxt)
1819                 MemoryContextDelete(relation->rd_indexcxt);
1820
1821         /*
1822          * If we're really done with the relcache entry, blow it away. But if
1823          * someone is still using it, reconstruct the whole deal without moving
1824          * the physical RelationData record (so that the someone's pointer is
1825          * still valid).
1826          */
1827         if (!rebuild)
1828         {
1829                 /* ok to zap remaining substructure */
1830                 flush_rowtype_cache(old_reltype);
1831                 /* can't use DecrTupleDescRefCount here */
1832                 Assert(relation->rd_att->tdrefcount > 0);
1833                 if (--relation->rd_att->tdrefcount == 0)
1834                         FreeTupleDesc(relation->rd_att);
1835                 if (relation->rd_rulescxt)
1836                         MemoryContextDelete(relation->rd_rulescxt);
1837                 pfree(relation);
1838         }
1839         else
1840         {
1841                 /*
1842                  * When rebuilding an open relcache entry, must preserve ref count and
1843                  * rd_createSubid/rd_newRelfilenodeSubid state.  Also attempt to
1844                  * preserve the tupledesc and rewrite-rule substructures in place.
1845                  * (Note: the refcount mechanism for tupledescs may eventually ensure
1846                  * that we don't really need to preserve the tupledesc in-place, but
1847                  * for now there are still a lot of places that assume an open rel's
1848                  * tupledesc won't move.)
1849                  *
1850                  * Note that this process does not touch CurrentResourceOwner; which
1851                  * is good because whatever ref counts the entry may have do not
1852                  * necessarily belong to that resource owner.
1853                  */
1854                 Oid                     save_relid = RelationGetRelid(relation);
1855                 int                     old_refcnt = relation->rd_refcnt;
1856                 SubTransactionId old_createSubid = relation->rd_createSubid;
1857                 SubTransactionId old_newRelfilenodeSubid = relation->rd_newRelfilenodeSubid;
1858                 struct PgStat_TableStatus *old_pgstat_info = relation->pgstat_info;
1859                 TupleDesc       old_att = relation->rd_att;
1860                 RuleLock   *old_rules = relation->rd_rules;
1861                 MemoryContext old_rulescxt = relation->rd_rulescxt;
1862
1863                 if (RelationBuildDesc(save_relid, relation) != relation)
1864                 {
1865                         /* Should only get here if relation was deleted */
1866                         flush_rowtype_cache(old_reltype);
1867                         Assert(old_att->tdrefcount > 0);
1868                         if (--old_att->tdrefcount == 0)
1869                                 FreeTupleDesc(old_att);
1870                         if (old_rulescxt)
1871                                 MemoryContextDelete(old_rulescxt);
1872                         pfree(relation);
1873                         elog(ERROR, "relation %u deleted while still in use", save_relid);
1874                 }
1875                 relation->rd_refcnt = old_refcnt;
1876                 relation->rd_createSubid = old_createSubid;
1877                 relation->rd_newRelfilenodeSubid = old_newRelfilenodeSubid;
1878                 relation->pgstat_info = old_pgstat_info;
1879
1880                 if (equalTupleDescs(old_att, relation->rd_att))
1881                 {
1882                         /* needn't flush typcache here */
1883                         Assert(relation->rd_att->tdrefcount == 1);
1884                         if (--relation->rd_att->tdrefcount == 0)
1885                                 FreeTupleDesc(relation->rd_att);
1886                         relation->rd_att = old_att;
1887                 }
1888                 else
1889                 {
1890                         flush_rowtype_cache(old_reltype);
1891                         Assert(old_att->tdrefcount > 0);
1892                         if (--old_att->tdrefcount == 0)
1893                                 FreeTupleDesc(old_att);
1894                 }
1895                 if (equalRuleLocks(old_rules, relation->rd_rules))
1896                 {
1897                         if (relation->rd_rulescxt)
1898                                 MemoryContextDelete(relation->rd_rulescxt);
1899                         relation->rd_rules = old_rules;
1900                         relation->rd_rulescxt = old_rulescxt;
1901                 }
1902                 else
1903                 {
1904                         if (old_rulescxt)
1905                                 MemoryContextDelete(old_rulescxt);
1906                 }
1907         }
1908 }
1909
1910 /*
1911  * RelationFlushRelation
1912  *
1913  *       Rebuild the relation if it is open (refcount > 0), else blow it away.
1914  */
1915 static void
1916 RelationFlushRelation(Relation relation)
1917 {
1918         bool            rebuild;
1919
1920         if (relation->rd_createSubid != InvalidSubTransactionId ||
1921                 relation->rd_newRelfilenodeSubid != InvalidSubTransactionId)
1922         {
1923                 /*
1924                  * New relcache entries are always rebuilt, not flushed; else we'd
1925                  * forget the "new" status of the relation, which is a useful
1926                  * optimization to have.  Ditto for the new-relfilenode status.
1927                  */
1928                 rebuild = true;
1929         }
1930         else
1931         {
1932                 /*
1933                  * Pre-existing rels can be dropped from the relcache if not open.
1934                  */
1935                 rebuild = !RelationHasReferenceCountZero(relation);
1936         }
1937
1938         RelationClearRelation(relation, rebuild);
1939 }
1940
1941 /*
1942  * RelationForgetRelation - unconditionally remove a relcache entry
1943  *
1944  *                 External interface for destroying a relcache entry when we
1945  *                 drop the relation.
1946  */
1947 void
1948 RelationForgetRelation(Oid rid)
1949 {
1950         Relation        relation;
1951
1952         RelationIdCacheLookup(rid, relation);
1953
1954         if (!PointerIsValid(relation))
1955                 return;                                 /* not in cache, nothing to do */
1956
1957         if (!RelationHasReferenceCountZero(relation))
1958                 elog(ERROR, "relation %u is still open", rid);
1959
1960         /* Unconditionally destroy the relcache entry */
1961         RelationClearRelation(relation, false);
1962 }
1963
1964 /*
1965  *              RelationCacheInvalidateEntry
1966  *
1967  *              This routine is invoked for SI cache flush messages.
1968  *
1969  * Any relcache entry matching the relid must be flushed.  (Note: caller has
1970  * already determined that the relid belongs to our database or is a shared
1971  * relation.)
1972  *
1973  * We used to skip local relations, on the grounds that they could
1974  * not be targets of cross-backend SI update messages; but it seems
1975  * safer to process them, so that our *own* SI update messages will
1976  * have the same effects during CommandCounterIncrement for both
1977  * local and nonlocal relations.
1978  */
1979 void
1980 RelationCacheInvalidateEntry(Oid relationId)
1981 {
1982         Relation        relation;
1983
1984         RelationIdCacheLookup(relationId, relation);
1985
1986         if (PointerIsValid(relation))
1987         {
1988                 relcacheInvalsReceived++;
1989                 RelationFlushRelation(relation);
1990         }
1991 }
1992
1993 /*
1994  * RelationCacheInvalidate
1995  *       Blow away cached relation descriptors that have zero reference counts,
1996  *       and rebuild those with positive reference counts.      Also reset the smgr
1997  *       relation cache.
1998  *
1999  *       This is currently used only to recover from SI message buffer overflow,
2000  *       so we do not touch new-in-transaction relations; they cannot be targets
2001  *       of cross-backend SI updates (and our own updates now go through a
2002  *       separate linked list that isn't limited by the SI message buffer size).
2003  *       Likewise, we need not discard new-relfilenode-in-transaction hints,
2004  *       since any invalidation of those would be a local event.
2005  *
2006  *       We do this in two phases: the first pass deletes deletable items, and
2007  *       the second one rebuilds the rebuildable items.  This is essential for
2008  *       safety, because hash_seq_search only copes with concurrent deletion of
2009  *       the element it is currently visiting.  If a second SI overflow were to
2010  *       occur while we are walking the table, resulting in recursive entry to
2011  *       this routine, we could crash because the inner invocation blows away
2012  *       the entry next to be visited by the outer scan.  But this way is OK,
2013  *       because (a) during the first pass we won't process any more SI messages,
2014  *       so hash_seq_search will complete safely; (b) during the second pass we
2015  *       only hold onto pointers to nondeletable entries.
2016  *
2017  *       The two-phase approach also makes it easy to ensure that we process
2018  *       nailed-in-cache indexes before other nondeletable items, and that we
2019  *       process pg_class_oid_index first of all.  In scenarios where a nailed
2020  *       index has been given a new relfilenode, we have to detect that update
2021  *       before the nailed index is used in reloading any other relcache entry.
2022  */
2023 void
2024 RelationCacheInvalidate(void)
2025 {
2026         HASH_SEQ_STATUS status;
2027         RelIdCacheEnt *idhentry;
2028         Relation        relation;
2029         List       *rebuildFirstList = NIL;
2030         List       *rebuildList = NIL;
2031         ListCell   *l;
2032
2033         /* Phase 1 */
2034         hash_seq_init(&status, RelationIdCache);
2035
2036         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2037         {
2038                 relation = idhentry->reldesc;
2039
2040                 /* Must close all smgr references to avoid leaving dangling ptrs */
2041                 RelationCloseSmgr(relation);
2042
2043                 /* Ignore new relations, since they are never SI targets */
2044                 if (relation->rd_createSubid != InvalidSubTransactionId)
2045                         continue;
2046
2047                 relcacheInvalsReceived++;
2048
2049                 if (RelationHasReferenceCountZero(relation))
2050                 {
2051                         /* Delete this entry immediately */
2052                         Assert(!relation->rd_isnailed);
2053                         RelationClearRelation(relation, false);
2054                 }
2055                 else
2056                 {
2057                         /*
2058                          * Add this entry to list of stuff to rebuild in second pass.
2059                          * pg_class_oid_index goes on the front of rebuildFirstList, other
2060                          * nailed indexes on the back, and everything else into
2061                          * rebuildList (in no particular order).
2062                          */
2063                         if (relation->rd_isnailed &&
2064                                 relation->rd_rel->relkind == RELKIND_INDEX)
2065                         {
2066                                 if (RelationGetRelid(relation) == ClassOidIndexId)
2067                                         rebuildFirstList = lcons(relation, rebuildFirstList);
2068                                 else
2069                                         rebuildFirstList = lappend(rebuildFirstList, relation);
2070                         }
2071                         else
2072                                 rebuildList = lcons(relation, rebuildList);
2073                 }
2074         }
2075
2076         /*
2077          * Now zap any remaining smgr cache entries.  This must happen before we
2078          * start to rebuild entries, since that may involve catalog fetches which
2079          * will re-open catalog files.
2080          */
2081         smgrcloseall();
2082
2083         /* Phase 2: rebuild the items found to need rebuild in phase 1 */
2084         foreach(l, rebuildFirstList)
2085         {
2086                 relation = (Relation) lfirst(l);
2087                 RelationClearRelation(relation, true);
2088         }
2089         list_free(rebuildFirstList);
2090         foreach(l, rebuildList)
2091         {
2092                 relation = (Relation) lfirst(l);
2093                 RelationClearRelation(relation, true);
2094         }
2095         list_free(rebuildList);
2096 }
2097
2098 /*
2099  * AtEOXact_RelationCache
2100  *
2101  *      Clean up the relcache at main-transaction commit or abort.
2102  *
2103  * Note: this must be called *before* processing invalidation messages.
2104  * In the case of abort, we don't want to try to rebuild any invalidated
2105  * cache entries (since we can't safely do database accesses).  Therefore
2106  * we must reset refcnts before handling pending invalidations.
2107  *
2108  * As of PostgreSQL 8.1, relcache refcnts should get released by the
2109  * ResourceOwner mechanism.  This routine just does a debugging
2110  * cross-check that no pins remain.  However, we also need to do special
2111  * cleanup when the current transaction created any relations or made use
2112  * of forced index lists.
2113  */
2114 void
2115 AtEOXact_RelationCache(bool isCommit)
2116 {
2117         HASH_SEQ_STATUS status;
2118         RelIdCacheEnt *idhentry;
2119
2120         /*
2121          * To speed up transaction exit, we want to avoid scanning the relcache
2122          * unless there is actually something for this routine to do.  Other than
2123          * the debug-only Assert checks, most transactions don't create any work
2124          * for us to do here, so we keep a static flag that gets set if there is
2125          * anything to do.      (Currently, this means either a relation is created in
2126          * the current xact, or one is given a new relfilenode, or an index list
2127          * is forced.)  For simplicity, the flag remains set till end of top-level
2128          * transaction, even though we could clear it at subtransaction end in
2129          * some cases.
2130          */
2131         if (!need_eoxact_work
2132 #ifdef USE_ASSERT_CHECKING
2133                 && !assert_enabled
2134 #endif
2135                 )
2136                 return;
2137
2138         hash_seq_init(&status, RelationIdCache);
2139
2140         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2141         {
2142                 Relation        relation = idhentry->reldesc;
2143
2144                 /*
2145                  * The relcache entry's ref count should be back to its normal
2146                  * not-in-a-transaction state: 0 unless it's nailed in cache.
2147                  *
2148                  * In bootstrap mode, this is NOT true, so don't check it --- the
2149                  * bootstrap code expects relations to stay open across start/commit
2150                  * transaction calls.  (That seems bogus, but it's not worth fixing.)
2151                  */
2152 #ifdef USE_ASSERT_CHECKING
2153                 if (!IsBootstrapProcessingMode())
2154                 {
2155                         int                     expected_refcnt;
2156
2157                         expected_refcnt = relation->rd_isnailed ? 1 : 0;
2158                         Assert(relation->rd_refcnt == expected_refcnt);
2159                 }
2160 #endif
2161
2162                 /*
2163                  * Is it a relation created in the current transaction?
2164                  *
2165                  * During commit, reset the flag to zero, since we are now out of the
2166                  * creating transaction.  During abort, simply delete the relcache
2167                  * entry --- it isn't interesting any longer.  (NOTE: if we have
2168                  * forgotten the new-ness of a new relation due to a forced cache
2169                  * flush, the entry will get deleted anyway by shared-cache-inval
2170                  * processing of the aborted pg_class insertion.)
2171                  */
2172                 if (relation->rd_createSubid != InvalidSubTransactionId)
2173                 {
2174                         if (isCommit)
2175                                 relation->rd_createSubid = InvalidSubTransactionId;
2176                         else
2177                         {
2178                                 RelationClearRelation(relation, false);
2179                                 continue;
2180                         }
2181                 }
2182
2183                 /*
2184                  * Likewise, reset the hint about the relfilenode being new.
2185                  */
2186                 relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2187
2188                 /*
2189                  * Flush any temporary index list.
2190                  */
2191                 if (relation->rd_indexvalid == 2)
2192                 {
2193                         list_free(relation->rd_indexlist);
2194                         relation->rd_indexlist = NIL;
2195                         relation->rd_oidindex = InvalidOid;
2196                         relation->rd_indexvalid = 0;
2197                 }
2198         }
2199
2200         /* Once done with the transaction, we can reset need_eoxact_work */
2201         need_eoxact_work = false;
2202 }
2203
2204 /*
2205  * AtEOSubXact_RelationCache
2206  *
2207  *      Clean up the relcache at sub-transaction commit or abort.
2208  *
2209  * Note: this must be called *before* processing invalidation messages.
2210  */
2211 void
2212 AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
2213                                                   SubTransactionId parentSubid)
2214 {
2215         HASH_SEQ_STATUS status;
2216         RelIdCacheEnt *idhentry;
2217
2218         /*
2219          * Skip the relcache scan if nothing to do --- see notes for
2220          * AtEOXact_RelationCache.
2221          */
2222         if (!need_eoxact_work)
2223                 return;
2224
2225         hash_seq_init(&status, RelationIdCache);
2226
2227         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2228         {
2229                 Relation        relation = idhentry->reldesc;
2230
2231                 /*
2232                  * Is it a relation created in the current subtransaction?
2233                  *
2234                  * During subcommit, mark it as belonging to the parent, instead.
2235                  * During subabort, simply delete the relcache entry.
2236                  */
2237                 if (relation->rd_createSubid == mySubid)
2238                 {
2239                         if (isCommit)
2240                                 relation->rd_createSubid = parentSubid;
2241                         else
2242                         {
2243                                 Assert(RelationHasReferenceCountZero(relation));
2244                                 RelationClearRelation(relation, false);
2245                                 continue;
2246                         }
2247                 }
2248
2249                 /*
2250                  * Likewise, update or drop any new-relfilenode-in-subtransaction
2251                  * hint.
2252                  */
2253                 if (relation->rd_newRelfilenodeSubid == mySubid)
2254                 {
2255                         if (isCommit)
2256                                 relation->rd_newRelfilenodeSubid = parentSubid;
2257                         else
2258                                 relation->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2259                 }
2260
2261                 /*
2262                  * Flush any temporary index list.
2263                  */
2264                 if (relation->rd_indexvalid == 2)
2265                 {
2266                         list_free(relation->rd_indexlist);
2267                         relation->rd_indexlist = NIL;
2268                         relation->rd_oidindex = InvalidOid;
2269                         relation->rd_indexvalid = 0;
2270                 }
2271         }
2272 }
2273
2274 /*
2275  * RelationCacheMarkNewRelfilenode
2276  *
2277  *      Mark the rel as having been given a new relfilenode in the current
2278  *      (sub) transaction.      This is a hint that can be used to optimize
2279  *      later operations on the rel in the same transaction.
2280  */
2281 void
2282 RelationCacheMarkNewRelfilenode(Relation rel)
2283 {
2284         /* Mark it... */
2285         rel->rd_newRelfilenodeSubid = GetCurrentSubTransactionId();
2286         /* ... and now we have eoxact cleanup work to do */
2287         need_eoxact_work = true;
2288 }
2289
2290
2291 /*
2292  *              RelationBuildLocalRelation
2293  *                      Build a relcache entry for an about-to-be-created relation,
2294  *                      and enter it into the relcache.
2295  */
2296 Relation
2297 RelationBuildLocalRelation(const char *relname,
2298                                                    Oid relnamespace,
2299                                                    TupleDesc tupDesc,
2300                                                    Oid relid,
2301                                                    Oid reltablespace,
2302                                                    bool shared_relation)
2303 {
2304         Relation        rel;
2305         MemoryContext oldcxt;
2306         int                     natts = tupDesc->natts;
2307         int                     i;
2308         bool            has_not_null;
2309         bool            nailit;
2310
2311         AssertArg(natts >= 0);
2312
2313         /*
2314          * check for creation of a rel that must be nailed in cache.
2315          *
2316          * XXX this list had better match RelationCacheInitializePhase2's list.
2317          */
2318         switch (relid)
2319         {
2320                 case RelationRelationId:
2321                 case AttributeRelationId:
2322                 case ProcedureRelationId:
2323                 case TypeRelationId:
2324                         nailit = true;
2325                         break;
2326                 default:
2327                         nailit = false;
2328                         break;
2329         }
2330
2331         /*
2332          * check that hardwired list of shared rels matches what's in the
2333          * bootstrap .bki file.  If you get a failure here during initdb, you
2334          * probably need to fix IsSharedRelation() to match whatever you've done
2335          * to the set of shared relations.
2336          */
2337         if (shared_relation != IsSharedRelation(relid))
2338                 elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
2339                          relname, relid);
2340
2341         /*
2342          * switch to the cache context to create the relcache entry.
2343          */
2344         if (!CacheMemoryContext)
2345                 CreateCacheMemoryContext();
2346
2347         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2348
2349         /*
2350          * allocate a new relation descriptor and fill in basic state fields.
2351          */
2352         rel = (Relation) palloc0(sizeof(RelationData));
2353
2354         rel->rd_targblock = InvalidBlockNumber;
2355         rel->rd_fsm_nblocks = InvalidBlockNumber;
2356         rel->rd_vm_nblocks = InvalidBlockNumber;
2357
2358         /* make sure relation is marked as having no open file yet */
2359         rel->rd_smgr = NULL;
2360
2361         /* mark it nailed if appropriate */
2362         rel->rd_isnailed = nailit;
2363
2364         rel->rd_refcnt = nailit ? 1 : 0;
2365
2366         /* it's being created in this transaction */
2367         rel->rd_createSubid = GetCurrentSubTransactionId();
2368         rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
2369
2370         /* must flag that we have rels created in this transaction */
2371         need_eoxact_work = true;
2372
2373         /* is it a temporary relation? */
2374         rel->rd_istemp = isTempOrToastNamespace(relnamespace);
2375
2376         /*
2377          * create a new tuple descriptor from the one passed in.  We do this
2378          * partly to copy it into the cache context, and partly because the new
2379          * relation can't have any defaults or constraints yet; they have to be
2380          * added in later steps, because they require additions to multiple system
2381          * catalogs.  We can copy attnotnull constraints here, however.
2382          */
2383         rel->rd_att = CreateTupleDescCopy(tupDesc);
2384         rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
2385         has_not_null = false;
2386         for (i = 0; i < natts; i++)
2387         {
2388                 rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
2389                 has_not_null |= tupDesc->attrs[i]->attnotnull;
2390         }
2391
2392         if (has_not_null)
2393         {
2394                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
2395
2396                 constr->has_not_null = true;
2397                 rel->rd_att->constr = constr;
2398         }
2399
2400         /*
2401          * initialize relation tuple form (caller may add/override data later)
2402          */
2403         rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
2404
2405         namestrcpy(&rel->rd_rel->relname, relname);
2406         rel->rd_rel->relnamespace = relnamespace;
2407
2408         rel->rd_rel->relkind = RELKIND_UNCATALOGED;
2409         rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
2410         rel->rd_rel->relnatts = natts;
2411         rel->rd_rel->reltype = InvalidOid;
2412         /* needed when bootstrapping: */
2413         rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
2414
2415         /*
2416          * Insert relation physical and logical identifiers (OIDs) into the right
2417          * places.      Note that the physical ID (relfilenode) is initially the same
2418          * as the logical ID (OID).
2419          */
2420         rel->rd_rel->relisshared = shared_relation;
2421
2422         RelationGetRelid(rel) = relid;
2423
2424         for (i = 0; i < natts; i++)
2425                 rel->rd_att->attrs[i]->attrelid = relid;
2426
2427         rel->rd_rel->relfilenode = relid;
2428         rel->rd_rel->reltablespace = reltablespace;
2429
2430         RelationInitLockInfo(rel);      /* see lmgr.c */
2431
2432         RelationInitPhysicalAddr(rel);
2433
2434         /*
2435          * Okay to insert into the relcache hash tables.
2436          */
2437         RelationCacheInsert(rel);
2438
2439         /*
2440          * done building relcache entry.
2441          */
2442         MemoryContextSwitchTo(oldcxt);
2443
2444         /* It's fully valid */
2445         rel->rd_isvalid = true;
2446
2447         /*
2448          * Caller expects us to pin the returned entry.
2449          */
2450         RelationIncrementReferenceCount(rel);
2451
2452         return rel;
2453 }
2454
2455 /*
2456  *              RelationCacheInitialize
2457  *
2458  *              This initializes the relation descriptor cache.  At the time
2459  *              that this is invoked, we can't do database access yet (mainly
2460  *              because the transaction subsystem is not up); all we are doing
2461  *              is making an empty cache hashtable.  This must be done before
2462  *              starting the initialization transaction, because otherwise
2463  *              AtEOXact_RelationCache would crash if that transaction aborts
2464  *              before we can get the relcache set up.
2465  */
2466
2467 #define INITRELCACHESIZE                400
2468
2469 void
2470 RelationCacheInitialize(void)
2471 {
2472         MemoryContext oldcxt;
2473         HASHCTL         ctl;
2474
2475         /*
2476          * switch to cache memory context
2477          */
2478         if (!CacheMemoryContext)
2479                 CreateCacheMemoryContext();
2480
2481         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2482
2483         /*
2484          * create hashtable that indexes the relcache
2485          */
2486         MemSet(&ctl, 0, sizeof(ctl));
2487         ctl.keysize = sizeof(Oid);
2488         ctl.entrysize = sizeof(RelIdCacheEnt);
2489         ctl.hash = oid_hash;
2490         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2491                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2492
2493         MemoryContextSwitchTo(oldcxt);
2494 }
2495
2496 /*
2497  *              RelationCacheInitializePhase2
2498  *
2499  *              This is called as soon as the catcache and transaction system
2500  *              are functional.  At this point we can actually read data from
2501  *              the system catalogs.  We first try to read pre-computed relcache
2502  *              entries from the pg_internal.init file.  If that's missing or
2503  *              broken, make phony entries for the minimum set of nailed-in-cache
2504  *              relations.      Then (unless bootstrapping) make sure we have entries
2505  *              for the critical system indexes.  Once we've done all this, we
2506  *              have enough infrastructure to open any system catalog or use any
2507  *              catcache.  The last step is to rewrite pg_internal.init if needed.
2508  */
2509 void
2510 RelationCacheInitializePhase2(void)
2511 {
2512         HASH_SEQ_STATUS status;
2513         RelIdCacheEnt *idhentry;
2514         MemoryContext oldcxt;
2515         bool            needNewCacheFile = false;
2516
2517         /*
2518          * switch to cache memory context
2519          */
2520         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2521
2522         /*
2523          * Try to load the relcache cache file.  If unsuccessful, bootstrap the
2524          * cache with pre-made descriptors for the critical "nailed-in" system
2525          * catalogs.
2526          */
2527         if (IsBootstrapProcessingMode() ||
2528                 !load_relcache_init_file())
2529         {
2530                 needNewCacheFile = true;
2531
2532                 formrdesc("pg_class", PG_CLASS_RELTYPE_OID,
2533                                   true, Natts_pg_class, Desc_pg_class);
2534                 formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID,
2535                                   false, Natts_pg_attribute, Desc_pg_attribute);
2536                 formrdesc("pg_proc", PG_PROC_RELTYPE_OID,
2537                                   true, Natts_pg_proc, Desc_pg_proc);
2538                 formrdesc("pg_type", PG_TYPE_RELTYPE_OID,
2539                                   true, Natts_pg_type, Desc_pg_type);
2540
2541 #define NUM_CRITICAL_RELS       4       /* fix if you change list above */
2542         }
2543
2544         MemoryContextSwitchTo(oldcxt);
2545
2546         /* In bootstrap mode, the faked-up formrdesc info is all we'll have */
2547         if (IsBootstrapProcessingMode())
2548                 return;
2549
2550         /*
2551          * If we didn't get the critical system indexes loaded into relcache, do
2552          * so now.      These are critical because the catcache and/or opclass cache
2553          * depend on them for fetches done during relcache load.  Thus, we have an
2554          * infinite-recursion problem.  We can break the recursion by doing
2555          * heapscans instead of indexscans at certain key spots. To avoid hobbling
2556          * performance, we only want to do that until we have the critical indexes
2557          * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
2558          * decide whether to do heapscan or indexscan at the key spots, and we set
2559          * it true after we've loaded the critical indexes.
2560          *
2561          * The critical indexes are marked as "nailed in cache", partly to make it
2562          * easy for load_relcache_init_file to count them, but mainly because we
2563          * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
2564          * true.  (NOTE: perhaps it would be possible to reload them by
2565          * temporarily setting criticalRelcachesBuilt to false again.  For now,
2566          * though, we just nail 'em in.)
2567          *
2568          * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
2569          * in the same way as the others, because the critical catalogs don't
2570          * (currently) have any rules or triggers, and so these indexes can be
2571          * rebuilt without inducing recursion.  However they are used during
2572          * relcache load when a rel does have rules or triggers, so we choose to
2573          * nail them for performance reasons.
2574          */
2575         if (!criticalRelcachesBuilt)
2576         {
2577                 Relation        ird;
2578
2579 #define LOAD_CRIT_INDEX(indexoid) \
2580                 do { \
2581                         LockRelationOid(indexoid, AccessShareLock); \
2582                         ird = RelationBuildDesc(indexoid, NULL); \
2583                         if (ird == NULL) \
2584                                 elog(PANIC, "could not open critical system index %u", \
2585                                          indexoid); \
2586                         ird->rd_isnailed = true; \
2587                         ird->rd_refcnt = 1; \
2588                         UnlockRelationOid(indexoid, AccessShareLock); \
2589                 } while (0)
2590
2591                 LOAD_CRIT_INDEX(ClassOidIndexId);
2592                 LOAD_CRIT_INDEX(AttributeRelidNumIndexId);
2593                 LOAD_CRIT_INDEX(IndexRelidIndexId);
2594                 LOAD_CRIT_INDEX(OpclassOidIndexId);
2595                 LOAD_CRIT_INDEX(AccessMethodStrategyIndexId);
2596                 LOAD_CRIT_INDEX(AccessMethodProcedureIndexId);
2597                 LOAD_CRIT_INDEX(OperatorOidIndexId);
2598                 LOAD_CRIT_INDEX(RewriteRelRulenameIndexId);
2599                 LOAD_CRIT_INDEX(TriggerRelidNameIndexId);
2600
2601 #define NUM_CRITICAL_INDEXES    9               /* fix if you change list above */
2602
2603                 criticalRelcachesBuilt = true;
2604         }
2605
2606         /*
2607          * Now, scan all the relcache entries and update anything that might be
2608          * wrong in the results from formrdesc or the relcache cache file. If we
2609          * faked up relcache entries using formrdesc, then read the real pg_class
2610          * rows and replace the fake entries with them. Also, if any of the
2611          * relcache entries have rules or triggers, load that info the hard way
2612          * since it isn't recorded in the cache file.
2613          */
2614         hash_seq_init(&status, RelationIdCache);
2615
2616         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2617         {
2618                 Relation        relation = idhentry->reldesc;
2619
2620                 /*
2621                  * If it's a faked-up entry, read the real pg_class tuple.
2622                  */
2623                 if (needNewCacheFile && relation->rd_isnailed)
2624                 {
2625                         HeapTuple       htup;
2626                         Form_pg_class relp;
2627
2628                         htup = SearchSysCache(RELOID,
2629                                                                 ObjectIdGetDatum(RelationGetRelid(relation)),
2630                                                                   0, 0, 0);
2631                         if (!HeapTupleIsValid(htup))
2632                                 elog(FATAL, "cache lookup failed for relation %u",
2633                                          RelationGetRelid(relation));
2634                         relp = (Form_pg_class) GETSTRUCT(htup);
2635
2636                         /*
2637                          * Copy tuple to relation->rd_rel. (See notes in
2638                          * AllocateRelationDesc())
2639                          */
2640                         Assert(relation->rd_rel != NULL);
2641                         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
2642
2643                         /* Update rd_options while we have the tuple */
2644                         if (relation->rd_options)
2645                                 pfree(relation->rd_options);
2646                         RelationParseRelOptions(relation, htup);
2647
2648                         /*
2649                          * Also update the derived fields in rd_att.
2650                          */
2651                         relation->rd_att->tdtypeid = relp->reltype;
2652                         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
2653                         relation->rd_att->tdhasoid = relp->relhasoids;
2654
2655                         ReleaseSysCache(htup);
2656                 }
2657
2658                 /*
2659                  * Fix data that isn't saved in relcache cache file.
2660                  */
2661                 if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
2662                         RelationBuildRuleLock(relation);
2663                 if (relation->rd_rel->relhastriggers && relation->trigdesc == NULL)
2664                         RelationBuildTriggers(relation);
2665         }
2666
2667         /*
2668          * Lastly, write out a new relcache cache file if one is needed.
2669          */
2670         if (needNewCacheFile)
2671         {
2672                 /*
2673                  * Force all the catcaches to finish initializing and thereby open the
2674                  * catalogs and indexes they use.  This will preload the relcache with
2675                  * entries for all the most important system catalogs and indexes, so
2676                  * that the init file will be most useful for future backends.
2677                  */
2678                 InitCatalogCachePhase2();
2679
2680                 /* now write the file */
2681                 write_relcache_init_file();
2682         }
2683 }
2684
2685 /*
2686  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
2687  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
2688  *
2689  * We need this kluge because we have to be able to access non-fixed-width
2690  * fields of pg_class and pg_index before we have the standard catalog caches
2691  * available.  We use predefined data that's set up in just the same way as
2692  * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
2693  * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
2694  * does it have a TupleConstr field.  But it's good enough for the purpose of
2695  * extracting fields.
2696  */
2697 static TupleDesc
2698 BuildHardcodedDescriptor(int natts, Form_pg_attribute attrs, bool hasoids)
2699 {
2700         TupleDesc       result;
2701         MemoryContext oldcxt;
2702         int                     i;
2703
2704         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2705
2706         result = CreateTemplateTupleDesc(natts, hasoids);
2707         result->tdtypeid = RECORDOID;           /* not right, but we don't care */
2708         result->tdtypmod = -1;
2709
2710         for (i = 0; i < natts; i++)
2711         {
2712                 memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_TUPLE_SIZE);
2713                 /* make sure attcacheoff is valid */
2714                 result->attrs[i]->attcacheoff = -1;
2715         }
2716
2717         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
2718         result->attrs[0]->attcacheoff = 0;
2719
2720         /* Note: we don't bother to set up a TupleConstr entry */
2721
2722         MemoryContextSwitchTo(oldcxt);
2723
2724         return result;
2725 }
2726
2727 static TupleDesc
2728 GetPgClassDescriptor(void)
2729 {
2730         static TupleDesc pgclassdesc = NULL;
2731
2732         /* Already done? */
2733         if (pgclassdesc == NULL)
2734                 pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
2735                                                                                            Desc_pg_class,
2736                                                                                            true);
2737
2738         return pgclassdesc;
2739 }
2740
2741 static TupleDesc
2742 GetPgIndexDescriptor(void)
2743 {
2744         static TupleDesc pgindexdesc = NULL;
2745
2746         /* Already done? */
2747         if (pgindexdesc == NULL)
2748                 pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
2749                                                                                            Desc_pg_index,
2750                                                                                            false);
2751
2752         return pgindexdesc;
2753 }
2754
2755 static void
2756 AttrDefaultFetch(Relation relation)
2757 {
2758         AttrDefault *attrdef = relation->rd_att->constr->defval;
2759         int                     ndef = relation->rd_att->constr->num_defval;
2760         Relation        adrel;
2761         SysScanDesc adscan;
2762         ScanKeyData skey;
2763         HeapTuple       htup;
2764         Datum           val;
2765         bool            isnull;
2766         int                     found;
2767         int                     i;
2768
2769         ScanKeyInit(&skey,
2770                                 Anum_pg_attrdef_adrelid,
2771                                 BTEqualStrategyNumber, F_OIDEQ,
2772                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2773
2774         adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
2775         adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
2776                                                                 SnapshotNow, 1, &skey);
2777         found = 0;
2778
2779         while (HeapTupleIsValid(htup = systable_getnext(adscan)))
2780         {
2781                 Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
2782
2783                 for (i = 0; i < ndef; i++)
2784                 {
2785                         if (adform->adnum != attrdef[i].adnum)
2786                                 continue;
2787                         if (attrdef[i].adbin != NULL)
2788                                 elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
2789                                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2790                                          RelationGetRelationName(relation));
2791                         else
2792                                 found++;
2793
2794                         val = fastgetattr(htup,
2795                                                           Anum_pg_attrdef_adbin,
2796                                                           adrel->rd_att, &isnull);
2797                         if (isnull)
2798                                 elog(WARNING, "null adbin for attr %s of rel %s",
2799                                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2800                                          RelationGetRelationName(relation));
2801                         else
2802                                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
2803                                                                                                         TextDatumGetCString(val));
2804                         break;
2805                 }
2806
2807                 if (i >= ndef)
2808                         elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
2809                                  adform->adnum, RelationGetRelationName(relation));
2810         }
2811
2812         systable_endscan(adscan);
2813         heap_close(adrel, AccessShareLock);
2814
2815         if (found != ndef)
2816                 elog(WARNING, "%d attrdef record(s) missing for rel %s",
2817                          ndef - found, RelationGetRelationName(relation));
2818 }
2819
2820 static void
2821 CheckConstraintFetch(Relation relation)
2822 {
2823         ConstrCheck *check = relation->rd_att->constr->check;
2824         int                     ncheck = relation->rd_att->constr->num_check;
2825         Relation        conrel;
2826         SysScanDesc conscan;
2827         ScanKeyData skey[1];
2828         HeapTuple       htup;
2829         Datum           val;
2830         bool            isnull;
2831         int                     found = 0;
2832
2833         ScanKeyInit(&skey[0],
2834                                 Anum_pg_constraint_conrelid,
2835                                 BTEqualStrategyNumber, F_OIDEQ,
2836                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2837
2838         conrel = heap_open(ConstraintRelationId, AccessShareLock);
2839         conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
2840                                                                  SnapshotNow, 1, skey);
2841
2842         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
2843         {
2844                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
2845
2846                 /* We want check constraints only */
2847                 if (conform->contype != CONSTRAINT_CHECK)
2848                         continue;
2849
2850                 if (found >= ncheck)
2851                         elog(ERROR, "unexpected constraint record found for rel %s",
2852                                  RelationGetRelationName(relation));
2853
2854                 check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
2855                                                                                                   NameStr(conform->conname));
2856
2857                 /* Grab and test conbin is actually set */
2858                 val = fastgetattr(htup,
2859                                                   Anum_pg_constraint_conbin,
2860                                                   conrel->rd_att, &isnull);
2861                 if (isnull)
2862                         elog(ERROR, "null conbin for rel %s",
2863                                  RelationGetRelationName(relation));
2864
2865                 check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
2866                                                                                                  TextDatumGetCString(val));
2867                 found++;
2868         }
2869
2870         systable_endscan(conscan);
2871         heap_close(conrel, AccessShareLock);
2872
2873         if (found != ncheck)
2874                 elog(ERROR, "%d constraint record(s) missing for rel %s",
2875                          ncheck - found, RelationGetRelationName(relation));
2876 }
2877
2878 /*
2879  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
2880  *
2881  * The index list is created only if someone requests it.  We scan pg_index
2882  * to find relevant indexes, and add the list to the relcache entry so that
2883  * we won't have to compute it again.  Note that shared cache inval of a
2884  * relcache entry will delete the old list and set rd_indexvalid to 0,
2885  * so that we must recompute the index list on next request.  This handles
2886  * creation or deletion of an index.
2887  *
2888  * The returned list is guaranteed to be sorted in order by OID.  This is
2889  * needed by the executor, since for index types that we obtain exclusive
2890  * locks on when updating the index, all backends must lock the indexes in
2891  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
2892  * consistent ordering would do, but ordering by OID is easy.
2893  *
2894  * Since shared cache inval causes the relcache's copy of the list to go away,
2895  * we return a copy of the list palloc'd in the caller's context.  The caller
2896  * may list_free() the returned list after scanning it. This is necessary
2897  * since the caller will typically be doing syscache lookups on the relevant
2898  * indexes, and syscache lookup could cause SI messages to be processed!
2899  *
2900  * We also update rd_oidindex, which this module treats as effectively part
2901  * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
2902  * it is the pg_class OID of a unique index on OID when the relation has one,
2903  * and InvalidOid if there is no such index.
2904  */
2905 List *
2906 RelationGetIndexList(Relation relation)
2907 {
2908         Relation        indrel;
2909         SysScanDesc indscan;
2910         ScanKeyData skey;
2911         HeapTuple       htup;
2912         List       *result;
2913         Oid                     oidIndex;
2914         MemoryContext oldcxt;
2915
2916         /* Quick exit if we already computed the list. */
2917         if (relation->rd_indexvalid != 0)
2918                 return list_copy(relation->rd_indexlist);
2919
2920         /*
2921          * We build the list we intend to return (in the caller's context) while
2922          * doing the scan.      After successfully completing the scan, we copy that
2923          * list into the relcache entry.  This avoids cache-context memory leakage
2924          * if we get some sort of error partway through.
2925          */
2926         result = NIL;
2927         oidIndex = InvalidOid;
2928
2929         /* Prepare to scan pg_index for entries having indrelid = this rel. */
2930         ScanKeyInit(&skey,
2931                                 Anum_pg_index_indrelid,
2932                                 BTEqualStrategyNumber, F_OIDEQ,
2933                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2934
2935         indrel = heap_open(IndexRelationId, AccessShareLock);
2936         indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
2937                                                                  SnapshotNow, 1, &skey);
2938
2939         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
2940         {
2941                 Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
2942
2943                 /* Add index's OID to result list in the proper order */
2944                 result = insert_ordered_oid(result, index->indexrelid);
2945
2946                 /* Check to see if it is a unique, non-partial btree index on OID */
2947                 if (index->indnatts == 1 &&
2948                         index->indisunique &&
2949                         index->indkey.values[0] == ObjectIdAttributeNumber &&
2950                         index->indclass.values[0] == OID_BTREE_OPS_OID &&
2951                         heap_attisnull(htup, Anum_pg_index_indpred))
2952                         oidIndex = index->indexrelid;
2953         }
2954
2955         systable_endscan(indscan);
2956         heap_close(indrel, AccessShareLock);
2957
2958         /* Now save a copy of the completed list in the relcache entry. */
2959         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2960         relation->rd_indexlist = list_copy(result);
2961         relation->rd_oidindex = oidIndex;
2962         relation->rd_indexvalid = 1;
2963         MemoryContextSwitchTo(oldcxt);
2964
2965         return result;
2966 }
2967
2968 /*
2969  * insert_ordered_oid
2970  *              Insert a new Oid into a sorted list of Oids, preserving ordering
2971  *
2972  * Building the ordered list this way is O(N^2), but with a pretty small
2973  * constant, so for the number of entries we expect it will probably be
2974  * faster than trying to apply qsort().  Most tables don't have very many
2975  * indexes...
2976  */
2977 static List *
2978 insert_ordered_oid(List *list, Oid datum)
2979 {
2980         ListCell   *prev;
2981
2982         /* Does the datum belong at the front? */
2983         if (list == NIL || datum < linitial_oid(list))
2984                 return lcons_oid(datum, list);
2985         /* No, so find the entry it belongs after */
2986         prev = list_head(list);
2987         for (;;)
2988         {
2989                 ListCell   *curr = lnext(prev);
2990
2991                 if (curr == NULL || datum < lfirst_oid(curr))
2992                         break;                          /* it belongs after 'prev', before 'curr' */
2993
2994                 prev = curr;
2995         }
2996         /* Insert datum into list after 'prev' */
2997         lappend_cell_oid(list, prev, datum);
2998         return list;
2999 }
3000
3001 /*
3002  * RelationSetIndexList -- externally force the index list contents
3003  *
3004  * This is used to temporarily override what we think the set of valid
3005  * indexes is (including the presence or absence of an OID index).
3006  * The forcing will be valid only until transaction commit or abort.
3007  *
3008  * This should only be applied to nailed relations, because in a non-nailed
3009  * relation the hacked index list could be lost at any time due to SI
3010  * messages.  In practice it is only used on pg_class (see REINDEX).
3011  *
3012  * It is up to the caller to make sure the given list is correctly ordered.
3013  *
3014  * We deliberately do not change rd_indexattr here: even when operating
3015  * with a temporary partial index list, HOT-update decisions must be made
3016  * correctly with respect to the full index set.  It is up to the caller
3017  * to ensure that a correct rd_indexattr set has been cached before first
3018  * calling RelationSetIndexList; else a subsequent inquiry might cause a
3019  * wrong rd_indexattr set to get computed and cached.
3020  */
3021 void
3022 RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
3023 {
3024         MemoryContext oldcxt;
3025
3026         Assert(relation->rd_isnailed);
3027         /* Copy the list into the cache context (could fail for lack of mem) */
3028         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3029         indexIds = list_copy(indexIds);
3030         MemoryContextSwitchTo(oldcxt);
3031         /* Okay to replace old list */
3032         list_free(relation->rd_indexlist);
3033         relation->rd_indexlist = indexIds;
3034         relation->rd_oidindex = oidIndex;
3035         relation->rd_indexvalid = 2;    /* mark list as forced */
3036         /* must flag that we have a forced index list */
3037         need_eoxact_work = true;
3038 }
3039
3040 /*
3041  * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
3042  *
3043  * Returns InvalidOid if there is no such index.
3044  */
3045 Oid
3046 RelationGetOidIndex(Relation relation)
3047 {
3048         List       *ilist;
3049
3050         /*
3051          * If relation doesn't have OIDs at all, caller is probably confused. (We
3052          * could just silently return InvalidOid, but it seems better to throw an
3053          * assertion.)
3054          */
3055         Assert(relation->rd_rel->relhasoids);
3056
3057         if (relation->rd_indexvalid == 0)
3058         {
3059                 /* RelationGetIndexList does the heavy lifting. */
3060                 ilist = RelationGetIndexList(relation);
3061                 list_free(ilist);
3062                 Assert(relation->rd_indexvalid != 0);
3063         }
3064
3065         return relation->rd_oidindex;
3066 }
3067
3068 /*
3069  * RelationGetIndexExpressions -- get the index expressions for an index
3070  *
3071  * We cache the result of transforming pg_index.indexprs into a node tree.
3072  * If the rel is not an index or has no expressional columns, we return NIL.
3073  * Otherwise, the returned tree is copied into the caller's memory context.
3074  * (We don't want to return a pointer to the relcache copy, since it could
3075  * disappear due to relcache invalidation.)
3076  */
3077 List *
3078 RelationGetIndexExpressions(Relation relation)
3079 {
3080         List       *result;
3081         Datum           exprsDatum;
3082         bool            isnull;
3083         char       *exprsString;
3084         MemoryContext oldcxt;
3085
3086         /* Quick exit if we already computed the result. */
3087         if (relation->rd_indexprs)
3088                 return (List *) copyObject(relation->rd_indexprs);
3089
3090         /* Quick exit if there is nothing to do. */
3091         if (relation->rd_indextuple == NULL ||
3092                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
3093                 return NIL;
3094
3095         /*
3096          * We build the tree we intend to return in the caller's context. After
3097          * successfully completing the work, we copy it into the relcache entry.
3098          * This avoids problems if we get some sort of error partway through.
3099          */
3100         exprsDatum = heap_getattr(relation->rd_indextuple,
3101                                                           Anum_pg_index_indexprs,
3102                                                           GetPgIndexDescriptor(),
3103                                                           &isnull);
3104         Assert(!isnull);
3105         exprsString = TextDatumGetCString(exprsDatum);
3106         result = (List *) stringToNode(exprsString);
3107         pfree(exprsString);
3108
3109         /*
3110          * Run the expressions through eval_const_expressions. This is not just an
3111          * optimization, but is necessary, because the planner will be comparing
3112          * them to similarly-processed qual clauses, and may fail to detect valid
3113          * matches without this.  We don't bother with canonicalize_qual, however.
3114          */
3115         result = (List *) eval_const_expressions(NULL, (Node *) result);
3116
3117         /*
3118          * Also mark any coercion format fields as "don't care", so that the
3119          * planner can match to both explicit and implicit coercions.
3120          */
3121         set_coercionform_dontcare((Node *) result);
3122
3123         /* May as well fix opfuncids too */
3124         fix_opfuncids((Node *) result);
3125
3126         /* Now save a copy of the completed tree in the relcache entry. */
3127         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3128         relation->rd_indexprs = (List *) copyObject(result);
3129         MemoryContextSwitchTo(oldcxt);
3130
3131         return result;
3132 }
3133
3134 /*
3135  * RelationGetIndexPredicate -- get the index predicate for an index
3136  *
3137  * We cache the result of transforming pg_index.indpred into an implicit-AND
3138  * node tree (suitable for ExecQual).
3139  * If the rel is not an index or has no predicate, we return NIL.
3140  * Otherwise, the returned tree is copied into the caller's memory context.
3141  * (We don't want to return a pointer to the relcache copy, since it could
3142  * disappear due to relcache invalidation.)
3143  */
3144 List *
3145 RelationGetIndexPredicate(Relation relation)
3146 {
3147         List       *result;
3148         Datum           predDatum;
3149         bool            isnull;
3150         char       *predString;
3151         MemoryContext oldcxt;
3152
3153         /* Quick exit if we already computed the result. */
3154         if (relation->rd_indpred)
3155                 return (List *) copyObject(relation->rd_indpred);
3156
3157         /* Quick exit if there is nothing to do. */
3158         if (relation->rd_indextuple == NULL ||
3159                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
3160                 return NIL;
3161
3162         /*
3163          * We build the tree we intend to return in the caller's context. After
3164          * successfully completing the work, we copy it into the relcache entry.
3165          * This avoids problems if we get some sort of error partway through.
3166          */
3167         predDatum = heap_getattr(relation->rd_indextuple,
3168                                                          Anum_pg_index_indpred,
3169                                                          GetPgIndexDescriptor(),
3170                                                          &isnull);
3171         Assert(!isnull);
3172         predString = TextDatumGetCString(predDatum);
3173         result = (List *) stringToNode(predString);
3174         pfree(predString);
3175
3176         /*
3177          * Run the expression through const-simplification and canonicalization.
3178          * This is not just an optimization, but is necessary, because the planner
3179          * will be comparing it to similarly-processed qual clauses, and may fail
3180          * to detect valid matches without this.  This must match the processing
3181          * done to qual clauses in preprocess_expression()!  (We can skip the
3182          * stuff involving subqueries, however, since we don't allow any in index
3183          * predicates.)
3184          */
3185         result = (List *) eval_const_expressions(NULL, (Node *) result);
3186
3187         result = (List *) canonicalize_qual((Expr *) result);
3188
3189         /*
3190          * Also mark any coercion format fields as "don't care", so that the
3191          * planner can match to both explicit and implicit coercions.
3192          */
3193         set_coercionform_dontcare((Node *) result);
3194
3195         /* Also convert to implicit-AND format */
3196         result = make_ands_implicit((Expr *) result);
3197
3198         /* May as well fix opfuncids too */
3199         fix_opfuncids((Node *) result);
3200
3201         /* Now save a copy of the completed tree in the relcache entry. */
3202         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3203         relation->rd_indpred = (List *) copyObject(result);
3204         MemoryContextSwitchTo(oldcxt);
3205
3206         return result;
3207 }
3208
3209 /*
3210  * RelationGetIndexAttrBitmap -- get a bitmap of index attribute numbers
3211  *
3212  * The result has a bit set for each attribute used anywhere in the index
3213  * definitions of all the indexes on this relation.  (This includes not only
3214  * simple index keys, but attributes used in expressions and partial-index
3215  * predicates.)
3216  *
3217  * Attribute numbers are offset by FirstLowInvalidHeapAttributeNumber so that
3218  * we can include system attributes (e.g., OID) in the bitmap representation.
3219  *
3220  * The returned result is palloc'd in the caller's memory context and should
3221  * be bms_free'd when not needed anymore.
3222  */
3223 Bitmapset *
3224 RelationGetIndexAttrBitmap(Relation relation)
3225 {
3226         Bitmapset  *indexattrs;
3227         List       *indexoidlist;
3228         ListCell   *l;
3229         MemoryContext oldcxt;
3230
3231         /* Quick exit if we already computed the result. */
3232         if (relation->rd_indexattr != NULL)
3233                 return bms_copy(relation->rd_indexattr);
3234
3235         /* Fast path if definitely no indexes */
3236         if (!RelationGetForm(relation)->relhasindex)
3237                 return NULL;
3238
3239         /*
3240          * Get cached list of index OIDs
3241          */
3242         indexoidlist = RelationGetIndexList(relation);
3243
3244         /* Fall out if no indexes (but relhasindex was set) */
3245         if (indexoidlist == NIL)
3246                 return NULL;
3247
3248         /*
3249          * For each index, add referenced attributes to indexattrs.
3250          */
3251         indexattrs = NULL;
3252         foreach(l, indexoidlist)
3253         {
3254                 Oid                     indexOid = lfirst_oid(l);
3255                 Relation        indexDesc;
3256                 IndexInfo  *indexInfo;
3257                 int                     i;
3258
3259                 indexDesc = index_open(indexOid, AccessShareLock);
3260
3261                 /* Extract index key information from the index's pg_index row */
3262                 indexInfo = BuildIndexInfo(indexDesc);
3263
3264                 /* Collect simple attribute references */
3265                 for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++)
3266                 {
3267                         int                     attrnum = indexInfo->ii_KeyAttrNumbers[i];
3268
3269                         if (attrnum != 0)
3270                                 indexattrs = bms_add_member(indexattrs,
3271                                                            attrnum - FirstLowInvalidHeapAttributeNumber);
3272                 }
3273
3274                 /* Collect all attributes used in expressions, too */
3275                 pull_varattnos((Node *) indexInfo->ii_Expressions, &indexattrs);
3276
3277                 /* Collect all attributes in the index predicate, too */
3278                 pull_varattnos((Node *) indexInfo->ii_Predicate, &indexattrs);
3279
3280                 index_close(indexDesc, AccessShareLock);
3281         }
3282
3283         list_free(indexoidlist);
3284
3285         /* Now save a copy of the bitmap in the relcache entry. */
3286         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3287         relation->rd_indexattr = bms_copy(indexattrs);
3288         MemoryContextSwitchTo(oldcxt);
3289
3290         /* We return our original working copy for caller to play with */
3291         return indexattrs;
3292 }
3293
3294
3295 /*
3296  *      load_relcache_init_file, write_relcache_init_file
3297  *
3298  *              In late 1992, we started regularly having databases with more than
3299  *              a thousand classes in them.  With this number of classes, it became
3300  *              critical to do indexed lookups on the system catalogs.
3301  *
3302  *              Bootstrapping these lookups is very hard.  We want to be able to
3303  *              use an index on pg_attribute, for example, but in order to do so,
3304  *              we must have read pg_attribute for the attributes in the index,
3305  *              which implies that we need to use the index.
3306  *
3307  *              In order to get around the problem, we do the following:
3308  *
3309  *                 +  When the database system is initialized (at initdb time), we
3310  *                        don't use indexes.  We do sequential scans.
3311  *
3312  *                 +  When the backend is started up in normal mode, we load an image
3313  *                        of the appropriate relation descriptors, in internal format,
3314  *                        from an initialization file in the data/base/... directory.
3315  *
3316  *                 +  If the initialization file isn't there, then we create the
3317  *                        relation descriptors using sequential scans and write 'em to
3318  *                        the initialization file for use by subsequent backends.
3319  *
3320  *              We could dispense with the initialization file and just build the
3321  *              critical reldescs the hard way on every backend startup, but that
3322  *              slows down backend startup noticeably.
3323  *
3324  *              We can in fact go further, and save more relcache entries than
3325  *              just the ones that are absolutely critical; this allows us to speed
3326  *              up backend startup by not having to build such entries the hard way.
3327  *              Presently, all the catalog and index entries that are referred to
3328  *              by catcaches are stored in the initialization file.
3329  *
3330  *              The same mechanism that detects when catcache and relcache entries
3331  *              need to be invalidated (due to catalog updates) also arranges to
3332  *              unlink the initialization file when its contents may be out of date.
3333  *              The file will then be rebuilt during the next backend startup.
3334  */
3335
3336 /*
3337  * load_relcache_init_file -- attempt to load cache from the init file
3338  *
3339  * If successful, return TRUE and set criticalRelcachesBuilt to true.
3340  * If not successful, return FALSE.
3341  *
3342  * NOTE: we assume we are already switched into CacheMemoryContext.
3343  */
3344 static bool
3345 load_relcache_init_file(void)
3346 {
3347         FILE       *fp;
3348         char            initfilename[MAXPGPATH];
3349         Relation   *rels;
3350         int                     relno,
3351                                 num_rels,
3352                                 max_rels,
3353                                 nailed_rels,
3354                                 nailed_indexes,
3355                                 magic;
3356         int                     i;
3357
3358         snprintf(initfilename, sizeof(initfilename), "%s/%s",
3359                          DatabasePath, RELCACHE_INIT_FILENAME);
3360
3361         fp = AllocateFile(initfilename, PG_BINARY_R);
3362         if (fp == NULL)
3363                 return false;
3364
3365         /*
3366          * Read the index relcache entries from the file.  Note we will not enter
3367          * any of them into the cache if the read fails partway through; this
3368          * helps to guard against broken init files.
3369          */
3370         max_rels = 100;
3371         rels = (Relation *) palloc(max_rels * sizeof(Relation));
3372         num_rels = 0;
3373         nailed_rels = nailed_indexes = 0;
3374         initFileRelationIds = NIL;
3375
3376         /* check for correct magic number (compatible version) */
3377         if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
3378                 goto read_failed;
3379         if (magic != RELCACHE_INIT_FILEMAGIC)
3380                 goto read_failed;
3381
3382         for (relno = 0;; relno++)
3383         {
3384                 Size            len;
3385                 size_t          nread;
3386                 Relation        rel;
3387                 Form_pg_class relform;
3388                 bool            has_not_null;
3389
3390                 /* first read the relation descriptor length */
3391                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3392                 {
3393                         if (nread == 0)
3394                                 break;                  /* end of file */
3395                         goto read_failed;
3396                 }
3397
3398                 /* safety check for incompatible relcache layout */
3399                 if (len != sizeof(RelationData))
3400                         goto read_failed;
3401
3402                 /* allocate another relcache header */
3403                 if (num_rels >= max_rels)
3404                 {
3405                         max_rels *= 2;
3406                         rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
3407                 }
3408
3409                 rel = rels[num_rels++] = (Relation) palloc(len);
3410
3411                 /* then, read the Relation structure */
3412                 if ((nread = fread(rel, 1, len, fp)) != len)
3413                         goto read_failed;
3414
3415                 /* next read the relation tuple form */
3416                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3417                         goto read_failed;
3418
3419                 relform = (Form_pg_class) palloc(len);
3420                 if ((nread = fread(relform, 1, len, fp)) != len)
3421                         goto read_failed;
3422
3423                 rel->rd_rel = relform;
3424
3425                 /* initialize attribute tuple forms */
3426                 rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
3427                                                                                           relform->relhasoids);
3428                 rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
3429
3430                 rel->rd_att->tdtypeid = relform->reltype;
3431                 rel->rd_att->tdtypmod = -1;             /* unnecessary, but... */
3432
3433                 /* next read all the attribute tuple form data entries */
3434                 has_not_null = false;
3435                 for (i = 0; i < relform->relnatts; i++)
3436                 {
3437                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3438                                 goto read_failed;
3439                         if (len != ATTRIBUTE_TUPLE_SIZE)
3440                                 goto read_failed;
3441                         if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
3442                                 goto read_failed;
3443
3444                         has_not_null |= rel->rd_att->attrs[i]->attnotnull;
3445                 }
3446
3447                 /* next read the access method specific field */
3448                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3449                         goto read_failed;
3450                 if (len > 0)
3451                 {
3452                         rel->rd_options = palloc(len);
3453                         if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
3454                                 goto read_failed;
3455                         if (len != VARSIZE(rel->rd_options))
3456                                 goto read_failed;               /* sanity check */
3457                 }
3458                 else
3459                 {
3460                         rel->rd_options = NULL;
3461                 }
3462
3463                 /* mark not-null status */
3464                 if (has_not_null)
3465                 {
3466                         TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
3467
3468                         constr->has_not_null = true;
3469                         rel->rd_att->constr = constr;
3470                 }
3471
3472                 /* If it's an index, there's more to do */
3473                 if (rel->rd_rel->relkind == RELKIND_INDEX)
3474                 {
3475                         Form_pg_am      am;
3476                         MemoryContext indexcxt;
3477                         Oid                *opfamily;
3478                         Oid                *opcintype;
3479                         Oid                *operator;
3480                         RegProcedure *support;
3481                         int                     nsupport;
3482                         int16      *indoption;
3483
3484                         /* Count nailed indexes to ensure we have 'em all */
3485                         if (rel->rd_isnailed)
3486                                 nailed_indexes++;
3487
3488                         /* next, read the pg_index tuple */
3489                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3490                                 goto read_failed;
3491
3492                         rel->rd_indextuple = (HeapTuple) palloc(len);
3493                         if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
3494                                 goto read_failed;
3495
3496                         /* Fix up internal pointers in the tuple -- see heap_copytuple */
3497                         rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
3498                         rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
3499
3500                         /* next, read the access method tuple form */
3501                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3502                                 goto read_failed;
3503
3504                         am = (Form_pg_am) palloc(len);
3505                         if ((nread = fread(am, 1, len, fp)) != len)
3506                                 goto read_failed;
3507                         rel->rd_am = am;
3508
3509                         /*
3510                          * prepare index info context --- parameters should match
3511                          * RelationInitIndexAccessInfo
3512                          */
3513                         indexcxt = AllocSetContextCreate(CacheMemoryContext,
3514                                                                                          RelationGetRelationName(rel),
3515                                                                                          ALLOCSET_SMALL_MINSIZE,
3516                                                                                          ALLOCSET_SMALL_INITSIZE,
3517                                                                                          ALLOCSET_SMALL_MAXSIZE);
3518                         rel->rd_indexcxt = indexcxt;
3519
3520                         /* next, read the vector of opfamily OIDs */
3521                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3522                                 goto read_failed;
3523
3524                         opfamily = (Oid *) MemoryContextAlloc(indexcxt, len);
3525                         if ((nread = fread(opfamily, 1, len, fp)) != len)
3526                                 goto read_failed;
3527
3528                         rel->rd_opfamily = opfamily;
3529
3530                         /* next, read the vector of opcintype OIDs */
3531                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3532                                 goto read_failed;
3533
3534                         opcintype = (Oid *) MemoryContextAlloc(indexcxt, len);
3535                         if ((nread = fread(opcintype, 1, len, fp)) != len)
3536                                 goto read_failed;
3537
3538                         rel->rd_opcintype = opcintype;
3539
3540                         /* next, read the vector of operator OIDs */
3541                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3542                                 goto read_failed;
3543
3544                         operator = (Oid *) MemoryContextAlloc(indexcxt, len);
3545                         if ((nread = fread(operator, 1, len, fp)) != len)
3546                                 goto read_failed;
3547
3548                         rel->rd_operator = operator;
3549
3550                         /* next, read the vector of support procedures */
3551                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3552                                 goto read_failed;
3553                         support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
3554                         if ((nread = fread(support, 1, len, fp)) != len)
3555                                 goto read_failed;
3556
3557                         rel->rd_support = support;
3558
3559                         /* finally, read the vector of indoption values */
3560                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3561                                 goto read_failed;
3562
3563                         indoption = (int16 *) MemoryContextAlloc(indexcxt, len);
3564                         if ((nread = fread(indoption, 1, len, fp)) != len)
3565                                 goto read_failed;
3566
3567                         rel->rd_indoption = indoption;
3568
3569                         /* set up zeroed fmgr-info vectors */
3570                         rel->rd_aminfo = (RelationAmInfo *)
3571                                 MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
3572                         nsupport = relform->relnatts * am->amsupport;
3573                         rel->rd_supportinfo = (FmgrInfo *)
3574                                 MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
3575                 }
3576                 else
3577                 {
3578                         /* Count nailed rels to ensure we have 'em all */
3579                         if (rel->rd_isnailed)
3580                                 nailed_rels++;
3581
3582                         Assert(rel->rd_index == NULL);
3583                         Assert(rel->rd_indextuple == NULL);
3584                         Assert(rel->rd_am == NULL);
3585                         Assert(rel->rd_indexcxt == NULL);
3586                         Assert(rel->rd_aminfo == NULL);
3587                         Assert(rel->rd_opfamily == NULL);
3588                         Assert(rel->rd_opcintype == NULL);
3589                         Assert(rel->rd_operator == NULL);
3590                         Assert(rel->rd_support == NULL);
3591                         Assert(rel->rd_supportinfo == NULL);
3592                         Assert(rel->rd_indoption == NULL);
3593                 }
3594
3595                 /*
3596                  * Rules and triggers are not saved (mainly because the internal
3597                  * format is complex and subject to change).  They must be rebuilt if
3598                  * needed by RelationCacheInitializePhase2.  This is not expected to
3599                  * be a big performance hit since few system catalogs have such. Ditto
3600                  * for index expressions and predicates.
3601                  */
3602                 rel->rd_rules = NULL;
3603                 rel->rd_rulescxt = NULL;
3604                 rel->trigdesc = NULL;
3605                 rel->rd_indexprs = NIL;
3606                 rel->rd_indpred = NIL;
3607
3608                 /*
3609                  * Reset transient-state fields in the relcache entry
3610                  */
3611                 rel->rd_smgr = NULL;
3612                 rel->rd_targblock = InvalidBlockNumber;
3613                 rel->rd_fsm_nblocks = InvalidBlockNumber;
3614                 rel->rd_vm_nblocks = InvalidBlockNumber;
3615                 if (rel->rd_isnailed)
3616                         rel->rd_refcnt = 1;
3617                 else
3618                         rel->rd_refcnt = 0;
3619                 rel->rd_indexvalid = 0;
3620                 rel->rd_indexlist = NIL;
3621                 rel->rd_indexattr = NULL;
3622                 rel->rd_oidindex = InvalidOid;
3623                 rel->rd_createSubid = InvalidSubTransactionId;
3624                 rel->rd_newRelfilenodeSubid = InvalidSubTransactionId;
3625                 rel->rd_amcache = NULL;
3626                 MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
3627
3628                 /*
3629                  * Recompute lock and physical addressing info.  This is needed in
3630                  * case the pg_internal.init file was copied from some other database
3631                  * by CREATE DATABASE.
3632                  */
3633                 RelationInitLockInfo(rel);
3634                 RelationInitPhysicalAddr(rel);
3635         }
3636
3637         /*
3638          * We reached the end of the init file without apparent problem. Did we
3639          * get the right number of nailed items?  (This is a useful crosscheck in
3640          * case the set of critical rels or indexes changes.)
3641          */
3642         if (nailed_rels != NUM_CRITICAL_RELS ||
3643                 nailed_indexes != NUM_CRITICAL_INDEXES)
3644                 goto read_failed;
3645
3646         /*
3647          * OK, all appears well.
3648          *
3649          * Now insert all the new relcache entries into the cache.
3650          */
3651         for (relno = 0; relno < num_rels; relno++)
3652         {
3653                 RelationCacheInsert(rels[relno]);
3654                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3655                 initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
3656                                                                                 initFileRelationIds);
3657         }
3658
3659         pfree(rels);
3660         FreeFile(fp);
3661
3662         criticalRelcachesBuilt = true;
3663         return true;
3664
3665         /*
3666          * init file is broken, so do it the hard way.  We don't bother trying to
3667          * free the clutter we just allocated; it's not in the relcache so it
3668          * won't hurt.
3669          */
3670 read_failed:
3671         pfree(rels);
3672         FreeFile(fp);
3673
3674         return false;
3675 }
3676
3677 /*
3678  * Write out a new initialization file with the current contents
3679  * of the relcache.
3680  */
3681 static void
3682 write_relcache_init_file(void)
3683 {
3684         FILE       *fp;
3685         char            tempfilename[MAXPGPATH];
3686         char            finalfilename[MAXPGPATH];
3687         int                     magic;
3688         HASH_SEQ_STATUS status;
3689         RelIdCacheEnt *idhentry;
3690         MemoryContext oldcxt;
3691         int                     i;
3692
3693         /*
3694          * We must write a temporary file and rename it into place. Otherwise,
3695          * another backend starting at about the same time might crash trying to
3696          * read the partially-complete file.
3697          */
3698         snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
3699                          DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
3700         snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
3701                          DatabasePath, RELCACHE_INIT_FILENAME);
3702
3703         unlink(tempfilename);           /* in case it exists w/wrong permissions */
3704
3705         fp = AllocateFile(tempfilename, PG_BINARY_W);
3706         if (fp == NULL)
3707         {
3708                 /*
3709                  * We used to consider this a fatal error, but we might as well
3710                  * continue with backend startup ...
3711                  */
3712                 ereport(WARNING,
3713                                 (errcode_for_file_access(),
3714                                  errmsg("could not create relation-cache initialization file \"%s\": %m",
3715                                                 tempfilename),
3716                           errdetail("Continuing anyway, but there's something wrong.")));
3717                 return;
3718         }
3719
3720         /*
3721          * Write a magic number to serve as a file version identifier.  We can
3722          * change the magic number whenever the relcache layout changes.
3723          */
3724         magic = RELCACHE_INIT_FILEMAGIC;
3725         if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
3726                 elog(FATAL, "could not write init file");
3727
3728         /*
3729          * Write all the reldescs (in no particular order).
3730          */
3731         hash_seq_init(&status, RelationIdCache);
3732
3733         initFileRelationIds = NIL;
3734
3735         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3736         {
3737                 Relation        rel = idhentry->reldesc;
3738                 Form_pg_class relform = rel->rd_rel;
3739
3740                 /* first write the relcache entry proper */
3741                 write_item(rel, sizeof(RelationData), fp);
3742
3743                 /* next write the relation tuple form */
3744                 write_item(relform, CLASS_TUPLE_SIZE, fp);
3745
3746                 /* next, do all the attribute tuple form data entries */
3747                 for (i = 0; i < relform->relnatts; i++)
3748                 {
3749                         write_item(rel->rd_att->attrs[i], ATTRIBUTE_TUPLE_SIZE, fp);
3750                 }
3751
3752                 /* next, do the access method specific field */
3753                 write_item(rel->rd_options,
3754                                    (rel->rd_options ? VARSIZE(rel->rd_options) : 0),
3755                                    fp);
3756
3757                 /* If it's an index, there's more to do */
3758                 if (rel->rd_rel->relkind == RELKIND_INDEX)
3759                 {
3760                         Form_pg_am      am = rel->rd_am;
3761
3762                         /* write the pg_index tuple */
3763                         /* we assume this was created by heap_copytuple! */
3764                         write_item(rel->rd_indextuple,
3765                                            HEAPTUPLESIZE + rel->rd_indextuple->t_len,
3766                                            fp);
3767
3768                         /* next, write the access method tuple form */
3769                         write_item(am, sizeof(FormData_pg_am), fp);
3770
3771                         /* next, write the vector of opfamily OIDs */
3772                         write_item(rel->rd_opfamily,
3773                                            relform->relnatts * sizeof(Oid),
3774                                            fp);
3775
3776                         /* next, write the vector of opcintype OIDs */
3777                         write_item(rel->rd_opcintype,
3778                                            relform->relnatts * sizeof(Oid),
3779                                            fp);
3780
3781                         /* next, write the vector of operator OIDs */
3782                         write_item(rel->rd_operator,
3783                                            relform->relnatts * (am->amstrategies * sizeof(Oid)),
3784                                            fp);
3785
3786                         /* next, write the vector of support procedures */
3787                         write_item(rel->rd_support,
3788                                   relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
3789                                            fp);
3790
3791                         /* finally, write the vector of indoption values */
3792                         write_item(rel->rd_indoption,
3793                                            relform->relnatts * sizeof(int16),
3794                                            fp);
3795                 }
3796
3797                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3798                 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3799                 initFileRelationIds = lcons_oid(RelationGetRelid(rel),
3800                                                                                 initFileRelationIds);
3801                 MemoryContextSwitchTo(oldcxt);
3802         }
3803
3804         if (FreeFile(fp))
3805                 elog(FATAL, "could not write init file");
3806
3807         /*
3808          * Now we have to check whether the data we've so painstakingly
3809          * accumulated is already obsolete due to someone else's just-committed
3810          * catalog changes.  If so, we just delete the temp file and leave it to
3811          * the next backend to try again.  (Our own relcache entries will be
3812          * updated by SI message processing, but we can't be sure whether what we
3813          * wrote out was up-to-date.)
3814          *
3815          * This mustn't run concurrently with RelationCacheInitFileInvalidate, so
3816          * grab a serialization lock for the duration.
3817          */
3818         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3819
3820         /* Make sure we have seen all incoming SI messages */
3821         AcceptInvalidationMessages();
3822
3823         /*
3824          * If we have received any SI relcache invals since backend start, assume
3825          * we may have written out-of-date data.
3826          */
3827         if (relcacheInvalsReceived == 0L)
3828         {
3829                 /*
3830                  * OK, rename the temp file to its final name, deleting any
3831                  * previously-existing init file.
3832                  *
3833                  * Note: a failure here is possible under Cygwin, if some other
3834                  * backend is holding open an unlinked-but-not-yet-gone init file. So
3835                  * treat this as a noncritical failure; just remove the useless temp
3836                  * file on failure.
3837                  */
3838                 if (rename(tempfilename, finalfilename) < 0)
3839                         unlink(tempfilename);
3840         }
3841         else
3842         {
3843                 /* Delete the already-obsolete temp file */
3844                 unlink(tempfilename);
3845         }
3846
3847         LWLockRelease(RelCacheInitLock);
3848 }
3849
3850 /* write a chunk of data preceded by its length */
3851 static void
3852 write_item(const void *data, Size len, FILE *fp)
3853 {
3854         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3855                 elog(FATAL, "could not write init file");
3856         if (fwrite(data, 1, len, fp) != len)
3857                 elog(FATAL, "could not write init file");
3858 }
3859
3860 /*
3861  * Detect whether a given relation (identified by OID) is one of the ones
3862  * we store in the init file.
3863  *
3864  * Note that we effectively assume that all backends running in a database
3865  * would choose to store the same set of relations in the init file;
3866  * otherwise there are cases where we'd fail to detect the need for an init
3867  * file invalidation.  This does not seem likely to be a problem in practice.
3868  */
3869 bool
3870 RelationIdIsInInitFile(Oid relationId)
3871 {
3872         return list_member_oid(initFileRelationIds, relationId);
3873 }
3874
3875 /*
3876  * Invalidate (remove) the init file during commit of a transaction that
3877  * changed one or more of the relation cache entries that are kept in the
3878  * init file.
3879  *
3880  * We actually need to remove the init file twice: once just before sending
3881  * the SI messages that include relcache inval for such relations, and once
3882  * just after sending them.  The unlink before ensures that a backend that's
3883  * currently starting cannot read the now-obsolete init file and then miss
3884  * the SI messages that will force it to update its relcache entries.  (This
3885  * works because the backend startup sequence gets into the PGPROC array before
3886  * trying to load the init file.)  The unlink after is to synchronize with a
3887  * backend that may currently be trying to write an init file based on data
3888  * that we've just rendered invalid.  Such a backend will see the SI messages,
3889  * but we can't leave the init file sitting around to fool later backends.
3890  *
3891  * Ignore any failure to unlink the file, since it might not be there if
3892  * no backend has been started since the last removal.
3893  */
3894 void
3895 RelationCacheInitFileInvalidate(bool beforeSend)
3896 {
3897         char            initfilename[MAXPGPATH];
3898
3899         snprintf(initfilename, sizeof(initfilename), "%s/%s",
3900                          DatabasePath, RELCACHE_INIT_FILENAME);
3901
3902         if (beforeSend)
3903         {
3904                 /* no interlock needed here */
3905                 unlink(initfilename);
3906         }
3907         else
3908         {
3909                 /*
3910                  * We need to interlock this against write_relcache_init_file, to
3911                  * guard against possibility that someone renames a new-but-
3912                  * already-obsolete init file into place just after we unlink. With
3913                  * the interlock, it's certain that write_relcache_init_file will
3914                  * notice our SI inval message before renaming into place, or else
3915                  * that we will execute second and successfully unlink the file.
3916                  */
3917                 LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3918                 unlink(initfilename);
3919                 LWLockRelease(RelCacheInitLock);
3920         }
3921 }
3922
3923 /*
3924  * Remove the init file for a given database during postmaster startup.
3925  *
3926  * We used to keep the init file across restarts, but that is unsafe in PITR
3927  * scenarios, and even in simple crash-recovery cases there are windows for
3928  * the init file to become out-of-sync with the database.  So now we just
3929  * remove it during startup and expect the first backend launch to rebuild it.
3930  * Of course, this has to happen in each database of the cluster.  For
3931  * simplicity this is driven by flatfiles.c, which has to scan pg_database
3932  * anyway.
3933  */
3934 void
3935 RelationCacheInitFileRemove(const char *dbPath)
3936 {
3937         char            initfilename[MAXPGPATH];
3938
3939         snprintf(initfilename, sizeof(initfilename), "%s/%s",
3940                          dbPath, RELCACHE_INIT_FILENAME);
3941         unlink(initfilename);
3942         /* ignore any error, since it might not be there at all */
3943 }