OSDN Git Service

Modify hash_search() API to prevent future occurrences of the error
[pg-rex/syncrep.git] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *        POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.225 2005/05/29 04:23:05 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              RelationCacheInitialize                 - initialize relcache
18  *              RelationCacheInitializePhase2   - finish initializing relcache
19  *              RelationIdGetRelation                   - get a reldesc by relation id
20  *              RelationIdCacheGetRelation              - get a cached reldesc by relid
21  *              RelationClose                                   - close an open relation
22  *
23  * NOTES
24  *              The following code contains many undocumented hacks.  Please be
25  *              careful....
26  */
27 #include "postgres.h"
28
29 #include <sys/file.h>
30 #include <fcntl.h>
31 #include <unistd.h>
32
33 #include "access/genam.h"
34 #include "access/heapam.h"
35 #include "catalog/catalog.h"
36 #include "catalog/indexing.h"
37 #include "catalog/namespace.h"
38 #include "catalog/pg_amop.h"
39 #include "catalog/pg_amproc.h"
40 #include "catalog/pg_attrdef.h"
41 #include "catalog/pg_attribute.h"
42 #include "catalog/pg_constraint.h"
43 #include "catalog/pg_index.h"
44 #include "catalog/pg_namespace.h"
45 #include "catalog/pg_opclass.h"
46 #include "catalog/pg_proc.h"
47 #include "catalog/pg_rewrite.h"
48 #include "catalog/pg_type.h"
49 #include "commands/trigger.h"
50 #include "miscadmin.h"
51 #include "optimizer/clauses.h"
52 #include "optimizer/planmain.h"
53 #include "optimizer/prep.h"
54 #include "storage/fd.h"
55 #include "storage/smgr.h"
56 #include "utils/builtins.h"
57 #include "utils/catcache.h"
58 #include "utils/fmgroids.h"
59 #include "utils/inval.h"
60 #include "utils/lsyscache.h"
61 #include "utils/memutils.h"
62 #include "utils/relcache.h"
63 #include "utils/resowner.h"
64 #include "utils/syscache.h"
65 #include "utils/typcache.h"
66
67
68 /*
69  * name of relcache init file, used to speed up backend startup
70  */
71 #define RELCACHE_INIT_FILENAME  "pg_internal.init"
72
73 #define RELCACHE_INIT_FILEMAGIC         0x573262        /* version ID value */
74
75 /*
76  *              hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
77  */
78 static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
79 static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
80 static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
81 static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
82 static FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
83
84 /*
85  *              Hash tables that index the relation cache
86  *
87  *              We used to index the cache by both name and OID, but now there
88  *              is only an index by OID.
89  */
90 typedef struct relidcacheent
91 {
92         Oid                     reloid;
93         Relation        reldesc;
94 } RelIdCacheEnt;
95
96 static HTAB *RelationIdCache;
97
98 /*
99  * This flag is false until we have prepared the critical relcache entries
100  * that are needed to do indexscans on the tables read by relcache building.
101  */
102 bool            criticalRelcachesBuilt = false;
103
104 /*
105  * This flag is set if we discover that we need to write a new relcache
106  * cache file at the end of startup.
107  */
108 static bool needNewCacheFile = false;
109
110 /*
111  * This counter counts relcache inval events received since backend startup
112  * (but only for rels that are actually in cache).      Presently, we use it only
113  * to detect whether data about to be written by write_relcache_init_file()
114  * might already be obsolete.
115  */
116 static long relcacheInvalsReceived = 0L;
117
118 /*
119  * This list remembers the OIDs of the relations cached in the relcache
120  * init file.
121  */
122 static List *initFileRelationIds = NIL;
123
124 /*
125  * This flag lets us optimize away work in AtEOSubXact_RelationCache().
126  */
127 static bool need_eosubxact_work = false;
128
129
130 /*
131  *              macros to manipulate the lookup hashtables
132  */
133 #define RelationCacheInsert(RELATION)   \
134 do { \
135         RelIdCacheEnt *idhentry; bool found; \
136         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
137                                                                                    (void *) &(RELATION->rd_id), \
138                                                                                    HASH_ENTER, \
139                                                                                    &found); \
140         /* used to give notice if found -- now just keep quiet */ \
141         idhentry->reldesc = RELATION; \
142 } while(0)
143
144 #define RelationIdCacheLookup(ID, RELATION) \
145 do { \
146         RelIdCacheEnt *hentry; \
147         hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
148                                                                                  (void *) &(ID), HASH_FIND,NULL); \
149         if (hentry) \
150                 RELATION = hentry->reldesc; \
151         else \
152                 RELATION = NULL; \
153 } while(0)
154
155 #define RelationCacheDelete(RELATION) \
156 do { \
157         RelIdCacheEnt *idhentry; \
158         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
159                                                                                    (void *) &(RELATION->rd_id), \
160                                                                                    HASH_REMOVE, NULL); \
161         if (idhentry == NULL) \
162                 elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
163 } while(0)
164
165
166 /*
167  * Special cache for opclass-related information
168  *
169  * Note: only default-subtype operators and support procs get cached
170  */
171 typedef struct opclasscacheent
172 {
173         Oid                     opclassoid;             /* lookup key: OID of opclass */
174         bool            valid;                  /* set TRUE after successful fill-in */
175         StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
176         StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
177         Oid                *operatorOids;       /* strategy operators' OIDs */
178         RegProcedure *supportProcs; /* support procs */
179 } OpClassCacheEnt;
180
181 static HTAB *OpClassCache = NULL;
182
183
184 /* non-export function prototypes */
185
186 static void RelationClearRelation(Relation relation, bool rebuild);
187
188 static void RelationReloadClassinfo(Relation relation);
189 static void RelationFlushRelation(Relation relation);
190 static bool load_relcache_init_file(void);
191 static void write_relcache_init_file(void);
192
193 static void formrdesc(const char *relationName, Oid relationReltype,
194                                           bool hasoids, int natts, FormData_pg_attribute *att);
195
196 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
197 static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
198 static void RelationBuildTupleDesc(Relation relation);
199 static Relation RelationBuildDesc(Oid targetRelId, Relation oldrelation);
200 static void RelationInitPhysicalAddr(Relation relation);
201 static TupleDesc GetPgIndexDescriptor(void);
202 static void AttrDefaultFetch(Relation relation);
203 static void CheckConstraintFetch(Relation relation);
204 static List *insert_ordered_oid(List *list, Oid datum);
205 static void IndexSupportInitialize(oidvector *indclass,
206                                            Oid *indexOperator,
207                                            RegProcedure *indexSupport,
208                                            StrategyNumber maxStrategyNumber,
209                                            StrategyNumber maxSupportNumber,
210                                            AttrNumber maxAttributeNumber);
211 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
212                                   StrategyNumber numStrats,
213                                   StrategyNumber numSupport);
214
215
216 /*
217  *              ScanPgRelation
218  *
219  *              this is used by RelationBuildDesc to find a pg_class
220  *              tuple matching targetRelId.
221  *
222  *              NB: the returned tuple has been copied into palloc'd storage
223  *              and must eventually be freed with heap_freetuple.
224  */
225 static HeapTuple
226 ScanPgRelation(Oid targetRelId, bool indexOK)
227 {
228         HeapTuple       pg_class_tuple;
229         Relation        pg_class_desc;
230         SysScanDesc pg_class_scan;
231         ScanKeyData key[1];
232
233         /*
234          * form a scan key
235          */
236         ScanKeyInit(&key[0],
237                                 ObjectIdAttributeNumber,
238                                 BTEqualStrategyNumber, F_OIDEQ,
239                                 ObjectIdGetDatum(targetRelId));
240
241         /*
242          * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
243          * built the critical relcache entries (this includes initdb and
244          * startup without a pg_internal.init file).  The caller can also
245          * force a heap scan by setting indexOK == false.
246          */
247         pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
248         pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
249                                                                            indexOK && criticalRelcachesBuilt,
250                                                                            SnapshotNow,
251                                                                            1, key);
252
253         pg_class_tuple = systable_getnext(pg_class_scan);
254
255         /*
256          * Must copy tuple before releasing buffer.
257          */
258         if (HeapTupleIsValid(pg_class_tuple))
259                 pg_class_tuple = heap_copytuple(pg_class_tuple);
260
261         /* all done */
262         systable_endscan(pg_class_scan);
263         heap_close(pg_class_desc, AccessShareLock);
264
265         return pg_class_tuple;
266 }
267
268 /*
269  *              AllocateRelationDesc
270  *
271  *              This is used to allocate memory for a new relation descriptor
272  *              and initialize the rd_rel field.
273  *
274  *              If 'relation' is NULL, allocate a new RelationData object.
275  *              If not, reuse the given object (that path is taken only when
276  *              we have to rebuild a relcache entry during RelationClearRelation).
277  */
278 static Relation
279 AllocateRelationDesc(Relation relation, Form_pg_class relp)
280 {
281         MemoryContext oldcxt;
282         Form_pg_class relationForm;
283
284         /* Relcache entries must live in CacheMemoryContext */
285         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
286
287         /*
288          * allocate space for new relation descriptor, if needed
289          */
290         if (relation == NULL)
291                 relation = (Relation) palloc(sizeof(RelationData));
292
293         /*
294          * clear all fields of reldesc
295          */
296         MemSet(relation, 0, sizeof(RelationData));
297         relation->rd_targblock = InvalidBlockNumber;
298
299         /* make sure relation is marked as having no open file yet */
300         relation->rd_smgr = NULL;
301
302         /*
303          * Copy the relation tuple form
304          *
305          * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE.
306          * relacl is NOT stored in the relcache --- there'd be little point in
307          * it, since we don't copy the tuple's nullvalues bitmap and hence
308          * wouldn't know if the value is valid ... bottom line is that relacl
309          * *cannot* be retrieved from the relcache.  Get it from the syscache
310          * if you need it.
311          */
312         relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
313
314         memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
315
316         /* initialize relation tuple form */
317         relation->rd_rel = relationForm;
318
319         /* and allocate attribute tuple form storage */
320         relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
321                                                                                            relationForm->relhasoids);
322
323         MemoryContextSwitchTo(oldcxt);
324
325         return relation;
326 }
327
328 /*
329  *              RelationBuildTupleDesc
330  *
331  *              Form the relation's tuple descriptor from information in
332  *              the pg_attribute, pg_attrdef & pg_constraint system catalogs.
333  */
334 static void
335 RelationBuildTupleDesc(Relation relation)
336 {
337         HeapTuple       pg_attribute_tuple;
338         Relation        pg_attribute_desc;
339         SysScanDesc pg_attribute_scan;
340         ScanKeyData skey[2];
341         int                     need;
342         TupleConstr *constr;
343         AttrDefault *attrdef = NULL;
344         int                     ndef = 0;
345
346         /* copy some fields from pg_class row to rd_att */
347         relation->rd_att->tdtypeid = relation->rd_rel->reltype;
348         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
349         relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
350
351         constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
352                                                                                                 sizeof(TupleConstr));
353         constr->has_not_null = false;
354
355         /*
356          * Form a scan key that selects only user attributes (attnum > 0).
357          * (Eliminating system attribute rows at the index level is lots
358          * faster than fetching them.)
359          */
360         ScanKeyInit(&skey[0],
361                                 Anum_pg_attribute_attrelid,
362                                 BTEqualStrategyNumber, F_OIDEQ,
363                                 ObjectIdGetDatum(RelationGetRelid(relation)));
364         ScanKeyInit(&skey[1],
365                                 Anum_pg_attribute_attnum,
366                                 BTGreaterStrategyNumber, F_INT2GT,
367                                 Int16GetDatum(0));
368
369         /*
370          * Open pg_attribute and begin a scan.  Force heap scan if we haven't
371          * yet built the critical relcache entries (this includes initdb and
372          * startup without a pg_internal.init file).
373          */
374         pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
375         pg_attribute_scan = systable_beginscan(pg_attribute_desc,
376                                                                                    AttributeRelidNumIndexId,
377                                                                                    criticalRelcachesBuilt,
378                                                                                    SnapshotNow,
379                                                                                    2, skey);
380
381         /*
382          * add attribute data to relation->rd_att
383          */
384         need = relation->rd_rel->relnatts;
385
386         while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
387         {
388                 Form_pg_attribute attp;
389
390                 attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
391
392                 if (attp->attnum <= 0 ||
393                         attp->attnum > relation->rd_rel->relnatts)
394                         elog(ERROR, "invalid attribute number %d for %s",
395                                  attp->attnum, RelationGetRelationName(relation));
396
397                 memcpy(relation->rd_att->attrs[attp->attnum - 1],
398                            attp,
399                            ATTRIBUTE_TUPLE_SIZE);
400
401                 /* Update constraint/default info */
402                 if (attp->attnotnull)
403                         constr->has_not_null = true;
404
405                 if (attp->atthasdef)
406                 {
407                         if (attrdef == NULL)
408                                 attrdef = (AttrDefault *)
409                                         MemoryContextAllocZero(CacheMemoryContext,
410                                                                                    relation->rd_rel->relnatts *
411                                                                                    sizeof(AttrDefault));
412                         attrdef[ndef].adnum = attp->attnum;
413                         attrdef[ndef].adbin = NULL;
414                         ndef++;
415                 }
416                 need--;
417                 if (need == 0)
418                         break;
419         }
420
421         /*
422          * end the scan and close the attribute relation
423          */
424         systable_endscan(pg_attribute_scan);
425         heap_close(pg_attribute_desc, AccessShareLock);
426
427         if (need != 0)
428                 elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
429                          need, RelationGetRelid(relation));
430
431         /*
432          * The attcacheoff values we read from pg_attribute should all be -1
433          * ("unknown").  Verify this if assert checking is on.  They will be
434          * computed when and if needed during tuple access.
435          */
436 #ifdef USE_ASSERT_CHECKING
437         {
438                 int                     i;
439
440                 for (i = 0; i < relation->rd_rel->relnatts; i++)
441                         Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
442         }
443 #endif
444
445         /*
446          * However, we can easily set the attcacheoff value for the first
447          * attribute: it must be zero.  This eliminates the need for special
448          * cases for attnum=1 that used to exist in fastgetattr() and
449          * index_getattr().
450          */
451         if (relation->rd_rel->relnatts > 0)
452                 relation->rd_att->attrs[0]->attcacheoff = 0;
453
454         /*
455          * Set up constraint/default info
456          */
457         if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
458         {
459                 relation->rd_att->constr = constr;
460
461                 if (ndef > 0)                   /* DEFAULTs */
462                 {
463                         if (ndef < relation->rd_rel->relnatts)
464                                 constr->defval = (AttrDefault *)
465                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
466                         else
467                                 constr->defval = attrdef;
468                         constr->num_defval = ndef;
469                         AttrDefaultFetch(relation);
470                 }
471                 else
472                         constr->num_defval = 0;
473
474                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
475                 {
476                         constr->num_check = relation->rd_rel->relchecks;
477                         constr->check = (ConstrCheck *)
478                                 MemoryContextAllocZero(CacheMemoryContext,
479                                                                 constr->num_check * sizeof(ConstrCheck));
480                         CheckConstraintFetch(relation);
481                 }
482                 else
483                         constr->num_check = 0;
484         }
485         else
486         {
487                 pfree(constr);
488                 relation->rd_att->constr = NULL;
489         }
490 }
491
492 /*
493  *              RelationBuildRuleLock
494  *
495  *              Form the relation's rewrite rules from information in
496  *              the pg_rewrite system catalog.
497  *
498  * Note: The rule parsetrees are potentially very complex node structures.
499  * To allow these trees to be freed when the relcache entry is flushed,
500  * we make a private memory context to hold the RuleLock information for
501  * each relcache entry that has associated rules.  The context is used
502  * just for rule info, not for any other subsidiary data of the relcache
503  * entry, because that keeps the update logic in RelationClearRelation()
504  * manageable.  The other subsidiary data structures are simple enough
505  * to be easy to free explicitly, anyway.
506  */
507 static void
508 RelationBuildRuleLock(Relation relation)
509 {
510         MemoryContext rulescxt;
511         MemoryContext oldcxt;
512         HeapTuple       rewrite_tuple;
513         Relation        rewrite_desc;
514         TupleDesc       rewrite_tupdesc;
515         SysScanDesc rewrite_scan;
516         ScanKeyData key;
517         RuleLock   *rulelock;
518         int                     numlocks;
519         RewriteRule **rules;
520         int                     maxlocks;
521
522         /*
523          * Make the private context.  Parameters are set on the assumption
524          * that it'll probably not contain much data.
525          */
526         rulescxt = AllocSetContextCreate(CacheMemoryContext,
527                                                                          RelationGetRelationName(relation),
528                                                                          ALLOCSET_SMALL_MINSIZE,
529                                                                          ALLOCSET_SMALL_INITSIZE,
530                                                                          ALLOCSET_SMALL_MAXSIZE);
531         relation->rd_rulescxt = rulescxt;
532
533         /*
534          * allocate an array to hold the rewrite rules (the array is extended
535          * if necessary)
536          */
537         maxlocks = 4;
538         rules = (RewriteRule **)
539                 MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
540         numlocks = 0;
541
542         /*
543          * form a scan key
544          */
545         ScanKeyInit(&key,
546                                 Anum_pg_rewrite_ev_class,
547                                 BTEqualStrategyNumber, F_OIDEQ,
548                                 ObjectIdGetDatum(RelationGetRelid(relation)));
549
550         /*
551          * open pg_rewrite and begin a scan
552          *
553          * Note: since we scan the rules using RewriteRelRulenameIndexId,
554          * we will be reading the rules in name order, except possibly during
555          * emergency-recovery operations (ie, IsIgnoringSystemIndexes). This
556          * in turn ensures that rules will be fired in name order.
557          */
558         rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
559         rewrite_tupdesc = RelationGetDescr(rewrite_desc);
560         rewrite_scan = systable_beginscan(rewrite_desc,
561                                                                           RewriteRelRulenameIndexId,
562                                                                           true, SnapshotNow,
563                                                                           1, &key);
564
565         while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
566         {
567                 Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
568                 bool            isnull;
569                 Datum           ruleaction;
570                 Datum           rule_evqual;
571                 char       *ruleaction_str;
572                 char       *rule_evqual_str;
573                 RewriteRule *rule;
574
575                 rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
576                                                                                                   sizeof(RewriteRule));
577
578                 rule->ruleId = HeapTupleGetOid(rewrite_tuple);
579
580                 rule->event = rewrite_form->ev_type - '0';
581                 rule->attrno = rewrite_form->ev_attr;
582                 rule->isInstead = rewrite_form->is_instead;
583
584                 /* Must use heap_getattr to fetch ev_qual and ev_action */
585
586                 ruleaction = heap_getattr(rewrite_tuple,
587                                                                   Anum_pg_rewrite_ev_action,
588                                                                   rewrite_tupdesc,
589                                                                   &isnull);
590                 Assert(!isnull);
591                 ruleaction_str = DatumGetCString(DirectFunctionCall1(textout,
592                                                                                                                          ruleaction));
593                 oldcxt = MemoryContextSwitchTo(rulescxt);
594                 rule->actions = (List *) stringToNode(ruleaction_str);
595                 MemoryContextSwitchTo(oldcxt);
596                 pfree(ruleaction_str);
597
598                 rule_evqual = heap_getattr(rewrite_tuple,
599                                                                    Anum_pg_rewrite_ev_qual,
600                                                                    rewrite_tupdesc,
601                                                                    &isnull);
602                 Assert(!isnull);
603                 rule_evqual_str = DatumGetCString(DirectFunctionCall1(textout,
604                                                                                                                    rule_evqual));
605                 oldcxt = MemoryContextSwitchTo(rulescxt);
606                 rule->qual = (Node *) stringToNode(rule_evqual_str);
607                 MemoryContextSwitchTo(oldcxt);
608                 pfree(rule_evqual_str);
609
610                 if (numlocks >= maxlocks)
611                 {
612                         maxlocks *= 2;
613                         rules = (RewriteRule **)
614                                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
615                 }
616                 rules[numlocks++] = rule;
617         }
618
619         /*
620          * end the scan and close the attribute relation
621          */
622         systable_endscan(rewrite_scan);
623         heap_close(rewrite_desc, AccessShareLock);
624
625         /*
626          * form a RuleLock and insert into relation
627          */
628         rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
629         rulelock->numLocks = numlocks;
630         rulelock->rules = rules;
631
632         relation->rd_rules = rulelock;
633 }
634
635 /*
636  *              equalRuleLocks
637  *
638  *              Determine whether two RuleLocks are equivalent
639  *
640  *              Probably this should be in the rules code someplace...
641  */
642 static bool
643 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
644 {
645         int                     i;
646
647         /*
648          * As of 7.3 we assume the rule ordering is repeatable, because
649          * RelationBuildRuleLock should read 'em in a consistent order.  So
650          * just compare corresponding slots.
651          */
652         if (rlock1 != NULL)
653         {
654                 if (rlock2 == NULL)
655                         return false;
656                 if (rlock1->numLocks != rlock2->numLocks)
657                         return false;
658                 for (i = 0; i < rlock1->numLocks; i++)
659                 {
660                         RewriteRule *rule1 = rlock1->rules[i];
661                         RewriteRule *rule2 = rlock2->rules[i];
662
663                         if (rule1->ruleId != rule2->ruleId)
664                                 return false;
665                         if (rule1->event != rule2->event)
666                                 return false;
667                         if (rule1->attrno != rule2->attrno)
668                                 return false;
669                         if (rule1->isInstead != rule2->isInstead)
670                                 return false;
671                         if (!equal(rule1->qual, rule2->qual))
672                                 return false;
673                         if (!equal(rule1->actions, rule2->actions))
674                                 return false;
675                 }
676         }
677         else if (rlock2 != NULL)
678                 return false;
679         return true;
680 }
681
682
683 /* ----------------------------------
684  *              RelationBuildDesc
685  *
686  *              Build a relation descriptor --- either a new one, or by
687  *              recycling the given old relation object.  The latter case
688  *              supports rebuilding a relcache entry without invalidating
689  *              pointers to it.
690  * --------------------------------
691  */
692 static Relation
693 RelationBuildDesc(Oid targetRelId, Relation oldrelation)
694 {
695         Relation        relation;
696         Oid                     relid;
697         HeapTuple       pg_class_tuple;
698         Form_pg_class relp;
699         MemoryContext oldcxt;
700
701         /*
702          * find the tuple in pg_class corresponding to the given relation id
703          */
704         pg_class_tuple = ScanPgRelation(targetRelId, true);
705
706         /*
707          * if no such tuple exists, return NULL
708          */
709         if (!HeapTupleIsValid(pg_class_tuple))
710                 return NULL;
711
712         /*
713          * get information from the pg_class_tuple
714          */
715         relid = HeapTupleGetOid(pg_class_tuple);
716         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
717
718         /*
719          * allocate storage for the relation descriptor, and copy
720          * pg_class_tuple to relation->rd_rel.
721          */
722         relation = AllocateRelationDesc(oldrelation, relp);
723
724         /*
725          * now we can free the memory allocated for pg_class_tuple
726          */
727         heap_freetuple(pg_class_tuple);
728
729         /*
730          * initialize the relation's relation id (relation->rd_id)
731          */
732         RelationGetRelid(relation) = relid;
733
734         /*
735          * normal relations are not nailed into the cache; nor can a
736          * pre-existing relation be new.  It could be temp though.      (Actually,
737          * it could be new too, but it's okay to forget that fact if forced to
738          * flush the entry.)
739          */
740         relation->rd_refcnt = 0;
741         relation->rd_isnailed = false;
742         relation->rd_createSubid = InvalidSubTransactionId;
743         relation->rd_istemp = isTempNamespace(relation->rd_rel->relnamespace);
744
745         /*
746          * initialize the tuple descriptor (relation->rd_att).
747          */
748         RelationBuildTupleDesc(relation);
749
750         /*
751          * Fetch rules and triggers that affect this relation
752          */
753         if (relation->rd_rel->relhasrules)
754                 RelationBuildRuleLock(relation);
755         else
756         {
757                 relation->rd_rules = NULL;
758                 relation->rd_rulescxt = NULL;
759         }
760
761         if (relation->rd_rel->reltriggers > 0)
762                 RelationBuildTriggers(relation);
763         else
764                 relation->trigdesc = NULL;
765
766         /*
767          * if it's an index, initialize index-related information
768          */
769         if (OidIsValid(relation->rd_rel->relam))
770                 RelationInitIndexAccessInfo(relation);
771
772         /*
773          * initialize the relation lock manager information
774          */
775         RelationInitLockInfo(relation);         /* see lmgr.c */
776
777         /*
778          * initialize physical addressing information for the relation
779          */
780         RelationInitPhysicalAddr(relation);
781
782         /* make sure relation is marked as having no open file yet */
783         relation->rd_smgr = NULL;
784
785         /*
786          * Insert newly created relation into relcache hash tables.
787          */
788         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
789         RelationCacheInsert(relation);
790         MemoryContextSwitchTo(oldcxt);
791
792         /* It's fully valid */
793         relation->rd_isvalid = true;
794
795         return relation;
796 }
797
798 /*
799  * Initialize the physical addressing info (RelFileNode) for a relcache entry
800  */
801 static void
802 RelationInitPhysicalAddr(Relation relation)
803 {
804         if (relation->rd_rel->reltablespace)
805                 relation->rd_node.spcNode = relation->rd_rel->reltablespace;
806         else
807                 relation->rd_node.spcNode = MyDatabaseTableSpace;
808         if (relation->rd_rel->relisshared)
809                 relation->rd_node.dbNode = InvalidOid;
810         else
811                 relation->rd_node.dbNode = MyDatabaseId;
812         relation->rd_node.relNode = relation->rd_rel->relfilenode;
813 }
814
815 /*
816  * Initialize index-access-method support data for an index relation
817  */
818 void
819 RelationInitIndexAccessInfo(Relation relation)
820 {
821         HeapTuple       tuple;
822         Form_pg_am      aform;
823         Datum           indclassDatum;
824         bool            isnull;
825         MemoryContext indexcxt;
826         MemoryContext oldcontext;
827         Oid                *operator;
828         RegProcedure *support;
829         FmgrInfo   *supportinfo;
830         int                     natts;
831         uint16          amstrategies;
832         uint16          amsupport;
833
834         /*
835          * Make a copy of the pg_index entry for the index.  Since pg_index
836          * contains variable-length and possibly-null fields, we have to do
837          * this honestly rather than just treating it as a Form_pg_index
838          * struct.
839          */
840         tuple = SearchSysCache(INDEXRELID,
841                                                    ObjectIdGetDatum(RelationGetRelid(relation)),
842                                                    0, 0, 0);
843         if (!HeapTupleIsValid(tuple))
844                 elog(ERROR, "cache lookup failed for index %u",
845                          RelationGetRelid(relation));
846         oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
847         relation->rd_indextuple = heap_copytuple(tuple);
848         relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
849         MemoryContextSwitchTo(oldcontext);
850         ReleaseSysCache(tuple);
851
852         /*
853          * indclass cannot be referenced directly through the C struct, because
854          * it is after the variable-width indkey field.  Therefore we extract
855          * the datum the hard way and provide a direct link in the relcache.
856          */
857         indclassDatum = fastgetattr(relation->rd_indextuple,
858                                                                 Anum_pg_index_indclass,
859                                                                 GetPgIndexDescriptor(),
860                                                                 &isnull);
861         Assert(!isnull);
862         relation->rd_indclass = (oidvector *) DatumGetPointer(indclassDatum);
863
864         /*
865          * Make a copy of the pg_am entry for the index's access method
866          */
867         tuple = SearchSysCache(AMOID,
868                                                    ObjectIdGetDatum(relation->rd_rel->relam),
869                                                    0, 0, 0);
870         if (!HeapTupleIsValid(tuple))
871                 elog(ERROR, "cache lookup failed for access method %u",
872                          relation->rd_rel->relam);
873         aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
874         memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
875         ReleaseSysCache(tuple);
876         relation->rd_am = aform;
877
878         natts = relation->rd_rel->relnatts;
879         if (natts != relation->rd_index->indnatts)
880                 elog(ERROR, "relnatts disagrees with indnatts for index %u",
881                          RelationGetRelid(relation));
882         amstrategies = aform->amstrategies;
883         amsupport = aform->amsupport;
884
885         /*
886          * Make the private context to hold index access info.  The reason we
887          * need a context, and not just a couple of pallocs, is so that we
888          * won't leak any subsidiary info attached to fmgr lookup records.
889          *
890          * Context parameters are set on the assumption that it'll probably not
891          * contain much data.
892          */
893         indexcxt = AllocSetContextCreate(CacheMemoryContext,
894                                                                          RelationGetRelationName(relation),
895                                                                          ALLOCSET_SMALL_MINSIZE,
896                                                                          ALLOCSET_SMALL_INITSIZE,
897                                                                          ALLOCSET_SMALL_MAXSIZE);
898         relation->rd_indexcxt = indexcxt;
899
900         /*
901          * Allocate arrays to hold data
902          */
903         relation->rd_aminfo = (RelationAmInfo *)
904                 MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
905
906         if (amstrategies > 0)
907                 operator = (Oid *)
908                         MemoryContextAllocZero(indexcxt,
909                                                                    natts * amstrategies * sizeof(Oid));
910         else
911                 operator = NULL;
912
913         if (amsupport > 0)
914         {
915                 int                     nsupport = natts * amsupport;
916
917                 support = (RegProcedure *)
918                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
919                 supportinfo = (FmgrInfo *)
920                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
921         }
922         else
923         {
924                 support = NULL;
925                 supportinfo = NULL;
926         }
927
928         relation->rd_operator = operator;
929         relation->rd_support = support;
930         relation->rd_supportinfo = supportinfo;
931
932         /*
933          * Fill the operator and support procedure OID arrays.  (aminfo and
934          * supportinfo are left as zeroes, and are filled on-the-fly when used)
935          */
936         IndexSupportInitialize(relation->rd_indclass,
937                                                    operator, support,
938                                                    amstrategies, amsupport, natts);
939
940         /*
941          * expressions and predicate cache will be filled later
942          */
943         relation->rd_indexprs = NIL;
944         relation->rd_indpred = NIL;
945 }
946
947 /*
948  * IndexSupportInitialize
949  *              Initializes an index's cached opclass information,
950  *              given the index's pg_index.indclass entry.
951  *
952  * Data is returned into *indexOperator and *indexSupport, which are arrays
953  * allocated by the caller.
954  *
955  * The caller also passes maxStrategyNumber, maxSupportNumber, and
956  * maxAttributeNumber, since these indicate the size of the arrays
957  * it has allocated --- but in practice these numbers must always match
958  * those obtainable from the system catalog entries for the index and
959  * access method.
960  */
961 static void
962 IndexSupportInitialize(oidvector *indclass,
963                                            Oid *indexOperator,
964                                            RegProcedure *indexSupport,
965                                            StrategyNumber maxStrategyNumber,
966                                            StrategyNumber maxSupportNumber,
967                                            AttrNumber maxAttributeNumber)
968 {
969         int                     attIndex;
970
971         for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
972         {
973                 OpClassCacheEnt *opcentry;
974
975                 if (!OidIsValid(indclass->values[attIndex]))
976                         elog(ERROR, "bogus pg_index tuple");
977
978                 /* look up the info for this opclass, using a cache */
979                 opcentry = LookupOpclassInfo(indclass->values[attIndex],
980                                                                          maxStrategyNumber,
981                                                                          maxSupportNumber);
982
983                 /* copy cached data into relcache entry */
984                 if (maxStrategyNumber > 0)
985                         memcpy(&indexOperator[attIndex * maxStrategyNumber],
986                                    opcentry->operatorOids,
987                                    maxStrategyNumber * sizeof(Oid));
988                 if (maxSupportNumber > 0)
989                         memcpy(&indexSupport[attIndex * maxSupportNumber],
990                                    opcentry->supportProcs,
991                                    maxSupportNumber * sizeof(RegProcedure));
992         }
993 }
994
995 /*
996  * LookupOpclassInfo
997  *
998  * This routine maintains a per-opclass cache of the information needed
999  * by IndexSupportInitialize().  This is more efficient than relying on
1000  * the catalog cache, because we can load all the info about a particular
1001  * opclass in a single indexscan of pg_amproc or pg_amop.
1002  *
1003  * The information from pg_am about expected range of strategy and support
1004  * numbers is passed in, rather than being looked up, mainly because the
1005  * caller will have it already.
1006  *
1007  * XXX There isn't any provision for flushing the cache.  However, there
1008  * isn't any provision for flushing relcache entries when opclass info
1009  * changes, either :-(
1010  */
1011 static OpClassCacheEnt *
1012 LookupOpclassInfo(Oid operatorClassOid,
1013                                   StrategyNumber numStrats,
1014                                   StrategyNumber numSupport)
1015 {
1016         OpClassCacheEnt *opcentry;
1017         bool            found;
1018         Relation        rel;
1019         SysScanDesc scan;
1020         ScanKeyData skey[2];
1021         HeapTuple       htup;
1022         bool            indexOK;
1023
1024         if (OpClassCache == NULL)
1025         {
1026                 /* First time through: initialize the opclass cache */
1027                 HASHCTL         ctl;
1028
1029                 if (!CacheMemoryContext)
1030                         CreateCacheMemoryContext();
1031
1032                 MemSet(&ctl, 0, sizeof(ctl));
1033                 ctl.keysize = sizeof(Oid);
1034                 ctl.entrysize = sizeof(OpClassCacheEnt);
1035                 ctl.hash = oid_hash;
1036                 OpClassCache = hash_create("Operator class cache", 64,
1037                                                                    &ctl, HASH_ELEM | HASH_FUNCTION);
1038         }
1039
1040         opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1041                                                                                            (void *) &operatorClassOid,
1042                                                                                            HASH_ENTER, &found);
1043
1044         if (found && opcentry->valid)
1045         {
1046                 /* Already made an entry for it */
1047                 Assert(numStrats == opcentry->numStrats);
1048                 Assert(numSupport == opcentry->numSupport);
1049                 return opcentry;
1050         }
1051
1052         /* Need to fill in new entry */
1053         opcentry->valid = false;        /* until known OK */
1054         opcentry->numStrats = numStrats;
1055         opcentry->numSupport = numSupport;
1056
1057         if (numStrats > 0)
1058                 opcentry->operatorOids = (Oid *)
1059                         MemoryContextAllocZero(CacheMemoryContext,
1060                                                                    numStrats * sizeof(Oid));
1061         else
1062                 opcentry->operatorOids = NULL;
1063
1064         if (numSupport > 0)
1065                 opcentry->supportProcs = (RegProcedure *)
1066                         MemoryContextAllocZero(CacheMemoryContext,
1067                                                                    numSupport * sizeof(RegProcedure));
1068         else
1069                 opcentry->supportProcs = NULL;
1070
1071         /*
1072          * To avoid infinite recursion during startup, force heap scans if
1073          * we're looking up info for the opclasses used by the indexes we
1074          * would like to reference here.
1075          */
1076         indexOK = criticalRelcachesBuilt ||
1077                 (operatorClassOid != OID_BTREE_OPS_OID &&
1078                  operatorClassOid != INT2_BTREE_OPS_OID);
1079
1080         /*
1081          * Scan pg_amop to obtain operators for the opclass.  We only fetch
1082          * the default ones (those with subtype zero).
1083          */
1084         if (numStrats > 0)
1085         {
1086                 ScanKeyInit(&skey[0],
1087                                         Anum_pg_amop_amopclaid,
1088                                         BTEqualStrategyNumber, F_OIDEQ,
1089                                         ObjectIdGetDatum(operatorClassOid));
1090                 ScanKeyInit(&skey[1],
1091                                         Anum_pg_amop_amopsubtype,
1092                                         BTEqualStrategyNumber, F_OIDEQ,
1093                                         ObjectIdGetDatum(InvalidOid));
1094                 rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
1095                 scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
1096                                                                   SnapshotNow, 2, skey);
1097
1098                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1099                 {
1100                         Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);
1101
1102                         if (amopform->amopstrategy <= 0 ||
1103                                 (StrategyNumber) amopform->amopstrategy > numStrats)
1104                                 elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1105                                          amopform->amopstrategy, operatorClassOid);
1106                         opcentry->operatorOids[amopform->amopstrategy - 1] =
1107                                 amopform->amopopr;
1108                 }
1109
1110                 systable_endscan(scan);
1111                 heap_close(rel, AccessShareLock);
1112         }
1113
1114         /*
1115          * Scan pg_amproc to obtain support procs for the opclass.      We only
1116          * fetch the default ones (those with subtype zero).
1117          */
1118         if (numSupport > 0)
1119         {
1120                 ScanKeyInit(&skey[0],
1121                                         Anum_pg_amproc_amopclaid,
1122                                         BTEqualStrategyNumber, F_OIDEQ,
1123                                         ObjectIdGetDatum(operatorClassOid));
1124                 ScanKeyInit(&skey[1],
1125                                         Anum_pg_amproc_amprocsubtype,
1126                                         BTEqualStrategyNumber, F_OIDEQ,
1127                                         ObjectIdGetDatum(InvalidOid));
1128                 rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
1129                 scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1130                                                                   SnapshotNow, 2, skey);
1131
1132                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1133                 {
1134                         Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1135
1136                         if (amprocform->amprocnum <= 0 ||
1137                                 (StrategyNumber) amprocform->amprocnum > numSupport)
1138                                 elog(ERROR, "invalid amproc number %d for opclass %u",
1139                                          amprocform->amprocnum, operatorClassOid);
1140
1141                         opcentry->supportProcs[amprocform->amprocnum - 1] =
1142                                 amprocform->amproc;
1143                 }
1144
1145                 systable_endscan(scan);
1146                 heap_close(rel, AccessShareLock);
1147         }
1148
1149         opcentry->valid = true;
1150         return opcentry;
1151 }
1152
1153
1154 /*
1155  *              formrdesc
1156  *
1157  *              This is a special cut-down version of RelationBuildDesc()
1158  *              used by RelationCacheInitialize() in initializing the relcache.
1159  *              The relation descriptor is built just from the supplied parameters,
1160  *              without actually looking at any system table entries.  We cheat
1161  *              quite a lot since we only need to work for a few basic system
1162  *              catalogs.
1163  *
1164  * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
1165  * and pg_type (see RelationCacheInitialize).
1166  *
1167  * Note that these catalogs can't have constraints (except attnotnull),
1168  * default values, rules, or triggers, since we don't cope with any of that.
1169  *
1170  * NOTE: we assume we are already switched into CacheMemoryContext.
1171  */
1172 static void
1173 formrdesc(const char *relationName, Oid relationReltype,
1174                   bool hasoids, int natts, FormData_pg_attribute *att)
1175 {
1176         Relation        relation;
1177         int                     i;
1178         bool            has_not_null;
1179
1180         /*
1181          * allocate new relation desc, clear all fields of reldesc
1182          */
1183         relation = (Relation) palloc0(sizeof(RelationData));
1184         relation->rd_targblock = InvalidBlockNumber;
1185
1186         /* make sure relation is marked as having no open file yet */
1187         relation->rd_smgr = NULL;
1188
1189         /*
1190          * initialize reference count: 1 because it is nailed in cache
1191          */
1192         relation->rd_refcnt = 1;
1193
1194         /*
1195          * all entries built with this routine are nailed-in-cache; none are
1196          * for new or temp relations.
1197          */
1198         relation->rd_isnailed = true;
1199         relation->rd_createSubid = InvalidSubTransactionId;
1200         relation->rd_istemp = false;
1201
1202         /*
1203          * initialize relation tuple form
1204          *
1205          * The data we insert here is pretty incomplete/bogus, but it'll serve to
1206          * get us launched.  RelationCacheInitializePhase2() will read the
1207          * real data from pg_class and replace what we've done here.
1208          */
1209         relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1210
1211         namestrcpy(&relation->rd_rel->relname, relationName);
1212         relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1213         relation->rd_rel->reltype = relationReltype;
1214
1215         /*
1216          * It's important to distinguish between shared and non-shared
1217          * relations, even at bootstrap time, to make sure we know where they
1218          * are stored.  At present, all relations that formrdesc is used for
1219          * are not shared.
1220          */
1221         relation->rd_rel->relisshared = false;
1222
1223         relation->rd_rel->relpages = 1;
1224         relation->rd_rel->reltuples = 1;
1225         relation->rd_rel->relkind = RELKIND_RELATION;
1226         relation->rd_rel->relhasoids = hasoids;
1227         relation->rd_rel->relnatts = (int16) natts;
1228
1229         /*
1230          * initialize attribute tuple form
1231          *
1232          * Unlike the case with the relation tuple, this data had better be right
1233          * because it will never be replaced.  The input values must be
1234          * correctly defined by macros in src/include/catalog/ headers.
1235          */
1236         relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1237         relation->rd_att->tdtypeid = relationReltype;
1238         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
1239
1240         /*
1241          * initialize tuple desc info
1242          */
1243         has_not_null = false;
1244         for (i = 0; i < natts; i++)
1245         {
1246                 memcpy(relation->rd_att->attrs[i],
1247                            &att[i],
1248                            ATTRIBUTE_TUPLE_SIZE);
1249                 has_not_null |= att[i].attnotnull;
1250                 /* make sure attcacheoff is valid */
1251                 relation->rd_att->attrs[i]->attcacheoff = -1;
1252         }
1253
1254         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1255         relation->rd_att->attrs[0]->attcacheoff = 0;
1256
1257         /* mark not-null status */
1258         if (has_not_null)
1259         {
1260                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1261
1262                 constr->has_not_null = true;
1263                 relation->rd_att->constr = constr;
1264         }
1265
1266         /*
1267          * initialize relation id from info in att array (my, this is ugly)
1268          */
1269         RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1270         relation->rd_rel->relfilenode = RelationGetRelid(relation);
1271
1272         /*
1273          * initialize the relation lock manager information
1274          */
1275         RelationInitLockInfo(relation);         /* see lmgr.c */
1276
1277         /*
1278          * initialize physical addressing information for the relation
1279          */
1280         RelationInitPhysicalAddr(relation);
1281
1282         /*
1283          * initialize the rel-has-index flag, using hardwired knowledge
1284          */
1285         if (IsBootstrapProcessingMode())
1286         {
1287                 /* In bootstrap mode, we have no indexes */
1288                 relation->rd_rel->relhasindex = false;
1289         }
1290         else
1291         {
1292                 /* Otherwise, all the rels formrdesc is used for have indexes */
1293                 relation->rd_rel->relhasindex = true;
1294         }
1295
1296         /*
1297          * add new reldesc to relcache
1298          */
1299         RelationCacheInsert(relation);
1300
1301         /* It's fully valid */
1302         relation->rd_isvalid = true;
1303 }
1304
1305
1306 /* ----------------------------------------------------------------
1307  *                               Relation Descriptor Lookup Interface
1308  * ----------------------------------------------------------------
1309  */
1310
1311 /*
1312  *              RelationIdCacheGetRelation
1313  *
1314  *              Lookup an existing reldesc by OID.
1315  *
1316  *              Only try to get the reldesc by looking in the cache,
1317  *              do not go to the disk if it's not present.
1318  *
1319  *              NB: relation ref count is incremented if successful.
1320  *              Caller should eventually decrement count.  (Usually,
1321  *              that happens by calling RelationClose().)
1322  */
1323 Relation
1324 RelationIdCacheGetRelation(Oid relationId)
1325 {
1326         Relation        rd;
1327
1328         RelationIdCacheLookup(relationId, rd);
1329
1330         if (RelationIsValid(rd))
1331         {
1332                 RelationIncrementReferenceCount(rd);
1333                 /* revalidate nailed index if necessary */
1334                 if (!rd->rd_isvalid)
1335                         RelationReloadClassinfo(rd);
1336         }
1337
1338         return rd;
1339 }
1340
1341 /*
1342  *              RelationIdGetRelation
1343  *
1344  *              Lookup a reldesc by OID; make one if not already in cache.
1345  *
1346  *              NB: relation ref count is incremented, or set to 1 if new entry.
1347  *              Caller should eventually decrement count.  (Usually,
1348  *              that happens by calling RelationClose().)
1349  */
1350 Relation
1351 RelationIdGetRelation(Oid relationId)
1352 {
1353         Relation        rd;
1354
1355         /*
1356          * first try and get a reldesc from the cache
1357          */
1358         rd = RelationIdCacheGetRelation(relationId);
1359         if (RelationIsValid(rd))
1360                 return rd;
1361
1362         /*
1363          * no reldesc in the cache, so have RelationBuildDesc() build one and
1364          * add it.
1365          */
1366         rd = RelationBuildDesc(relationId, NULL);
1367         if (RelationIsValid(rd))
1368                 RelationIncrementReferenceCount(rd);
1369         return rd;
1370 }
1371
1372 /* ----------------------------------------------------------------
1373  *                              cache invalidation support routines
1374  * ----------------------------------------------------------------
1375  */
1376
1377 /*
1378  * RelationIncrementReferenceCount
1379  *              Increments relation reference count.
1380  *
1381  * Note: bootstrap mode has its own weird ideas about relation refcount
1382  * behavior; we ought to fix it someday, but for now, just disable
1383  * reference count ownership tracking in bootstrap mode.
1384  */
1385 void
1386 RelationIncrementReferenceCount(Relation rel)
1387 {
1388         ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
1389         rel->rd_refcnt += 1;
1390         if (!IsBootstrapProcessingMode())
1391                 ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
1392 }
1393
1394 /*
1395  * RelationDecrementReferenceCount
1396  *              Decrements relation reference count.
1397  */
1398 void
1399 RelationDecrementReferenceCount(Relation rel)
1400 {
1401         Assert(rel->rd_refcnt > 0);
1402         rel->rd_refcnt -= 1;
1403         if (!IsBootstrapProcessingMode())
1404                 ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
1405 }
1406
1407 /*
1408  * RelationClose - close an open relation
1409  *
1410  *      Actually, we just decrement the refcount.
1411  *
1412  *      NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
1413  *      will be freed as soon as their refcount goes to zero.  In combination
1414  *      with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
1415  *      to catch references to already-released relcache entries.  It slows
1416  *      things down quite a bit, however.
1417  */
1418 void
1419 RelationClose(Relation relation)
1420 {
1421         /* Note: no locking manipulations needed */
1422         RelationDecrementReferenceCount(relation);
1423
1424 #ifdef RELCACHE_FORCE_RELEASE
1425         if (RelationHasReferenceCountZero(relation) &&
1426                 relation->rd_createSubid == InvalidSubTransactionId)
1427                 RelationClearRelation(relation, false);
1428 #endif
1429 }
1430
1431 /*
1432  * RelationReloadClassinfo - reload the pg_class row (only)
1433  *
1434  *      This function is used only for nailed indexes.  Since a REINDEX can
1435  *      change the relfilenode value for a nailed index, we have to reread
1436  *      the pg_class row anytime we get an SI invalidation on a nailed index
1437  *      (without throwing away the whole relcache entry, since we'd be unable
1438  *      to rebuild it).
1439  *
1440  *      We can't necessarily reread the pg_class row right away; we might be
1441  *      in a failed transaction when we receive the SI notification.  If so,
1442  *      RelationClearRelation just marks the entry as invalid by setting
1443  *      rd_isvalid to false.  This routine is called to fix the entry when it
1444  *      is next needed.
1445  */
1446 static void
1447 RelationReloadClassinfo(Relation relation)
1448 {
1449         bool            indexOK;
1450         HeapTuple       pg_class_tuple;
1451         Form_pg_class relp;
1452
1453         /* Should be called only for invalidated nailed indexes */
1454         Assert(relation->rd_isnailed && !relation->rd_isvalid &&
1455                    relation->rd_rel->relkind == RELKIND_INDEX);
1456         /*
1457          * Read the pg_class row
1458          *
1459          * Don't try to use an indexscan of pg_class_oid_index to reload the
1460          * info for pg_class_oid_index ...
1461          */
1462         indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
1463         pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK);
1464         if (!HeapTupleIsValid(pg_class_tuple))
1465                 elog(ERROR, "could not find tuple for system relation %u",
1466                          RelationGetRelid(relation));
1467         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1468         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
1469         /* Now we can recalculate physical address */
1470         RelationInitPhysicalAddr(relation);
1471         heap_freetuple(pg_class_tuple);
1472         relation->rd_targblock = InvalidBlockNumber;
1473         /* Okay, now it's valid again */
1474         relation->rd_isvalid = true;
1475 }
1476
1477 /*
1478  * RelationClearRelation
1479  *
1480  *       Physically blow away a relation cache entry, or reset it and rebuild
1481  *       it from scratch (that is, from catalog entries).  The latter path is
1482  *       usually used when we are notified of a change to an open relation
1483  *       (one with refcount > 0).  However, this routine just does whichever
1484  *       it's told to do; callers must determine which they want.
1485  */
1486 static void
1487 RelationClearRelation(Relation relation, bool rebuild)
1488 {
1489         Oid                     old_reltype = relation->rd_rel->reltype;
1490         MemoryContext oldcxt;
1491
1492         /*
1493          * Make sure smgr and lower levels close the relation's files, if they
1494          * weren't closed already.  If the relation is not getting deleted,
1495          * the next smgr access should reopen the files automatically.  This
1496          * ensures that the low-level file access state is updated after, say,
1497          * a vacuum truncation.
1498          */
1499         RelationCloseSmgr(relation);
1500
1501         /*
1502          * Never, never ever blow away a nailed-in system relation, because
1503          * we'd be unable to recover.  However, we must reset rd_targblock, in
1504          * case we got called because of a relation cache flush that was
1505          * triggered by VACUUM.
1506          *
1507          * If it's a nailed index, then we need to re-read the pg_class row to
1508          * see if its relfilenode changed.      We can't necessarily do that here,
1509          * because we might be in a failed transaction.  We assume it's okay
1510          * to do it if there are open references to the relcache entry (cf
1511          * notes for AtEOXact_RelationCache).  Otherwise just mark the entry
1512          * as possibly invalid, and it'll be fixed when next opened.
1513          */
1514         if (relation->rd_isnailed)
1515         {
1516                 relation->rd_targblock = InvalidBlockNumber;
1517                 if (relation->rd_rel->relkind == RELKIND_INDEX)
1518                 {
1519                         relation->rd_isvalid = false;           /* needs to be revalidated */
1520                         if (relation->rd_refcnt > 1)
1521                                 RelationReloadClassinfo(relation);
1522                 }
1523                 return;
1524         }
1525
1526         /*
1527          * Remove relation from hash tables
1528          *
1529          * Note: we might be reinserting it momentarily, but we must not have it
1530          * visible in the hash tables until it's valid again, so don't try to
1531          * optimize this away...
1532          */
1533         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
1534         RelationCacheDelete(relation);
1535         MemoryContextSwitchTo(oldcxt);
1536
1537         /* Clear out catcache's entries for this relation */
1538         CatalogCacheFlushRelation(RelationGetRelid(relation));
1539
1540         /*
1541          * Free all the subsidiary data structures of the relcache entry. We
1542          * cannot free rd_att if we are trying to rebuild the entry, however,
1543          * because pointers to it may be cached in various places. The rule
1544          * manager might also have pointers into the rewrite rules. So to
1545          * begin with, we can only get rid of these fields:
1546          */
1547         FreeTriggerDesc(relation->trigdesc);
1548         if (relation->rd_indextuple)
1549                 pfree(relation->rd_indextuple);
1550         if (relation->rd_am)
1551                 pfree(relation->rd_am);
1552         if (relation->rd_rel)
1553                 pfree(relation->rd_rel);
1554         list_free(relation->rd_indexlist);
1555         if (relation->rd_indexcxt)
1556                 MemoryContextDelete(relation->rd_indexcxt);
1557
1558         /*
1559          * If we're really done with the relcache entry, blow it away. But if
1560          * someone is still using it, reconstruct the whole deal without
1561          * moving the physical RelationData record (so that the someone's
1562          * pointer is still valid).
1563          */
1564         if (!rebuild)
1565         {
1566                 /* ok to zap remaining substructure */
1567                 flush_rowtype_cache(old_reltype);
1568                 FreeTupleDesc(relation->rd_att);
1569                 if (relation->rd_rulescxt)
1570                         MemoryContextDelete(relation->rd_rulescxt);
1571                 pfree(relation);
1572         }
1573         else
1574         {
1575                 /*
1576                  * When rebuilding an open relcache entry, must preserve ref count
1577                  * and rd_createSubid state.  Also attempt to preserve the
1578                  * tupledesc and rewrite-rule substructures in place.
1579                  *
1580                  * Note that this process does not touch CurrentResourceOwner; which
1581                  * is good because whatever ref counts the entry may have do not
1582                  * necessarily belong to that resource owner.
1583                  */
1584                 Oid                     save_relid = RelationGetRelid(relation);
1585                 int                     old_refcnt = relation->rd_refcnt;
1586                 SubTransactionId old_createSubid = relation->rd_createSubid;
1587                 TupleDesc       old_att = relation->rd_att;
1588                 RuleLock   *old_rules = relation->rd_rules;
1589                 MemoryContext old_rulescxt = relation->rd_rulescxt;
1590
1591                 if (RelationBuildDesc(save_relid, relation) != relation)
1592                 {
1593                         /* Should only get here if relation was deleted */
1594                         flush_rowtype_cache(old_reltype);
1595                         FreeTupleDesc(old_att);
1596                         if (old_rulescxt)
1597                                 MemoryContextDelete(old_rulescxt);
1598                         pfree(relation);
1599                         elog(ERROR, "relation %u deleted while still in use", save_relid);
1600                 }
1601                 relation->rd_refcnt = old_refcnt;
1602                 relation->rd_createSubid = old_createSubid;
1603                 if (equalTupleDescs(old_att, relation->rd_att))
1604                 {
1605                         /* needn't flush typcache here */
1606                         FreeTupleDesc(relation->rd_att);
1607                         relation->rd_att = old_att;
1608                 }
1609                 else
1610                 {
1611                         flush_rowtype_cache(old_reltype);
1612                         FreeTupleDesc(old_att);
1613                 }
1614                 if (equalRuleLocks(old_rules, relation->rd_rules))
1615                 {
1616                         if (relation->rd_rulescxt)
1617                                 MemoryContextDelete(relation->rd_rulescxt);
1618                         relation->rd_rules = old_rules;
1619                         relation->rd_rulescxt = old_rulescxt;
1620                 }
1621                 else
1622                 {
1623                         if (old_rulescxt)
1624                                 MemoryContextDelete(old_rulescxt);
1625                 }
1626         }
1627 }
1628
1629 /*
1630  * RelationFlushRelation
1631  *
1632  *       Rebuild the relation if it is open (refcount > 0), else blow it away.
1633  */
1634 static void
1635 RelationFlushRelation(Relation relation)
1636 {
1637         bool            rebuild;
1638
1639         if (relation->rd_createSubid != InvalidSubTransactionId)
1640         {
1641                 /*
1642                  * New relcache entries are always rebuilt, not flushed; else we'd
1643                  * forget the "new" status of the relation, which is a useful
1644                  * optimization to have.
1645                  */
1646                 rebuild = true;
1647         }
1648         else
1649         {
1650                 /*
1651                  * Pre-existing rels can be dropped from the relcache if not open.
1652                  */
1653                 rebuild = !RelationHasReferenceCountZero(relation);
1654         }
1655
1656         RelationClearRelation(relation, rebuild);
1657 }
1658
1659 /*
1660  * RelationForgetRelation - unconditionally remove a relcache entry
1661  *
1662  *                 External interface for destroying a relcache entry when we
1663  *                 drop the relation.
1664  */
1665 void
1666 RelationForgetRelation(Oid rid)
1667 {
1668         Relation        relation;
1669
1670         RelationIdCacheLookup(rid, relation);
1671
1672         if (!PointerIsValid(relation))
1673                 return;                                 /* not in cache, nothing to do */
1674
1675         if (!RelationHasReferenceCountZero(relation))
1676                 elog(ERROR, "relation %u is still open", rid);
1677
1678         /* Unconditionally destroy the relcache entry */
1679         RelationClearRelation(relation, false);
1680 }
1681
1682 /*
1683  *              RelationCacheInvalidateEntry
1684  *
1685  *              This routine is invoked for SI cache flush messages.
1686  *
1687  * Any relcache entry matching the relid must be flushed.  (Note: caller has
1688  * already determined that the relid belongs to our database or is a shared
1689  * relation.)
1690  *
1691  * We used to skip local relations, on the grounds that they could
1692  * not be targets of cross-backend SI update messages; but it seems
1693  * safer to process them, so that our *own* SI update messages will
1694  * have the same effects during CommandCounterIncrement for both
1695  * local and nonlocal relations.
1696  */
1697 void
1698 RelationCacheInvalidateEntry(Oid relationId)
1699 {
1700         Relation        relation;
1701
1702         RelationIdCacheLookup(relationId, relation);
1703
1704         if (PointerIsValid(relation))
1705         {
1706                 relcacheInvalsReceived++;
1707                 RelationFlushRelation(relation);
1708         }
1709 }
1710
1711 /*
1712  * RelationCacheInvalidate
1713  *       Blow away cached relation descriptors that have zero reference counts,
1714  *       and rebuild those with positive reference counts.      Also reset the smgr
1715  *       relation cache.
1716  *
1717  *       This is currently used only to recover from SI message buffer overflow,
1718  *       so we do not touch new-in-transaction relations; they cannot be targets
1719  *       of cross-backend SI updates (and our own updates now go through a
1720  *       separate linked list that isn't limited by the SI message buffer size).
1721  *
1722  *       We do this in two phases: the first pass deletes deletable items, and
1723  *       the second one rebuilds the rebuildable items.  This is essential for
1724  *       safety, because hash_seq_search only copes with concurrent deletion of
1725  *       the element it is currently visiting.  If a second SI overflow were to
1726  *       occur while we are walking the table, resulting in recursive entry to
1727  *       this routine, we could crash because the inner invocation blows away
1728  *       the entry next to be visited by the outer scan.  But this way is OK,
1729  *       because (a) during the first pass we won't process any more SI messages,
1730  *       so hash_seq_search will complete safely; (b) during the second pass we
1731  *       only hold onto pointers to nondeletable entries.
1732  *
1733  *       The two-phase approach also makes it easy to ensure that we process
1734  *       nailed-in-cache indexes before other nondeletable items, and that we
1735  *       process pg_class_oid_index first of all.  In scenarios where a nailed
1736  *       index has been given a new relfilenode, we have to detect that update
1737  *       before the nailed index is used in reloading any other relcache entry.
1738  */
1739 void
1740 RelationCacheInvalidate(void)
1741 {
1742         HASH_SEQ_STATUS status;
1743         RelIdCacheEnt *idhentry;
1744         Relation        relation;
1745         List       *rebuildFirstList = NIL;
1746         List       *rebuildList = NIL;
1747         ListCell   *l;
1748
1749         /* Phase 1 */
1750         hash_seq_init(&status, RelationIdCache);
1751
1752         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1753         {
1754                 relation = idhentry->reldesc;
1755
1756                 /* Must close all smgr references to avoid leaving dangling ptrs */
1757                 RelationCloseSmgr(relation);
1758
1759                 /* Ignore new relations, since they are never SI targets */
1760                 if (relation->rd_createSubid != InvalidSubTransactionId)
1761                         continue;
1762
1763                 relcacheInvalsReceived++;
1764
1765                 if (RelationHasReferenceCountZero(relation))
1766                 {
1767                         /* Delete this entry immediately */
1768                         Assert(!relation->rd_isnailed);
1769                         RelationClearRelation(relation, false);
1770                 }
1771                 else
1772                 {
1773                         /*
1774                          * Add this entry to list of stuff to rebuild in second pass.
1775                          * pg_class_oid_index goes on the front of rebuildFirstList,
1776                          * other nailed indexes on the back, and everything else into
1777                          * rebuildList (in no particular order).
1778                          */
1779                         if (relation->rd_isnailed &&
1780                                 relation->rd_rel->relkind == RELKIND_INDEX)
1781                         {
1782                                 if (RelationGetRelid(relation) == ClassOidIndexId)
1783                                         rebuildFirstList = lcons(relation, rebuildFirstList);
1784                                 else
1785                                         rebuildFirstList = lappend(rebuildFirstList, relation);
1786                         }
1787                         else
1788                                 rebuildList = lcons(relation, rebuildList);
1789                 }
1790         }
1791
1792         rebuildList = list_concat(rebuildFirstList, rebuildList);
1793
1794         /*
1795          * Now zap any remaining smgr cache entries.  This must happen before
1796          * we start to rebuild entries, since that may involve catalog fetches
1797          * which will re-open catalog files.
1798          */
1799         smgrcloseall();
1800
1801         /* Phase 2: rebuild the items found to need rebuild in phase 1 */
1802         foreach(l, rebuildList)
1803         {
1804                 relation = (Relation) lfirst(l);
1805                 RelationClearRelation(relation, true);
1806         }
1807         list_free(rebuildList);
1808 }
1809
1810 /*
1811  * AtEOXact_RelationCache
1812  *
1813  *      Clean up the relcache at main-transaction commit or abort.
1814  *
1815  * Note: this must be called *before* processing invalidation messages.
1816  * In the case of abort, we don't want to try to rebuild any invalidated
1817  * cache entries (since we can't safely do database accesses).  Therefore
1818  * we must reset refcnts before handling pending invalidations.
1819  */
1820 void
1821 AtEOXact_RelationCache(bool isCommit)
1822 {
1823         HASH_SEQ_STATUS status;
1824         RelIdCacheEnt *idhentry;
1825
1826         hash_seq_init(&status, RelationIdCache);
1827
1828         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1829         {
1830                 Relation        relation = idhentry->reldesc;
1831                 int                     expected_refcnt;
1832
1833                 /*
1834                  * Is it a relation created in the current transaction?
1835                  *
1836                  * During commit, reset the flag to zero, since we are now out of the
1837                  * creating transaction.  During abort, simply delete the relcache
1838                  * entry --- it isn't interesting any longer.  (NOTE: if we have
1839                  * forgotten the new-ness of a new relation due to a forced cache
1840                  * flush, the entry will get deleted anyway by shared-cache-inval
1841                  * processing of the aborted pg_class insertion.)
1842                  */
1843                 if (relation->rd_createSubid != InvalidSubTransactionId)
1844                 {
1845                         if (isCommit)
1846                                 relation->rd_createSubid = InvalidSubTransactionId;
1847                         else
1848                         {
1849                                 RelationClearRelation(relation, false);
1850                                 continue;
1851                         }
1852                 }
1853
1854                 /*
1855                  * During transaction abort, we must also reset relcache entry ref
1856                  * counts to their normal not-in-a-transaction state.  A ref count
1857                  * may be too high because some routine was exited by ereport()
1858                  * between incrementing and decrementing the count.
1859                  *
1860                  * During commit, we should not have to do this, but it's still
1861                  * useful to check that the counts are correct to catch missed
1862                  * relcache closes.
1863                  *
1864                  * In bootstrap mode, do NOT reset the refcnt nor complain that it's
1865                  * nonzero --- the bootstrap code expects relations to stay open
1866                  * across start/commit transaction calls.  (That seems bogus, but
1867                  * it's not worth fixing.)
1868                  */
1869                 expected_refcnt = relation->rd_isnailed ? 1 : 0;
1870
1871                 if (isCommit)
1872                 {
1873                         if (relation->rd_refcnt != expected_refcnt &&
1874                                 !IsBootstrapProcessingMode())
1875                         {
1876                                 elog(WARNING, "relcache reference leak: relation \"%s\" has refcnt %d instead of %d",
1877                                          RelationGetRelationName(relation),
1878                                          relation->rd_refcnt, expected_refcnt);
1879                                 relation->rd_refcnt = expected_refcnt;
1880                         }
1881                 }
1882                 else
1883                 {
1884                         /* abort case, just reset it quietly */
1885                         relation->rd_refcnt = expected_refcnt;
1886                 }
1887
1888                 /*
1889                  * Flush any temporary index list.
1890                  */
1891                 if (relation->rd_indexvalid == 2)
1892                 {
1893                         list_free(relation->rd_indexlist);
1894                         relation->rd_indexlist = NIL;
1895                         relation->rd_indexvalid = 0;
1896                 }
1897         }
1898
1899         /* Once done with the transaction, we can reset need_eosubxact_work */
1900         need_eosubxact_work = false;
1901 }
1902
1903 /*
1904  * AtEOSubXact_RelationCache
1905  *
1906  *      Clean up the relcache at sub-transaction commit or abort.
1907  *
1908  * Note: this must be called *before* processing invalidation messages.
1909  */
1910 void
1911 AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
1912                                                   SubTransactionId parentSubid)
1913 {
1914         HASH_SEQ_STATUS status;
1915         RelIdCacheEnt *idhentry;
1916
1917         /*
1918          * In the majority of subtransactions there is not anything for this
1919          * routine to do, and since there are usually many entries in the
1920          * relcache, uselessly scanning the cache represents a surprisingly
1921          * large fraction of the subtransaction entry/exit overhead.  To avoid
1922          * this, we keep a static flag that must be set whenever a condition
1923          * is created that requires subtransaction-end work.  (Currently, this
1924          * means either a relation is created in the current xact, or an index
1925          * list is forced.)  For simplicity, the flag remains set till end of
1926          * top-level transaction, even though we could clear it earlier in some
1927          * cases.
1928          */
1929         if (!need_eosubxact_work)
1930                 return;
1931
1932         hash_seq_init(&status, RelationIdCache);
1933
1934         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1935         {
1936                 Relation        relation = idhentry->reldesc;
1937
1938                 /*
1939                  * Is it a relation created in the current subtransaction?
1940                  *
1941                  * During subcommit, mark it as belonging to the parent, instead.
1942                  * During subabort, simply delete the relcache entry.
1943                  */
1944                 if (relation->rd_createSubid == mySubid)
1945                 {
1946                         if (isCommit)
1947                                 relation->rd_createSubid = parentSubid;
1948                         else
1949                         {
1950                                 Assert(RelationHasReferenceCountZero(relation));
1951                                 RelationClearRelation(relation, false);
1952                                 continue;
1953                         }
1954                 }
1955
1956                 /*
1957                  * Flush any temporary index list.
1958                  */
1959                 if (relation->rd_indexvalid == 2)
1960                 {
1961                         list_free(relation->rd_indexlist);
1962                         relation->rd_indexlist = NIL;
1963                         relation->rd_indexvalid = 0;
1964                 }
1965         }
1966 }
1967
1968 /*
1969  *              RelationBuildLocalRelation
1970  *                      Build a relcache entry for an about-to-be-created relation,
1971  *                      and enter it into the relcache.
1972  */
1973 Relation
1974 RelationBuildLocalRelation(const char *relname,
1975                                                    Oid relnamespace,
1976                                                    TupleDesc tupDesc,
1977                                                    Oid relid,
1978                                                    Oid reltablespace,
1979                                                    bool shared_relation)
1980 {
1981         Relation        rel;
1982         MemoryContext oldcxt;
1983         int                     natts = tupDesc->natts;
1984         int                     i;
1985         bool            has_not_null;
1986         bool            nailit;
1987
1988         AssertArg(natts >= 0);
1989
1990         /*
1991          * check for creation of a rel that must be nailed in cache.
1992          *
1993          * XXX this list had better match RelationCacheInitialize's list.
1994          */
1995         switch (relid)
1996         {
1997                 case RelationRelationId:
1998                 case AttributeRelationId:
1999                 case ProcedureRelationId:
2000                 case TypeRelationId:
2001                         nailit = true;
2002                         break;
2003                 default:
2004                         nailit = false;
2005                         break;
2006         }
2007
2008         /*
2009          * switch to the cache context to create the relcache entry.
2010          */
2011         if (!CacheMemoryContext)
2012                 CreateCacheMemoryContext();
2013
2014         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2015
2016         /*
2017          * allocate a new relation descriptor and fill in basic state fields.
2018          */
2019         rel = (Relation) palloc0(sizeof(RelationData));
2020
2021         rel->rd_targblock = InvalidBlockNumber;
2022
2023         /* make sure relation is marked as having no open file yet */
2024         rel->rd_smgr = NULL;
2025
2026         /* mark it nailed if appropriate */
2027         rel->rd_isnailed = nailit;
2028
2029         rel->rd_refcnt = nailit ? 1 : 0;
2030
2031         /* it's being created in this transaction */
2032         rel->rd_createSubid = GetCurrentSubTransactionId();
2033
2034         /* must flag that we have rels created in this transaction */
2035         need_eosubxact_work = true;
2036
2037         /* is it a temporary relation? */
2038         rel->rd_istemp = isTempNamespace(relnamespace);
2039
2040         /*
2041          * create a new tuple descriptor from the one passed in.  We do this
2042          * partly to copy it into the cache context, and partly because the
2043          * new relation can't have any defaults or constraints yet; they have
2044          * to be added in later steps, because they require additions to
2045          * multiple system catalogs.  We can copy attnotnull constraints here,
2046          * however.
2047          */
2048         rel->rd_att = CreateTupleDescCopy(tupDesc);
2049         has_not_null = false;
2050         for (i = 0; i < natts; i++)
2051         {
2052                 rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
2053                 has_not_null |= tupDesc->attrs[i]->attnotnull;
2054         }
2055
2056         if (has_not_null)
2057         {
2058                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
2059
2060                 constr->has_not_null = true;
2061                 rel->rd_att->constr = constr;
2062         }
2063
2064         /*
2065          * initialize relation tuple form (caller may add/override data later)
2066          */
2067         rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
2068
2069         namestrcpy(&rel->rd_rel->relname, relname);
2070         rel->rd_rel->relnamespace = relnamespace;
2071
2072         rel->rd_rel->relkind = RELKIND_UNCATALOGED;
2073         rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
2074         rel->rd_rel->relnatts = natts;
2075         rel->rd_rel->reltype = InvalidOid;
2076
2077         /*
2078          * Insert relation physical and logical identifiers (OIDs) into the
2079          * right places.  Note that the physical ID (relfilenode) is initially
2080          * the same as the logical ID (OID).
2081          */
2082         rel->rd_rel->relisshared = shared_relation;
2083
2084         RelationGetRelid(rel) = relid;
2085
2086         for (i = 0; i < natts; i++)
2087                 rel->rd_att->attrs[i]->attrelid = relid;
2088
2089         rel->rd_rel->relfilenode = relid;
2090         rel->rd_rel->reltablespace = reltablespace;
2091
2092         RelationInitLockInfo(rel);      /* see lmgr.c */
2093
2094         RelationInitPhysicalAddr(rel);
2095
2096         /*
2097          * Okay to insert into the relcache hash tables.
2098          */
2099         RelationCacheInsert(rel);
2100
2101         /*
2102          * done building relcache entry.
2103          */
2104         MemoryContextSwitchTo(oldcxt);
2105
2106         /* It's fully valid */
2107         rel->rd_isvalid = true;
2108
2109         /*
2110          * Caller expects us to pin the returned entry.
2111          */
2112         RelationIncrementReferenceCount(rel);
2113
2114         return rel;
2115 }
2116
2117 /*
2118  *              RelationCacheInitialize
2119  *
2120  *              This initializes the relation descriptor cache.  At the time
2121  *              that this is invoked, we can't do database access yet (mainly
2122  *              because the transaction subsystem is not up), so we can't get
2123  *              "real" info.  However it's okay to read the pg_internal.init
2124  *              cache file, if one is available.  Otherwise we make phony
2125  *              entries for the minimum set of nailed-in-cache relations.
2126  */
2127
2128 #define INITRELCACHESIZE                400
2129
2130 void
2131 RelationCacheInitialize(void)
2132 {
2133         MemoryContext oldcxt;
2134         HASHCTL         ctl;
2135
2136         /*
2137          * switch to cache memory context
2138          */
2139         if (!CacheMemoryContext)
2140                 CreateCacheMemoryContext();
2141
2142         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2143
2144         /*
2145          * create hashtables that index the relcache
2146          */
2147         MemSet(&ctl, 0, sizeof(ctl));
2148         ctl.keysize = sizeof(Oid);
2149         ctl.entrysize = sizeof(RelIdCacheEnt);
2150         ctl.hash = oid_hash;
2151         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2152                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2153
2154         /*
2155          * Try to load the relcache cache file.  If successful, we're done for
2156          * now.  Otherwise, initialize the cache with pre-made descriptors for
2157          * the critical "nailed-in" system catalogs.
2158          */
2159         if (IsBootstrapProcessingMode() ||
2160                 !load_relcache_init_file())
2161         {
2162                 formrdesc("pg_class", PG_CLASS_RELTYPE_OID,
2163                                   true, Natts_pg_class, Desc_pg_class);
2164                 formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID,
2165                                   false, Natts_pg_attribute, Desc_pg_attribute);
2166                 formrdesc("pg_proc", PG_PROC_RELTYPE_OID,
2167                                   true, Natts_pg_proc, Desc_pg_proc);
2168                 formrdesc("pg_type", PG_TYPE_RELTYPE_OID,
2169                                   true, Natts_pg_type, Desc_pg_type);
2170
2171 #define NUM_CRITICAL_RELS       4       /* fix if you change list above */
2172         }
2173
2174         MemoryContextSwitchTo(oldcxt);
2175 }
2176
2177 /*
2178  *              RelationCacheInitializePhase2
2179  *
2180  *              This is called as soon as the catcache and transaction system
2181  *              are functional.  At this point we can actually read data from
2182  *              the system catalogs.  Update the relcache entries made during
2183  *              RelationCacheInitialize, and make sure we have entries for the
2184  *              critical system indexes.
2185  */
2186 void
2187 RelationCacheInitializePhase2(void)
2188 {
2189         HASH_SEQ_STATUS status;
2190         RelIdCacheEnt *idhentry;
2191
2192         if (IsBootstrapProcessingMode())
2193                 return;
2194
2195         /*
2196          * If we didn't get the critical system indexes loaded into relcache,
2197          * do so now.  These are critical because the catcache depends on them
2198          * for catcache fetches that are done during relcache load.  Thus, we
2199          * have an infinite-recursion problem.  We can break the recursion by
2200          * doing heapscans instead of indexscans at certain key spots. To
2201          * avoid hobbling performance, we only want to do that until we have
2202          * the critical indexes loaded into relcache.  Thus, the flag
2203          * criticalRelcachesBuilt is used to decide whether to do heapscan or
2204          * indexscan at the key spots, and we set it true after we've loaded
2205          * the critical indexes.
2206          *
2207          * The critical indexes are marked as "nailed in cache", partly to make
2208          * it easy for load_relcache_init_file to count them, but mainly
2209          * because we cannot flush and rebuild them once we've set
2210          * criticalRelcachesBuilt to true.      (NOTE: perhaps it would be
2211          * possible to reload them by temporarily setting
2212          * criticalRelcachesBuilt to false again.  For now, though, we just
2213          * nail 'em in.)
2214          */
2215         if (!criticalRelcachesBuilt)
2216         {
2217                 Relation        ird;
2218
2219 #define LOAD_CRIT_INDEX(indexoid) \
2220                 do { \
2221                         ird = RelationBuildDesc((indexoid), NULL); \
2222                         ird->rd_isnailed = true; \
2223                         ird->rd_refcnt = 1; \
2224                 } while (0)
2225
2226                 LOAD_CRIT_INDEX(ClassOidIndexId);
2227                 LOAD_CRIT_INDEX(AttributeRelidNumIndexId);
2228                 LOAD_CRIT_INDEX(IndexRelidIndexId);
2229                 LOAD_CRIT_INDEX(AccessMethodStrategyIndexId);
2230                 LOAD_CRIT_INDEX(AccessMethodProcedureIndexId);
2231                 LOAD_CRIT_INDEX(OperatorOidIndexId);
2232
2233 #define NUM_CRITICAL_INDEXES    6               /* fix if you change list above */
2234
2235                 criticalRelcachesBuilt = true;
2236         }
2237
2238         /*
2239          * Now, scan all the relcache entries and update anything that might
2240          * be wrong in the results from formrdesc or the relcache cache file.
2241          * If we faked up relcache entries using formrdesc, then read the real
2242          * pg_class rows and replace the fake entries with them. Also, if any
2243          * of the relcache entries have rules or triggers, load that info the
2244          * hard way since it isn't recorded in the cache file.
2245          */
2246         hash_seq_init(&status, RelationIdCache);
2247
2248         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2249         {
2250                 Relation        relation = idhentry->reldesc;
2251
2252                 /*
2253                  * If it's a faked-up entry, read the real pg_class tuple.
2254                  */
2255                 if (needNewCacheFile && relation->rd_isnailed)
2256                 {
2257                         HeapTuple       htup;
2258                         Form_pg_class relp;
2259
2260                         htup = SearchSysCache(RELOID,
2261                                                         ObjectIdGetDatum(RelationGetRelid(relation)),
2262                                                                   0, 0, 0);
2263                         if (!HeapTupleIsValid(htup))
2264                                 elog(FATAL, "cache lookup failed for relation %u",
2265                                          RelationGetRelid(relation));
2266                         relp = (Form_pg_class) GETSTRUCT(htup);
2267
2268                         /*
2269                          * Copy tuple to relation->rd_rel. (See notes in
2270                          * AllocateRelationDesc())
2271                          */
2272                         Assert(relation->rd_rel != NULL);
2273                         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
2274
2275                         /*
2276                          * Also update the derived fields in rd_att.
2277                          */
2278                         relation->rd_att->tdtypeid = relp->reltype;
2279                         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
2280                         relation->rd_att->tdhasoid = relp->relhasoids;
2281
2282                         ReleaseSysCache(htup);
2283                 }
2284
2285                 /*
2286                  * Fix data that isn't saved in relcache cache file.
2287                  */
2288                 if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
2289                         RelationBuildRuleLock(relation);
2290                 if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
2291                         RelationBuildTriggers(relation);
2292         }
2293 }
2294
2295 /*
2296  *              RelationCacheInitializePhase3
2297  *
2298  *              Final step of relcache initialization: write out a new relcache
2299  *              cache file if one is needed.
2300  */
2301 void
2302 RelationCacheInitializePhase3(void)
2303 {
2304         if (IsBootstrapProcessingMode())
2305                 return;
2306
2307         if (needNewCacheFile)
2308         {
2309                 /*
2310                  * Force all the catcaches to finish initializing and thereby open
2311                  * the catalogs and indexes they use.  This will preload the
2312                  * relcache with entries for all the most important system
2313                  * catalogs and indexes, so that the init file will be most useful
2314                  * for future backends.
2315                  */
2316                 InitCatalogCachePhase2();
2317
2318                 /* now write the file */
2319                 write_relcache_init_file();
2320         }
2321 }
2322
2323 /*
2324  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
2325  *
2326  * We need this kluge because we have to be able to access non-fixed-width
2327  * fields of pg_index before we have the standard catalog caches available.
2328  * We use predefined data that's set up in just the same way as the
2329  * bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
2330  * not 100% kosher: it does not have the correct rowtype OID in tdtypeid,
2331  * nor does it have a TupleConstr field.  But it's good enough for the
2332  * purpose of extracting fields.
2333  */
2334 static TupleDesc
2335 GetPgIndexDescriptor(void)
2336 {
2337         static TupleDesc pgindexdesc = NULL;
2338         MemoryContext oldcxt;
2339         int                     i;
2340
2341         /* Already done? */
2342         if (pgindexdesc)
2343                 return pgindexdesc;
2344
2345         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2346
2347         pgindexdesc = CreateTemplateTupleDesc(Natts_pg_index, false);
2348         pgindexdesc->tdtypeid = RECORDOID; /* not right, but we don't care */
2349         pgindexdesc->tdtypmod = -1;
2350
2351         for (i = 0; i < Natts_pg_index; i++)
2352         {
2353                 memcpy(pgindexdesc->attrs[i],
2354                            &Desc_pg_index[i],
2355                            ATTRIBUTE_TUPLE_SIZE);
2356                 /* make sure attcacheoff is valid */
2357                 pgindexdesc->attrs[i]->attcacheoff = -1;
2358         }
2359
2360         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
2361         pgindexdesc->attrs[0]->attcacheoff = 0;
2362
2363         /* Note: we don't bother to set up a TupleConstr entry */
2364
2365         MemoryContextSwitchTo(oldcxt);
2366
2367         return pgindexdesc;
2368 }
2369
2370 static void
2371 AttrDefaultFetch(Relation relation)
2372 {
2373         AttrDefault *attrdef = relation->rd_att->constr->defval;
2374         int                     ndef = relation->rd_att->constr->num_defval;
2375         Relation        adrel;
2376         SysScanDesc adscan;
2377         ScanKeyData skey;
2378         HeapTuple       htup;
2379         Datum           val;
2380         bool            isnull;
2381         int                     found;
2382         int                     i;
2383
2384         ScanKeyInit(&skey,
2385                                 Anum_pg_attrdef_adrelid,
2386                                 BTEqualStrategyNumber, F_OIDEQ,
2387                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2388
2389         adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
2390         adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
2391                                                                 SnapshotNow, 1, &skey);
2392         found = 0;
2393
2394         while (HeapTupleIsValid(htup = systable_getnext(adscan)))
2395         {
2396                 Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
2397
2398                 for (i = 0; i < ndef; i++)
2399                 {
2400                         if (adform->adnum != attrdef[i].adnum)
2401                                 continue;
2402                         if (attrdef[i].adbin != NULL)
2403                                 elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
2404                                          NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2405                                          RelationGetRelationName(relation));
2406                         else
2407                                 found++;
2408
2409                         val = fastgetattr(htup,
2410                                                           Anum_pg_attrdef_adbin,
2411                                                           adrel->rd_att, &isnull);
2412                         if (isnull)
2413                                 elog(WARNING, "null adbin for attr %s of rel %s",
2414                                          NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2415                                          RelationGetRelationName(relation));
2416                         else
2417                                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
2418                                                          DatumGetCString(DirectFunctionCall1(textout,
2419                                                                                                                                  val)));
2420                         break;
2421                 }
2422
2423                 if (i >= ndef)
2424                         elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
2425                                  adform->adnum, RelationGetRelationName(relation));
2426         }
2427
2428         systable_endscan(adscan);
2429         heap_close(adrel, AccessShareLock);
2430
2431         if (found != ndef)
2432                 elog(WARNING, "%d attrdef record(s) missing for rel %s",
2433                          ndef - found, RelationGetRelationName(relation));
2434 }
2435
2436 static void
2437 CheckConstraintFetch(Relation relation)
2438 {
2439         ConstrCheck *check = relation->rd_att->constr->check;
2440         int                     ncheck = relation->rd_att->constr->num_check;
2441         Relation        conrel;
2442         SysScanDesc conscan;
2443         ScanKeyData skey[1];
2444         HeapTuple       htup;
2445         Datum           val;
2446         bool            isnull;
2447         int                     found = 0;
2448
2449         ScanKeyInit(&skey[0],
2450                                 Anum_pg_constraint_conrelid,
2451                                 BTEqualStrategyNumber, F_OIDEQ,
2452                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2453
2454         conrel = heap_open(ConstraintRelationId, AccessShareLock);
2455         conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
2456                                                                  SnapshotNow, 1, skey);
2457
2458         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
2459         {
2460                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
2461
2462                 /* We want check constraints only */
2463                 if (conform->contype != CONSTRAINT_CHECK)
2464                         continue;
2465
2466                 if (found >= ncheck)
2467                         elog(ERROR, "unexpected constraint record found for rel %s",
2468                                  RelationGetRelationName(relation));
2469
2470                 check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
2471                                                                                           NameStr(conform->conname));
2472
2473                 /* Grab and test conbin is actually set */
2474                 val = fastgetattr(htup,
2475                                                   Anum_pg_constraint_conbin,
2476                                                   conrel->rd_att, &isnull);
2477                 if (isnull)
2478                         elog(ERROR, "null conbin for rel %s",
2479                                  RelationGetRelationName(relation));
2480
2481                 check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
2482                                                          DatumGetCString(DirectFunctionCall1(textout,
2483                                                                                                                                  val)));
2484                 found++;
2485         }
2486
2487         systable_endscan(conscan);
2488         heap_close(conrel, AccessShareLock);
2489
2490         if (found != ncheck)
2491                 elog(ERROR, "%d constraint record(s) missing for rel %s",
2492                          ncheck - found, RelationGetRelationName(relation));
2493 }
2494
2495 /*
2496  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
2497  *
2498  * The index list is created only if someone requests it.  We scan pg_index
2499  * to find relevant indexes, and add the list to the relcache entry so that
2500  * we won't have to compute it again.  Note that shared cache inval of a
2501  * relcache entry will delete the old list and set rd_indexvalid to 0,
2502  * so that we must recompute the index list on next request.  This handles
2503  * creation or deletion of an index.
2504  *
2505  * The returned list is guaranteed to be sorted in order by OID.  This is
2506  * needed by the executor, since for index types that we obtain exclusive
2507  * locks on when updating the index, all backends must lock the indexes in
2508  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
2509  * consistent ordering would do, but ordering by OID is easy.
2510  *
2511  * Since shared cache inval causes the relcache's copy of the list to go away,
2512  * we return a copy of the list palloc'd in the caller's context.  The caller
2513  * may freeList() the returned list after scanning it.  This is necessary
2514  * since the caller will typically be doing syscache lookups on the relevant
2515  * indexes, and syscache lookup could cause SI messages to be processed!
2516  */
2517 List *
2518 RelationGetIndexList(Relation relation)
2519 {
2520         Relation        indrel;
2521         SysScanDesc indscan;
2522         ScanKeyData skey;
2523         HeapTuple       htup;
2524         List       *result;
2525         MemoryContext oldcxt;
2526
2527         /* Quick exit if we already computed the list. */
2528         if (relation->rd_indexvalid != 0)
2529                 return list_copy(relation->rd_indexlist);
2530
2531         /*
2532          * We build the list we intend to return (in the caller's context)
2533          * while doing the scan.  After successfully completing the scan, we
2534          * copy that list into the relcache entry.      This avoids cache-context
2535          * memory leakage if we get some sort of error partway through.
2536          */
2537         result = NIL;
2538
2539         /* Prepare to scan pg_index for entries having indrelid = this rel. */
2540         ScanKeyInit(&skey,
2541                                 Anum_pg_index_indrelid,
2542                                 BTEqualStrategyNumber, F_OIDEQ,
2543                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2544
2545         indrel = heap_open(IndexRelationId, AccessShareLock);
2546         indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
2547                                                                  SnapshotNow, 1, &skey);
2548
2549         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
2550         {
2551                 Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
2552
2553                 result = insert_ordered_oid(result, index->indexrelid);
2554         }
2555
2556         systable_endscan(indscan);
2557         heap_close(indrel, AccessShareLock);
2558
2559         /* Now save a copy of the completed list in the relcache entry. */
2560         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2561         relation->rd_indexlist = list_copy(result);
2562         relation->rd_indexvalid = 1;
2563         MemoryContextSwitchTo(oldcxt);
2564
2565         return result;
2566 }
2567
2568 /*
2569  * insert_ordered_oid
2570  *              Insert a new Oid into a sorted list of Oids, preserving ordering
2571  *
2572  * Building the ordered list this way is O(N^2), but with a pretty small
2573  * constant, so for the number of entries we expect it will probably be
2574  * faster than trying to apply qsort().  Most tables don't have very many
2575  * indexes...
2576  */
2577 static List *
2578 insert_ordered_oid(List *list, Oid datum)
2579 {
2580         ListCell   *prev;
2581
2582         /* Does the datum belong at the front? */
2583         if (list == NIL || datum < linitial_oid(list))
2584                 return lcons_oid(datum, list);
2585         /* No, so find the entry it belongs after */
2586         prev = list_head(list);
2587         for (;;)
2588         {
2589                 ListCell   *curr = lnext(prev);
2590
2591                 if (curr == NULL || datum < lfirst_oid(curr))
2592                         break;                          /* it belongs after 'prev', before 'curr' */
2593
2594                 prev = curr;
2595         }
2596         /* Insert datum into list after 'prev' */
2597         lappend_cell_oid(list, prev, datum);
2598         return list;
2599 }
2600
2601 /*
2602  * RelationSetIndexList -- externally force the index list contents
2603  *
2604  * This is used to temporarily override what we think the set of valid
2605  * indexes is.  The forcing will be valid only until transaction commit
2606  * or abort.
2607  *
2608  * This should only be applied to nailed relations, because in a non-nailed
2609  * relation the hacked index list could be lost at any time due to SI
2610  * messages.  In practice it is only used on pg_class (see REINDEX).
2611  *
2612  * It is up to the caller to make sure the given list is correctly ordered.
2613  */
2614 void
2615 RelationSetIndexList(Relation relation, List *indexIds)
2616 {
2617         MemoryContext oldcxt;
2618
2619         Assert(relation->rd_isnailed);
2620         /* Copy the list into the cache context (could fail for lack of mem) */
2621         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2622         indexIds = list_copy(indexIds);
2623         MemoryContextSwitchTo(oldcxt);
2624         /* Okay to replace old list */
2625         list_free(relation->rd_indexlist);
2626         relation->rd_indexlist = indexIds;
2627         relation->rd_indexvalid = 2;    /* mark list as forced */
2628         /* must flag that we have a forced index list */
2629         need_eosubxact_work = true;
2630 }
2631
2632 /*
2633  * RelationGetIndexExpressions -- get the index expressions for an index
2634  *
2635  * We cache the result of transforming pg_index.indexprs into a node tree.
2636  * If the rel is not an index or has no expressional columns, we return NIL.
2637  * Otherwise, the returned tree is copied into the caller's memory context.
2638  * (We don't want to return a pointer to the relcache copy, since it could
2639  * disappear due to relcache invalidation.)
2640  */
2641 List *
2642 RelationGetIndexExpressions(Relation relation)
2643 {
2644         List       *result;
2645         Datum           exprsDatum;
2646         bool            isnull;
2647         char       *exprsString;
2648         MemoryContext oldcxt;
2649
2650         /* Quick exit if we already computed the result. */
2651         if (relation->rd_indexprs)
2652                 return (List *) copyObject(relation->rd_indexprs);
2653
2654         /* Quick exit if there is nothing to do. */
2655         if (relation->rd_indextuple == NULL ||
2656                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
2657                 return NIL;
2658
2659         /*
2660          * We build the tree we intend to return in the caller's context.
2661          * After successfully completing the work, we copy it into the
2662          * relcache entry.      This avoids problems if we get some sort of error
2663          * partway through.
2664          */
2665         exprsDatum = heap_getattr(relation->rd_indextuple,
2666                                                           Anum_pg_index_indexprs,
2667                                                           GetPgIndexDescriptor(),
2668                                                           &isnull);
2669         Assert(!isnull);
2670         exprsString = DatumGetCString(DirectFunctionCall1(textout, exprsDatum));
2671         result = (List *) stringToNode(exprsString);
2672         pfree(exprsString);
2673
2674         /*
2675          * Run the expressions through eval_const_expressions. This is not just an
2676          * optimization, but is necessary, because the planner will be comparing
2677          * them to similarly-processed qual clauses, and may fail to detect valid
2678          * matches without this.  We don't bother with canonicalize_qual, however.
2679          */
2680         result = (List *) eval_const_expressions((Node *) result);
2681
2682         /*
2683          * Also mark any coercion format fields as "don't care", so that the
2684          * planner can match to both explicit and implicit coercions.
2685          */
2686         set_coercionform_dontcare((Node *) result);
2687
2688         /* May as well fix opfuncids too */
2689         fix_opfuncids((Node *) result);
2690
2691         /* Now save a copy of the completed tree in the relcache entry. */
2692         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2693         relation->rd_indexprs = (List *) copyObject(result);
2694         MemoryContextSwitchTo(oldcxt);
2695
2696         return result;
2697 }
2698
2699 /*
2700  * RelationGetIndexPredicate -- get the index predicate for an index
2701  *
2702  * We cache the result of transforming pg_index.indpred into an implicit-AND
2703  * node tree (suitable for ExecQual).
2704  * If the rel is not an index or has no predicate, we return NIL.
2705  * Otherwise, the returned tree is copied into the caller's memory context.
2706  * (We don't want to return a pointer to the relcache copy, since it could
2707  * disappear due to relcache invalidation.)
2708  */
2709 List *
2710 RelationGetIndexPredicate(Relation relation)
2711 {
2712         List       *result;
2713         Datum           predDatum;
2714         bool            isnull;
2715         char       *predString;
2716         MemoryContext oldcxt;
2717
2718         /* Quick exit if we already computed the result. */
2719         if (relation->rd_indpred)
2720                 return (List *) copyObject(relation->rd_indpred);
2721
2722         /* Quick exit if there is nothing to do. */
2723         if (relation->rd_indextuple == NULL ||
2724                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
2725                 return NIL;
2726
2727         /*
2728          * We build the tree we intend to return in the caller's context.
2729          * After successfully completing the work, we copy it into the
2730          * relcache entry.      This avoids problems if we get some sort of error
2731          * partway through.
2732          */
2733         predDatum = heap_getattr(relation->rd_indextuple,
2734                                                          Anum_pg_index_indpred,
2735                                                          GetPgIndexDescriptor(),
2736                                                          &isnull);
2737         Assert(!isnull);
2738         predString = DatumGetCString(DirectFunctionCall1(textout, predDatum));
2739         result = (List *) stringToNode(predString);
2740         pfree(predString);
2741
2742         /*
2743          * Run the expression through const-simplification and canonicalization.
2744          * This is not just an optimization, but is necessary, because the planner
2745          * will be comparing it to similarly-processed qual clauses, and may fail
2746          * to detect valid matches without this.  This must match the processing
2747          * done to qual clauses in preprocess_expression()!  (We can skip the
2748          * stuff involving subqueries, however, since we don't allow any in
2749          * index predicates.)
2750          */
2751         result = (List *) eval_const_expressions((Node *) result);
2752
2753         result = (List *) canonicalize_qual((Expr *) result);
2754
2755         /*
2756          * Also mark any coercion format fields as "don't care", so that the
2757          * planner can match to both explicit and implicit coercions.
2758          */
2759         set_coercionform_dontcare((Node *) result);
2760
2761         /* Also convert to implicit-AND format */
2762         result = make_ands_implicit((Expr *) result);
2763
2764         /* May as well fix opfuncids too */
2765         fix_opfuncids((Node *) result);
2766
2767         /* Now save a copy of the completed tree in the relcache entry. */
2768         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2769         relation->rd_indpred = (List *) copyObject(result);
2770         MemoryContextSwitchTo(oldcxt);
2771
2772         return result;
2773 }
2774
2775
2776 /*
2777  *      load_relcache_init_file, write_relcache_init_file
2778  *
2779  *              In late 1992, we started regularly having databases with more than
2780  *              a thousand classes in them.  With this number of classes, it became
2781  *              critical to do indexed lookups on the system catalogs.
2782  *
2783  *              Bootstrapping these lookups is very hard.  We want to be able to
2784  *              use an index on pg_attribute, for example, but in order to do so,
2785  *              we must have read pg_attribute for the attributes in the index,
2786  *              which implies that we need to use the index.
2787  *
2788  *              In order to get around the problem, we do the following:
2789  *
2790  *                 +  When the database system is initialized (at initdb time), we
2791  *                        don't use indexes.  We do sequential scans.
2792  *
2793  *                 +  When the backend is started up in normal mode, we load an image
2794  *                        of the appropriate relation descriptors, in internal format,
2795  *                        from an initialization file in the data/base/... directory.
2796  *
2797  *                 +  If the initialization file isn't there, then we create the
2798  *                        relation descriptors using sequential scans and write 'em to
2799  *                        the initialization file for use by subsequent backends.
2800  *
2801  *              We could dispense with the initialization file and just build the
2802  *              critical reldescs the hard way on every backend startup, but that
2803  *              slows down backend startup noticeably.
2804  *
2805  *              We can in fact go further, and save more relcache entries than
2806  *              just the ones that are absolutely critical; this allows us to speed
2807  *              up backend startup by not having to build such entries the hard way.
2808  *              Presently, all the catalog and index entries that are referred to
2809  *              by catcaches are stored in the initialization file.
2810  *
2811  *              The same mechanism that detects when catcache and relcache entries
2812  *              need to be invalidated (due to catalog updates) also arranges to
2813  *              unlink the initialization file when its contents may be out of date.
2814  *              The file will then be rebuilt during the next backend startup.
2815  */
2816
2817 /*
2818  * load_relcache_init_file -- attempt to load cache from the init file
2819  *
2820  * If successful, return TRUE and set criticalRelcachesBuilt to true.
2821  * If not successful, return FALSE and set needNewCacheFile to true.
2822  *
2823  * NOTE: we assume we are already switched into CacheMemoryContext.
2824  */
2825 static bool
2826 load_relcache_init_file(void)
2827 {
2828         FILE       *fp;
2829         char            initfilename[MAXPGPATH];
2830         Relation   *rels;
2831         int                     relno,
2832                                 num_rels,
2833                                 max_rels,
2834                                 nailed_rels,
2835                                 nailed_indexes,
2836                                 magic;
2837         int                     i;
2838
2839         snprintf(initfilename, sizeof(initfilename), "%s/%s",
2840                          DatabasePath, RELCACHE_INIT_FILENAME);
2841
2842         fp = AllocateFile(initfilename, PG_BINARY_R);
2843         if (fp == NULL)
2844         {
2845                 needNewCacheFile = true;
2846                 return false;
2847         }
2848
2849         /*
2850          * Read the index relcache entries from the file.  Note we will not
2851          * enter any of them into the cache if the read fails partway through;
2852          * this helps to guard against broken init files.
2853          */
2854         max_rels = 100;
2855         rels = (Relation *) palloc(max_rels * sizeof(Relation));
2856         num_rels = 0;
2857         nailed_rels = nailed_indexes = 0;
2858         initFileRelationIds = NIL;
2859
2860         /* check for correct magic number (compatible version) */
2861         if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
2862                 goto read_failed;
2863         if (magic != RELCACHE_INIT_FILEMAGIC)
2864                 goto read_failed;
2865
2866         for (relno = 0;; relno++)
2867         {
2868                 Size            len;
2869                 size_t          nread;
2870                 Relation        rel;
2871                 Form_pg_class relform;
2872                 bool            has_not_null;
2873                 Datum           indclassDatum;
2874                 bool            isnull;
2875
2876                 /* first read the relation descriptor length */
2877                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2878                 {
2879                         if (nread == 0)
2880                                 break;                  /* end of file */
2881                         goto read_failed;
2882                 }
2883
2884                 /* safety check for incompatible relcache layout */
2885                 if (len != sizeof(RelationData))
2886                         goto read_failed;
2887
2888                 /* allocate another relcache header */
2889                 if (num_rels >= max_rels)
2890                 {
2891                         max_rels *= 2;
2892                         rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
2893                 }
2894
2895                 rel = rels[num_rels++] = (Relation) palloc(len);
2896
2897                 /* then, read the Relation structure */
2898                 if ((nread = fread(rel, 1, len, fp)) != len)
2899                         goto read_failed;
2900
2901                 /* next read the relation tuple form */
2902                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2903                         goto read_failed;
2904
2905                 relform = (Form_pg_class) palloc(len);
2906                 if ((nread = fread(relform, 1, len, fp)) != len)
2907                         goto read_failed;
2908
2909                 rel->rd_rel = relform;
2910
2911                 /* initialize attribute tuple forms */
2912                 rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
2913                                                                                           relform->relhasoids);
2914                 rel->rd_att->tdtypeid = relform->reltype;
2915                 rel->rd_att->tdtypmod = -1;             /* unnecessary, but... */
2916
2917                 /* next read all the attribute tuple form data entries */
2918                 has_not_null = false;
2919                 for (i = 0; i < relform->relnatts; i++)
2920                 {
2921                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2922                                 goto read_failed;
2923                         if (len != ATTRIBUTE_TUPLE_SIZE)
2924                                 goto read_failed;
2925                         if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
2926                                 goto read_failed;
2927
2928                         has_not_null |= rel->rd_att->attrs[i]->attnotnull;
2929                 }
2930
2931                 /* mark not-null status */
2932                 if (has_not_null)
2933                 {
2934                         TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
2935
2936                         constr->has_not_null = true;
2937                         rel->rd_att->constr = constr;
2938                 }
2939
2940                 /* If it's an index, there's more to do */
2941                 if (rel->rd_rel->relkind == RELKIND_INDEX)
2942                 {
2943                         Form_pg_am      am;
2944                         MemoryContext indexcxt;
2945                         Oid                *operator;
2946                         RegProcedure *support;
2947                         int                     nsupport;
2948
2949                         /* Count nailed indexes to ensure we have 'em all */
2950                         if (rel->rd_isnailed)
2951                                 nailed_indexes++;
2952
2953                         /* next, read the pg_index tuple */
2954                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2955                                 goto read_failed;
2956
2957                         rel->rd_indextuple = (HeapTuple) palloc(len);
2958                         if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
2959                                 goto read_failed;
2960
2961                         /* Fix up internal pointers in the tuple -- see heap_copytuple */
2962                         rel->rd_indextuple->t_datamcxt = CurrentMemoryContext;
2963                         rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
2964                         rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
2965
2966                         /* fix up indclass pointer too */
2967                         indclassDatum = fastgetattr(rel->rd_indextuple,
2968                                                                                 Anum_pg_index_indclass,
2969                                                                                 GetPgIndexDescriptor(),
2970                                                                                 &isnull);
2971                         Assert(!isnull);
2972                         rel->rd_indclass = (oidvector *) DatumGetPointer(indclassDatum);
2973
2974                         /* next, read the access method tuple form */
2975                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2976                                 goto read_failed;
2977
2978                         am = (Form_pg_am) palloc(len);
2979                         if ((nread = fread(am, 1, len, fp)) != len)
2980                                 goto read_failed;
2981                         rel->rd_am = am;
2982
2983                         /*
2984                          * prepare index info context --- parameters should match
2985                          * RelationInitIndexAccessInfo
2986                          */
2987                         indexcxt = AllocSetContextCreate(CacheMemoryContext,
2988                                                                                          RelationGetRelationName(rel),
2989                                                                                          ALLOCSET_SMALL_MINSIZE,
2990                                                                                          ALLOCSET_SMALL_INITSIZE,
2991                                                                                          ALLOCSET_SMALL_MAXSIZE);
2992                         rel->rd_indexcxt = indexcxt;
2993
2994                         /* next, read the vector of operator OIDs */
2995                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
2996                                 goto read_failed;
2997
2998                         operator = (Oid *) MemoryContextAlloc(indexcxt, len);
2999                         if ((nread = fread(operator, 1, len, fp)) != len)
3000                                 goto read_failed;
3001
3002                         rel->rd_operator = operator;
3003
3004                         /* finally, read the vector of support procedures */
3005                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3006                                 goto read_failed;
3007                         support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
3008                         if ((nread = fread(support, 1, len, fp)) != len)
3009                                 goto read_failed;
3010
3011                         rel->rd_support = support;
3012
3013                         /* set up zeroed fmgr-info vectors */
3014                         rel->rd_aminfo = (RelationAmInfo *)
3015                                 MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
3016                         nsupport = relform->relnatts * am->amsupport;
3017                         rel->rd_supportinfo = (FmgrInfo *)
3018                                 MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
3019                 }
3020                 else
3021                 {
3022                         /* Count nailed rels to ensure we have 'em all */
3023                         if (rel->rd_isnailed)
3024                                 nailed_rels++;
3025
3026                         Assert(rel->rd_index == NULL);
3027                         Assert(rel->rd_indextuple == NULL);
3028                         Assert(rel->rd_indclass == NULL);
3029                         Assert(rel->rd_am == NULL);
3030                         Assert(rel->rd_indexcxt == NULL);
3031                         Assert(rel->rd_aminfo == NULL);
3032                         Assert(rel->rd_operator == NULL);
3033                         Assert(rel->rd_support == NULL);
3034                         Assert(rel->rd_supportinfo == NULL);
3035                 }
3036
3037                 /*
3038                  * Rules and triggers are not saved (mainly because the internal
3039                  * format is complex and subject to change).  They must be rebuilt
3040                  * if needed by RelationCacheInitializePhase2.  This is not
3041                  * expected to be a big performance hit since few system catalogs
3042                  * have such.  Ditto for index expressions and predicates.
3043                  */
3044                 rel->rd_rules = NULL;
3045                 rel->rd_rulescxt = NULL;
3046                 rel->trigdesc = NULL;
3047                 rel->rd_indexprs = NIL;
3048                 rel->rd_indpred = NIL;
3049
3050                 /*
3051                  * Reset transient-state fields in the relcache entry
3052                  */
3053                 rel->rd_smgr = NULL;
3054                 rel->rd_targblock = InvalidBlockNumber;
3055                 if (rel->rd_isnailed)
3056                         rel->rd_refcnt = 1;
3057                 else
3058                         rel->rd_refcnt = 0;
3059                 rel->rd_indexvalid = 0;
3060                 rel->rd_indexlist = NIL;
3061                 rel->rd_createSubid = InvalidSubTransactionId;
3062                 MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
3063
3064                 /*
3065                  * Recompute lock and physical addressing info.  This is needed in
3066                  * case the pg_internal.init file was copied from some other
3067                  * database by CREATE DATABASE.
3068                  */
3069                 RelationInitLockInfo(rel);
3070                 RelationInitPhysicalAddr(rel);
3071         }
3072
3073         /*
3074          * We reached the end of the init file without apparent problem. Did
3075          * we get the right number of nailed items?  (This is a useful
3076          * crosscheck in case the set of critical rels or indexes changes.)
3077          */
3078         if (nailed_rels != NUM_CRITICAL_RELS ||
3079                 nailed_indexes != NUM_CRITICAL_INDEXES)
3080                 goto read_failed;
3081
3082         /*
3083          * OK, all appears well.
3084          *
3085          * Now insert all the new relcache entries into the cache.
3086          */
3087         for (relno = 0; relno < num_rels; relno++)
3088         {
3089                 RelationCacheInsert(rels[relno]);
3090                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3091                 initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
3092                                                                                 initFileRelationIds);
3093         }
3094
3095         pfree(rels);
3096         FreeFile(fp);
3097
3098         criticalRelcachesBuilt = true;
3099         return true;
3100
3101         /*
3102          * init file is broken, so do it the hard way.  We don't bother trying
3103          * to free the clutter we just allocated; it's not in the relcache so
3104          * it won't hurt.
3105          */
3106 read_failed:
3107         pfree(rels);
3108         FreeFile(fp);
3109
3110         needNewCacheFile = true;
3111         return false;
3112 }
3113
3114 /*
3115  * Write out a new initialization file with the current contents
3116  * of the relcache.
3117  */
3118 static void
3119 write_relcache_init_file(void)
3120 {
3121         FILE       *fp;
3122         char            tempfilename[MAXPGPATH];
3123         char            finalfilename[MAXPGPATH];
3124         int                     magic;
3125         HASH_SEQ_STATUS status;
3126         RelIdCacheEnt *idhentry;
3127         MemoryContext oldcxt;
3128         int                     i;
3129
3130         /*
3131          * We must write a temporary file and rename it into place. Otherwise,
3132          * another backend starting at about the same time might crash trying
3133          * to read the partially-complete file.
3134          */
3135         snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
3136                          DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
3137         snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
3138                          DatabasePath, RELCACHE_INIT_FILENAME);
3139
3140         unlink(tempfilename);           /* in case it exists w/wrong permissions */
3141
3142         fp = AllocateFile(tempfilename, PG_BINARY_W);
3143         if (fp == NULL)
3144         {
3145                 /*
3146                  * We used to consider this a fatal error, but we might as well
3147                  * continue with backend startup ...
3148                  */
3149                 ereport(WARNING,
3150                                 (errcode_for_file_access(),
3151                                  errmsg("could not create relation-cache initialization file \"%s\": %m",
3152                                                 tempfilename),
3153                   errdetail("Continuing anyway, but there's something wrong.")));
3154                 return;
3155         }
3156
3157         /*
3158          * Write a magic number to serve as a file version identifier.  We can
3159          * change the magic number whenever the relcache layout changes.
3160          */
3161         magic = RELCACHE_INIT_FILEMAGIC;
3162         if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
3163                 elog(FATAL, "could not write init file");
3164
3165         /*
3166          * Write all the reldescs (in no particular order).
3167          */
3168         hash_seq_init(&status, RelationIdCache);
3169
3170         initFileRelationIds = NIL;
3171
3172         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3173         {
3174                 Relation        rel = idhentry->reldesc;
3175                 Form_pg_class relform = rel->rd_rel;
3176                 Size            len;
3177
3178                 /*
3179                  * first write the relcache entry proper
3180                  */
3181                 len = sizeof(RelationData);
3182
3183                 /* first, write the relation descriptor length */
3184                 if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3185                         elog(FATAL, "could not write init file");
3186
3187                 /* next, write out the Relation structure */
3188                 if (fwrite(rel, 1, len, fp) != len)
3189                         elog(FATAL, "could not write init file");
3190
3191                 /* next write the relation tuple form */
3192                 len = sizeof(FormData_pg_class);
3193                 if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3194                         elog(FATAL, "could not write init file");
3195
3196                 if (fwrite(relform, 1, len, fp) != len)
3197                         elog(FATAL, "could not write init file");
3198
3199                 /* next, do all the attribute tuple form data entries */
3200                 for (i = 0; i < relform->relnatts; i++)
3201                 {
3202                         len = ATTRIBUTE_TUPLE_SIZE;
3203                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3204                                 elog(FATAL, "could not write init file");
3205                         if (fwrite(rel->rd_att->attrs[i], 1, len, fp) != len)
3206                                 elog(FATAL, "could not write init file");
3207                 }
3208
3209                 /* If it's an index, there's more to do */
3210                 if (rel->rd_rel->relkind == RELKIND_INDEX)
3211                 {
3212                         Form_pg_am      am = rel->rd_am;
3213
3214                         /* write the pg_index tuple */
3215                         /* we assume this was created by heap_copytuple! */
3216                         len = HEAPTUPLESIZE + rel->rd_indextuple->t_len;
3217                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3218                                 elog(FATAL, "could not write init file");
3219
3220                         if (fwrite(rel->rd_indextuple, 1, len, fp) != len)
3221                                 elog(FATAL, "could not write init file");
3222
3223                         /* next, write the access method tuple form */
3224                         len = sizeof(FormData_pg_am);
3225                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3226                                 elog(FATAL, "could not write init file");
3227
3228                         if (fwrite(am, 1, len, fp) != len)
3229                                 elog(FATAL, "could not write init file");
3230
3231                         /* next, write the vector of operator OIDs */
3232                         len = relform->relnatts * (am->amstrategies * sizeof(Oid));
3233                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3234                                 elog(FATAL, "could not write init file");
3235
3236                         if (fwrite(rel->rd_operator, 1, len, fp) != len)
3237                                 elog(FATAL, "could not write init file");
3238
3239                         /* finally, write the vector of support procedures */
3240                         len = relform->relnatts * (am->amsupport * sizeof(RegProcedure));
3241                         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3242                                 elog(FATAL, "could not write init file");
3243
3244                         if (fwrite(rel->rd_support, 1, len, fp) != len)
3245                                 elog(FATAL, "could not write init file");
3246                 }
3247
3248                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3249                 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3250                 initFileRelationIds = lcons_oid(RelationGetRelid(rel),
3251                                                                                 initFileRelationIds);
3252                 MemoryContextSwitchTo(oldcxt);
3253         }
3254
3255         if (FreeFile(fp))
3256                 elog(FATAL, "could not write init file");
3257
3258         /*
3259          * Now we have to check whether the data we've so painstakingly
3260          * accumulated is already obsolete due to someone else's
3261          * just-committed catalog changes.      If so, we just delete the temp
3262          * file and leave it to the next backend to try again.  (Our own
3263          * relcache entries will be updated by SI message processing, but we
3264          * can't be sure whether what we wrote out was up-to-date.)
3265          *
3266          * This mustn't run concurrently with RelationCacheInitFileInvalidate, so
3267          * grab a serialization lock for the duration.
3268          */
3269         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3270
3271         /* Make sure we have seen all incoming SI messages */
3272         AcceptInvalidationMessages();
3273
3274         /*
3275          * If we have received any SI relcache invals since backend start,
3276          * assume we may have written out-of-date data.
3277          */
3278         if (relcacheInvalsReceived == 0L)
3279         {
3280                 /*
3281                  * OK, rename the temp file to its final name, deleting any
3282                  * previously-existing init file.
3283                  *
3284                  * Note: a failure here is possible under Cygwin, if some other
3285                  * backend is holding open an unlinked-but-not-yet-gone init file.
3286                  * So treat this as a noncritical failure; just remove the useless
3287                  * temp file on failure.
3288                  */
3289                 if (rename(tempfilename, finalfilename) < 0)
3290                         unlink(tempfilename);
3291         }
3292         else
3293         {
3294                 /* Delete the already-obsolete temp file */
3295                 unlink(tempfilename);
3296         }
3297
3298         LWLockRelease(RelCacheInitLock);
3299 }
3300
3301 /*
3302  * Detect whether a given relation (identified by OID) is one of the ones
3303  * we store in the init file.
3304  *
3305  * Note that we effectively assume that all backends running in a database
3306  * would choose to store the same set of relations in the init file;
3307  * otherwise there are cases where we'd fail to detect the need for an init
3308  * file invalidation.  This does not seem likely to be a problem in practice.
3309  */
3310 bool
3311 RelationIdIsInInitFile(Oid relationId)
3312 {
3313         return list_member_oid(initFileRelationIds, relationId);
3314 }
3315
3316 /*
3317  * Invalidate (remove) the init file during commit of a transaction that
3318  * changed one or more of the relation cache entries that are kept in the
3319  * init file.
3320  *
3321  * We actually need to remove the init file twice: once just before sending
3322  * the SI messages that include relcache inval for such relations, and once
3323  * just after sending them.  The unlink before ensures that a backend that's
3324  * currently starting cannot read the now-obsolete init file and then miss
3325  * the SI messages that will force it to update its relcache entries.  (This
3326  * works because the backend startup sequence gets into the PROC array before
3327  * trying to load the init file.)  The unlink after is to synchronize with a
3328  * backend that may currently be trying to write an init file based on data
3329  * that we've just rendered invalid.  Such a backend will see the SI messages,
3330  * but we can't leave the init file sitting around to fool later backends.
3331  *
3332  * Ignore any failure to unlink the file, since it might not be there if
3333  * no backend has been started since the last removal.
3334  */
3335 void
3336 RelationCacheInitFileInvalidate(bool beforeSend)
3337 {
3338         char            initfilename[MAXPGPATH];
3339
3340         snprintf(initfilename, sizeof(initfilename), "%s/%s",
3341                          DatabasePath, RELCACHE_INIT_FILENAME);
3342
3343         if (beforeSend)
3344         {
3345                 /* no interlock needed here */
3346                 unlink(initfilename);
3347         }
3348         else
3349         {
3350                 /*
3351                  * We need to interlock this against write_relcache_init_file, to
3352                  * guard against possibility that someone renames a new-but-
3353                  * already-obsolete init file into place just after we unlink.
3354                  * With the interlock, it's certain that write_relcache_init_file
3355                  * will notice our SI inval message before renaming into place, or
3356                  * else that we will execute second and successfully unlink the
3357                  * file.
3358                  */
3359                 LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3360                 unlink(initfilename);
3361                 LWLockRelease(RelCacheInitLock);
3362         }
3363 }