OSDN Git Service

Get rid of the separate RULE privilege for tables: now only a table's owner
[pg-rex/syncrep.git] / src / backend / utils / cache / relcache.c
1 /*-------------------------------------------------------------------------
2  *
3  * relcache.c
4  *        POSTGRES relation descriptor cache code
5  *
6  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/utils/cache/relcache.c,v 1.248 2006/09/05 21:08:36 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15 /*
16  * INTERFACE ROUTINES
17  *              RelationCacheInitialize                 - initialize relcache (to empty)
18  *              RelationCacheInitializePhase2   - finish initializing relcache
19  *              RelationIdGetRelation                   - get a reldesc by relation id
20  *              RelationClose                                   - close an open relation
21  *
22  * NOTES
23  *              The following code contains many undocumented hacks.  Please be
24  *              careful....
25  */
26 #include "postgres.h"
27
28 #include <sys/file.h>
29 #include <fcntl.h>
30 #include <unistd.h>
31
32 #include "access/genam.h"
33 #include "access/heapam.h"
34 #include "access/reloptions.h"
35 #include "access/xact.h"
36 #include "catalog/catalog.h"
37 #include "catalog/indexing.h"
38 #include "catalog/namespace.h"
39 #include "catalog/pg_amop.h"
40 #include "catalog/pg_amproc.h"
41 #include "catalog/pg_attrdef.h"
42 #include "catalog/pg_authid.h"
43 #include "catalog/pg_constraint.h"
44 #include "catalog/pg_namespace.h"
45 #include "catalog/pg_opclass.h"
46 #include "catalog/pg_proc.h"
47 #include "catalog/pg_rewrite.h"
48 #include "catalog/pg_type.h"
49 #include "commands/trigger.h"
50 #include "miscadmin.h"
51 #include "optimizer/clauses.h"
52 #include "optimizer/planmain.h"
53 #include "optimizer/prep.h"
54 #include "rewrite/rewriteDefine.h"
55 #include "storage/fd.h"
56 #include "storage/smgr.h"
57 #include "utils/builtins.h"
58 #include "utils/fmgroids.h"
59 #include "utils/inval.h"
60 #include "utils/memutils.h"
61 #include "utils/relcache.h"
62 #include "utils/resowner.h"
63 #include "utils/syscache.h"
64 #include "utils/typcache.h"
65
66
67 /*
68  * name of relcache init file, used to speed up backend startup
69  */
70 #define RELCACHE_INIT_FILENAME  "pg_internal.init"
71
72 #define RELCACHE_INIT_FILEMAGIC         0x573263        /* version ID value */
73
74 /*
75  *              hardcoded tuple descriptors.  see include/catalog/pg_attribute.h
76  */
77 static FormData_pg_attribute Desc_pg_class[Natts_pg_class] = {Schema_pg_class};
78 static FormData_pg_attribute Desc_pg_attribute[Natts_pg_attribute] = {Schema_pg_attribute};
79 static FormData_pg_attribute Desc_pg_proc[Natts_pg_proc] = {Schema_pg_proc};
80 static FormData_pg_attribute Desc_pg_type[Natts_pg_type] = {Schema_pg_type};
81 static FormData_pg_attribute Desc_pg_index[Natts_pg_index] = {Schema_pg_index};
82
83 /*
84  *              Hash tables that index the relation cache
85  *
86  *              We used to index the cache by both name and OID, but now there
87  *              is only an index by OID.
88  */
89 typedef struct relidcacheent
90 {
91         Oid                     reloid;
92         Relation        reldesc;
93 } RelIdCacheEnt;
94
95 static HTAB *RelationIdCache;
96
97 /*
98  * This flag is false until we have prepared the critical relcache entries
99  * that are needed to do indexscans on the tables read by relcache building.
100  */
101 bool            criticalRelcachesBuilt = false;
102
103 /*
104  * This counter counts relcache inval events received since backend startup
105  * (but only for rels that are actually in cache).      Presently, we use it only
106  * to detect whether data about to be written by write_relcache_init_file()
107  * might already be obsolete.
108  */
109 static long relcacheInvalsReceived = 0L;
110
111 /*
112  * This list remembers the OIDs of the relations cached in the relcache
113  * init file.
114  */
115 static List *initFileRelationIds = NIL;
116
117 /*
118  * This flag lets us optimize away work in AtEO(Sub)Xact_RelationCache().
119  */
120 static bool need_eoxact_work = false;
121
122
123 /*
124  *              macros to manipulate the lookup hashtables
125  */
126 #define RelationCacheInsert(RELATION)   \
127 do { \
128         RelIdCacheEnt *idhentry; bool found; \
129         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
130                                                                                    (void *) &(RELATION->rd_id), \
131                                                                                    HASH_ENTER, \
132                                                                                    &found); \
133         /* used to give notice if found -- now just keep quiet */ \
134         idhentry->reldesc = RELATION; \
135 } while(0)
136
137 #define RelationIdCacheLookup(ID, RELATION) \
138 do { \
139         RelIdCacheEnt *hentry; \
140         hentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
141                                                                                  (void *) &(ID), HASH_FIND,NULL); \
142         if (hentry) \
143                 RELATION = hentry->reldesc; \
144         else \
145                 RELATION = NULL; \
146 } while(0)
147
148 #define RelationCacheDelete(RELATION) \
149 do { \
150         RelIdCacheEnt *idhentry; \
151         idhentry = (RelIdCacheEnt*)hash_search(RelationIdCache, \
152                                                                                    (void *) &(RELATION->rd_id), \
153                                                                                    HASH_REMOVE, NULL); \
154         if (idhentry == NULL) \
155                 elog(WARNING, "trying to delete a rd_id reldesc that does not exist"); \
156 } while(0)
157
158
159 /*
160  * Special cache for opclass-related information
161  *
162  * Note: only default-subtype operators and support procs get cached
163  */
164 typedef struct opclasscacheent
165 {
166         Oid                     opclassoid;             /* lookup key: OID of opclass */
167         bool            valid;                  /* set TRUE after successful fill-in */
168         StrategyNumber numStrats;       /* max # of strategies (from pg_am) */
169         StrategyNumber numSupport;      /* max # of support procs (from pg_am) */
170         Oid                *operatorOids;       /* strategy operators' OIDs */
171         RegProcedure *supportProcs; /* support procs */
172 } OpClassCacheEnt;
173
174 static HTAB *OpClassCache = NULL;
175
176
177 /* non-export function prototypes */
178
179 static void RelationClearRelation(Relation relation, bool rebuild);
180
181 static void RelationReloadClassinfo(Relation relation);
182 static void RelationFlushRelation(Relation relation);
183 static bool load_relcache_init_file(void);
184 static void write_relcache_init_file(void);
185 static void     write_item(const void *data, Size len, FILE *fp);
186
187 static void formrdesc(const char *relationName, Oid relationReltype,
188                   bool hasoids, int natts, FormData_pg_attribute *att);
189
190 static HeapTuple ScanPgRelation(Oid targetRelId, bool indexOK);
191 static Relation AllocateRelationDesc(Relation relation, Form_pg_class relp);
192 static void RelationParseRelOptions(Relation relation, HeapTuple tuple);
193 static void RelationBuildTupleDesc(Relation relation);
194 static Relation RelationBuildDesc(Oid targetRelId, Relation oldrelation);
195 static void RelationInitPhysicalAddr(Relation relation);
196 static TupleDesc GetPgClassDescriptor(void);
197 static TupleDesc GetPgIndexDescriptor(void);
198 static void AttrDefaultFetch(Relation relation);
199 static void CheckConstraintFetch(Relation relation);
200 static List *insert_ordered_oid(List *list, Oid datum);
201 static void IndexSupportInitialize(oidvector *indclass,
202                                            Oid *indexOperator,
203                                            RegProcedure *indexSupport,
204                                            StrategyNumber maxStrategyNumber,
205                                            StrategyNumber maxSupportNumber,
206                                            AttrNumber maxAttributeNumber);
207 static OpClassCacheEnt *LookupOpclassInfo(Oid operatorClassOid,
208                                   StrategyNumber numStrats,
209                                   StrategyNumber numSupport);
210
211
212 /*
213  *              ScanPgRelation
214  *
215  *              this is used by RelationBuildDesc to find a pg_class
216  *              tuple matching targetRelId.
217  *
218  *              NB: the returned tuple has been copied into palloc'd storage
219  *              and must eventually be freed with heap_freetuple.
220  */
221 static HeapTuple
222 ScanPgRelation(Oid targetRelId, bool indexOK)
223 {
224         HeapTuple       pg_class_tuple;
225         Relation        pg_class_desc;
226         SysScanDesc pg_class_scan;
227         ScanKeyData key[1];
228
229         /*
230          * form a scan key
231          */
232         ScanKeyInit(&key[0],
233                                 ObjectIdAttributeNumber,
234                                 BTEqualStrategyNumber, F_OIDEQ,
235                                 ObjectIdGetDatum(targetRelId));
236
237         /*
238          * Open pg_class and fetch a tuple.  Force heap scan if we haven't yet
239          * built the critical relcache entries (this includes initdb and startup
240          * without a pg_internal.init file).  The caller can also force a heap
241          * scan by setting indexOK == false.
242          */
243         pg_class_desc = heap_open(RelationRelationId, AccessShareLock);
244         pg_class_scan = systable_beginscan(pg_class_desc, ClassOidIndexId,
245                                                                            indexOK && criticalRelcachesBuilt,
246                                                                            SnapshotNow,
247                                                                            1, key);
248
249         pg_class_tuple = systable_getnext(pg_class_scan);
250
251         /*
252          * Must copy tuple before releasing buffer.
253          */
254         if (HeapTupleIsValid(pg_class_tuple))
255                 pg_class_tuple = heap_copytuple(pg_class_tuple);
256
257         /* all done */
258         systable_endscan(pg_class_scan);
259         heap_close(pg_class_desc, AccessShareLock);
260
261         return pg_class_tuple;
262 }
263
264 /*
265  *              AllocateRelationDesc
266  *
267  *              This is used to allocate memory for a new relation descriptor
268  *              and initialize the rd_rel field.
269  *
270  *              If 'relation' is NULL, allocate a new RelationData object.
271  *              If not, reuse the given object (that path is taken only when
272  *              we have to rebuild a relcache entry during RelationClearRelation).
273  */
274 static Relation
275 AllocateRelationDesc(Relation relation, Form_pg_class relp)
276 {
277         MemoryContext oldcxt;
278         Form_pg_class relationForm;
279
280         /* Relcache entries must live in CacheMemoryContext */
281         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
282
283         /*
284          * allocate space for new relation descriptor, if needed
285          */
286         if (relation == NULL)
287                 relation = (Relation) palloc(sizeof(RelationData));
288
289         /*
290          * clear all fields of reldesc
291          */
292         MemSet(relation, 0, sizeof(RelationData));
293         relation->rd_targblock = InvalidBlockNumber;
294
295         /* make sure relation is marked as having no open file yet */
296         relation->rd_smgr = NULL;
297
298         /*
299          * Copy the relation tuple form
300          *
301          * We only allocate space for the fixed fields, ie, CLASS_TUPLE_SIZE.
302          * The variable-length fields (relacl, reloptions) are NOT stored in the
303          * relcache --- there'd be little point in it, since we don't copy the
304          * tuple's nulls bitmap and hence wouldn't know if the values are valid.
305          * Bottom line is that relacl *cannot* be retrieved from the relcache.
306          * Get it from the syscache if you need it.  The same goes for the
307          * original form of reloptions (however, we do store the parsed form
308          * of reloptions in rd_options).
309          */
310         relationForm = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
311
312         memcpy(relationForm, relp, CLASS_TUPLE_SIZE);
313
314         /* initialize relation tuple form */
315         relation->rd_rel = relationForm;
316
317         /* and allocate attribute tuple form storage */
318         relation->rd_att = CreateTemplateTupleDesc(relationForm->relnatts,
319                                                                                            relationForm->relhasoids);
320         /* which we mark as a reference-counted tupdesc */
321         relation->rd_att->tdrefcount = 1;
322
323         MemoryContextSwitchTo(oldcxt);
324
325         return relation;
326 }
327
328 /*
329  * RelationParseRelOptions
330  *              Convert pg_class.reloptions into pre-parsed rd_options
331  *
332  * tuple is the real pg_class tuple (not rd_rel!) for relation
333  *
334  * Note: rd_rel and (if an index) rd_am must be valid already
335  */
336 static void
337 RelationParseRelOptions(Relation relation, HeapTuple tuple)
338 {
339         Datum           datum;
340         bool            isnull;
341         bytea      *options;
342
343         relation->rd_options = NULL;
344
345         /* Fall out if relkind should not have options */
346         switch (relation->rd_rel->relkind)
347         {
348                 case RELKIND_RELATION:
349                 case RELKIND_TOASTVALUE:
350                 case RELKIND_UNCATALOGED:
351                 case RELKIND_INDEX:
352                         break;
353                 default:
354                         return;
355         }
356
357         /*
358          * Fetch reloptions from tuple; have to use a hardwired descriptor
359          * because we might not have any other for pg_class yet (consider
360          * executing this code for pg_class itself)
361          */
362         datum = fastgetattr(tuple,
363                                                 Anum_pg_class_reloptions,
364                                                 GetPgClassDescriptor(),
365                                                 &isnull);
366         if (isnull)
367                 return;
368
369         /* Parse into appropriate format; don't error out here */
370         switch (relation->rd_rel->relkind)
371         {
372                 case RELKIND_RELATION:
373                 case RELKIND_TOASTVALUE:
374                 case RELKIND_UNCATALOGED:
375                         options = heap_reloptions(relation->rd_rel->relkind, datum,
376                                                                           false);
377                         break;
378                 case RELKIND_INDEX:
379                         options = index_reloptions(relation->rd_am->amoptions, datum,
380                                                                            false);
381                         break;
382                 default:
383                         Assert(false);          /* can't get here */
384                         options = NULL;         /* keep compiler quiet */
385                         break;
386         }
387
388         /* Copy parsed data into CacheMemoryContext */
389         if (options)
390         {
391                 relation->rd_options = MemoryContextAlloc(CacheMemoryContext,
392                                                                                                   VARSIZE(options));
393                 memcpy(relation->rd_options, options, VARSIZE(options));
394         }
395 }
396
397 /*
398  *              RelationBuildTupleDesc
399  *
400  *              Form the relation's tuple descriptor from information in
401  *              the pg_attribute, pg_attrdef & pg_constraint system catalogs.
402  */
403 static void
404 RelationBuildTupleDesc(Relation relation)
405 {
406         HeapTuple       pg_attribute_tuple;
407         Relation        pg_attribute_desc;
408         SysScanDesc pg_attribute_scan;
409         ScanKeyData skey[2];
410         int                     need;
411         TupleConstr *constr;
412         AttrDefault *attrdef = NULL;
413         int                     ndef = 0;
414
415         /* copy some fields from pg_class row to rd_att */
416         relation->rd_att->tdtypeid = relation->rd_rel->reltype;
417         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
418         relation->rd_att->tdhasoid = relation->rd_rel->relhasoids;
419
420         constr = (TupleConstr *) MemoryContextAlloc(CacheMemoryContext,
421                                                                                                 sizeof(TupleConstr));
422         constr->has_not_null = false;
423
424         /*
425          * Form a scan key that selects only user attributes (attnum > 0).
426          * (Eliminating system attribute rows at the index level is lots faster
427          * than fetching them.)
428          */
429         ScanKeyInit(&skey[0],
430                                 Anum_pg_attribute_attrelid,
431                                 BTEqualStrategyNumber, F_OIDEQ,
432                                 ObjectIdGetDatum(RelationGetRelid(relation)));
433         ScanKeyInit(&skey[1],
434                                 Anum_pg_attribute_attnum,
435                                 BTGreaterStrategyNumber, F_INT2GT,
436                                 Int16GetDatum(0));
437
438         /*
439          * Open pg_attribute and begin a scan.  Force heap scan if we haven't yet
440          * built the critical relcache entries (this includes initdb and startup
441          * without a pg_internal.init file).
442          */
443         pg_attribute_desc = heap_open(AttributeRelationId, AccessShareLock);
444         pg_attribute_scan = systable_beginscan(pg_attribute_desc,
445                                                                                    AttributeRelidNumIndexId,
446                                                                                    criticalRelcachesBuilt,
447                                                                                    SnapshotNow,
448                                                                                    2, skey);
449
450         /*
451          * add attribute data to relation->rd_att
452          */
453         need = relation->rd_rel->relnatts;
454
455         while (HeapTupleIsValid(pg_attribute_tuple = systable_getnext(pg_attribute_scan)))
456         {
457                 Form_pg_attribute attp;
458
459                 attp = (Form_pg_attribute) GETSTRUCT(pg_attribute_tuple);
460
461                 if (attp->attnum <= 0 ||
462                         attp->attnum > relation->rd_rel->relnatts)
463                         elog(ERROR, "invalid attribute number %d for %s",
464                                  attp->attnum, RelationGetRelationName(relation));
465
466                 memcpy(relation->rd_att->attrs[attp->attnum - 1],
467                            attp,
468                            ATTRIBUTE_TUPLE_SIZE);
469
470                 /* Update constraint/default info */
471                 if (attp->attnotnull)
472                         constr->has_not_null = true;
473
474                 if (attp->atthasdef)
475                 {
476                         if (attrdef == NULL)
477                                 attrdef = (AttrDefault *)
478                                         MemoryContextAllocZero(CacheMemoryContext,
479                                                                                    relation->rd_rel->relnatts *
480                                                                                    sizeof(AttrDefault));
481                         attrdef[ndef].adnum = attp->attnum;
482                         attrdef[ndef].adbin = NULL;
483                         ndef++;
484                 }
485                 need--;
486                 if (need == 0)
487                         break;
488         }
489
490         /*
491          * end the scan and close the attribute relation
492          */
493         systable_endscan(pg_attribute_scan);
494         heap_close(pg_attribute_desc, AccessShareLock);
495
496         if (need != 0)
497                 elog(ERROR, "catalog is missing %d attribute(s) for relid %u",
498                          need, RelationGetRelid(relation));
499
500         /*
501          * The attcacheoff values we read from pg_attribute should all be -1
502          * ("unknown").  Verify this if assert checking is on.  They will be
503          * computed when and if needed during tuple access.
504          */
505 #ifdef USE_ASSERT_CHECKING
506         {
507                 int                     i;
508
509                 for (i = 0; i < relation->rd_rel->relnatts; i++)
510                         Assert(relation->rd_att->attrs[i]->attcacheoff == -1);
511         }
512 #endif
513
514         /*
515          * However, we can easily set the attcacheoff value for the first
516          * attribute: it must be zero.  This eliminates the need for special cases
517          * for attnum=1 that used to exist in fastgetattr() and index_getattr().
518          */
519         if (relation->rd_rel->relnatts > 0)
520                 relation->rd_att->attrs[0]->attcacheoff = 0;
521
522         /*
523          * Set up constraint/default info
524          */
525         if (constr->has_not_null || ndef > 0 || relation->rd_rel->relchecks)
526         {
527                 relation->rd_att->constr = constr;
528
529                 if (ndef > 0)                   /* DEFAULTs */
530                 {
531                         if (ndef < relation->rd_rel->relnatts)
532                                 constr->defval = (AttrDefault *)
533                                         repalloc(attrdef, ndef * sizeof(AttrDefault));
534                         else
535                                 constr->defval = attrdef;
536                         constr->num_defval = ndef;
537                         AttrDefaultFetch(relation);
538                 }
539                 else
540                         constr->num_defval = 0;
541
542                 if (relation->rd_rel->relchecks > 0)    /* CHECKs */
543                 {
544                         constr->num_check = relation->rd_rel->relchecks;
545                         constr->check = (ConstrCheck *)
546                                 MemoryContextAllocZero(CacheMemoryContext,
547                                                                         constr->num_check * sizeof(ConstrCheck));
548                         CheckConstraintFetch(relation);
549                 }
550                 else
551                         constr->num_check = 0;
552         }
553         else
554         {
555                 pfree(constr);
556                 relation->rd_att->constr = NULL;
557         }
558 }
559
560 /*
561  *              RelationBuildRuleLock
562  *
563  *              Form the relation's rewrite rules from information in
564  *              the pg_rewrite system catalog.
565  *
566  * Note: The rule parsetrees are potentially very complex node structures.
567  * To allow these trees to be freed when the relcache entry is flushed,
568  * we make a private memory context to hold the RuleLock information for
569  * each relcache entry that has associated rules.  The context is used
570  * just for rule info, not for any other subsidiary data of the relcache
571  * entry, because that keeps the update logic in RelationClearRelation()
572  * manageable.  The other subsidiary data structures are simple enough
573  * to be easy to free explicitly, anyway.
574  */
575 static void
576 RelationBuildRuleLock(Relation relation)
577 {
578         MemoryContext rulescxt;
579         MemoryContext oldcxt;
580         HeapTuple       rewrite_tuple;
581         Relation        rewrite_desc;
582         TupleDesc       rewrite_tupdesc;
583         SysScanDesc rewrite_scan;
584         ScanKeyData key;
585         RuleLock   *rulelock;
586         int                     numlocks;
587         RewriteRule **rules;
588         int                     maxlocks;
589
590         /*
591          * Make the private context.  Parameters are set on the assumption that
592          * it'll probably not contain much data.
593          */
594         rulescxt = AllocSetContextCreate(CacheMemoryContext,
595                                                                          RelationGetRelationName(relation),
596                                                                          ALLOCSET_SMALL_MINSIZE,
597                                                                          ALLOCSET_SMALL_INITSIZE,
598                                                                          ALLOCSET_SMALL_MAXSIZE);
599         relation->rd_rulescxt = rulescxt;
600
601         /*
602          * allocate an array to hold the rewrite rules (the array is extended if
603          * necessary)
604          */
605         maxlocks = 4;
606         rules = (RewriteRule **)
607                 MemoryContextAlloc(rulescxt, sizeof(RewriteRule *) * maxlocks);
608         numlocks = 0;
609
610         /*
611          * form a scan key
612          */
613         ScanKeyInit(&key,
614                                 Anum_pg_rewrite_ev_class,
615                                 BTEqualStrategyNumber, F_OIDEQ,
616                                 ObjectIdGetDatum(RelationGetRelid(relation)));
617
618         /*
619          * open pg_rewrite and begin a scan
620          *
621          * Note: since we scan the rules using RewriteRelRulenameIndexId, we will
622          * be reading the rules in name order, except possibly during
623          * emergency-recovery operations (ie, IgnoreSystemIndexes). This in
624          * turn ensures that rules will be fired in name order.
625          */
626         rewrite_desc = heap_open(RewriteRelationId, AccessShareLock);
627         rewrite_tupdesc = RelationGetDescr(rewrite_desc);
628         rewrite_scan = systable_beginscan(rewrite_desc,
629                                                                           RewriteRelRulenameIndexId,
630                                                                           true, SnapshotNow,
631                                                                           1, &key);
632
633         while (HeapTupleIsValid(rewrite_tuple = systable_getnext(rewrite_scan)))
634         {
635                 Form_pg_rewrite rewrite_form = (Form_pg_rewrite) GETSTRUCT(rewrite_tuple);
636                 bool            isnull;
637                 Datum           rule_datum;
638                 text       *rule_text;
639                 char       *rule_str;
640                 RewriteRule *rule;
641
642                 rule = (RewriteRule *) MemoryContextAlloc(rulescxt,
643                                                                                                   sizeof(RewriteRule));
644
645                 rule->ruleId = HeapTupleGetOid(rewrite_tuple);
646
647                 rule->event = rewrite_form->ev_type - '0';
648                 rule->attrno = rewrite_form->ev_attr;
649                 rule->isInstead = rewrite_form->is_instead;
650
651                 /*
652                  * Must use heap_getattr to fetch ev_action and ev_qual.  Also,
653                  * the rule strings are often large enough to be toasted.  To avoid
654                  * leaking memory in the caller's context, do the detoasting here
655                  * so we can free the detoasted version.
656                  */
657                 rule_datum = heap_getattr(rewrite_tuple,
658                                                                   Anum_pg_rewrite_ev_action,
659                                                                   rewrite_tupdesc,
660                                                                   &isnull);
661                 Assert(!isnull);
662                 rule_text = DatumGetTextP(rule_datum);
663                 rule_str = DatumGetCString(DirectFunctionCall1(textout,
664                                                                                                 PointerGetDatum(rule_text)));
665                 oldcxt = MemoryContextSwitchTo(rulescxt);
666                 rule->actions = (List *) stringToNode(rule_str);
667                 MemoryContextSwitchTo(oldcxt);
668                 pfree(rule_str);
669                 if ((Pointer) rule_text != DatumGetPointer(rule_datum))
670                         pfree(rule_text);
671
672                 rule_datum = heap_getattr(rewrite_tuple,
673                                                                   Anum_pg_rewrite_ev_qual,
674                                                                   rewrite_tupdesc,
675                                                                   &isnull);
676                 Assert(!isnull);
677                 rule_text = DatumGetTextP(rule_datum);
678                 rule_str = DatumGetCString(DirectFunctionCall1(textout,
679                                                                                                 PointerGetDatum(rule_text)));
680                 oldcxt = MemoryContextSwitchTo(rulescxt);
681                 rule->qual = (Node *) stringToNode(rule_str);
682                 MemoryContextSwitchTo(oldcxt);
683                 pfree(rule_str);
684                 if ((Pointer) rule_text != DatumGetPointer(rule_datum))
685                         pfree(rule_text);
686
687                 /*
688                  * We want the rule's table references to be checked as though by the
689                  * table owner, not the user referencing the rule.  Therefore, scan
690                  * through the rule's actions and set the checkAsUser field on all
691                  * rtable entries.  We have to look at the qual as well, in case it
692                  * contains sublinks.
693                  *
694                  * The reason for doing this when the rule is loaded, rather than
695                  * when it is stored, is that otherwise ALTER TABLE OWNER would have
696                  * to grovel through stored rules to update checkAsUser fields.
697                  * Scanning the rule tree during load is relatively cheap (compared
698                  * to constructing it in the first place), so we do it here.
699                  */
700                 setRuleCheckAsUser((Node *) rule->actions, relation->rd_rel->relowner);
701                 setRuleCheckAsUser(rule->qual, relation->rd_rel->relowner);
702
703                 if (numlocks >= maxlocks)
704                 {
705                         maxlocks *= 2;
706                         rules = (RewriteRule **)
707                                 repalloc(rules, sizeof(RewriteRule *) * maxlocks);
708                 }
709                 rules[numlocks++] = rule;
710         }
711
712         /*
713          * end the scan and close the attribute relation
714          */
715         systable_endscan(rewrite_scan);
716         heap_close(rewrite_desc, AccessShareLock);
717
718         /*
719          * form a RuleLock and insert into relation
720          */
721         rulelock = (RuleLock *) MemoryContextAlloc(rulescxt, sizeof(RuleLock));
722         rulelock->numLocks = numlocks;
723         rulelock->rules = rules;
724
725         relation->rd_rules = rulelock;
726 }
727
728 /*
729  *              equalRuleLocks
730  *
731  *              Determine whether two RuleLocks are equivalent
732  *
733  *              Probably this should be in the rules code someplace...
734  */
735 static bool
736 equalRuleLocks(RuleLock *rlock1, RuleLock *rlock2)
737 {
738         int                     i;
739
740         /*
741          * As of 7.3 we assume the rule ordering is repeatable, because
742          * RelationBuildRuleLock should read 'em in a consistent order.  So just
743          * compare corresponding slots.
744          */
745         if (rlock1 != NULL)
746         {
747                 if (rlock2 == NULL)
748                         return false;
749                 if (rlock1->numLocks != rlock2->numLocks)
750                         return false;
751                 for (i = 0; i < rlock1->numLocks; i++)
752                 {
753                         RewriteRule *rule1 = rlock1->rules[i];
754                         RewriteRule *rule2 = rlock2->rules[i];
755
756                         if (rule1->ruleId != rule2->ruleId)
757                                 return false;
758                         if (rule1->event != rule2->event)
759                                 return false;
760                         if (rule1->attrno != rule2->attrno)
761                                 return false;
762                         if (rule1->isInstead != rule2->isInstead)
763                                 return false;
764                         if (!equal(rule1->qual, rule2->qual))
765                                 return false;
766                         if (!equal(rule1->actions, rule2->actions))
767                                 return false;
768                 }
769         }
770         else if (rlock2 != NULL)
771                 return false;
772         return true;
773 }
774
775
776 /* ----------------------------------
777  *              RelationBuildDesc
778  *
779  *              Build a relation descriptor --- either a new one, or by
780  *              recycling the given old relation object.  The latter case
781  *              supports rebuilding a relcache entry without invalidating
782  *              pointers to it.
783  *
784  *              Returns NULL if no pg_class row could be found for the given relid
785  *              (suggesting we are trying to access a just-deleted relation).
786  *              Any other error is reported via elog.
787  * --------------------------------
788  */
789 static Relation
790 RelationBuildDesc(Oid targetRelId, Relation oldrelation)
791 {
792         Relation        relation;
793         Oid                     relid;
794         HeapTuple       pg_class_tuple;
795         Form_pg_class relp;
796         MemoryContext oldcxt;
797
798         /*
799          * find the tuple in pg_class corresponding to the given relation id
800          */
801         pg_class_tuple = ScanPgRelation(targetRelId, true);
802
803         /*
804          * if no such tuple exists, return NULL
805          */
806         if (!HeapTupleIsValid(pg_class_tuple))
807                 return NULL;
808
809         /*
810          * get information from the pg_class_tuple
811          */
812         relid = HeapTupleGetOid(pg_class_tuple);
813         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
814
815         /*
816          * allocate storage for the relation descriptor, and copy pg_class_tuple
817          * to relation->rd_rel.
818          */
819         relation = AllocateRelationDesc(oldrelation, relp);
820
821         /*
822          * initialize the relation's relation id (relation->rd_id)
823          */
824         RelationGetRelid(relation) = relid;
825
826         /*
827          * normal relations are not nailed into the cache; nor can a pre-existing
828          * relation be new.  It could be temp though.  (Actually, it could be new
829          * too, but it's okay to forget that fact if forced to flush the entry.)
830          */
831         relation->rd_refcnt = 0;
832         relation->rd_isnailed = false;
833         relation->rd_createSubid = InvalidSubTransactionId;
834         relation->rd_istemp = isTempNamespace(relation->rd_rel->relnamespace);
835
836         /*
837          * initialize the tuple descriptor (relation->rd_att).
838          */
839         RelationBuildTupleDesc(relation);
840
841         /*
842          * Fetch rules and triggers that affect this relation
843          */
844         if (relation->rd_rel->relhasrules)
845                 RelationBuildRuleLock(relation);
846         else
847         {
848                 relation->rd_rules = NULL;
849                 relation->rd_rulescxt = NULL;
850         }
851
852         if (relation->rd_rel->reltriggers > 0)
853                 RelationBuildTriggers(relation);
854         else
855                 relation->trigdesc = NULL;
856
857         /*
858          * if it's an index, initialize index-related information
859          */
860         if (OidIsValid(relation->rd_rel->relam))
861                 RelationInitIndexAccessInfo(relation);
862
863         /* extract reloptions if any */
864         RelationParseRelOptions(relation, pg_class_tuple);
865
866         /*
867          * initialize the relation lock manager information
868          */
869         RelationInitLockInfo(relation);         /* see lmgr.c */
870
871         /*
872          * initialize physical addressing information for the relation
873          */
874         RelationInitPhysicalAddr(relation);
875
876         /* make sure relation is marked as having no open file yet */
877         relation->rd_smgr = NULL;
878
879         /*
880          * now we can free the memory allocated for pg_class_tuple
881          */
882         heap_freetuple(pg_class_tuple);
883
884         /*
885          * Insert newly created relation into relcache hash tables.
886          */
887         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
888         RelationCacheInsert(relation);
889         MemoryContextSwitchTo(oldcxt);
890
891         /* It's fully valid */
892         relation->rd_isvalid = true;
893
894         return relation;
895 }
896
897 /*
898  * Initialize the physical addressing info (RelFileNode) for a relcache entry
899  */
900 static void
901 RelationInitPhysicalAddr(Relation relation)
902 {
903         if (relation->rd_rel->reltablespace)
904                 relation->rd_node.spcNode = relation->rd_rel->reltablespace;
905         else
906                 relation->rd_node.spcNode = MyDatabaseTableSpace;
907         if (relation->rd_rel->relisshared)
908                 relation->rd_node.dbNode = InvalidOid;
909         else
910                 relation->rd_node.dbNode = MyDatabaseId;
911         relation->rd_node.relNode = relation->rd_rel->relfilenode;
912 }
913
914 /*
915  * Initialize index-access-method support data for an index relation
916  */
917 void
918 RelationInitIndexAccessInfo(Relation relation)
919 {
920         HeapTuple       tuple;
921         Form_pg_am      aform;
922         Datum           indclassDatum;
923         bool            isnull;
924         MemoryContext indexcxt;
925         MemoryContext oldcontext;
926         Oid                *operator;
927         RegProcedure *support;
928         FmgrInfo   *supportinfo;
929         int                     natts;
930         uint16          amstrategies;
931         uint16          amsupport;
932
933         /*
934          * Make a copy of the pg_index entry for the index.  Since pg_index
935          * contains variable-length and possibly-null fields, we have to do this
936          * honestly rather than just treating it as a Form_pg_index struct.
937          */
938         tuple = SearchSysCache(INDEXRELID,
939                                                    ObjectIdGetDatum(RelationGetRelid(relation)),
940                                                    0, 0, 0);
941         if (!HeapTupleIsValid(tuple))
942                 elog(ERROR, "cache lookup failed for index %u",
943                          RelationGetRelid(relation));
944         oldcontext = MemoryContextSwitchTo(CacheMemoryContext);
945         relation->rd_indextuple = heap_copytuple(tuple);
946         relation->rd_index = (Form_pg_index) GETSTRUCT(relation->rd_indextuple);
947         MemoryContextSwitchTo(oldcontext);
948         ReleaseSysCache(tuple);
949
950         /*
951          * indclass cannot be referenced directly through the C struct, because it
952          * is after the variable-width indkey field.  Therefore we extract the
953          * datum the hard way and provide a direct link in the relcache.
954          */
955         indclassDatum = fastgetattr(relation->rd_indextuple,
956                                                                 Anum_pg_index_indclass,
957                                                                 GetPgIndexDescriptor(),
958                                                                 &isnull);
959         Assert(!isnull);
960         relation->rd_indclass = (oidvector *) DatumGetPointer(indclassDatum);
961
962         /*
963          * Make a copy of the pg_am entry for the index's access method
964          */
965         tuple = SearchSysCache(AMOID,
966                                                    ObjectIdGetDatum(relation->rd_rel->relam),
967                                                    0, 0, 0);
968         if (!HeapTupleIsValid(tuple))
969                 elog(ERROR, "cache lookup failed for access method %u",
970                          relation->rd_rel->relam);
971         aform = (Form_pg_am) MemoryContextAlloc(CacheMemoryContext, sizeof *aform);
972         memcpy(aform, GETSTRUCT(tuple), sizeof *aform);
973         ReleaseSysCache(tuple);
974         relation->rd_am = aform;
975
976         natts = relation->rd_rel->relnatts;
977         if (natts != relation->rd_index->indnatts)
978                 elog(ERROR, "relnatts disagrees with indnatts for index %u",
979                          RelationGetRelid(relation));
980         amstrategies = aform->amstrategies;
981         amsupport = aform->amsupport;
982
983         /*
984          * Make the private context to hold index access info.  The reason we need
985          * a context, and not just a couple of pallocs, is so that we won't leak
986          * any subsidiary info attached to fmgr lookup records.
987          *
988          * Context parameters are set on the assumption that it'll probably not
989          * contain much data.
990          */
991         indexcxt = AllocSetContextCreate(CacheMemoryContext,
992                                                                          RelationGetRelationName(relation),
993                                                                          ALLOCSET_SMALL_MINSIZE,
994                                                                          ALLOCSET_SMALL_INITSIZE,
995                                                                          ALLOCSET_SMALL_MAXSIZE);
996         relation->rd_indexcxt = indexcxt;
997
998         /*
999          * Allocate arrays to hold data
1000          */
1001         relation->rd_aminfo = (RelationAmInfo *)
1002                 MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
1003
1004         if (amstrategies > 0)
1005                 operator = (Oid *)
1006                         MemoryContextAllocZero(indexcxt,
1007                                                                    natts * amstrategies * sizeof(Oid));
1008         else
1009                 operator = NULL;
1010
1011         if (amsupport > 0)
1012         {
1013                 int                     nsupport = natts * amsupport;
1014
1015                 support = (RegProcedure *)
1016                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(RegProcedure));
1017                 supportinfo = (FmgrInfo *)
1018                         MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
1019         }
1020         else
1021         {
1022                 support = NULL;
1023                 supportinfo = NULL;
1024         }
1025
1026         relation->rd_operator = operator;
1027         relation->rd_support = support;
1028         relation->rd_supportinfo = supportinfo;
1029
1030         /*
1031          * Fill the operator and support procedure OID arrays.  (aminfo and
1032          * supportinfo are left as zeroes, and are filled on-the-fly when used)
1033          */
1034         IndexSupportInitialize(relation->rd_indclass,
1035                                                    operator, support,
1036                                                    amstrategies, amsupport, natts);
1037
1038         /*
1039          * expressions and predicate cache will be filled later
1040          */
1041         relation->rd_indexprs = NIL;
1042         relation->rd_indpred = NIL;
1043         relation->rd_amcache = NULL;
1044 }
1045
1046 /*
1047  * IndexSupportInitialize
1048  *              Initializes an index's cached opclass information,
1049  *              given the index's pg_index.indclass entry.
1050  *
1051  * Data is returned into *indexOperator and *indexSupport, which are arrays
1052  * allocated by the caller.
1053  *
1054  * The caller also passes maxStrategyNumber, maxSupportNumber, and
1055  * maxAttributeNumber, since these indicate the size of the arrays
1056  * it has allocated --- but in practice these numbers must always match
1057  * those obtainable from the system catalog entries for the index and
1058  * access method.
1059  */
1060 static void
1061 IndexSupportInitialize(oidvector *indclass,
1062                                            Oid *indexOperator,
1063                                            RegProcedure *indexSupport,
1064                                            StrategyNumber maxStrategyNumber,
1065                                            StrategyNumber maxSupportNumber,
1066                                            AttrNumber maxAttributeNumber)
1067 {
1068         int                     attIndex;
1069
1070         for (attIndex = 0; attIndex < maxAttributeNumber; attIndex++)
1071         {
1072                 OpClassCacheEnt *opcentry;
1073
1074                 if (!OidIsValid(indclass->values[attIndex]))
1075                         elog(ERROR, "bogus pg_index tuple");
1076
1077                 /* look up the info for this opclass, using a cache */
1078                 opcentry = LookupOpclassInfo(indclass->values[attIndex],
1079                                                                          maxStrategyNumber,
1080                                                                          maxSupportNumber);
1081
1082                 /* copy cached data into relcache entry */
1083                 if (maxStrategyNumber > 0)
1084                         memcpy(&indexOperator[attIndex * maxStrategyNumber],
1085                                    opcentry->operatorOids,
1086                                    maxStrategyNumber * sizeof(Oid));
1087                 if (maxSupportNumber > 0)
1088                         memcpy(&indexSupport[attIndex * maxSupportNumber],
1089                                    opcentry->supportProcs,
1090                                    maxSupportNumber * sizeof(RegProcedure));
1091         }
1092 }
1093
1094 /*
1095  * LookupOpclassInfo
1096  *
1097  * This routine maintains a per-opclass cache of the information needed
1098  * by IndexSupportInitialize().  This is more efficient than relying on
1099  * the catalog cache, because we can load all the info about a particular
1100  * opclass in a single indexscan of pg_amproc or pg_amop.
1101  *
1102  * The information from pg_am about expected range of strategy and support
1103  * numbers is passed in, rather than being looked up, mainly because the
1104  * caller will have it already.
1105  *
1106  * XXX There isn't any provision for flushing the cache.  However, there
1107  * isn't any provision for flushing relcache entries when opclass info
1108  * changes, either :-(
1109  */
1110 static OpClassCacheEnt *
1111 LookupOpclassInfo(Oid operatorClassOid,
1112                                   StrategyNumber numStrats,
1113                                   StrategyNumber numSupport)
1114 {
1115         OpClassCacheEnt *opcentry;
1116         bool            found;
1117         Relation        rel;
1118         SysScanDesc scan;
1119         ScanKeyData skey[2];
1120         HeapTuple       htup;
1121         bool            indexOK;
1122
1123         if (OpClassCache == NULL)
1124         {
1125                 /* First time through: initialize the opclass cache */
1126                 HASHCTL         ctl;
1127
1128                 if (!CacheMemoryContext)
1129                         CreateCacheMemoryContext();
1130
1131                 MemSet(&ctl, 0, sizeof(ctl));
1132                 ctl.keysize = sizeof(Oid);
1133                 ctl.entrysize = sizeof(OpClassCacheEnt);
1134                 ctl.hash = oid_hash;
1135                 OpClassCache = hash_create("Operator class cache", 64,
1136                                                                    &ctl, HASH_ELEM | HASH_FUNCTION);
1137         }
1138
1139         opcentry = (OpClassCacheEnt *) hash_search(OpClassCache,
1140                                                                                            (void *) &operatorClassOid,
1141                                                                                            HASH_ENTER, &found);
1142
1143         if (found && opcentry->valid)
1144         {
1145                 /* Already made an entry for it */
1146                 Assert(numStrats == opcentry->numStrats);
1147                 Assert(numSupport == opcentry->numSupport);
1148                 return opcentry;
1149         }
1150
1151         /* Need to fill in new entry */
1152         opcentry->valid = false;        /* until known OK */
1153         opcentry->numStrats = numStrats;
1154         opcentry->numSupport = numSupport;
1155
1156         if (numStrats > 0)
1157                 opcentry->operatorOids = (Oid *)
1158                         MemoryContextAllocZero(CacheMemoryContext,
1159                                                                    numStrats * sizeof(Oid));
1160         else
1161                 opcentry->operatorOids = NULL;
1162
1163         if (numSupport > 0)
1164                 opcentry->supportProcs = (RegProcedure *)
1165                         MemoryContextAllocZero(CacheMemoryContext,
1166                                                                    numSupport * sizeof(RegProcedure));
1167         else
1168                 opcentry->supportProcs = NULL;
1169
1170         /*
1171          * To avoid infinite recursion during startup, force heap scans if we're
1172          * looking up info for the opclasses used by the indexes we would like to
1173          * reference here.
1174          */
1175         indexOK = criticalRelcachesBuilt ||
1176                 (operatorClassOid != OID_BTREE_OPS_OID &&
1177                  operatorClassOid != INT2_BTREE_OPS_OID);
1178
1179         /*
1180          * Scan pg_amop to obtain operators for the opclass.  We only fetch the
1181          * default ones (those with subtype zero).
1182          */
1183         if (numStrats > 0)
1184         {
1185                 ScanKeyInit(&skey[0],
1186                                         Anum_pg_amop_amopclaid,
1187                                         BTEqualStrategyNumber, F_OIDEQ,
1188                                         ObjectIdGetDatum(operatorClassOid));
1189                 ScanKeyInit(&skey[1],
1190                                         Anum_pg_amop_amopsubtype,
1191                                         BTEqualStrategyNumber, F_OIDEQ,
1192                                         ObjectIdGetDatum(InvalidOid));
1193                 rel = heap_open(AccessMethodOperatorRelationId, AccessShareLock);
1194                 scan = systable_beginscan(rel, AccessMethodStrategyIndexId, indexOK,
1195                                                                   SnapshotNow, 2, skey);
1196
1197                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1198                 {
1199                         Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(htup);
1200
1201                         if (amopform->amopstrategy <= 0 ||
1202                                 (StrategyNumber) amopform->amopstrategy > numStrats)
1203                                 elog(ERROR, "invalid amopstrategy number %d for opclass %u",
1204                                          amopform->amopstrategy, operatorClassOid);
1205                         opcentry->operatorOids[amopform->amopstrategy - 1] =
1206                                 amopform->amopopr;
1207                 }
1208
1209                 systable_endscan(scan);
1210                 heap_close(rel, AccessShareLock);
1211         }
1212
1213         /*
1214          * Scan pg_amproc to obtain support procs for the opclass.      We only fetch
1215          * the default ones (those with subtype zero).
1216          */
1217         if (numSupport > 0)
1218         {
1219                 ScanKeyInit(&skey[0],
1220                                         Anum_pg_amproc_amopclaid,
1221                                         BTEqualStrategyNumber, F_OIDEQ,
1222                                         ObjectIdGetDatum(operatorClassOid));
1223                 ScanKeyInit(&skey[1],
1224                                         Anum_pg_amproc_amprocsubtype,
1225                                         BTEqualStrategyNumber, F_OIDEQ,
1226                                         ObjectIdGetDatum(InvalidOid));
1227                 rel = heap_open(AccessMethodProcedureRelationId, AccessShareLock);
1228                 scan = systable_beginscan(rel, AccessMethodProcedureIndexId, indexOK,
1229                                                                   SnapshotNow, 2, skey);
1230
1231                 while (HeapTupleIsValid(htup = systable_getnext(scan)))
1232                 {
1233                         Form_pg_amproc amprocform = (Form_pg_amproc) GETSTRUCT(htup);
1234
1235                         if (amprocform->amprocnum <= 0 ||
1236                                 (StrategyNumber) amprocform->amprocnum > numSupport)
1237                                 elog(ERROR, "invalid amproc number %d for opclass %u",
1238                                          amprocform->amprocnum, operatorClassOid);
1239
1240                         opcentry->supportProcs[amprocform->amprocnum - 1] =
1241                                 amprocform->amproc;
1242                 }
1243
1244                 systable_endscan(scan);
1245                 heap_close(rel, AccessShareLock);
1246         }
1247
1248         opcentry->valid = true;
1249         return opcentry;
1250 }
1251
1252
1253 /*
1254  *              formrdesc
1255  *
1256  *              This is a special cut-down version of RelationBuildDesc()
1257  *              used by RelationCacheInitializePhase2() in initializing the relcache.
1258  *              The relation descriptor is built just from the supplied parameters,
1259  *              without actually looking at any system table entries.  We cheat
1260  *              quite a lot since we only need to work for a few basic system
1261  *              catalogs.
1262  *
1263  * formrdesc is currently used for: pg_class, pg_attribute, pg_proc,
1264  * and pg_type (see RelationCacheInitializePhase2).
1265  *
1266  * Note that these catalogs can't have constraints (except attnotnull),
1267  * default values, rules, or triggers, since we don't cope with any of that.
1268  *
1269  * NOTE: we assume we are already switched into CacheMemoryContext.
1270  */
1271 static void
1272 formrdesc(const char *relationName, Oid relationReltype,
1273                   bool hasoids, int natts, FormData_pg_attribute *att)
1274 {
1275         Relation        relation;
1276         int                     i;
1277         bool            has_not_null;
1278
1279         /*
1280          * allocate new relation desc, clear all fields of reldesc
1281          */
1282         relation = (Relation) palloc0(sizeof(RelationData));
1283         relation->rd_targblock = InvalidBlockNumber;
1284
1285         /* make sure relation is marked as having no open file yet */
1286         relation->rd_smgr = NULL;
1287
1288         /*
1289          * initialize reference count: 1 because it is nailed in cache
1290          */
1291         relation->rd_refcnt = 1;
1292
1293         /*
1294          * all entries built with this routine are nailed-in-cache; none are for
1295          * new or temp relations.
1296          */
1297         relation->rd_isnailed = true;
1298         relation->rd_createSubid = InvalidSubTransactionId;
1299         relation->rd_istemp = false;
1300
1301         /*
1302          * initialize relation tuple form
1303          *
1304          * The data we insert here is pretty incomplete/bogus, but it'll serve to
1305          * get us launched.  RelationCacheInitializePhase2() will read the real
1306          * data from pg_class and replace what we've done here.
1307          */
1308         relation->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
1309
1310         namestrcpy(&relation->rd_rel->relname, relationName);
1311         relation->rd_rel->relnamespace = PG_CATALOG_NAMESPACE;
1312         relation->rd_rel->reltype = relationReltype;
1313
1314         /*
1315          * It's important to distinguish between shared and non-shared relations,
1316          * even at bootstrap time, to make sure we know where they are stored.  At
1317          * present, all relations that formrdesc is used for are not shared.
1318          */
1319         relation->rd_rel->relisshared = false;
1320
1321         relation->rd_rel->relpages = 1;
1322         relation->rd_rel->reltuples = 1;
1323         relation->rd_rel->relkind = RELKIND_RELATION;
1324         relation->rd_rel->relhasoids = hasoids;
1325         relation->rd_rel->relnatts = (int16) natts;
1326
1327         /*
1328          * initialize attribute tuple form
1329          *
1330          * Unlike the case with the relation tuple, this data had better be right
1331          * because it will never be replaced.  The input values must be correctly
1332          * defined by macros in src/include/catalog/ headers.
1333          */
1334         relation->rd_att = CreateTemplateTupleDesc(natts, hasoids);
1335         relation->rd_att->tdrefcount = 1;       /* mark as refcounted */
1336
1337         relation->rd_att->tdtypeid = relationReltype;
1338         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
1339
1340         /*
1341          * initialize tuple desc info
1342          */
1343         has_not_null = false;
1344         for (i = 0; i < natts; i++)
1345         {
1346                 memcpy(relation->rd_att->attrs[i],
1347                            &att[i],
1348                            ATTRIBUTE_TUPLE_SIZE);
1349                 has_not_null |= att[i].attnotnull;
1350                 /* make sure attcacheoff is valid */
1351                 relation->rd_att->attrs[i]->attcacheoff = -1;
1352         }
1353
1354         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
1355         relation->rd_att->attrs[0]->attcacheoff = 0;
1356
1357         /* mark not-null status */
1358         if (has_not_null)
1359         {
1360                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
1361
1362                 constr->has_not_null = true;
1363                 relation->rd_att->constr = constr;
1364         }
1365
1366         /*
1367          * initialize relation id from info in att array (my, this is ugly)
1368          */
1369         RelationGetRelid(relation) = relation->rd_att->attrs[0]->attrelid;
1370         relation->rd_rel->relfilenode = RelationGetRelid(relation);
1371
1372         /*
1373          * initialize the relation lock manager information
1374          */
1375         RelationInitLockInfo(relation);         /* see lmgr.c */
1376
1377         /*
1378          * initialize physical addressing information for the relation
1379          */
1380         RelationInitPhysicalAddr(relation);
1381
1382         /*
1383          * initialize the rel-has-index flag, using hardwired knowledge
1384          */
1385         if (IsBootstrapProcessingMode())
1386         {
1387                 /* In bootstrap mode, we have no indexes */
1388                 relation->rd_rel->relhasindex = false;
1389         }
1390         else
1391         {
1392                 /* Otherwise, all the rels formrdesc is used for have indexes */
1393                 relation->rd_rel->relhasindex = true;
1394         }
1395
1396         /*
1397          * add new reldesc to relcache
1398          */
1399         RelationCacheInsert(relation);
1400
1401         /* It's fully valid */
1402         relation->rd_isvalid = true;
1403 }
1404
1405
1406 /* ----------------------------------------------------------------
1407  *                               Relation Descriptor Lookup Interface
1408  * ----------------------------------------------------------------
1409  */
1410
1411 /*
1412  *              RelationIdGetRelation
1413  *
1414  *              Lookup a reldesc by OID; make one if not already in cache.
1415  *
1416  *              Returns NULL if no pg_class row could be found for the given relid
1417  *              (suggesting we are trying to access a just-deleted relation).
1418  *              Any other error is reported via elog.
1419  *
1420  *              NB: caller should already have at least AccessShareLock on the
1421  *              relation ID, else there are nasty race conditions.
1422  *
1423  *              NB: relation ref count is incremented, or set to 1 if new entry.
1424  *              Caller should eventually decrement count.  (Usually,
1425  *              that happens by calling RelationClose().)
1426  */
1427 Relation
1428 RelationIdGetRelation(Oid relationId)
1429 {
1430         Relation        rd;
1431
1432         /*
1433          * first try to find reldesc in the cache
1434          */
1435         RelationIdCacheLookup(relationId, rd);
1436
1437         if (RelationIsValid(rd))
1438         {
1439                 RelationIncrementReferenceCount(rd);
1440                 /* revalidate nailed index if necessary */
1441                 if (!rd->rd_isvalid)
1442                         RelationReloadClassinfo(rd);
1443                 return rd;
1444         }
1445
1446         /*
1447          * no reldesc in the cache, so have RelationBuildDesc() build one and add
1448          * it.
1449          */
1450         rd = RelationBuildDesc(relationId, NULL);
1451         if (RelationIsValid(rd))
1452                 RelationIncrementReferenceCount(rd);
1453         return rd;
1454 }
1455
1456 /* ----------------------------------------------------------------
1457  *                              cache invalidation support routines
1458  * ----------------------------------------------------------------
1459  */
1460
1461 /*
1462  * RelationIncrementReferenceCount
1463  *              Increments relation reference count.
1464  *
1465  * Note: bootstrap mode has its own weird ideas about relation refcount
1466  * behavior; we ought to fix it someday, but for now, just disable
1467  * reference count ownership tracking in bootstrap mode.
1468  */
1469 void
1470 RelationIncrementReferenceCount(Relation rel)
1471 {
1472         ResourceOwnerEnlargeRelationRefs(CurrentResourceOwner);
1473         rel->rd_refcnt += 1;
1474         if (!IsBootstrapProcessingMode())
1475                 ResourceOwnerRememberRelationRef(CurrentResourceOwner, rel);
1476 }
1477
1478 /*
1479  * RelationDecrementReferenceCount
1480  *              Decrements relation reference count.
1481  */
1482 void
1483 RelationDecrementReferenceCount(Relation rel)
1484 {
1485         Assert(rel->rd_refcnt > 0);
1486         rel->rd_refcnt -= 1;
1487         if (!IsBootstrapProcessingMode())
1488                 ResourceOwnerForgetRelationRef(CurrentResourceOwner, rel);
1489 }
1490
1491 /*
1492  * RelationClose - close an open relation
1493  *
1494  *      Actually, we just decrement the refcount.
1495  *
1496  *      NOTE: if compiled with -DRELCACHE_FORCE_RELEASE then relcache entries
1497  *      will be freed as soon as their refcount goes to zero.  In combination
1498  *      with aset.c's CLOBBER_FREED_MEMORY option, this provides a good test
1499  *      to catch references to already-released relcache entries.  It slows
1500  *      things down quite a bit, however.
1501  */
1502 void
1503 RelationClose(Relation relation)
1504 {
1505         /* Note: no locking manipulations needed */
1506         RelationDecrementReferenceCount(relation);
1507
1508 #ifdef RELCACHE_FORCE_RELEASE
1509         if (RelationHasReferenceCountZero(relation) &&
1510                 relation->rd_createSubid == InvalidSubTransactionId)
1511                 RelationClearRelation(relation, false);
1512 #endif
1513 }
1514
1515 /*
1516  * RelationReloadClassinfo - reload the pg_class row (only)
1517  *
1518  *      This function is used only for indexes.  We currently allow only the
1519  *      pg_class row of an existing index to change (to support changes of
1520  *      owner, tablespace, or relfilenode), not its pg_index row or other
1521  *      subsidiary index schema information.  Therefore it's sufficient to do
1522  *      this when we get an SI invalidation.  Furthermore, there are cases
1523  *      where it's necessary not to throw away the index information, especially
1524  *      for "nailed" indexes which we are unable to rebuild on-the-fly.
1525  *
1526  *      We can't necessarily reread the pg_class row right away; we might be
1527  *      in a failed transaction when we receive the SI notification.  If so,
1528  *      RelationClearRelation just marks the entry as invalid by setting
1529  *      rd_isvalid to false.  This routine is called to fix the entry when it
1530  *      is next needed.
1531  */
1532 static void
1533 RelationReloadClassinfo(Relation relation)
1534 {
1535         bool            indexOK;
1536         HeapTuple       pg_class_tuple;
1537         Form_pg_class relp;
1538
1539         /* Should be called only for invalidated indexes */
1540         Assert(relation->rd_rel->relkind == RELKIND_INDEX &&
1541                    !relation->rd_isvalid);
1542         /* Should be closed at smgr level */
1543         Assert(relation->rd_smgr == NULL);
1544
1545         /*
1546          * Read the pg_class row
1547          *
1548          * Don't try to use an indexscan of pg_class_oid_index to reload the info
1549          * for pg_class_oid_index ...
1550          */
1551         indexOK = (RelationGetRelid(relation) != ClassOidIndexId);
1552         pg_class_tuple = ScanPgRelation(RelationGetRelid(relation), indexOK);
1553         if (!HeapTupleIsValid(pg_class_tuple))
1554                 elog(ERROR, "could not find pg_class tuple for index %u",
1555                          RelationGetRelid(relation));
1556         relp = (Form_pg_class) GETSTRUCT(pg_class_tuple);
1557         memcpy(relation->rd_rel, relp, CLASS_TUPLE_SIZE);
1558         /* Reload reloptions in case they changed */
1559         if (relation->rd_options)
1560                 pfree(relation->rd_options);
1561         RelationParseRelOptions(relation, pg_class_tuple);
1562         /* done with pg_class tuple */
1563         heap_freetuple(pg_class_tuple);
1564         /* We must recalculate physical address in case it changed */
1565         RelationInitPhysicalAddr(relation);
1566         /* Make sure targblock is reset in case rel was truncated */
1567         relation->rd_targblock = InvalidBlockNumber;
1568         /* Must free any AM cached data, too */
1569         if (relation->rd_amcache)
1570                 pfree(relation->rd_amcache);
1571         relation->rd_amcache = NULL;
1572         /* Okay, now it's valid again */
1573         relation->rd_isvalid = true;
1574 }
1575
1576 /*
1577  * RelationClearRelation
1578  *
1579  *       Physically blow away a relation cache entry, or reset it and rebuild
1580  *       it from scratch (that is, from catalog entries).  The latter path is
1581  *       usually used when we are notified of a change to an open relation
1582  *       (one with refcount > 0).  However, this routine just does whichever
1583  *       it's told to do; callers must determine which they want.
1584  */
1585 static void
1586 RelationClearRelation(Relation relation, bool rebuild)
1587 {
1588         Oid                     old_reltype = relation->rd_rel->reltype;
1589         MemoryContext oldcxt;
1590
1591         /*
1592          * Make sure smgr and lower levels close the relation's files, if they
1593          * weren't closed already.  If the relation is not getting deleted, the
1594          * next smgr access should reopen the files automatically.      This ensures
1595          * that the low-level file access state is updated after, say, a vacuum
1596          * truncation.
1597          */
1598         RelationCloseSmgr(relation);
1599
1600         /*
1601          * Never, never ever blow away a nailed-in system relation, because we'd
1602          * be unable to recover.  However, we must reset rd_targblock, in case we
1603          * got called because of a relation cache flush that was triggered by
1604          * VACUUM.
1605          *
1606          * If it's a nailed index, then we need to re-read the pg_class row to see
1607          * if its relfilenode changed.  We can't necessarily do that here, because
1608          * we might be in a failed transaction.  We assume it's okay to do it if
1609          * there are open references to the relcache entry (cf notes for
1610          * AtEOXact_RelationCache).  Otherwise just mark the entry as possibly
1611          * invalid, and it'll be fixed when next opened.
1612          */
1613         if (relation->rd_isnailed)
1614         {
1615                 relation->rd_targblock = InvalidBlockNumber;
1616                 if (relation->rd_rel->relkind == RELKIND_INDEX)
1617                 {
1618                         relation->rd_isvalid = false;           /* needs to be revalidated */
1619                         if (relation->rd_refcnt > 1)
1620                                 RelationReloadClassinfo(relation);
1621                 }
1622                 return;
1623         }
1624
1625         /*
1626          * Even non-system indexes should not be blown away if they are open and
1627          * have valid index support information.  This avoids problems with active
1628          * use of the index support information.  As with nailed indexes, we
1629          * re-read the pg_class row to handle possible physical relocation of
1630          * the index.
1631          */
1632         if (relation->rd_rel->relkind == RELKIND_INDEX &&
1633                 relation->rd_refcnt > 0 &&
1634                 relation->rd_indexcxt != NULL)
1635         {
1636                 relation->rd_isvalid = false;                   /* needs to be revalidated */
1637                 RelationReloadClassinfo(relation);
1638                 return;
1639         }
1640
1641         /*
1642          * Remove relation from hash tables
1643          *
1644          * Note: we might be reinserting it momentarily, but we must not have it
1645          * visible in the hash tables until it's valid again, so don't try to
1646          * optimize this away...
1647          */
1648         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
1649         RelationCacheDelete(relation);
1650         MemoryContextSwitchTo(oldcxt);
1651
1652         /* Clear out catcache's entries for this relation */
1653         CatalogCacheFlushRelation(RelationGetRelid(relation));
1654
1655         /*
1656          * Free all the subsidiary data structures of the relcache entry. We
1657          * cannot free rd_att if we are trying to rebuild the entry, however,
1658          * because pointers to it may be cached in various places. The rule
1659          * manager might also have pointers into the rewrite rules. So to begin
1660          * with, we can only get rid of these fields:
1661          */
1662         FreeTriggerDesc(relation->trigdesc);
1663         if (relation->rd_indextuple)
1664                 pfree(relation->rd_indextuple);
1665         if (relation->rd_am)
1666                 pfree(relation->rd_am);
1667         if (relation->rd_rel)
1668                 pfree(relation->rd_rel);
1669         if (relation->rd_options)
1670                 pfree(relation->rd_options);
1671         list_free(relation->rd_indexlist);
1672         if (relation->rd_indexcxt)
1673                 MemoryContextDelete(relation->rd_indexcxt);
1674
1675         /*
1676          * If we're really done with the relcache entry, blow it away. But if
1677          * someone is still using it, reconstruct the whole deal without moving
1678          * the physical RelationData record (so that the someone's pointer is
1679          * still valid).
1680          */
1681         if (!rebuild)
1682         {
1683                 /* ok to zap remaining substructure */
1684                 flush_rowtype_cache(old_reltype);
1685                 /* can't use DecrTupleDescRefCount here */
1686                 Assert(relation->rd_att->tdrefcount > 0);
1687                 if (--relation->rd_att->tdrefcount == 0)
1688                         FreeTupleDesc(relation->rd_att);
1689                 if (relation->rd_rulescxt)
1690                         MemoryContextDelete(relation->rd_rulescxt);
1691                 pfree(relation);
1692         }
1693         else
1694         {
1695                 /*
1696                  * When rebuilding an open relcache entry, must preserve ref count and
1697                  * rd_createSubid state.  Also attempt to preserve the tupledesc and
1698                  * rewrite-rule substructures in place.  (Note: the refcount mechanism
1699                  * for tupledescs may eventually ensure that we don't really need to
1700                  * preserve the tupledesc in-place, but for now there are still a lot
1701                  * of places that assume an open rel's tupledesc won't move.)
1702                  *
1703                  * Note that this process does not touch CurrentResourceOwner; which
1704                  * is good because whatever ref counts the entry may have do not
1705                  * necessarily belong to that resource owner.
1706                  */
1707                 Oid                     save_relid = RelationGetRelid(relation);
1708                 int                     old_refcnt = relation->rd_refcnt;
1709                 SubTransactionId old_createSubid = relation->rd_createSubid;
1710                 TupleDesc       old_att = relation->rd_att;
1711                 RuleLock   *old_rules = relation->rd_rules;
1712                 MemoryContext old_rulescxt = relation->rd_rulescxt;
1713
1714                 if (RelationBuildDesc(save_relid, relation) != relation)
1715                 {
1716                         /* Should only get here if relation was deleted */
1717                         flush_rowtype_cache(old_reltype);
1718                         Assert(old_att->tdrefcount > 0);
1719                         if (--old_att->tdrefcount == 0)
1720                                 FreeTupleDesc(old_att);
1721                         if (old_rulescxt)
1722                                 MemoryContextDelete(old_rulescxt);
1723                         pfree(relation);
1724                         elog(ERROR, "relation %u deleted while still in use", save_relid);
1725                 }
1726                 relation->rd_refcnt = old_refcnt;
1727                 relation->rd_createSubid = old_createSubid;
1728                 if (equalTupleDescs(old_att, relation->rd_att))
1729                 {
1730                         /* needn't flush typcache here */
1731                         Assert(relation->rd_att->tdrefcount == 1);
1732                         if (--relation->rd_att->tdrefcount == 0)
1733                                 FreeTupleDesc(relation->rd_att);
1734                         relation->rd_att = old_att;
1735                 }
1736                 else
1737                 {
1738                         flush_rowtype_cache(old_reltype);
1739                         Assert(old_att->tdrefcount > 0);
1740                         if (--old_att->tdrefcount == 0)
1741                                 FreeTupleDesc(old_att);
1742                 }
1743                 if (equalRuleLocks(old_rules, relation->rd_rules))
1744                 {
1745                         if (relation->rd_rulescxt)
1746                                 MemoryContextDelete(relation->rd_rulescxt);
1747                         relation->rd_rules = old_rules;
1748                         relation->rd_rulescxt = old_rulescxt;
1749                 }
1750                 else
1751                 {
1752                         if (old_rulescxt)
1753                                 MemoryContextDelete(old_rulescxt);
1754                 }
1755         }
1756 }
1757
1758 /*
1759  * RelationFlushRelation
1760  *
1761  *       Rebuild the relation if it is open (refcount > 0), else blow it away.
1762  */
1763 static void
1764 RelationFlushRelation(Relation relation)
1765 {
1766         bool            rebuild;
1767
1768         if (relation->rd_createSubid != InvalidSubTransactionId)
1769         {
1770                 /*
1771                  * New relcache entries are always rebuilt, not flushed; else we'd
1772                  * forget the "new" status of the relation, which is a useful
1773                  * optimization to have.
1774                  */
1775                 rebuild = true;
1776         }
1777         else
1778         {
1779                 /*
1780                  * Pre-existing rels can be dropped from the relcache if not open.
1781                  */
1782                 rebuild = !RelationHasReferenceCountZero(relation);
1783         }
1784
1785         RelationClearRelation(relation, rebuild);
1786 }
1787
1788 /*
1789  * RelationForgetRelation - unconditionally remove a relcache entry
1790  *
1791  *                 External interface for destroying a relcache entry when we
1792  *                 drop the relation.
1793  */
1794 void
1795 RelationForgetRelation(Oid rid)
1796 {
1797         Relation        relation;
1798
1799         RelationIdCacheLookup(rid, relation);
1800
1801         if (!PointerIsValid(relation))
1802                 return;                                 /* not in cache, nothing to do */
1803
1804         if (!RelationHasReferenceCountZero(relation))
1805                 elog(ERROR, "relation %u is still open", rid);
1806
1807         /* Unconditionally destroy the relcache entry */
1808         RelationClearRelation(relation, false);
1809 }
1810
1811 /*
1812  *              RelationCacheInvalidateEntry
1813  *
1814  *              This routine is invoked for SI cache flush messages.
1815  *
1816  * Any relcache entry matching the relid must be flushed.  (Note: caller has
1817  * already determined that the relid belongs to our database or is a shared
1818  * relation.)
1819  *
1820  * We used to skip local relations, on the grounds that they could
1821  * not be targets of cross-backend SI update messages; but it seems
1822  * safer to process them, so that our *own* SI update messages will
1823  * have the same effects during CommandCounterIncrement for both
1824  * local and nonlocal relations.
1825  */
1826 void
1827 RelationCacheInvalidateEntry(Oid relationId)
1828 {
1829         Relation        relation;
1830
1831         RelationIdCacheLookup(relationId, relation);
1832
1833         if (PointerIsValid(relation))
1834         {
1835                 relcacheInvalsReceived++;
1836                 RelationFlushRelation(relation);
1837         }
1838 }
1839
1840 /*
1841  * RelationCacheInvalidate
1842  *       Blow away cached relation descriptors that have zero reference counts,
1843  *       and rebuild those with positive reference counts.      Also reset the smgr
1844  *       relation cache.
1845  *
1846  *       This is currently used only to recover from SI message buffer overflow,
1847  *       so we do not touch new-in-transaction relations; they cannot be targets
1848  *       of cross-backend SI updates (and our own updates now go through a
1849  *       separate linked list that isn't limited by the SI message buffer size).
1850  *
1851  *       We do this in two phases: the first pass deletes deletable items, and
1852  *       the second one rebuilds the rebuildable items.  This is essential for
1853  *       safety, because hash_seq_search only copes with concurrent deletion of
1854  *       the element it is currently visiting.  If a second SI overflow were to
1855  *       occur while we are walking the table, resulting in recursive entry to
1856  *       this routine, we could crash because the inner invocation blows away
1857  *       the entry next to be visited by the outer scan.  But this way is OK,
1858  *       because (a) during the first pass we won't process any more SI messages,
1859  *       so hash_seq_search will complete safely; (b) during the second pass we
1860  *       only hold onto pointers to nondeletable entries.
1861  *
1862  *       The two-phase approach also makes it easy to ensure that we process
1863  *       nailed-in-cache indexes before other nondeletable items, and that we
1864  *       process pg_class_oid_index first of all.  In scenarios where a nailed
1865  *       index has been given a new relfilenode, we have to detect that update
1866  *       before the nailed index is used in reloading any other relcache entry.
1867  */
1868 void
1869 RelationCacheInvalidate(void)
1870 {
1871         HASH_SEQ_STATUS status;
1872         RelIdCacheEnt *idhentry;
1873         Relation        relation;
1874         List       *rebuildFirstList = NIL;
1875         List       *rebuildList = NIL;
1876         ListCell   *l;
1877
1878         /* Phase 1 */
1879         hash_seq_init(&status, RelationIdCache);
1880
1881         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1882         {
1883                 relation = idhentry->reldesc;
1884
1885                 /* Must close all smgr references to avoid leaving dangling ptrs */
1886                 RelationCloseSmgr(relation);
1887
1888                 /* Ignore new relations, since they are never SI targets */
1889                 if (relation->rd_createSubid != InvalidSubTransactionId)
1890                         continue;
1891
1892                 relcacheInvalsReceived++;
1893
1894                 if (RelationHasReferenceCountZero(relation))
1895                 {
1896                         /* Delete this entry immediately */
1897                         Assert(!relation->rd_isnailed);
1898                         RelationClearRelation(relation, false);
1899                 }
1900                 else
1901                 {
1902                         /*
1903                          * Add this entry to list of stuff to rebuild in second pass.
1904                          * pg_class_oid_index goes on the front of rebuildFirstList, other
1905                          * nailed indexes on the back, and everything else into
1906                          * rebuildList (in no particular order).
1907                          */
1908                         if (relation->rd_isnailed &&
1909                                 relation->rd_rel->relkind == RELKIND_INDEX)
1910                         {
1911                                 if (RelationGetRelid(relation) == ClassOidIndexId)
1912                                         rebuildFirstList = lcons(relation, rebuildFirstList);
1913                                 else
1914                                         rebuildFirstList = lappend(rebuildFirstList, relation);
1915                         }
1916                         else
1917                                 rebuildList = lcons(relation, rebuildList);
1918                 }
1919         }
1920
1921         /*
1922          * Now zap any remaining smgr cache entries.  This must happen before we
1923          * start to rebuild entries, since that may involve catalog fetches which
1924          * will re-open catalog files.
1925          */
1926         smgrcloseall();
1927
1928         /* Phase 2: rebuild the items found to need rebuild in phase 1 */
1929         foreach(l, rebuildFirstList)
1930         {
1931                 relation = (Relation) lfirst(l);
1932                 RelationClearRelation(relation, true);
1933         }
1934         list_free(rebuildFirstList);
1935         foreach(l, rebuildList)
1936         {
1937                 relation = (Relation) lfirst(l);
1938                 RelationClearRelation(relation, true);
1939         }
1940         list_free(rebuildList);
1941 }
1942
1943 /*
1944  * AtEOXact_RelationCache
1945  *
1946  *      Clean up the relcache at main-transaction commit or abort.
1947  *
1948  * Note: this must be called *before* processing invalidation messages.
1949  * In the case of abort, we don't want to try to rebuild any invalidated
1950  * cache entries (since we can't safely do database accesses).  Therefore
1951  * we must reset refcnts before handling pending invalidations.
1952  *
1953  * As of PostgreSQL 8.1, relcache refcnts should get released by the
1954  * ResourceOwner mechanism.  This routine just does a debugging
1955  * cross-check that no pins remain.  However, we also need to do special
1956  * cleanup when the current transaction created any relations or made use
1957  * of forced index lists.
1958  */
1959 void
1960 AtEOXact_RelationCache(bool isCommit)
1961 {
1962         HASH_SEQ_STATUS status;
1963         RelIdCacheEnt *idhentry;
1964
1965         /*
1966          * To speed up transaction exit, we want to avoid scanning the relcache
1967          * unless there is actually something for this routine to do.  Other than
1968          * the debug-only Assert checks, most transactions don't create any work
1969          * for us to do here, so we keep a static flag that gets set if there is
1970          * anything to do.      (Currently, this means either a relation is created in
1971          * the current xact, or an index list is forced.)  For simplicity, the
1972          * flag remains set till end of top-level transaction, even though we
1973          * could clear it at subtransaction end in some cases.
1974          */
1975         if (!need_eoxact_work
1976 #ifdef USE_ASSERT_CHECKING
1977                 && !assert_enabled
1978 #endif
1979                 )
1980                 return;
1981
1982         hash_seq_init(&status, RelationIdCache);
1983
1984         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
1985         {
1986                 Relation        relation = idhentry->reldesc;
1987
1988                 /*
1989                  * The relcache entry's ref count should be back to its normal
1990                  * not-in-a-transaction state: 0 unless it's nailed in cache.
1991                  *
1992                  * In bootstrap mode, this is NOT true, so don't check it --- the
1993                  * bootstrap code expects relations to stay open across start/commit
1994                  * transaction calls.  (That seems bogus, but it's not worth fixing.)
1995                  */
1996 #ifdef USE_ASSERT_CHECKING
1997                 if (!IsBootstrapProcessingMode())
1998                 {
1999                         int                     expected_refcnt;
2000
2001                         expected_refcnt = relation->rd_isnailed ? 1 : 0;
2002                         Assert(relation->rd_refcnt == expected_refcnt);
2003                 }
2004 #endif
2005
2006                 /*
2007                  * Is it a relation created in the current transaction?
2008                  *
2009                  * During commit, reset the flag to zero, since we are now out of the
2010                  * creating transaction.  During abort, simply delete the relcache
2011                  * entry --- it isn't interesting any longer.  (NOTE: if we have
2012                  * forgotten the new-ness of a new relation due to a forced cache
2013                  * flush, the entry will get deleted anyway by shared-cache-inval
2014                  * processing of the aborted pg_class insertion.)
2015                  */
2016                 if (relation->rd_createSubid != InvalidSubTransactionId)
2017                 {
2018                         if (isCommit)
2019                                 relation->rd_createSubid = InvalidSubTransactionId;
2020                         else
2021                         {
2022                                 RelationClearRelation(relation, false);
2023                                 continue;
2024                         }
2025                 }
2026
2027                 /*
2028                  * Flush any temporary index list.
2029                  */
2030                 if (relation->rd_indexvalid == 2)
2031                 {
2032                         list_free(relation->rd_indexlist);
2033                         relation->rd_indexlist = NIL;
2034                         relation->rd_oidindex = InvalidOid;
2035                         relation->rd_indexvalid = 0;
2036                 }
2037         }
2038
2039         /* Once done with the transaction, we can reset need_eoxact_work */
2040         need_eoxact_work = false;
2041 }
2042
2043 /*
2044  * AtEOSubXact_RelationCache
2045  *
2046  *      Clean up the relcache at sub-transaction commit or abort.
2047  *
2048  * Note: this must be called *before* processing invalidation messages.
2049  */
2050 void
2051 AtEOSubXact_RelationCache(bool isCommit, SubTransactionId mySubid,
2052                                                   SubTransactionId parentSubid)
2053 {
2054         HASH_SEQ_STATUS status;
2055         RelIdCacheEnt *idhentry;
2056
2057         /*
2058          * Skip the relcache scan if nothing to do --- see notes for
2059          * AtEOXact_RelationCache.
2060          */
2061         if (!need_eoxact_work)
2062                 return;
2063
2064         hash_seq_init(&status, RelationIdCache);
2065
2066         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2067         {
2068                 Relation        relation = idhentry->reldesc;
2069
2070                 /*
2071                  * Is it a relation created in the current subtransaction?
2072                  *
2073                  * During subcommit, mark it as belonging to the parent, instead.
2074                  * During subabort, simply delete the relcache entry.
2075                  */
2076                 if (relation->rd_createSubid == mySubid)
2077                 {
2078                         if (isCommit)
2079                                 relation->rd_createSubid = parentSubid;
2080                         else
2081                         {
2082                                 Assert(RelationHasReferenceCountZero(relation));
2083                                 RelationClearRelation(relation, false);
2084                                 continue;
2085                         }
2086                 }
2087
2088                 /*
2089                  * Flush any temporary index list.
2090                  */
2091                 if (relation->rd_indexvalid == 2)
2092                 {
2093                         list_free(relation->rd_indexlist);
2094                         relation->rd_indexlist = NIL;
2095                         relation->rd_oidindex = InvalidOid;
2096                         relation->rd_indexvalid = 0;
2097                 }
2098         }
2099 }
2100
2101 /*
2102  *              RelationBuildLocalRelation
2103  *                      Build a relcache entry for an about-to-be-created relation,
2104  *                      and enter it into the relcache.
2105  */
2106 Relation
2107 RelationBuildLocalRelation(const char *relname,
2108                                                    Oid relnamespace,
2109                                                    TupleDesc tupDesc,
2110                                                    Oid relid,
2111                                                    Oid reltablespace,
2112                                                    bool shared_relation)
2113 {
2114         Relation        rel;
2115         MemoryContext oldcxt;
2116         int                     natts = tupDesc->natts;
2117         int                     i;
2118         bool            has_not_null;
2119         bool            nailit;
2120
2121         AssertArg(natts >= 0);
2122
2123         /*
2124          * check for creation of a rel that must be nailed in cache.
2125          *
2126          * XXX this list had better match RelationCacheInitializePhase2's list.
2127          */
2128         switch (relid)
2129         {
2130                 case RelationRelationId:
2131                 case AttributeRelationId:
2132                 case ProcedureRelationId:
2133                 case TypeRelationId:
2134                         nailit = true;
2135                         break;
2136                 default:
2137                         nailit = false;
2138                         break;
2139         }
2140
2141         /*
2142          * check that hardwired list of shared rels matches what's in the
2143          * bootstrap .bki file.  If you get a failure here during initdb,
2144          * you probably need to fix IsSharedRelation() to match whatever
2145          * you've done to the set of shared relations.
2146          */
2147         if (shared_relation != IsSharedRelation(relid))
2148                 elog(ERROR, "shared_relation flag for \"%s\" does not match IsSharedRelation(%u)",
2149                          relname, relid);
2150
2151         /*
2152          * switch to the cache context to create the relcache entry.
2153          */
2154         if (!CacheMemoryContext)
2155                 CreateCacheMemoryContext();
2156
2157         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2158
2159         /*
2160          * allocate a new relation descriptor and fill in basic state fields.
2161          */
2162         rel = (Relation) palloc0(sizeof(RelationData));
2163
2164         rel->rd_targblock = InvalidBlockNumber;
2165
2166         /* make sure relation is marked as having no open file yet */
2167         rel->rd_smgr = NULL;
2168
2169         /* mark it nailed if appropriate */
2170         rel->rd_isnailed = nailit;
2171
2172         rel->rd_refcnt = nailit ? 1 : 0;
2173
2174         /* it's being created in this transaction */
2175         rel->rd_createSubid = GetCurrentSubTransactionId();
2176
2177         /* must flag that we have rels created in this transaction */
2178         need_eoxact_work = true;
2179
2180         /* is it a temporary relation? */
2181         rel->rd_istemp = isTempNamespace(relnamespace);
2182
2183         /*
2184          * create a new tuple descriptor from the one passed in.  We do this
2185          * partly to copy it into the cache context, and partly because the new
2186          * relation can't have any defaults or constraints yet; they have to be
2187          * added in later steps, because they require additions to multiple system
2188          * catalogs.  We can copy attnotnull constraints here, however.
2189          */
2190         rel->rd_att = CreateTupleDescCopy(tupDesc);
2191         rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
2192         has_not_null = false;
2193         for (i = 0; i < natts; i++)
2194         {
2195                 rel->rd_att->attrs[i]->attnotnull = tupDesc->attrs[i]->attnotnull;
2196                 has_not_null |= tupDesc->attrs[i]->attnotnull;
2197         }
2198
2199         if (has_not_null)
2200         {
2201                 TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
2202
2203                 constr->has_not_null = true;
2204                 rel->rd_att->constr = constr;
2205         }
2206
2207         /*
2208          * initialize relation tuple form (caller may add/override data later)
2209          */
2210         rel->rd_rel = (Form_pg_class) palloc0(CLASS_TUPLE_SIZE);
2211
2212         namestrcpy(&rel->rd_rel->relname, relname);
2213         rel->rd_rel->relnamespace = relnamespace;
2214
2215         rel->rd_rel->relkind = RELKIND_UNCATALOGED;
2216         rel->rd_rel->relhasoids = rel->rd_att->tdhasoid;
2217         rel->rd_rel->relnatts = natts;
2218         rel->rd_rel->reltype = InvalidOid;
2219         /* needed when bootstrapping: */
2220         rel->rd_rel->relowner = BOOTSTRAP_SUPERUSERID;
2221
2222         /*
2223          * Insert relation physical and logical identifiers (OIDs) into the right
2224          * places.      Note that the physical ID (relfilenode) is initially the same
2225          * as the logical ID (OID).
2226          */
2227         rel->rd_rel->relisshared = shared_relation;
2228
2229         RelationGetRelid(rel) = relid;
2230
2231         for (i = 0; i < natts; i++)
2232                 rel->rd_att->attrs[i]->attrelid = relid;
2233
2234         rel->rd_rel->relfilenode = relid;
2235         rel->rd_rel->reltablespace = reltablespace;
2236
2237         RelationInitLockInfo(rel);      /* see lmgr.c */
2238
2239         RelationInitPhysicalAddr(rel);
2240
2241         /*
2242          * Okay to insert into the relcache hash tables.
2243          */
2244         RelationCacheInsert(rel);
2245
2246         /*
2247          * done building relcache entry.
2248          */
2249         MemoryContextSwitchTo(oldcxt);
2250
2251         /* It's fully valid */
2252         rel->rd_isvalid = true;
2253
2254         /*
2255          * Caller expects us to pin the returned entry.
2256          */
2257         RelationIncrementReferenceCount(rel);
2258
2259         return rel;
2260 }
2261
2262 /*
2263  *              RelationCacheInitialize
2264  *
2265  *              This initializes the relation descriptor cache.  At the time
2266  *              that this is invoked, we can't do database access yet (mainly
2267  *              because the transaction subsystem is not up); all we are doing
2268  *              is making an empty cache hashtable.  This must be done before
2269  *              starting the initialization transaction, because otherwise
2270  *              AtEOXact_RelationCache would crash if that transaction aborts
2271  *              before we can get the relcache set up.
2272  */
2273
2274 #define INITRELCACHESIZE                400
2275
2276 void
2277 RelationCacheInitialize(void)
2278 {
2279         MemoryContext oldcxt;
2280         HASHCTL         ctl;
2281
2282         /*
2283          * switch to cache memory context
2284          */
2285         if (!CacheMemoryContext)
2286                 CreateCacheMemoryContext();
2287
2288         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2289
2290         /*
2291          * create hashtable that indexes the relcache
2292          */
2293         MemSet(&ctl, 0, sizeof(ctl));
2294         ctl.keysize = sizeof(Oid);
2295         ctl.entrysize = sizeof(RelIdCacheEnt);
2296         ctl.hash = oid_hash;
2297         RelationIdCache = hash_create("Relcache by OID", INITRELCACHESIZE,
2298                                                                   &ctl, HASH_ELEM | HASH_FUNCTION);
2299
2300         MemoryContextSwitchTo(oldcxt);
2301 }
2302
2303 /*
2304  *              RelationCacheInitializePhase2
2305  *
2306  *              This is called as soon as the catcache and transaction system
2307  *              are functional.  At this point we can actually read data from
2308  *              the system catalogs.  We first try to read pre-computed relcache
2309  *              entries from the pg_internal.init file.  If that's missing or
2310  *              broken, make phony entries for the minimum set of nailed-in-cache
2311  *              relations.  Then (unless bootstrapping) make sure we have entries
2312  *              for the critical system indexes.  Once we've done all this, we
2313  *              have enough infrastructure to open any system catalog or use any
2314  *              catcache.  The last step is to rewrite pg_internal.init if needed.
2315  */
2316 void
2317 RelationCacheInitializePhase2(void)
2318 {
2319         HASH_SEQ_STATUS status;
2320         RelIdCacheEnt *idhentry;
2321         MemoryContext oldcxt;
2322         bool needNewCacheFile = false;
2323
2324         /*
2325          * switch to cache memory context
2326          */
2327         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2328
2329         /*
2330          * Try to load the relcache cache file.  If unsuccessful, bootstrap the
2331          * cache with pre-made descriptors for the critical "nailed-in" system
2332          * catalogs.
2333          */
2334         if (IsBootstrapProcessingMode() ||
2335                 !load_relcache_init_file())
2336         {
2337                 needNewCacheFile = true;
2338
2339                 formrdesc("pg_class", PG_CLASS_RELTYPE_OID,
2340                                   true, Natts_pg_class, Desc_pg_class);
2341                 formrdesc("pg_attribute", PG_ATTRIBUTE_RELTYPE_OID,
2342                                   false, Natts_pg_attribute, Desc_pg_attribute);
2343                 formrdesc("pg_proc", PG_PROC_RELTYPE_OID,
2344                                   true, Natts_pg_proc, Desc_pg_proc);
2345                 formrdesc("pg_type", PG_TYPE_RELTYPE_OID,
2346                                   true, Natts_pg_type, Desc_pg_type);
2347
2348 #define NUM_CRITICAL_RELS       4       /* fix if you change list above */
2349         }
2350
2351         MemoryContextSwitchTo(oldcxt);
2352
2353         /* In bootstrap mode, the faked-up formrdesc info is all we'll have */
2354         if (IsBootstrapProcessingMode())
2355                 return;
2356
2357         /*
2358          * If we didn't get the critical system indexes loaded into relcache, do
2359          * so now.      These are critical because the catcache depends on them for
2360          * catcache fetches that are done during relcache load.  Thus, we have an
2361          * infinite-recursion problem.  We can break the recursion by doing
2362          * heapscans instead of indexscans at certain key spots. To avoid hobbling
2363          * performance, we only want to do that until we have the critical indexes
2364          * loaded into relcache.  Thus, the flag criticalRelcachesBuilt is used to
2365          * decide whether to do heapscan or indexscan at the key spots, and we set
2366          * it true after we've loaded the critical indexes.
2367          *
2368          * The critical indexes are marked as "nailed in cache", partly to make it
2369          * easy for load_relcache_init_file to count them, but mainly because we
2370          * cannot flush and rebuild them once we've set criticalRelcachesBuilt to
2371          * true.  (NOTE: perhaps it would be possible to reload them by
2372          * temporarily setting criticalRelcachesBuilt to false again.  For now,
2373          * though, we just nail 'em in.)
2374          *
2375          * RewriteRelRulenameIndexId and TriggerRelidNameIndexId are not critical
2376          * in the same way as the others, because the critical catalogs don't
2377          * (currently) have any rules or triggers, and so these indexes can be
2378          * rebuilt without inducing recursion.  However they are used during
2379          * relcache load when a rel does have rules or triggers, so we choose to
2380          * nail them for performance reasons.
2381          */
2382         if (!criticalRelcachesBuilt)
2383         {
2384                 Relation        ird;
2385
2386 #define LOAD_CRIT_INDEX(indexoid) \
2387                 do { \
2388                         ird = RelationBuildDesc((indexoid), NULL); \
2389                         ird->rd_isnailed = true; \
2390                         ird->rd_refcnt = 1; \
2391                 } while (0)
2392
2393                 LOAD_CRIT_INDEX(ClassOidIndexId);
2394                 LOAD_CRIT_INDEX(AttributeRelidNumIndexId);
2395                 LOAD_CRIT_INDEX(IndexRelidIndexId);
2396                 LOAD_CRIT_INDEX(AccessMethodStrategyIndexId);
2397                 LOAD_CRIT_INDEX(AccessMethodProcedureIndexId);
2398                 LOAD_CRIT_INDEX(OperatorOidIndexId);
2399                 LOAD_CRIT_INDEX(RewriteRelRulenameIndexId);
2400                 LOAD_CRIT_INDEX(TriggerRelidNameIndexId);
2401
2402 #define NUM_CRITICAL_INDEXES    8               /* fix if you change list above */
2403
2404                 criticalRelcachesBuilt = true;
2405         }
2406
2407         /*
2408          * Now, scan all the relcache entries and update anything that might be
2409          * wrong in the results from formrdesc or the relcache cache file. If we
2410          * faked up relcache entries using formrdesc, then read the real pg_class
2411          * rows and replace the fake entries with them. Also, if any of the
2412          * relcache entries have rules or triggers, load that info the hard way
2413          * since it isn't recorded in the cache file.
2414          */
2415         hash_seq_init(&status, RelationIdCache);
2416
2417         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
2418         {
2419                 Relation        relation = idhentry->reldesc;
2420
2421                 /*
2422                  * If it's a faked-up entry, read the real pg_class tuple.
2423                  */
2424                 if (needNewCacheFile && relation->rd_isnailed)
2425                 {
2426                         HeapTuple       htup;
2427                         Form_pg_class relp;
2428
2429                         htup = SearchSysCache(RELOID,
2430                                                                 ObjectIdGetDatum(RelationGetRelid(relation)),
2431                                                                   0, 0, 0);
2432                         if (!HeapTupleIsValid(htup))
2433                                 elog(FATAL, "cache lookup failed for relation %u",
2434                                          RelationGetRelid(relation));
2435                         relp = (Form_pg_class) GETSTRUCT(htup);
2436
2437                         /*
2438                          * Copy tuple to relation->rd_rel. (See notes in
2439                          * AllocateRelationDesc())
2440                          */
2441                         Assert(relation->rd_rel != NULL);
2442                         memcpy((char *) relation->rd_rel, (char *) relp, CLASS_TUPLE_SIZE);
2443
2444                         /* Update rd_options while we have the tuple */
2445                         if (relation->rd_options)
2446                                 pfree(relation->rd_options);
2447                         RelationParseRelOptions(relation, htup);
2448
2449                         /*
2450                          * Also update the derived fields in rd_att.
2451                          */
2452                         relation->rd_att->tdtypeid = relp->reltype;
2453                         relation->rd_att->tdtypmod = -1;        /* unnecessary, but... */
2454                         relation->rd_att->tdhasoid = relp->relhasoids;
2455
2456                         ReleaseSysCache(htup);
2457                 }
2458
2459                 /*
2460                  * Fix data that isn't saved in relcache cache file.
2461                  */
2462                 if (relation->rd_rel->relhasrules && relation->rd_rules == NULL)
2463                         RelationBuildRuleLock(relation);
2464                 if (relation->rd_rel->reltriggers > 0 && relation->trigdesc == NULL)
2465                         RelationBuildTriggers(relation);
2466         }
2467
2468         /*
2469          * Lastly, write out a new relcache cache file if one is needed.
2470          */
2471         if (needNewCacheFile)
2472         {
2473                 /*
2474                  * Force all the catcaches to finish initializing and thereby open the
2475                  * catalogs and indexes they use.  This will preload the relcache with
2476                  * entries for all the most important system catalogs and indexes, so
2477                  * that the init file will be most useful for future backends.
2478                  */
2479                 InitCatalogCachePhase2();
2480
2481                 /* now write the file */
2482                 write_relcache_init_file();
2483         }
2484 }
2485
2486 /*
2487  * GetPgClassDescriptor -- get a predefined tuple descriptor for pg_class
2488  * GetPgIndexDescriptor -- get a predefined tuple descriptor for pg_index
2489  *
2490  * We need this kluge because we have to be able to access non-fixed-width
2491  * fields of pg_class and pg_index before we have the standard catalog caches
2492  * available.  We use predefined data that's set up in just the same way as
2493  * the bootstrapped reldescs used by formrdesc().  The resulting tupdesc is
2494  * not 100% kosher: it does not have the correct rowtype OID in tdtypeid, nor
2495  * does it have a TupleConstr field.  But it's good enough for the purpose of
2496  * extracting fields.
2497  */
2498 static TupleDesc
2499 BuildHardcodedDescriptor(int natts, Form_pg_attribute attrs, bool hasoids)
2500 {
2501         TupleDesc       result;
2502         MemoryContext oldcxt;
2503         int                     i;
2504
2505         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2506
2507         result = CreateTemplateTupleDesc(natts, hasoids);
2508         result->tdtypeid = RECORDOID;   /* not right, but we don't care */
2509         result->tdtypmod = -1;
2510
2511         for (i = 0; i < natts; i++)
2512         {
2513                 memcpy(result->attrs[i], &attrs[i], ATTRIBUTE_TUPLE_SIZE);
2514                 /* make sure attcacheoff is valid */
2515                 result->attrs[i]->attcacheoff = -1;
2516         }
2517
2518         /* initialize first attribute's attcacheoff, cf RelationBuildTupleDesc */
2519         result->attrs[0]->attcacheoff = 0;
2520
2521         /* Note: we don't bother to set up a TupleConstr entry */
2522
2523         MemoryContextSwitchTo(oldcxt);
2524
2525         return result;
2526 }
2527
2528 static TupleDesc
2529 GetPgClassDescriptor(void)
2530 {
2531         static TupleDesc pgclassdesc = NULL;
2532
2533         /* Already done? */
2534         if (pgclassdesc == NULL)
2535                 pgclassdesc = BuildHardcodedDescriptor(Natts_pg_class,
2536                                                                                            Desc_pg_class,
2537                                                                                            true);
2538
2539         return pgclassdesc;
2540 }
2541
2542 static TupleDesc
2543 GetPgIndexDescriptor(void)
2544 {
2545         static TupleDesc pgindexdesc = NULL;
2546
2547         /* Already done? */
2548         if (pgindexdesc == NULL)
2549                 pgindexdesc = BuildHardcodedDescriptor(Natts_pg_index,
2550                                                                                            Desc_pg_index,
2551                                                                                            false);
2552
2553         return pgindexdesc;
2554 }
2555
2556 static void
2557 AttrDefaultFetch(Relation relation)
2558 {
2559         AttrDefault *attrdef = relation->rd_att->constr->defval;
2560         int                     ndef = relation->rd_att->constr->num_defval;
2561         Relation        adrel;
2562         SysScanDesc adscan;
2563         ScanKeyData skey;
2564         HeapTuple       htup;
2565         Datum           val;
2566         bool            isnull;
2567         int                     found;
2568         int                     i;
2569
2570         ScanKeyInit(&skey,
2571                                 Anum_pg_attrdef_adrelid,
2572                                 BTEqualStrategyNumber, F_OIDEQ,
2573                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2574
2575         adrel = heap_open(AttrDefaultRelationId, AccessShareLock);
2576         adscan = systable_beginscan(adrel, AttrDefaultIndexId, true,
2577                                                                 SnapshotNow, 1, &skey);
2578         found = 0;
2579
2580         while (HeapTupleIsValid(htup = systable_getnext(adscan)))
2581         {
2582                 Form_pg_attrdef adform = (Form_pg_attrdef) GETSTRUCT(htup);
2583
2584                 for (i = 0; i < ndef; i++)
2585                 {
2586                         if (adform->adnum != attrdef[i].adnum)
2587                                 continue;
2588                         if (attrdef[i].adbin != NULL)
2589                                 elog(WARNING, "multiple attrdef records found for attr %s of rel %s",
2590                                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2591                                          RelationGetRelationName(relation));
2592                         else
2593                                 found++;
2594
2595                         val = fastgetattr(htup,
2596                                                           Anum_pg_attrdef_adbin,
2597                                                           adrel->rd_att, &isnull);
2598                         if (isnull)
2599                                 elog(WARNING, "null adbin for attr %s of rel %s",
2600                                 NameStr(relation->rd_att->attrs[adform->adnum - 1]->attname),
2601                                          RelationGetRelationName(relation));
2602                         else
2603                                 attrdef[i].adbin = MemoryContextStrdup(CacheMemoryContext,
2604                                                                  DatumGetCString(DirectFunctionCall1(textout,
2605                                                                                                                                          val)));
2606                         break;
2607                 }
2608
2609                 if (i >= ndef)
2610                         elog(WARNING, "unexpected attrdef record found for attr %d of rel %s",
2611                                  adform->adnum, RelationGetRelationName(relation));
2612         }
2613
2614         systable_endscan(adscan);
2615         heap_close(adrel, AccessShareLock);
2616
2617         if (found != ndef)
2618                 elog(WARNING, "%d attrdef record(s) missing for rel %s",
2619                          ndef - found, RelationGetRelationName(relation));
2620 }
2621
2622 static void
2623 CheckConstraintFetch(Relation relation)
2624 {
2625         ConstrCheck *check = relation->rd_att->constr->check;
2626         int                     ncheck = relation->rd_att->constr->num_check;
2627         Relation        conrel;
2628         SysScanDesc conscan;
2629         ScanKeyData skey[1];
2630         HeapTuple       htup;
2631         Datum           val;
2632         bool            isnull;
2633         int                     found = 0;
2634
2635         ScanKeyInit(&skey[0],
2636                                 Anum_pg_constraint_conrelid,
2637                                 BTEqualStrategyNumber, F_OIDEQ,
2638                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2639
2640         conrel = heap_open(ConstraintRelationId, AccessShareLock);
2641         conscan = systable_beginscan(conrel, ConstraintRelidIndexId, true,
2642                                                                  SnapshotNow, 1, skey);
2643
2644         while (HeapTupleIsValid(htup = systable_getnext(conscan)))
2645         {
2646                 Form_pg_constraint conform = (Form_pg_constraint) GETSTRUCT(htup);
2647
2648                 /* We want check constraints only */
2649                 if (conform->contype != CONSTRAINT_CHECK)
2650                         continue;
2651
2652                 if (found >= ncheck)
2653                         elog(ERROR, "unexpected constraint record found for rel %s",
2654                                  RelationGetRelationName(relation));
2655
2656                 check[found].ccname = MemoryContextStrdup(CacheMemoryContext,
2657                                                                                                   NameStr(conform->conname));
2658
2659                 /* Grab and test conbin is actually set */
2660                 val = fastgetattr(htup,
2661                                                   Anum_pg_constraint_conbin,
2662                                                   conrel->rd_att, &isnull);
2663                 if (isnull)
2664                         elog(ERROR, "null conbin for rel %s",
2665                                  RelationGetRelationName(relation));
2666
2667                 check[found].ccbin = MemoryContextStrdup(CacheMemoryContext,
2668                                                                  DatumGetCString(DirectFunctionCall1(textout,
2669                                                                                                                                          val)));
2670                 found++;
2671         }
2672
2673         systable_endscan(conscan);
2674         heap_close(conrel, AccessShareLock);
2675
2676         if (found != ncheck)
2677                 elog(ERROR, "%d constraint record(s) missing for rel %s",
2678                          ncheck - found, RelationGetRelationName(relation));
2679 }
2680
2681 /*
2682  * RelationGetIndexList -- get a list of OIDs of indexes on this relation
2683  *
2684  * The index list is created only if someone requests it.  We scan pg_index
2685  * to find relevant indexes, and add the list to the relcache entry so that
2686  * we won't have to compute it again.  Note that shared cache inval of a
2687  * relcache entry will delete the old list and set rd_indexvalid to 0,
2688  * so that we must recompute the index list on next request.  This handles
2689  * creation or deletion of an index.
2690  *
2691  * The returned list is guaranteed to be sorted in order by OID.  This is
2692  * needed by the executor, since for index types that we obtain exclusive
2693  * locks on when updating the index, all backends must lock the indexes in
2694  * the same order or we will get deadlocks (see ExecOpenIndices()).  Any
2695  * consistent ordering would do, but ordering by OID is easy.
2696  *
2697  * Since shared cache inval causes the relcache's copy of the list to go away,
2698  * we return a copy of the list palloc'd in the caller's context.  The caller
2699  * may list_free() the returned list after scanning it. This is necessary
2700  * since the caller will typically be doing syscache lookups on the relevant
2701  * indexes, and syscache lookup could cause SI messages to be processed!
2702  *
2703  * We also update rd_oidindex, which this module treats as effectively part
2704  * of the index list.  rd_oidindex is valid when rd_indexvalid isn't zero;
2705  * it is the pg_class OID of a unique index on OID when the relation has one,
2706  * and InvalidOid if there is no such index.
2707  */
2708 List *
2709 RelationGetIndexList(Relation relation)
2710 {
2711         Relation        indrel;
2712         SysScanDesc indscan;
2713         ScanKeyData skey;
2714         HeapTuple       htup;
2715         List       *result;
2716         Oid                     oidIndex;
2717         MemoryContext oldcxt;
2718
2719         /* Quick exit if we already computed the list. */
2720         if (relation->rd_indexvalid != 0)
2721                 return list_copy(relation->rd_indexlist);
2722
2723         /*
2724          * We build the list we intend to return (in the caller's context) while
2725          * doing the scan.      After successfully completing the scan, we copy that
2726          * list into the relcache entry.  This avoids cache-context memory leakage
2727          * if we get some sort of error partway through.
2728          */
2729         result = NIL;
2730         oidIndex = InvalidOid;
2731
2732         /* Prepare to scan pg_index for entries having indrelid = this rel. */
2733         ScanKeyInit(&skey,
2734                                 Anum_pg_index_indrelid,
2735                                 BTEqualStrategyNumber, F_OIDEQ,
2736                                 ObjectIdGetDatum(RelationGetRelid(relation)));
2737
2738         indrel = heap_open(IndexRelationId, AccessShareLock);
2739         indscan = systable_beginscan(indrel, IndexIndrelidIndexId, true,
2740                                                                  SnapshotNow, 1, &skey);
2741
2742         while (HeapTupleIsValid(htup = systable_getnext(indscan)))
2743         {
2744                 Form_pg_index index = (Form_pg_index) GETSTRUCT(htup);
2745
2746                 /* Add index's OID to result list in the proper order */
2747                 result = insert_ordered_oid(result, index->indexrelid);
2748
2749                 /* Check to see if it is a unique, non-partial btree index on OID */
2750                 if (index->indnatts == 1 &&
2751                         index->indisunique &&
2752                         index->indkey.values[0] == ObjectIdAttributeNumber &&
2753                         index->indclass.values[0] == OID_BTREE_OPS_OID &&
2754                         heap_attisnull(htup, Anum_pg_index_indpred))
2755                         oidIndex = index->indexrelid;
2756         }
2757
2758         systable_endscan(indscan);
2759         heap_close(indrel, AccessShareLock);
2760
2761         /* Now save a copy of the completed list in the relcache entry. */
2762         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2763         relation->rd_indexlist = list_copy(result);
2764         relation->rd_oidindex = oidIndex;
2765         relation->rd_indexvalid = 1;
2766         MemoryContextSwitchTo(oldcxt);
2767
2768         return result;
2769 }
2770
2771 /*
2772  * insert_ordered_oid
2773  *              Insert a new Oid into a sorted list of Oids, preserving ordering
2774  *
2775  * Building the ordered list this way is O(N^2), but with a pretty small
2776  * constant, so for the number of entries we expect it will probably be
2777  * faster than trying to apply qsort().  Most tables don't have very many
2778  * indexes...
2779  */
2780 static List *
2781 insert_ordered_oid(List *list, Oid datum)
2782 {
2783         ListCell   *prev;
2784
2785         /* Does the datum belong at the front? */
2786         if (list == NIL || datum < linitial_oid(list))
2787                 return lcons_oid(datum, list);
2788         /* No, so find the entry it belongs after */
2789         prev = list_head(list);
2790         for (;;)
2791         {
2792                 ListCell   *curr = lnext(prev);
2793
2794                 if (curr == NULL || datum < lfirst_oid(curr))
2795                         break;                          /* it belongs after 'prev', before 'curr' */
2796
2797                 prev = curr;
2798         }
2799         /* Insert datum into list after 'prev' */
2800         lappend_cell_oid(list, prev, datum);
2801         return list;
2802 }
2803
2804 /*
2805  * RelationSetIndexList -- externally force the index list contents
2806  *
2807  * This is used to temporarily override what we think the set of valid
2808  * indexes is (including the presence or absence of an OID index).
2809  * The forcing will be valid only until transaction commit or abort.
2810  *
2811  * This should only be applied to nailed relations, because in a non-nailed
2812  * relation the hacked index list could be lost at any time due to SI
2813  * messages.  In practice it is only used on pg_class (see REINDEX).
2814  *
2815  * It is up to the caller to make sure the given list is correctly ordered.
2816  */
2817 void
2818 RelationSetIndexList(Relation relation, List *indexIds, Oid oidIndex)
2819 {
2820         MemoryContext oldcxt;
2821
2822         Assert(relation->rd_isnailed);
2823         /* Copy the list into the cache context (could fail for lack of mem) */
2824         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2825         indexIds = list_copy(indexIds);
2826         MemoryContextSwitchTo(oldcxt);
2827         /* Okay to replace old list */
2828         list_free(relation->rd_indexlist);
2829         relation->rd_indexlist = indexIds;
2830         relation->rd_oidindex = oidIndex;
2831         relation->rd_indexvalid = 2;    /* mark list as forced */
2832         /* must flag that we have a forced index list */
2833         need_eoxact_work = true;
2834 }
2835
2836 /*
2837  * RelationGetOidIndex -- get the pg_class OID of the relation's OID index
2838  *
2839  * Returns InvalidOid if there is no such index.
2840  */
2841 Oid
2842 RelationGetOidIndex(Relation relation)
2843 {
2844         List       *ilist;
2845
2846         /*
2847          * If relation doesn't have OIDs at all, caller is probably confused. (We
2848          * could just silently return InvalidOid, but it seems better to throw an
2849          * assertion.)
2850          */
2851         Assert(relation->rd_rel->relhasoids);
2852
2853         if (relation->rd_indexvalid == 0)
2854         {
2855                 /* RelationGetIndexList does the heavy lifting. */
2856                 ilist = RelationGetIndexList(relation);
2857                 list_free(ilist);
2858                 Assert(relation->rd_indexvalid != 0);
2859         }
2860
2861         return relation->rd_oidindex;
2862 }
2863
2864 /*
2865  * RelationGetIndexExpressions -- get the index expressions for an index
2866  *
2867  * We cache the result of transforming pg_index.indexprs into a node tree.
2868  * If the rel is not an index or has no expressional columns, we return NIL.
2869  * Otherwise, the returned tree is copied into the caller's memory context.
2870  * (We don't want to return a pointer to the relcache copy, since it could
2871  * disappear due to relcache invalidation.)
2872  */
2873 List *
2874 RelationGetIndexExpressions(Relation relation)
2875 {
2876         List       *result;
2877         Datum           exprsDatum;
2878         bool            isnull;
2879         char       *exprsString;
2880         MemoryContext oldcxt;
2881
2882         /* Quick exit if we already computed the result. */
2883         if (relation->rd_indexprs)
2884                 return (List *) copyObject(relation->rd_indexprs);
2885
2886         /* Quick exit if there is nothing to do. */
2887         if (relation->rd_indextuple == NULL ||
2888                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indexprs))
2889                 return NIL;
2890
2891         /*
2892          * We build the tree we intend to return in the caller's context. After
2893          * successfully completing the work, we copy it into the relcache entry.
2894          * This avoids problems if we get some sort of error partway through.
2895          */
2896         exprsDatum = heap_getattr(relation->rd_indextuple,
2897                                                           Anum_pg_index_indexprs,
2898                                                           GetPgIndexDescriptor(),
2899                                                           &isnull);
2900         Assert(!isnull);
2901         exprsString = DatumGetCString(DirectFunctionCall1(textout, exprsDatum));
2902         result = (List *) stringToNode(exprsString);
2903         pfree(exprsString);
2904
2905         /*
2906          * Run the expressions through eval_const_expressions. This is not just an
2907          * optimization, but is necessary, because the planner will be comparing
2908          * them to similarly-processed qual clauses, and may fail to detect valid
2909          * matches without this.  We don't bother with canonicalize_qual, however.
2910          */
2911         result = (List *) eval_const_expressions((Node *) result);
2912
2913         /*
2914          * Also mark any coercion format fields as "don't care", so that the
2915          * planner can match to both explicit and implicit coercions.
2916          */
2917         set_coercionform_dontcare((Node *) result);
2918
2919         /* May as well fix opfuncids too */
2920         fix_opfuncids((Node *) result);
2921
2922         /* Now save a copy of the completed tree in the relcache entry. */
2923         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2924         relation->rd_indexprs = (List *) copyObject(result);
2925         MemoryContextSwitchTo(oldcxt);
2926
2927         return result;
2928 }
2929
2930 /*
2931  * RelationGetIndexPredicate -- get the index predicate for an index
2932  *
2933  * We cache the result of transforming pg_index.indpred into an implicit-AND
2934  * node tree (suitable for ExecQual).
2935  * If the rel is not an index or has no predicate, we return NIL.
2936  * Otherwise, the returned tree is copied into the caller's memory context.
2937  * (We don't want to return a pointer to the relcache copy, since it could
2938  * disappear due to relcache invalidation.)
2939  */
2940 List *
2941 RelationGetIndexPredicate(Relation relation)
2942 {
2943         List       *result;
2944         Datum           predDatum;
2945         bool            isnull;
2946         char       *predString;
2947         MemoryContext oldcxt;
2948
2949         /* Quick exit if we already computed the result. */
2950         if (relation->rd_indpred)
2951                 return (List *) copyObject(relation->rd_indpred);
2952
2953         /* Quick exit if there is nothing to do. */
2954         if (relation->rd_indextuple == NULL ||
2955                 heap_attisnull(relation->rd_indextuple, Anum_pg_index_indpred))
2956                 return NIL;
2957
2958         /*
2959          * We build the tree we intend to return in the caller's context. After
2960          * successfully completing the work, we copy it into the relcache entry.
2961          * This avoids problems if we get some sort of error partway through.
2962          */
2963         predDatum = heap_getattr(relation->rd_indextuple,
2964                                                          Anum_pg_index_indpred,
2965                                                          GetPgIndexDescriptor(),
2966                                                          &isnull);
2967         Assert(!isnull);
2968         predString = DatumGetCString(DirectFunctionCall1(textout, predDatum));
2969         result = (List *) stringToNode(predString);
2970         pfree(predString);
2971
2972         /*
2973          * Run the expression through const-simplification and canonicalization.
2974          * This is not just an optimization, but is necessary, because the planner
2975          * will be comparing it to similarly-processed qual clauses, and may fail
2976          * to detect valid matches without this.  This must match the processing
2977          * done to qual clauses in preprocess_expression()!  (We can skip the
2978          * stuff involving subqueries, however, since we don't allow any in index
2979          * predicates.)
2980          */
2981         result = (List *) eval_const_expressions((Node *) result);
2982
2983         result = (List *) canonicalize_qual((Expr *) result);
2984
2985         /*
2986          * Also mark any coercion format fields as "don't care", so that the
2987          * planner can match to both explicit and implicit coercions.
2988          */
2989         set_coercionform_dontcare((Node *) result);
2990
2991         /* Also convert to implicit-AND format */
2992         result = make_ands_implicit((Expr *) result);
2993
2994         /* May as well fix opfuncids too */
2995         fix_opfuncids((Node *) result);
2996
2997         /* Now save a copy of the completed tree in the relcache entry. */
2998         oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
2999         relation->rd_indpred = (List *) copyObject(result);
3000         MemoryContextSwitchTo(oldcxt);
3001
3002         return result;
3003 }
3004
3005
3006 /*
3007  *      load_relcache_init_file, write_relcache_init_file
3008  *
3009  *              In late 1992, we started regularly having databases with more than
3010  *              a thousand classes in them.  With this number of classes, it became
3011  *              critical to do indexed lookups on the system catalogs.
3012  *
3013  *              Bootstrapping these lookups is very hard.  We want to be able to
3014  *              use an index on pg_attribute, for example, but in order to do so,
3015  *              we must have read pg_attribute for the attributes in the index,
3016  *              which implies that we need to use the index.
3017  *
3018  *              In order to get around the problem, we do the following:
3019  *
3020  *                 +  When the database system is initialized (at initdb time), we
3021  *                        don't use indexes.  We do sequential scans.
3022  *
3023  *                 +  When the backend is started up in normal mode, we load an image
3024  *                        of the appropriate relation descriptors, in internal format,
3025  *                        from an initialization file in the data/base/... directory.
3026  *
3027  *                 +  If the initialization file isn't there, then we create the
3028  *                        relation descriptors using sequential scans and write 'em to
3029  *                        the initialization file for use by subsequent backends.
3030  *
3031  *              We could dispense with the initialization file and just build the
3032  *              critical reldescs the hard way on every backend startup, but that
3033  *              slows down backend startup noticeably.
3034  *
3035  *              We can in fact go further, and save more relcache entries than
3036  *              just the ones that are absolutely critical; this allows us to speed
3037  *              up backend startup by not having to build such entries the hard way.
3038  *              Presently, all the catalog and index entries that are referred to
3039  *              by catcaches are stored in the initialization file.
3040  *
3041  *              The same mechanism that detects when catcache and relcache entries
3042  *              need to be invalidated (due to catalog updates) also arranges to
3043  *              unlink the initialization file when its contents may be out of date.
3044  *              The file will then be rebuilt during the next backend startup.
3045  */
3046
3047 /*
3048  * load_relcache_init_file -- attempt to load cache from the init file
3049  *
3050  * If successful, return TRUE and set criticalRelcachesBuilt to true.
3051  * If not successful, return FALSE.
3052  *
3053  * NOTE: we assume we are already switched into CacheMemoryContext.
3054  */
3055 static bool
3056 load_relcache_init_file(void)
3057 {
3058         FILE       *fp;
3059         char            initfilename[MAXPGPATH];
3060         Relation   *rels;
3061         int                     relno,
3062                                 num_rels,
3063                                 max_rels,
3064                                 nailed_rels,
3065                                 nailed_indexes,
3066                                 magic;
3067         int                     i;
3068
3069         snprintf(initfilename, sizeof(initfilename), "%s/%s",
3070                          DatabasePath, RELCACHE_INIT_FILENAME);
3071
3072         fp = AllocateFile(initfilename, PG_BINARY_R);
3073         if (fp == NULL)
3074                 return false;
3075
3076         /*
3077          * Read the index relcache entries from the file.  Note we will not enter
3078          * any of them into the cache if the read fails partway through; this
3079          * helps to guard against broken init files.
3080          */
3081         max_rels = 100;
3082         rels = (Relation *) palloc(max_rels * sizeof(Relation));
3083         num_rels = 0;
3084         nailed_rels = nailed_indexes = 0;
3085         initFileRelationIds = NIL;
3086
3087         /* check for correct magic number (compatible version) */
3088         if (fread(&magic, 1, sizeof(magic), fp) != sizeof(magic))
3089                 goto read_failed;
3090         if (magic != RELCACHE_INIT_FILEMAGIC)
3091                 goto read_failed;
3092
3093         for (relno = 0;; relno++)
3094         {
3095                 Size            len;
3096                 size_t          nread;
3097                 Relation        rel;
3098                 Form_pg_class relform;
3099                 bool            has_not_null;
3100                 Datum           indclassDatum;
3101                 bool            isnull;
3102
3103                 /* first read the relation descriptor length */
3104                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3105                 {
3106                         if (nread == 0)
3107                                 break;                  /* end of file */
3108                         goto read_failed;
3109                 }
3110
3111                 /* safety check for incompatible relcache layout */
3112                 if (len != sizeof(RelationData))
3113                         goto read_failed;
3114
3115                 /* allocate another relcache header */
3116                 if (num_rels >= max_rels)
3117                 {
3118                         max_rels *= 2;
3119                         rels = (Relation *) repalloc(rels, max_rels * sizeof(Relation));
3120                 }
3121
3122                 rel = rels[num_rels++] = (Relation) palloc(len);
3123
3124                 /* then, read the Relation structure */
3125                 if ((nread = fread(rel, 1, len, fp)) != len)
3126                         goto read_failed;
3127
3128                 /* next read the relation tuple form */
3129                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3130                         goto read_failed;
3131
3132                 relform = (Form_pg_class) palloc(len);
3133                 if ((nread = fread(relform, 1, len, fp)) != len)
3134                         goto read_failed;
3135
3136                 rel->rd_rel = relform;
3137
3138                 /* initialize attribute tuple forms */
3139                 rel->rd_att = CreateTemplateTupleDesc(relform->relnatts,
3140                                                                                           relform->relhasoids);
3141                 rel->rd_att->tdrefcount = 1;    /* mark as refcounted */
3142
3143                 rel->rd_att->tdtypeid = relform->reltype;
3144                 rel->rd_att->tdtypmod = -1;             /* unnecessary, but... */
3145
3146                 /* next read all the attribute tuple form data entries */
3147                 has_not_null = false;
3148                 for (i = 0; i < relform->relnatts; i++)
3149                 {
3150                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3151                                 goto read_failed;
3152                         if (len != ATTRIBUTE_TUPLE_SIZE)
3153                                 goto read_failed;
3154                         if ((nread = fread(rel->rd_att->attrs[i], 1, len, fp)) != len)
3155                                 goto read_failed;
3156
3157                         has_not_null |= rel->rd_att->attrs[i]->attnotnull;
3158                 }
3159
3160                 /* next read the access method specific field */
3161                 if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3162                         goto read_failed;
3163                 if (len > 0)
3164                 {
3165                         rel->rd_options = palloc(len);
3166                         if ((nread = fread(rel->rd_options, 1, len, fp)) != len)
3167                                 goto read_failed;
3168                         if (len != VARATT_SIZE(rel->rd_options))
3169                                 goto read_failed;                               /* sanity check */
3170                 }
3171                 else
3172                 {
3173                         rel->rd_options = NULL;
3174                 }
3175
3176                 /* mark not-null status */
3177                 if (has_not_null)
3178                 {
3179                         TupleConstr *constr = (TupleConstr *) palloc0(sizeof(TupleConstr));
3180
3181                         constr->has_not_null = true;
3182                         rel->rd_att->constr = constr;
3183                 }
3184
3185                 /* If it's an index, there's more to do */
3186                 if (rel->rd_rel->relkind == RELKIND_INDEX)
3187                 {
3188                         Form_pg_am      am;
3189                         MemoryContext indexcxt;
3190                         Oid                *operator;
3191                         RegProcedure *support;
3192                         int                     nsupport;
3193
3194                         /* Count nailed indexes to ensure we have 'em all */
3195                         if (rel->rd_isnailed)
3196                                 nailed_indexes++;
3197
3198                         /* next, read the pg_index tuple */
3199                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3200                                 goto read_failed;
3201
3202                         rel->rd_indextuple = (HeapTuple) palloc(len);
3203                         if ((nread = fread(rel->rd_indextuple, 1, len, fp)) != len)
3204                                 goto read_failed;
3205
3206                         /* Fix up internal pointers in the tuple -- see heap_copytuple */
3207                         rel->rd_indextuple->t_data = (HeapTupleHeader) ((char *) rel->rd_indextuple + HEAPTUPLESIZE);
3208                         rel->rd_index = (Form_pg_index) GETSTRUCT(rel->rd_indextuple);
3209
3210                         /* fix up indclass pointer too */
3211                         indclassDatum = fastgetattr(rel->rd_indextuple,
3212                                                                                 Anum_pg_index_indclass,
3213                                                                                 GetPgIndexDescriptor(),
3214                                                                                 &isnull);
3215                         Assert(!isnull);
3216                         rel->rd_indclass = (oidvector *) DatumGetPointer(indclassDatum);
3217
3218                         /* next, read the access method tuple form */
3219                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3220                                 goto read_failed;
3221
3222                         am = (Form_pg_am) palloc(len);
3223                         if ((nread = fread(am, 1, len, fp)) != len)
3224                                 goto read_failed;
3225                         rel->rd_am = am;
3226
3227                         /*
3228                          * prepare index info context --- parameters should match
3229                          * RelationInitIndexAccessInfo
3230                          */
3231                         indexcxt = AllocSetContextCreate(CacheMemoryContext,
3232                                                                                          RelationGetRelationName(rel),
3233                                                                                          ALLOCSET_SMALL_MINSIZE,
3234                                                                                          ALLOCSET_SMALL_INITSIZE,
3235                                                                                          ALLOCSET_SMALL_MAXSIZE);
3236                         rel->rd_indexcxt = indexcxt;
3237
3238                         /* next, read the vector of operator OIDs */
3239                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3240                                 goto read_failed;
3241
3242                         operator = (Oid *) MemoryContextAlloc(indexcxt, len);
3243                         if ((nread = fread(operator, 1, len, fp)) != len)
3244                                 goto read_failed;
3245
3246                         rel->rd_operator = operator;
3247
3248                         /* finally, read the vector of support procedures */
3249                         if ((nread = fread(&len, 1, sizeof(len), fp)) != sizeof(len))
3250                                 goto read_failed;
3251                         support = (RegProcedure *) MemoryContextAlloc(indexcxt, len);
3252                         if ((nread = fread(support, 1, len, fp)) != len)
3253                                 goto read_failed;
3254
3255                         rel->rd_support = support;
3256
3257                         /* set up zeroed fmgr-info vectors */
3258                         rel->rd_aminfo = (RelationAmInfo *)
3259                                 MemoryContextAllocZero(indexcxt, sizeof(RelationAmInfo));
3260                         nsupport = relform->relnatts * am->amsupport;
3261                         rel->rd_supportinfo = (FmgrInfo *)
3262                                 MemoryContextAllocZero(indexcxt, nsupport * sizeof(FmgrInfo));
3263                 }
3264                 else
3265                 {
3266                         /* Count nailed rels to ensure we have 'em all */
3267                         if (rel->rd_isnailed)
3268                                 nailed_rels++;
3269
3270                         Assert(rel->rd_index == NULL);
3271                         Assert(rel->rd_indextuple == NULL);
3272                         Assert(rel->rd_indclass == NULL);
3273                         Assert(rel->rd_am == NULL);
3274                         Assert(rel->rd_indexcxt == NULL);
3275                         Assert(rel->rd_aminfo == NULL);
3276                         Assert(rel->rd_operator == NULL);
3277                         Assert(rel->rd_support == NULL);
3278                         Assert(rel->rd_supportinfo == NULL);
3279                 }
3280
3281                 /*
3282                  * Rules and triggers are not saved (mainly because the internal
3283                  * format is complex and subject to change).  They must be rebuilt if
3284                  * needed by RelationCacheInitializePhase2.  This is not expected to
3285                  * be a big performance hit since few system catalogs have such. Ditto
3286                  * for index expressions and predicates.
3287                  */
3288                 rel->rd_rules = NULL;
3289                 rel->rd_rulescxt = NULL;
3290                 rel->trigdesc = NULL;
3291                 rel->rd_indexprs = NIL;
3292                 rel->rd_indpred = NIL;
3293
3294                 /*
3295                  * Reset transient-state fields in the relcache entry
3296                  */
3297                 rel->rd_smgr = NULL;
3298                 rel->rd_targblock = InvalidBlockNumber;
3299                 if (rel->rd_isnailed)
3300                         rel->rd_refcnt = 1;
3301                 else
3302                         rel->rd_refcnt = 0;
3303                 rel->rd_indexvalid = 0;
3304                 rel->rd_indexlist = NIL;
3305                 rel->rd_oidindex = InvalidOid;
3306                 rel->rd_createSubid = InvalidSubTransactionId;
3307                 rel->rd_amcache = NULL;
3308                 MemSet(&rel->pgstat_info, 0, sizeof(rel->pgstat_info));
3309
3310                 /*
3311                  * Recompute lock and physical addressing info.  This is needed in
3312                  * case the pg_internal.init file was copied from some other database
3313                  * by CREATE DATABASE.
3314                  */
3315                 RelationInitLockInfo(rel);
3316                 RelationInitPhysicalAddr(rel);
3317         }
3318
3319         /*
3320          * We reached the end of the init file without apparent problem. Did we
3321          * get the right number of nailed items?  (This is a useful crosscheck in
3322          * case the set of critical rels or indexes changes.)
3323          */
3324         if (nailed_rels != NUM_CRITICAL_RELS ||
3325                 nailed_indexes != NUM_CRITICAL_INDEXES)
3326                 goto read_failed;
3327
3328         /*
3329          * OK, all appears well.
3330          *
3331          * Now insert all the new relcache entries into the cache.
3332          */
3333         for (relno = 0; relno < num_rels; relno++)
3334         {
3335                 RelationCacheInsert(rels[relno]);
3336                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3337                 initFileRelationIds = lcons_oid(RelationGetRelid(rels[relno]),
3338                                                                                 initFileRelationIds);
3339         }
3340
3341         pfree(rels);
3342         FreeFile(fp);
3343
3344         criticalRelcachesBuilt = true;
3345         return true;
3346
3347         /*
3348          * init file is broken, so do it the hard way.  We don't bother trying to
3349          * free the clutter we just allocated; it's not in the relcache so it
3350          * won't hurt.
3351          */
3352 read_failed:
3353         pfree(rels);
3354         FreeFile(fp);
3355
3356         return false;
3357 }
3358
3359 /*
3360  * Write out a new initialization file with the current contents
3361  * of the relcache.
3362  */
3363 static void
3364 write_relcache_init_file(void)
3365 {
3366         FILE       *fp;
3367         char            tempfilename[MAXPGPATH];
3368         char            finalfilename[MAXPGPATH];
3369         int                     magic;
3370         HASH_SEQ_STATUS status;
3371         RelIdCacheEnt *idhentry;
3372         MemoryContext oldcxt;
3373         int                     i;
3374
3375         /*
3376          * We must write a temporary file and rename it into place. Otherwise,
3377          * another backend starting at about the same time might crash trying to
3378          * read the partially-complete file.
3379          */
3380         snprintf(tempfilename, sizeof(tempfilename), "%s/%s.%d",
3381                          DatabasePath, RELCACHE_INIT_FILENAME, MyProcPid);
3382         snprintf(finalfilename, sizeof(finalfilename), "%s/%s",
3383                          DatabasePath, RELCACHE_INIT_FILENAME);
3384
3385         unlink(tempfilename);           /* in case it exists w/wrong permissions */
3386
3387         fp = AllocateFile(tempfilename, PG_BINARY_W);
3388         if (fp == NULL)
3389         {
3390                 /*
3391                  * We used to consider this a fatal error, but we might as well
3392                  * continue with backend startup ...
3393                  */
3394                 ereport(WARNING,
3395                                 (errcode_for_file_access(),
3396                                  errmsg("could not create relation-cache initialization file \"%s\": %m",
3397                                                 tempfilename),
3398                           errdetail("Continuing anyway, but there's something wrong.")));
3399                 return;
3400         }
3401
3402         /*
3403          * Write a magic number to serve as a file version identifier.  We can
3404          * change the magic number whenever the relcache layout changes.
3405          */
3406         magic = RELCACHE_INIT_FILEMAGIC;
3407         if (fwrite(&magic, 1, sizeof(magic), fp) != sizeof(magic))
3408                 elog(FATAL, "could not write init file");
3409
3410         /*
3411          * Write all the reldescs (in no particular order).
3412          */
3413         hash_seq_init(&status, RelationIdCache);
3414
3415         initFileRelationIds = NIL;
3416
3417         while ((idhentry = (RelIdCacheEnt *) hash_seq_search(&status)) != NULL)
3418         {
3419                 Relation        rel = idhentry->reldesc;
3420                 Form_pg_class relform = rel->rd_rel;
3421
3422                 /* first write the relcache entry proper */
3423                 write_item(rel, sizeof(RelationData), fp);
3424
3425                 /* next write the relation tuple form */
3426                 write_item(relform, CLASS_TUPLE_SIZE, fp);
3427
3428                 /* next, do all the attribute tuple form data entries */
3429                 for (i = 0; i < relform->relnatts; i++)
3430                 {
3431                         write_item(rel->rd_att->attrs[i], ATTRIBUTE_TUPLE_SIZE, fp);
3432                 }
3433
3434                 /* next, do the access method specific field */
3435                 write_item(rel->rd_options,
3436                                    (rel->rd_options ? VARATT_SIZE(rel->rd_options) : 0),
3437                                    fp);
3438
3439                 /* If it's an index, there's more to do */
3440                 if (rel->rd_rel->relkind == RELKIND_INDEX)
3441                 {
3442                         Form_pg_am      am = rel->rd_am;
3443
3444                         /* write the pg_index tuple */
3445                         /* we assume this was created by heap_copytuple! */
3446                         write_item(rel->rd_indextuple,
3447                                            HEAPTUPLESIZE + rel->rd_indextuple->t_len,
3448                                            fp);
3449
3450                         /* next, write the access method tuple form */
3451                         write_item(am, sizeof(FormData_pg_am), fp);
3452
3453                         /* next, write the vector of operator OIDs */
3454                         write_item(rel->rd_operator,
3455                                            relform->relnatts * (am->amstrategies * sizeof(Oid)),
3456                                            fp);
3457
3458                         /* finally, write the vector of support procedures */
3459                         write_item(rel->rd_support,
3460                                            relform->relnatts * (am->amsupport * sizeof(RegProcedure)),
3461                                            fp);
3462                 }
3463
3464                 /* also make a list of their OIDs, for RelationIdIsInInitFile */
3465                 oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
3466                 initFileRelationIds = lcons_oid(RelationGetRelid(rel),
3467                                                                                 initFileRelationIds);
3468                 MemoryContextSwitchTo(oldcxt);
3469         }
3470
3471         if (FreeFile(fp))
3472                 elog(FATAL, "could not write init file");
3473
3474         /*
3475          * Now we have to check whether the data we've so painstakingly
3476          * accumulated is already obsolete due to someone else's just-committed
3477          * catalog changes.  If so, we just delete the temp file and leave it to
3478          * the next backend to try again.  (Our own relcache entries will be
3479          * updated by SI message processing, but we can't be sure whether what we
3480          * wrote out was up-to-date.)
3481          *
3482          * This mustn't run concurrently with RelationCacheInitFileInvalidate, so
3483          * grab a serialization lock for the duration.
3484          */
3485         LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3486
3487         /* Make sure we have seen all incoming SI messages */
3488         AcceptInvalidationMessages();
3489
3490         /*
3491          * If we have received any SI relcache invals since backend start, assume
3492          * we may have written out-of-date data.
3493          */
3494         if (relcacheInvalsReceived == 0L)
3495         {
3496                 /*
3497                  * OK, rename the temp file to its final name, deleting any
3498                  * previously-existing init file.
3499                  *
3500                  * Note: a failure here is possible under Cygwin, if some other
3501                  * backend is holding open an unlinked-but-not-yet-gone init file. So
3502                  * treat this as a noncritical failure; just remove the useless temp
3503                  * file on failure.
3504                  */
3505                 if (rename(tempfilename, finalfilename) < 0)
3506                         unlink(tempfilename);
3507         }
3508         else
3509         {
3510                 /* Delete the already-obsolete temp file */
3511                 unlink(tempfilename);
3512         }
3513
3514         LWLockRelease(RelCacheInitLock);
3515 }
3516
3517 /* write a chunk of data preceded by its length */
3518 static void
3519 write_item(const void *data, Size len, FILE *fp)
3520 {
3521         if (fwrite(&len, 1, sizeof(len), fp) != sizeof(len))
3522                 elog(FATAL, "could not write init file");
3523         if (fwrite(data, 1, len, fp) != len)
3524                 elog(FATAL, "could not write init file");
3525 }
3526
3527 /*
3528  * Detect whether a given relation (identified by OID) is one of the ones
3529  * we store in the init file.
3530  *
3531  * Note that we effectively assume that all backends running in a database
3532  * would choose to store the same set of relations in the init file;
3533  * otherwise there are cases where we'd fail to detect the need for an init
3534  * file invalidation.  This does not seem likely to be a problem in practice.
3535  */
3536 bool
3537 RelationIdIsInInitFile(Oid relationId)
3538 {
3539         return list_member_oid(initFileRelationIds, relationId);
3540 }
3541
3542 /*
3543  * Invalidate (remove) the init file during commit of a transaction that
3544  * changed one or more of the relation cache entries that are kept in the
3545  * init file.
3546  *
3547  * We actually need to remove the init file twice: once just before sending
3548  * the SI messages that include relcache inval for such relations, and once
3549  * just after sending them.  The unlink before ensures that a backend that's
3550  * currently starting cannot read the now-obsolete init file and then miss
3551  * the SI messages that will force it to update its relcache entries.  (This
3552  * works because the backend startup sequence gets into the PGPROC array before
3553  * trying to load the init file.)  The unlink after is to synchronize with a
3554  * backend that may currently be trying to write an init file based on data
3555  * that we've just rendered invalid.  Such a backend will see the SI messages,
3556  * but we can't leave the init file sitting around to fool later backends.
3557  *
3558  * Ignore any failure to unlink the file, since it might not be there if
3559  * no backend has been started since the last removal.
3560  */
3561 void
3562 RelationCacheInitFileInvalidate(bool beforeSend)
3563 {
3564         char            initfilename[MAXPGPATH];
3565
3566         snprintf(initfilename, sizeof(initfilename), "%s/%s",
3567                          DatabasePath, RELCACHE_INIT_FILENAME);
3568
3569         if (beforeSend)
3570         {
3571                 /* no interlock needed here */
3572                 unlink(initfilename);
3573         }
3574         else
3575         {
3576                 /*
3577                  * We need to interlock this against write_relcache_init_file, to
3578                  * guard against possibility that someone renames a new-but-
3579                  * already-obsolete init file into place just after we unlink. With
3580                  * the interlock, it's certain that write_relcache_init_file will
3581                  * notice our SI inval message before renaming into place, or else
3582                  * that we will execute second and successfully unlink the file.
3583                  */
3584                 LWLockAcquire(RelCacheInitLock, LW_EXCLUSIVE);
3585                 unlink(initfilename);
3586                 LWLockRelease(RelCacheInitLock);
3587         }
3588 }