OSDN Git Service

Fix typo
[pg-rex/syncrep.git] / src / backend / catalog / heap.c
1 /*-------------------------------------------------------------------------
2  *
3  * heap.c
4  *        code to create and destroy POSTGRES heap relations
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/catalog/heap.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *              heap_create()                   - Create an uncataloged heap relation
16  *              heap_create_with_catalog() - Create a cataloged relation
17  *              heap_drop_with_catalog() - Removes named relation from catalogs
18  *
19  * NOTES
20  *        this code taken from access/heap/create.c, which contains
21  *        the old heap_create_with_catalog, amcreate, and amdestroy.
22  *        those routines will soon call these routines using the function
23  *        manager,
24  *        just like the poorly named "NewXXX" routines do.      The
25  *        "New" routines are all going to die soon, once and for all!
26  *              -cim 1/13/91
27  *
28  *-------------------------------------------------------------------------
29  */
30 #include "postgres.h"
31
32 #include "access/genam.h"
33 #include "access/heapam.h"
34 #include "access/sysattr.h"
35 #include "access/transam.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "catalog/dependency.h"
39 #include "catalog/heap.h"
40 #include "catalog/index.h"
41 #include "catalog/indexing.h"
42 #include "catalog/namespace.h"
43 #include "catalog/objectaccess.h"
44 #include "catalog/pg_attrdef.h"
45 #include "catalog/pg_collation.h"
46 #include "catalog/pg_constraint.h"
47 #include "catalog/pg_foreign_table.h"
48 #include "catalog/pg_inherits.h"
49 #include "catalog/pg_namespace.h"
50 #include "catalog/pg_statistic.h"
51 #include "catalog/pg_tablespace.h"
52 #include "catalog/pg_type.h"
53 #include "catalog/pg_type_fn.h"
54 #include "catalog/storage.h"
55 #include "commands/tablecmds.h"
56 #include "commands/typecmds.h"
57 #include "miscadmin.h"
58 #include "nodes/nodeFuncs.h"
59 #include "optimizer/var.h"
60 #include "parser/parse_coerce.h"
61 #include "parser/parse_collate.h"
62 #include "parser/parse_expr.h"
63 #include "parser/parse_relation.h"
64 #include "storage/bufmgr.h"
65 #include "storage/freespace.h"
66 #include "storage/predicate.h"
67 #include "storage/smgr.h"
68 #include "utils/acl.h"
69 #include "utils/builtins.h"
70 #include "utils/fmgroids.h"
71 #include "utils/inval.h"
72 #include "utils/lsyscache.h"
73 #include "utils/relcache.h"
74 #include "utils/snapmgr.h"
75 #include "utils/syscache.h"
76 #include "utils/tqual.h"
77
78
79 /* Potentially set by contrib/pg_upgrade_support functions */
80 Oid                     binary_upgrade_next_heap_pg_class_oid = InvalidOid;
81 Oid                     binary_upgrade_next_toast_pg_class_oid = InvalidOid;
82
83 static void AddNewRelationTuple(Relation pg_class_desc,
84                                         Relation new_rel_desc,
85                                         Oid new_rel_oid,
86                                         Oid new_type_oid,
87                                         Oid reloftype,
88                                         Oid relowner,
89                                         char relkind,
90                                         Datum relacl,
91                                         Datum reloptions);
92 static Oid AddNewRelationType(const char *typeName,
93                                    Oid typeNamespace,
94                                    Oid new_rel_oid,
95                                    char new_rel_kind,
96                                    Oid ownerid,
97                                    Oid new_row_type,
98                                    Oid new_array_type);
99 static void RelationRemoveInheritance(Oid relid);
100 static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
101                           bool is_local, int inhcount);
102 static void StoreConstraints(Relation rel, List *cooked_constraints);
103 static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
104                                                         bool allow_merge, bool is_local);
105 static void SetRelationNumChecks(Relation rel, int numchecks);
106 static Node *cookConstraint(ParseState *pstate,
107                            Node *raw_constraint,
108                            char *relname);
109 static List *insert_ordered_unique_oid(List *list, Oid datum);
110
111
112 /* ----------------------------------------------------------------
113  *                              XXX UGLY HARD CODED BADNESS FOLLOWS XXX
114  *
115  *              these should all be moved to someplace in the lib/catalog
116  *              module, if not obliterated first.
117  * ----------------------------------------------------------------
118  */
119
120
121 /*
122  * Note:
123  *              Should the system special case these attributes in the future?
124  *              Advantage:      consume much less space in the ATTRIBUTE relation.
125  *              Disadvantage:  special cases will be all over the place.
126  */
127
128 /*
129  * The initializers below do not include the attoptions or attacl fields,
130  * but that's OK - we're never going to reference anything beyond the
131  * fixed-size portion of the structure anyway.
132  */
133
134 static FormData_pg_attribute a1 = {
135         0, {"ctid"}, TIDOID, 0, sizeof(ItemPointerData),
136         SelfItemPointerAttributeNumber, 0, -1, -1,
137         false, 'p', 's', true, false, false, true, 0
138 };
139
140 static FormData_pg_attribute a2 = {
141         0, {"oid"}, OIDOID, 0, sizeof(Oid),
142         ObjectIdAttributeNumber, 0, -1, -1,
143         true, 'p', 'i', true, false, false, true, 0
144 };
145
146 static FormData_pg_attribute a3 = {
147         0, {"xmin"}, XIDOID, 0, sizeof(TransactionId),
148         MinTransactionIdAttributeNumber, 0, -1, -1,
149         true, 'p', 'i', true, false, false, true, 0
150 };
151
152 static FormData_pg_attribute a4 = {
153         0, {"cmin"}, CIDOID, 0, sizeof(CommandId),
154         MinCommandIdAttributeNumber, 0, -1, -1,
155         true, 'p', 'i', true, false, false, true, 0
156 };
157
158 static FormData_pg_attribute a5 = {
159         0, {"xmax"}, XIDOID, 0, sizeof(TransactionId),
160         MaxTransactionIdAttributeNumber, 0, -1, -1,
161         true, 'p', 'i', true, false, false, true, 0
162 };
163
164 static FormData_pg_attribute a6 = {
165         0, {"cmax"}, CIDOID, 0, sizeof(CommandId),
166         MaxCommandIdAttributeNumber, 0, -1, -1,
167         true, 'p', 'i', true, false, false, true, 0
168 };
169
170 /*
171  * We decided to call this attribute "tableoid" rather than say
172  * "classoid" on the basis that in the future there may be more than one
173  * table of a particular class/type. In any case table is still the word
174  * used in SQL.
175  */
176 static FormData_pg_attribute a7 = {
177         0, {"tableoid"}, OIDOID, 0, sizeof(Oid),
178         TableOidAttributeNumber, 0, -1, -1,
179         true, 'p', 'i', true, false, false, true, 0
180 };
181
182 static const Form_pg_attribute SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6, &a7};
183
184 /*
185  * This function returns a Form_pg_attribute pointer for a system attribute.
186  * Note that we elog if the presented attno is invalid, which would only
187  * happen if there's a problem upstream.
188  */
189 Form_pg_attribute
190 SystemAttributeDefinition(AttrNumber attno, bool relhasoids)
191 {
192         if (attno >= 0 || attno < -(int) lengthof(SysAtt))
193                 elog(ERROR, "invalid system attribute number %d", attno);
194         if (attno == ObjectIdAttributeNumber && !relhasoids)
195                 elog(ERROR, "invalid system attribute number %d", attno);
196         return SysAtt[-attno - 1];
197 }
198
199 /*
200  * If the given name is a system attribute name, return a Form_pg_attribute
201  * pointer for a prototype definition.  If not, return NULL.
202  */
203 Form_pg_attribute
204 SystemAttributeByName(const char *attname, bool relhasoids)
205 {
206         int                     j;
207
208         for (j = 0; j < (int) lengthof(SysAtt); j++)
209         {
210                 Form_pg_attribute att = SysAtt[j];
211
212                 if (relhasoids || att->attnum != ObjectIdAttributeNumber)
213                 {
214                         if (strcmp(NameStr(att->attname), attname) == 0)
215                                 return att;
216                 }
217         }
218
219         return NULL;
220 }
221
222
223 /* ----------------------------------------------------------------
224  *                              XXX END OF UGLY HARD CODED BADNESS XXX
225  * ---------------------------------------------------------------- */
226
227
228 /* ----------------------------------------------------------------
229  *              heap_create             - Create an uncataloged heap relation
230  *
231  *              Note API change: the caller must now always provide the OID
232  *              to use for the relation.
233  *
234  *              rel->rd_rel is initialized by RelationBuildLocalRelation,
235  *              and is mostly zeroes at return.
236  * ----------------------------------------------------------------
237  */
238 Relation
239 heap_create(const char *relname,
240                         Oid relnamespace,
241                         Oid reltablespace,
242                         Oid relid,
243                         TupleDesc tupDesc,
244                         char relkind,
245                         char relpersistence,
246                         bool shared_relation,
247                         bool mapped_relation,
248                         bool allow_system_table_mods)
249 {
250         bool            create_storage;
251         Relation        rel;
252
253         /* The caller must have provided an OID for the relation. */
254         Assert(OidIsValid(relid));
255
256         /*
257          * sanity checks
258          */
259         if (!allow_system_table_mods &&
260                 (IsSystemNamespace(relnamespace) || IsToastNamespace(relnamespace)) &&
261                 IsNormalProcessingMode())
262                 ereport(ERROR,
263                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
264                                  errmsg("permission denied to create \"%s.%s\"",
265                                                 get_namespace_name(relnamespace), relname),
266                 errdetail("System catalog modifications are currently disallowed.")));
267
268         /*
269          * Decide if we need storage or not, and handle a couple other special
270          * cases for particular relkinds.
271          */
272         switch (relkind)
273         {
274                 case RELKIND_VIEW:
275                 case RELKIND_COMPOSITE_TYPE:
276                 case RELKIND_FOREIGN_TABLE:
277                         create_storage = false;
278
279                         /*
280                          * Force reltablespace to zero if the relation has no physical
281                          * storage.  This is mainly just for cleanliness' sake.
282                          */
283                         reltablespace = InvalidOid;
284                         break;
285                 case RELKIND_SEQUENCE:
286                         create_storage = true;
287
288                         /*
289                          * Force reltablespace to zero for sequences, since we don't
290                          * support moving them around into different tablespaces.
291                          */
292                         reltablespace = InvalidOid;
293                         break;
294                 default:
295                         create_storage = true;
296                         break;
297         }
298
299         /*
300          * Never allow a pg_class entry to explicitly specify the database's
301          * default tablespace in reltablespace; force it to zero instead. This
302          * ensures that if the database is cloned with a different default
303          * tablespace, the pg_class entry will still match where CREATE DATABASE
304          * will put the physically copied relation.
305          *
306          * Yes, this is a bit of a hack.
307          */
308         if (reltablespace == MyDatabaseTableSpace)
309                 reltablespace = InvalidOid;
310
311         /*
312          * build the relcache entry.
313          */
314         rel = RelationBuildLocalRelation(relname,
315                                                                          relnamespace,
316                                                                          tupDesc,
317                                                                          relid,
318                                                                          reltablespace,
319                                                                          shared_relation,
320                                                                          mapped_relation,
321                                                                          relpersistence);
322
323         /*
324          * Have the storage manager create the relation's disk file, if needed.
325          *
326          * We only create the main fork here, other forks will be created on
327          * demand.
328          */
329         if (create_storage)
330         {
331                 RelationOpenSmgr(rel);
332                 RelationCreateStorage(rel->rd_node, relpersistence);
333         }
334
335         return rel;
336 }
337
338 /* ----------------------------------------------------------------
339  *              heap_create_with_catalog                - Create a cataloged relation
340  *
341  *              this is done in multiple steps:
342  *
343  *              1) CheckAttributeNamesTypes() is used to make certain the tuple
344  *                 descriptor contains a valid set of attribute names and types
345  *
346  *              2) pg_class is opened and get_relname_relid()
347  *                 performs a scan to ensure that no relation with the
348  *                 same name already exists.
349  *
350  *              3) heap_create() is called to create the new relation on disk.
351  *
352  *              4) TypeCreate() is called to define a new type corresponding
353  *                 to the new relation.
354  *
355  *              5) AddNewRelationTuple() is called to register the
356  *                 relation in pg_class.
357  *
358  *              6) AddNewAttributeTuples() is called to register the
359  *                 new relation's schema in pg_attribute.
360  *
361  *              7) StoreConstraints is called ()                - vadim 08/22/97
362  *
363  *              8) the relations are closed and the new relation's oid
364  *                 is returned.
365  *
366  * ----------------------------------------------------------------
367  */
368
369 /* --------------------------------
370  *              CheckAttributeNamesTypes
371  *
372  *              this is used to make certain the tuple descriptor contains a
373  *              valid set of attribute names and datatypes.  a problem simply
374  *              generates ereport(ERROR) which aborts the current transaction.
375  * --------------------------------
376  */
377 void
378 CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
379                                                  bool allow_system_table_mods)
380 {
381         int                     i;
382         int                     j;
383         int                     natts = tupdesc->natts;
384
385         /* Sanity check on column count */
386         if (natts < 0 || natts > MaxHeapAttributeNumber)
387                 ereport(ERROR,
388                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
389                                  errmsg("tables can have at most %d columns",
390                                                 MaxHeapAttributeNumber)));
391
392         /*
393          * first check for collision with system attribute names
394          *
395          * Skip this for a view or type relation, since those don't have system
396          * attributes.
397          */
398         if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
399         {
400                 for (i = 0; i < natts; i++)
401                 {
402                         if (SystemAttributeByName(NameStr(tupdesc->attrs[i]->attname),
403                                                                           tupdesc->tdhasoid) != NULL)
404                                 ereport(ERROR,
405                                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
406                                                  errmsg("column name \"%s\" conflicts with a system column name",
407                                                                 NameStr(tupdesc->attrs[i]->attname))));
408                 }
409         }
410
411         /*
412          * next check for repeated attribute names
413          */
414         for (i = 1; i < natts; i++)
415         {
416                 for (j = 0; j < i; j++)
417                 {
418                         if (strcmp(NameStr(tupdesc->attrs[j]->attname),
419                                            NameStr(tupdesc->attrs[i]->attname)) == 0)
420                                 ereport(ERROR,
421                                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
422                                                  errmsg("column name \"%s\" specified more than once",
423                                                                 NameStr(tupdesc->attrs[j]->attname))));
424                 }
425         }
426
427         /*
428          * next check the attribute types
429          */
430         for (i = 0; i < natts; i++)
431         {
432                 CheckAttributeType(NameStr(tupdesc->attrs[i]->attname),
433                                                    tupdesc->attrs[i]->atttypid,
434                                                    tupdesc->attrs[i]->attcollation,
435                                                    NIL, /* assume we're creating a new rowtype */
436                                                    allow_system_table_mods);
437         }
438 }
439
440 /* --------------------------------
441  *              CheckAttributeType
442  *
443  *              Verify that the proposed datatype of an attribute is legal.
444  *              This is needed mainly because there are types (and pseudo-types)
445  *              in the catalogs that we do not support as elements of real tuples.
446  *              We also check some other properties required of a table column.
447  *
448  * If the attribute is being proposed for addition to an existing table or
449  * composite type, pass a one-element list of the rowtype OID as
450  * containing_rowtypes.  When checking a to-be-created rowtype, it's
451  * sufficient to pass NIL, because there could not be any recursive reference
452  * to a not-yet-existing rowtype.
453  * --------------------------------
454  */
455 void
456 CheckAttributeType(const char *attname,
457                                    Oid atttypid, Oid attcollation,
458                                    List *containing_rowtypes,
459                                    bool allow_system_table_mods)
460 {
461         char            att_typtype = get_typtype(atttypid);
462         Oid                     att_typelem;
463
464         if (atttypid == UNKNOWNOID)
465         {
466                 /*
467                  * Warn user, but don't fail, if column to be created has UNKNOWN type
468                  * (usually as a result of a 'retrieve into' - jolly)
469                  */
470                 ereport(WARNING,
471                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
472                                  errmsg("column \"%s\" has type \"unknown\"", attname),
473                                  errdetail("Proceeding with relation creation anyway.")));
474         }
475         else if (att_typtype == TYPTYPE_PSEUDO)
476         {
477                 /*
478                  * Refuse any attempt to create a pseudo-type column, except for a
479                  * special hack for pg_statistic: allow ANYARRAY when modifying system
480                  * catalogs (this allows creating pg_statistic and cloning it during
481                  * VACUUM FULL)
482                  */
483                 if (atttypid != ANYARRAYOID || !allow_system_table_mods)
484                         ereport(ERROR,
485                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
486                                          errmsg("column \"%s\" has pseudo-type %s",
487                                                         attname, format_type_be(atttypid))));
488         }
489         else if (att_typtype == TYPTYPE_DOMAIN)
490         {
491                 /*
492                  * If it's a domain, recurse to check its base type.
493                  */
494                 CheckAttributeType(attname, getBaseType(atttypid), attcollation,
495                                                    containing_rowtypes,
496                                                    allow_system_table_mods);
497         }
498         else if (att_typtype == TYPTYPE_COMPOSITE)
499         {
500                 /*
501                  * For a composite type, recurse into its attributes.
502                  */
503                 Relation        relation;
504                 TupleDesc       tupdesc;
505                 int                     i;
506
507                 /*
508                  * Check for self-containment.  Eventually we might be able to allow
509                  * this (just return without complaint, if so) but it's not clear how
510                  * many other places would require anti-recursion defenses before it
511                  * would be safe to allow tables to contain their own rowtype.
512                  */
513                 if (list_member_oid(containing_rowtypes, atttypid))
514                         ereport(ERROR,
515                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
516                                 errmsg("composite type %s cannot be made a member of itself",
517                                            format_type_be(atttypid))));
518
519                 containing_rowtypes = lcons_oid(atttypid, containing_rowtypes);
520
521                 relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
522
523                 tupdesc = RelationGetDescr(relation);
524
525                 for (i = 0; i < tupdesc->natts; i++)
526                 {
527                         Form_pg_attribute attr = tupdesc->attrs[i];
528
529                         if (attr->attisdropped)
530                                 continue;
531                         CheckAttributeType(NameStr(attr->attname),
532                                                            attr->atttypid, attr->attcollation,
533                                                            containing_rowtypes,
534                                                            allow_system_table_mods);
535                 }
536
537                 relation_close(relation, AccessShareLock);
538
539                 containing_rowtypes = list_delete_first(containing_rowtypes);
540         }
541         else if (OidIsValid((att_typelem = get_element_type(atttypid))))
542         {
543                 /*
544                  * Must recurse into array types, too, in case they are composite.
545                  */
546                 CheckAttributeType(attname, att_typelem, attcollation,
547                                                    containing_rowtypes,
548                                                    allow_system_table_mods);
549         }
550
551         /*
552          * This might not be strictly invalid per SQL standard, but it is pretty
553          * useless, and it cannot be dumped, so we must disallow it.
554          */
555         if (!OidIsValid(attcollation) && type_is_collatable(atttypid))
556                 ereport(ERROR,
557                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
558                                  errmsg("no collation was derived for column \"%s\" with collatable type %s",
559                                                 attname, format_type_be(atttypid)),
560                 errhint("Use the COLLATE clause to set the collation explicitly.")));
561 }
562
563 /*
564  * InsertPgAttributeTuple
565  *              Construct and insert a new tuple in pg_attribute.
566  *
567  * Caller has already opened and locked pg_attribute.  new_attribute is the
568  * attribute to insert (but we ignore attacl and attoptions, which are always
569  * initialized to NULL).
570  *
571  * indstate is the index state for CatalogIndexInsert.  It can be passed as
572  * NULL, in which case we'll fetch the necessary info.  (Don't do this when
573  * inserting multiple attributes, because it's a tad more expensive.)
574  */
575 void
576 InsertPgAttributeTuple(Relation pg_attribute_rel,
577                                            Form_pg_attribute new_attribute,
578                                            CatalogIndexState indstate)
579 {
580         Datum           values[Natts_pg_attribute];
581         bool            nulls[Natts_pg_attribute];
582         HeapTuple       tup;
583
584         /* This is a tad tedious, but way cleaner than what we used to do... */
585         memset(values, 0, sizeof(values));
586         memset(nulls, false, sizeof(nulls));
587
588         values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid);
589         values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname);
590         values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid);
591         values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget);
592         values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen);
593         values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum);
594         values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims);
595         values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(new_attribute->attcacheoff);
596         values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod);
597         values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval);
598         values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage);
599         values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign);
600         values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull);
601         values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef);
602         values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
603         values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
604         values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);
605         values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation);
606
607         /* start out with empty permissions and empty options */
608         nulls[Anum_pg_attribute_attacl - 1] = true;
609         nulls[Anum_pg_attribute_attoptions - 1] = true;
610
611         tup = heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls);
612
613         /* finally insert the new tuple, update the indexes, and clean up */
614         simple_heap_insert(pg_attribute_rel, tup);
615
616         if (indstate != NULL)
617                 CatalogIndexInsert(indstate, tup);
618         else
619                 CatalogUpdateIndexes(pg_attribute_rel, tup);
620
621         heap_freetuple(tup);
622 }
623
624 /* --------------------------------
625  *              AddNewAttributeTuples
626  *
627  *              this registers the new relation's schema by adding
628  *              tuples to pg_attribute.
629  * --------------------------------
630  */
631 static void
632 AddNewAttributeTuples(Oid new_rel_oid,
633                                           TupleDesc tupdesc,
634                                           char relkind,
635                                           bool oidislocal,
636                                           int oidinhcount)
637 {
638         Form_pg_attribute attr;
639         int                     i;
640         Relation        rel;
641         CatalogIndexState indstate;
642         int                     natts = tupdesc->natts;
643         ObjectAddress myself,
644                                 referenced;
645
646         /*
647          * open pg_attribute and its indexes.
648          */
649         rel = heap_open(AttributeRelationId, RowExclusiveLock);
650
651         indstate = CatalogOpenIndexes(rel);
652
653         /*
654          * First we add the user attributes.  This is also a convenient place to
655          * add dependencies on their datatypes and collations.
656          */
657         for (i = 0; i < natts; i++)
658         {
659                 attr = tupdesc->attrs[i];
660                 /* Fill in the correct relation OID */
661                 attr->attrelid = new_rel_oid;
662                 /* Make sure these are OK, too */
663                 attr->attstattarget = -1;
664                 attr->attcacheoff = -1;
665
666                 InsertPgAttributeTuple(rel, attr, indstate);
667
668                 /* Add dependency info */
669                 myself.classId = RelationRelationId;
670                 myself.objectId = new_rel_oid;
671                 myself.objectSubId = i + 1;
672                 referenced.classId = TypeRelationId;
673                 referenced.objectId = attr->atttypid;
674                 referenced.objectSubId = 0;
675                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
676
677                 /* The default collation is pinned, so don't bother recording it */
678                 if (OidIsValid(attr->attcollation) &&
679                         attr->attcollation != DEFAULT_COLLATION_OID)
680                 {
681                         referenced.classId = CollationRelationId;
682                         referenced.objectId = attr->attcollation;
683                         referenced.objectSubId = 0;
684                         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
685                 }
686         }
687
688         /*
689          * Next we add the system attributes.  Skip OID if rel has no OIDs. Skip
690          * all for a view or type relation.  We don't bother with making datatype
691          * dependencies here, since presumably all these types are pinned.
692          */
693         if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
694         {
695                 for (i = 0; i < (int) lengthof(SysAtt); i++)
696                 {
697                         FormData_pg_attribute attStruct;
698
699                         /* skip OID where appropriate */
700                         if (!tupdesc->tdhasoid &&
701                                 SysAtt[i]->attnum == ObjectIdAttributeNumber)
702                                 continue;
703
704                         memcpy(&attStruct, (char *) SysAtt[i], sizeof(FormData_pg_attribute));
705
706                         /* Fill in the correct relation OID in the copied tuple */
707                         attStruct.attrelid = new_rel_oid;
708
709                         /* Fill in correct inheritance info for the OID column */
710                         if (attStruct.attnum == ObjectIdAttributeNumber)
711                         {
712                                 attStruct.attislocal = oidislocal;
713                                 attStruct.attinhcount = oidinhcount;
714                         }
715
716                         InsertPgAttributeTuple(rel, &attStruct, indstate);
717                 }
718         }
719
720         /*
721          * clean up
722          */
723         CatalogCloseIndexes(indstate);
724
725         heap_close(rel, RowExclusiveLock);
726 }
727
728 /* --------------------------------
729  *              InsertPgClassTuple
730  *
731  *              Construct and insert a new tuple in pg_class.
732  *
733  * Caller has already opened and locked pg_class.
734  * Tuple data is taken from new_rel_desc->rd_rel, except for the
735  * variable-width fields which are not present in a cached reldesc.
736  * relacl and reloptions are passed in Datum form (to avoid having
737  * to reference the data types in heap.h).      Pass (Datum) 0 to set them
738  * to NULL.
739  * --------------------------------
740  */
741 void
742 InsertPgClassTuple(Relation pg_class_desc,
743                                    Relation new_rel_desc,
744                                    Oid new_rel_oid,
745                                    Datum relacl,
746                                    Datum reloptions)
747 {
748         Form_pg_class rd_rel = new_rel_desc->rd_rel;
749         Datum           values[Natts_pg_class];
750         bool            nulls[Natts_pg_class];
751         HeapTuple       tup;
752
753         /* This is a tad tedious, but way cleaner than what we used to do... */
754         memset(values, 0, sizeof(values));
755         memset(nulls, false, sizeof(nulls));
756
757         values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
758         values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
759         values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
760         values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
761         values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
762         values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
763         values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
764         values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
765         values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
766         values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
767         values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
768         values[Anum_pg_class_reltoastidxid - 1] = ObjectIdGetDatum(rd_rel->reltoastidxid);
769         values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
770         values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
771         values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
772         values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
773         values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
774         values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
775         values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(rd_rel->relhasoids);
776         values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(rd_rel->relhaspkey);
777         values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
778         values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
779         values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
780         values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
781         if (relacl != (Datum) 0)
782                 values[Anum_pg_class_relacl - 1] = relacl;
783         else
784                 nulls[Anum_pg_class_relacl - 1] = true;
785         if (reloptions != (Datum) 0)
786                 values[Anum_pg_class_reloptions - 1] = reloptions;
787         else
788                 nulls[Anum_pg_class_reloptions - 1] = true;
789
790         tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
791
792         /*
793          * The new tuple must have the oid already chosen for the rel.  Sure would
794          * be embarrassing to do this sort of thing in polite company.
795          */
796         HeapTupleSetOid(tup, new_rel_oid);
797
798         /* finally insert the new tuple, update the indexes, and clean up */
799         simple_heap_insert(pg_class_desc, tup);
800
801         CatalogUpdateIndexes(pg_class_desc, tup);
802
803         heap_freetuple(tup);
804 }
805
806 /* --------------------------------
807  *              AddNewRelationTuple
808  *
809  *              this registers the new relation in the catalogs by
810  *              adding a tuple to pg_class.
811  * --------------------------------
812  */
813 static void
814 AddNewRelationTuple(Relation pg_class_desc,
815                                         Relation new_rel_desc,
816                                         Oid new_rel_oid,
817                                         Oid new_type_oid,
818                                         Oid reloftype,
819                                         Oid relowner,
820                                         char relkind,
821                                         Datum relacl,
822                                         Datum reloptions)
823 {
824         Form_pg_class new_rel_reltup;
825
826         /*
827          * first we update some of the information in our uncataloged relation's
828          * relation descriptor.
829          */
830         new_rel_reltup = new_rel_desc->rd_rel;
831
832         switch (relkind)
833         {
834                 case RELKIND_RELATION:
835                 case RELKIND_INDEX:
836                 case RELKIND_TOASTVALUE:
837                         /* The relation is real, but as yet empty */
838                         new_rel_reltup->relpages = 0;
839                         new_rel_reltup->reltuples = 0;
840                         break;
841                 case RELKIND_SEQUENCE:
842                         /* Sequences always have a known size */
843                         new_rel_reltup->relpages = 1;
844                         new_rel_reltup->reltuples = 1;
845                         break;
846                 default:
847                         /* Views, etc, have no disk storage */
848                         new_rel_reltup->relpages = 0;
849                         new_rel_reltup->reltuples = 0;
850                         break;
851         }
852
853         /* Initialize relfrozenxid */
854         if (relkind == RELKIND_RELATION ||
855                 relkind == RELKIND_TOASTVALUE)
856         {
857                 /*
858                  * Initialize to the minimum XID that could put tuples in the table.
859                  * We know that no xacts older than RecentXmin are still running, so
860                  * that will do.
861                  */
862                 new_rel_reltup->relfrozenxid = RecentXmin;
863         }
864         else
865         {
866                 /*
867                  * Other relation types will not contain XIDs, so set relfrozenxid to
868                  * InvalidTransactionId.  (Note: a sequence does contain a tuple, but
869                  * we force its xmin to be FrozenTransactionId always; see
870                  * commands/sequence.c.)
871                  */
872                 new_rel_reltup->relfrozenxid = InvalidTransactionId;
873         }
874
875         new_rel_reltup->relowner = relowner;
876         new_rel_reltup->reltype = new_type_oid;
877         new_rel_reltup->reloftype = reloftype;
878         new_rel_reltup->relkind = relkind;
879
880         new_rel_desc->rd_att->tdtypeid = new_type_oid;
881
882         /* Now build and insert the tuple */
883         InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
884                                            relacl, reloptions);
885 }
886
887
888 /* --------------------------------
889  *              AddNewRelationType -
890  *
891  *              define a composite type corresponding to the new relation
892  * --------------------------------
893  */
894 static Oid
895 AddNewRelationType(const char *typeName,
896                                    Oid typeNamespace,
897                                    Oid new_rel_oid,
898                                    char new_rel_kind,
899                                    Oid ownerid,
900                                    Oid new_row_type,
901                                    Oid new_array_type)
902 {
903         return
904                 TypeCreate(new_row_type,        /* optional predetermined OID */
905                                    typeName,    /* type name */
906                                    typeNamespace,               /* type namespace */
907                                    new_rel_oid, /* relation oid */
908                                    new_rel_kind,        /* relation kind */
909                                    ownerid,             /* owner's ID */
910                                    -1,                  /* internal size (varlena) */
911                                    TYPTYPE_COMPOSITE,   /* type-type (composite) */
912                                    TYPCATEGORY_COMPOSITE,               /* type-category (ditto) */
913                                    false,               /* composite types are never preferred */
914                                    DEFAULT_TYPDELIM,    /* default array delimiter */
915                                    F_RECORD_IN, /* input procedure */
916                                    F_RECORD_OUT,        /* output procedure */
917                                    F_RECORD_RECV,               /* receive procedure */
918                                    F_RECORD_SEND,               /* send procedure */
919                                    InvalidOid,  /* typmodin procedure - none */
920                                    InvalidOid,  /* typmodout procedure - none */
921                                    InvalidOid,  /* analyze procedure - default */
922                                    InvalidOid,  /* array element type - irrelevant */
923                                    false,               /* this is not an array type */
924                                    new_array_type,              /* array type if any */
925                                    InvalidOid,  /* domain base type - irrelevant */
926                                    NULL,                /* default value - none */
927                                    NULL,                /* default binary representation */
928                                    false,               /* passed by reference */
929                                    'd',                 /* alignment - must be the largest! */
930                                    'x',                 /* fully TOASTable */
931                                    -1,                  /* typmod */
932                                    0,                   /* array dimensions for typBaseType */
933                                    false,               /* Type NOT NULL */
934                                    InvalidOid); /* rowtypes never have a collation */
935 }
936
937 /* --------------------------------
938  *              heap_create_with_catalog
939  *
940  *              creates a new cataloged relation.  see comments above.
941  *
942  * Arguments:
943  *      relname: name to give to new rel
944  *      relnamespace: OID of namespace it goes in
945  *      reltablespace: OID of tablespace it goes in
946  *      relid: OID to assign to new rel, or InvalidOid to select a new OID
947  *      reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
948  *      ownerid: OID of new rel's owner
949  *      tupdesc: tuple descriptor (source of column definitions)
950  *      cooked_constraints: list of precooked check constraints and defaults
951  *      relkind: relkind for new rel
952  *      shared_relation: TRUE if it's to be a shared relation
953  *      mapped_relation: TRUE if the relation will use the relfilenode map
954  *      oidislocal: TRUE if oid column (if any) should be marked attislocal
955  *      oidinhcount: attinhcount to assign to oid column (if any)
956  *      oncommit: ON COMMIT marking (only relevant if it's a temp table)
957  *      reloptions: reloptions in Datum form, or (Datum) 0 if none
958  *      use_user_acl: TRUE if should look for user-defined default permissions;
959  *              if FALSE, relacl is always set NULL
960  *      allow_system_table_mods: TRUE to allow creation in system namespaces
961  *
962  * Returns the OID of the new relation
963  * --------------------------------
964  */
965 Oid
966 heap_create_with_catalog(const char *relname,
967                                                  Oid relnamespace,
968                                                  Oid reltablespace,
969                                                  Oid relid,
970                                                  Oid reltypeid,
971                                                  Oid reloftypeid,
972                                                  Oid ownerid,
973                                                  TupleDesc tupdesc,
974                                                  List *cooked_constraints,
975                                                  char relkind,
976                                                  char relpersistence,
977                                                  bool shared_relation,
978                                                  bool mapped_relation,
979                                                  bool oidislocal,
980                                                  int oidinhcount,
981                                                  OnCommitAction oncommit,
982                                                  Datum reloptions,
983                                                  bool use_user_acl,
984                                                  bool allow_system_table_mods)
985 {
986         Relation        pg_class_desc;
987         Relation        new_rel_desc;
988         Acl                *relacl;
989         Oid                     existing_relid;
990         Oid                     old_type_oid;
991         Oid                     new_type_oid;
992         Oid                     new_array_oid = InvalidOid;
993
994         pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
995
996         /*
997          * sanity checks
998          */
999         Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
1000
1001         CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
1002
1003         /*
1004          * This would fail later on anyway, if the relation already exists.  But
1005          * by catching it here we can emit a nicer error message.
1006          */
1007         existing_relid = get_relname_relid(relname, relnamespace);
1008         if (existing_relid != InvalidOid)
1009                 ereport(ERROR,
1010                                 (errcode(ERRCODE_DUPLICATE_TABLE),
1011                                  errmsg("relation \"%s\" already exists", relname)));
1012
1013         /*
1014          * Since we are going to create a rowtype as well, also check for
1015          * collision with an existing type name.  If there is one and it's an
1016          * autogenerated array, we can rename it out of the way; otherwise we can
1017          * at least give a good error message.
1018          */
1019         old_type_oid = GetSysCacheOid2(TYPENAMENSP,
1020                                                                    CStringGetDatum(relname),
1021                                                                    ObjectIdGetDatum(relnamespace));
1022         if (OidIsValid(old_type_oid))
1023         {
1024                 if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
1025                         ereport(ERROR,
1026                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
1027                                          errmsg("type \"%s\" already exists", relname),
1028                            errhint("A relation has an associated type of the same name, "
1029                                            "so you must use a name that doesn't conflict "
1030                                            "with any existing type.")));
1031         }
1032
1033         /*
1034          * Shared relations must be in pg_global (last-ditch check)
1035          */
1036         if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
1037                 elog(ERROR, "shared relations must be placed in pg_global tablespace");
1038
1039         /*
1040          * Allocate an OID for the relation, unless we were told what to use.
1041          *
1042          * The OID will be the relfilenode as well, so make sure it doesn't
1043          * collide with either pg_class OIDs or existing physical files.
1044          */
1045         if (!OidIsValid(relid))
1046         {
1047                 /*
1048                  * Use binary-upgrade override for pg_class.oid/relfilenode, if
1049                  * supplied.
1050                  */
1051                 if (IsBinaryUpgrade &&
1052                         OidIsValid(binary_upgrade_next_heap_pg_class_oid) &&
1053                         (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
1054                          relkind == RELKIND_VIEW || relkind == RELKIND_COMPOSITE_TYPE ||
1055                          relkind == RELKIND_FOREIGN_TABLE))
1056                 {
1057                         relid = binary_upgrade_next_heap_pg_class_oid;
1058                         binary_upgrade_next_heap_pg_class_oid = InvalidOid;
1059                 }
1060                 else if (IsBinaryUpgrade &&
1061                                  OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
1062                                  relkind == RELKIND_TOASTVALUE)
1063                 {
1064                         relid = binary_upgrade_next_toast_pg_class_oid;
1065                         binary_upgrade_next_toast_pg_class_oid = InvalidOid;
1066                 }
1067                 else
1068                         relid = GetNewRelFileNode(reltablespace, pg_class_desc,
1069                                                                           relpersistence);
1070         }
1071
1072         /*
1073          * Determine the relation's initial permissions.
1074          */
1075         if (use_user_acl)
1076         {
1077                 switch (relkind)
1078                 {
1079                         case RELKIND_RELATION:
1080                         case RELKIND_VIEW:
1081                         case RELKIND_FOREIGN_TABLE:
1082                                 relacl = get_user_default_acl(ACL_OBJECT_RELATION, ownerid,
1083                                                                                           relnamespace);
1084                                 break;
1085                         case RELKIND_SEQUENCE:
1086                                 relacl = get_user_default_acl(ACL_OBJECT_SEQUENCE, ownerid,
1087                                                                                           relnamespace);
1088                                 break;
1089                         default:
1090                                 relacl = NULL;
1091                                 break;
1092                 }
1093         }
1094         else
1095                 relacl = NULL;
1096
1097         /*
1098          * Create the relcache entry (mostly dummy at this point) and the physical
1099          * disk file.  (If we fail further down, it's the smgr's responsibility to
1100          * remove the disk file again.)
1101          */
1102         new_rel_desc = heap_create(relname,
1103                                                            relnamespace,
1104                                                            reltablespace,
1105                                                            relid,
1106                                                            tupdesc,
1107                                                            relkind,
1108                                                            relpersistence,
1109                                                            shared_relation,
1110                                                            mapped_relation,
1111                                                            allow_system_table_mods);
1112
1113         Assert(relid == RelationGetRelid(new_rel_desc));
1114
1115         /*
1116          * Decide whether to create an array type over the relation's rowtype. We
1117          * do not create any array types for system catalogs (ie, those made
1118          * during initdb).      We create array types for regular relations, views,
1119          * composite types and foreign tables ... but not, eg, for toast tables or
1120          * sequences.
1121          */
1122         if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
1123                                                           relkind == RELKIND_VIEW ||
1124                                                           relkind == RELKIND_FOREIGN_TABLE ||
1125                                                           relkind == RELKIND_COMPOSITE_TYPE))
1126                 new_array_oid = AssignTypeArrayOid();
1127
1128         /*
1129          * Since defining a relation also defines a complex type, we add a new
1130          * system type corresponding to the new relation.  The OID of the type can
1131          * be preselected by the caller, but if reltypeid is InvalidOid, we'll
1132          * generate a new OID for it.
1133          *
1134          * NOTE: we could get a unique-index failure here, in case someone else is
1135          * creating the same type name in parallel but hadn't committed yet when
1136          * we checked for a duplicate name above.
1137          */
1138         new_type_oid = AddNewRelationType(relname,
1139                                                                           relnamespace,
1140                                                                           relid,
1141                                                                           relkind,
1142                                                                           ownerid,
1143                                                                           reltypeid,
1144                                                                           new_array_oid);
1145
1146         /*
1147          * Now make the array type if wanted.
1148          */
1149         if (OidIsValid(new_array_oid))
1150         {
1151                 char       *relarrayname;
1152
1153                 relarrayname = makeArrayTypeName(relname, relnamespace);
1154
1155                 TypeCreate(new_array_oid,               /* force the type's OID to this */
1156                                    relarrayname,        /* Array type name */
1157                                    relnamespace,        /* Same namespace as parent */
1158                                    InvalidOid,  /* Not composite, no relationOid */
1159                                    0,                   /* relkind, also N/A here */
1160                                    ownerid,             /* owner's ID */
1161                                    -1,                  /* Internal size (varlena) */
1162                                    TYPTYPE_BASE,        /* Not composite - typelem is */
1163                                    TYPCATEGORY_ARRAY,   /* type-category (array) */
1164                                    false,               /* array types are never preferred */
1165                                    DEFAULT_TYPDELIM,    /* default array delimiter */
1166                                    F_ARRAY_IN,  /* array input proc */
1167                                    F_ARRAY_OUT, /* array output proc */
1168                                    F_ARRAY_RECV,        /* array recv (bin) proc */
1169                                    F_ARRAY_SEND,        /* array send (bin) proc */
1170                                    InvalidOid,  /* typmodin procedure - none */
1171                                    InvalidOid,  /* typmodout procedure - none */
1172                                    InvalidOid,  /* analyze procedure - default */
1173                                    new_type_oid,        /* array element type - the rowtype */
1174                                    true,                /* yes, this is an array type */
1175                                    InvalidOid,  /* this has no array type */
1176                                    InvalidOid,  /* domain base type - irrelevant */
1177                                    NULL,                /* default value - none */
1178                                    NULL,                /* default binary representation */
1179                                    false,               /* passed by reference */
1180                                    'd',                 /* alignment - must be the largest! */
1181                                    'x',                 /* fully TOASTable */
1182                                    -1,                  /* typmod */
1183                                    0,                   /* array dimensions for typBaseType */
1184                                    false,               /* Type NOT NULL */
1185                                    InvalidOid); /* rowtypes never have a collation */
1186
1187                 pfree(relarrayname);
1188         }
1189
1190         /*
1191          * now create an entry in pg_class for the relation.
1192          *
1193          * NOTE: we could get a unique-index failure here, in case someone else is
1194          * creating the same relation name in parallel but hadn't committed yet
1195          * when we checked for a duplicate name above.
1196          */
1197         AddNewRelationTuple(pg_class_desc,
1198                                                 new_rel_desc,
1199                                                 relid,
1200                                                 new_type_oid,
1201                                                 reloftypeid,
1202                                                 ownerid,
1203                                                 relkind,
1204                                                 PointerGetDatum(relacl),
1205                                                 reloptions);
1206
1207         /*
1208          * now add tuples to pg_attribute for the attributes in our new relation.
1209          */
1210         AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind,
1211                                                   oidislocal, oidinhcount);
1212
1213         /*
1214          * Make a dependency link to force the relation to be deleted if its
1215          * namespace is.  Also make a dependency link to its owner, as well as
1216          * dependencies for any roles mentioned in the default ACL.
1217          *
1218          * For composite types, these dependencies are tracked for the pg_type
1219          * entry, so we needn't record them here.  Likewise, TOAST tables don't
1220          * need a namespace dependency (they live in a pinned namespace) nor an
1221          * owner dependency (they depend indirectly through the parent table), nor
1222          * should they have any ACL entries.  The same applies for extension
1223          * dependencies.
1224          *
1225          * Also, skip this in bootstrap mode, since we don't make dependencies
1226          * while bootstrapping.
1227          */
1228         if (relkind != RELKIND_COMPOSITE_TYPE &&
1229                 relkind != RELKIND_TOASTVALUE &&
1230                 !IsBootstrapProcessingMode())
1231         {
1232                 ObjectAddress myself,
1233                                         referenced;
1234
1235                 myself.classId = RelationRelationId;
1236                 myself.objectId = relid;
1237                 myself.objectSubId = 0;
1238                 referenced.classId = NamespaceRelationId;
1239                 referenced.objectId = relnamespace;
1240                 referenced.objectSubId = 0;
1241                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1242
1243                 recordDependencyOnOwner(RelationRelationId, relid, ownerid);
1244
1245                 recordDependencyOnCurrentExtension(&myself);
1246
1247                 if (reloftypeid)
1248                 {
1249                         referenced.classId = TypeRelationId;
1250                         referenced.objectId = reloftypeid;
1251                         referenced.objectSubId = 0;
1252                         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1253                 }
1254
1255                 if (relacl != NULL)
1256                 {
1257                         int                     nnewmembers;
1258                         Oid                *newmembers;
1259
1260                         nnewmembers = aclmembers(relacl, &newmembers);
1261                         updateAclDependencies(RelationRelationId, relid, 0,
1262                                                                   ownerid,
1263                                                                   0, NULL,
1264                                                                   nnewmembers, newmembers);
1265                 }
1266         }
1267
1268         /* Post creation hook for new relation */
1269         InvokeObjectAccessHook(OAT_POST_CREATE, RelationRelationId, relid, 0);
1270
1271         /*
1272          * Store any supplied constraints and defaults.
1273          *
1274          * NB: this may do a CommandCounterIncrement and rebuild the relcache
1275          * entry, so the relation must be valid and self-consistent at this point.
1276          * In particular, there are not yet constraints and defaults anywhere.
1277          */
1278         StoreConstraints(new_rel_desc, cooked_constraints);
1279
1280         /*
1281          * If there's a special on-commit action, remember it
1282          */
1283         if (oncommit != ONCOMMIT_NOOP)
1284                 register_on_commit_action(relid, oncommit);
1285
1286         /*
1287          * If this is an unlogged relation, it needs an init fork so that it can
1288          * be correctly reinitialized on restart.  Since we're going to do an
1289          * immediate sync, we only need to xlog this if archiving or streaming is
1290          * enabled.  And the immediate sync is required, because otherwise there's
1291          * no guarantee that this will hit the disk before the next checkpoint
1292          * moves the redo pointer.
1293          */
1294         if (relpersistence == RELPERSISTENCE_UNLOGGED)
1295         {
1296                 Assert(relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE);
1297
1298                 smgrcreate(new_rel_desc->rd_smgr, INIT_FORKNUM, false);
1299                 if (XLogIsNeeded())
1300                         log_smgrcreate(&new_rel_desc->rd_smgr->smgr_rnode.node,
1301                                                    INIT_FORKNUM);
1302                 smgrimmedsync(new_rel_desc->rd_smgr, INIT_FORKNUM);
1303         }
1304
1305         /*
1306          * ok, the relation has been cataloged, so close our relations and return
1307          * the OID of the newly created relation.
1308          */
1309         heap_close(new_rel_desc, NoLock);       /* do not unlock till end of xact */
1310         heap_close(pg_class_desc, RowExclusiveLock);
1311
1312         return relid;
1313 }
1314
1315
1316 /*
1317  *              RelationRemoveInheritance
1318  *
1319  * Formerly, this routine checked for child relations and aborted the
1320  * deletion if any were found.  Now we rely on the dependency mechanism
1321  * to check for or delete child relations.      By the time we get here,
1322  * there are no children and we need only remove any pg_inherits rows
1323  * linking this relation to its parent(s).
1324  */
1325 static void
1326 RelationRemoveInheritance(Oid relid)
1327 {
1328         Relation        catalogRelation;
1329         SysScanDesc scan;
1330         ScanKeyData key;
1331         HeapTuple       tuple;
1332
1333         catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
1334
1335         ScanKeyInit(&key,
1336                                 Anum_pg_inherits_inhrelid,
1337                                 BTEqualStrategyNumber, F_OIDEQ,
1338                                 ObjectIdGetDatum(relid));
1339
1340         scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
1341                                                           SnapshotNow, 1, &key);
1342
1343         while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1344                 simple_heap_delete(catalogRelation, &tuple->t_self);
1345
1346         systable_endscan(scan);
1347         heap_close(catalogRelation, RowExclusiveLock);
1348 }
1349
1350 /*
1351  *              DeleteRelationTuple
1352  *
1353  * Remove pg_class row for the given relid.
1354  *
1355  * Note: this is shared by relation deletion and index deletion.  It's
1356  * not intended for use anyplace else.
1357  */
1358 void
1359 DeleteRelationTuple(Oid relid)
1360 {
1361         Relation        pg_class_desc;
1362         HeapTuple       tup;
1363
1364         /* Grab an appropriate lock on the pg_class relation */
1365         pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
1366
1367         tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1368         if (!HeapTupleIsValid(tup))
1369                 elog(ERROR, "cache lookup failed for relation %u", relid);
1370
1371         /* delete the relation tuple from pg_class, and finish up */
1372         simple_heap_delete(pg_class_desc, &tup->t_self);
1373
1374         ReleaseSysCache(tup);
1375
1376         heap_close(pg_class_desc, RowExclusiveLock);
1377 }
1378
1379 /*
1380  *              DeleteAttributeTuples
1381  *
1382  * Remove pg_attribute rows for the given relid.
1383  *
1384  * Note: this is shared by relation deletion and index deletion.  It's
1385  * not intended for use anyplace else.
1386  */
1387 void
1388 DeleteAttributeTuples(Oid relid)
1389 {
1390         Relation        attrel;
1391         SysScanDesc scan;
1392         ScanKeyData key[1];
1393         HeapTuple       atttup;
1394
1395         /* Grab an appropriate lock on the pg_attribute relation */
1396         attrel = heap_open(AttributeRelationId, RowExclusiveLock);
1397
1398         /* Use the index to scan only attributes of the target relation */
1399         ScanKeyInit(&key[0],
1400                                 Anum_pg_attribute_attrelid,
1401                                 BTEqualStrategyNumber, F_OIDEQ,
1402                                 ObjectIdGetDatum(relid));
1403
1404         scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1405                                                           SnapshotNow, 1, key);
1406
1407         /* Delete all the matching tuples */
1408         while ((atttup = systable_getnext(scan)) != NULL)
1409                 simple_heap_delete(attrel, &atttup->t_self);
1410
1411         /* Clean up after the scan */
1412         systable_endscan(scan);
1413         heap_close(attrel, RowExclusiveLock);
1414 }
1415
1416 /*
1417  *              RemoveAttributeById
1418  *
1419  * This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
1420  * deleted in pg_attribute.  We also remove pg_statistic entries for it.
1421  * (Everything else needed, such as getting rid of any pg_attrdef entry,
1422  * is handled by dependency.c.)
1423  */
1424 void
1425 RemoveAttributeById(Oid relid, AttrNumber attnum)
1426 {
1427         Relation        rel;
1428         Relation        attr_rel;
1429         HeapTuple       tuple;
1430         Form_pg_attribute attStruct;
1431         char            newattname[NAMEDATALEN];
1432
1433         /*
1434          * Grab an exclusive lock on the target table, which we will NOT release
1435          * until end of transaction.  (In the simple case where we are directly
1436          * dropping this column, AlterTableDropColumn already did this ... but
1437          * when cascading from a drop of some other object, we may not have any
1438          * lock.)
1439          */
1440         rel = relation_open(relid, AccessExclusiveLock);
1441
1442         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
1443
1444         tuple = SearchSysCacheCopy2(ATTNUM,
1445                                                                 ObjectIdGetDatum(relid),
1446                                                                 Int16GetDatum(attnum));
1447         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
1448                 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1449                          attnum, relid);
1450         attStruct = (Form_pg_attribute) GETSTRUCT(tuple);
1451
1452         if (attnum < 0)
1453         {
1454                 /* System attribute (probably OID) ... just delete the row */
1455
1456                 simple_heap_delete(attr_rel, &tuple->t_self);
1457         }
1458         else
1459         {
1460                 /* Dropping user attributes is lots harder */
1461
1462                 /* Mark the attribute as dropped */
1463                 attStruct->attisdropped = true;
1464
1465                 /*
1466                  * Set the type OID to invalid.  A dropped attribute's type link
1467                  * cannot be relied on (once the attribute is dropped, the type might
1468                  * be too). Fortunately we do not need the type row --- the only
1469                  * really essential information is the type's typlen and typalign,
1470                  * which are preserved in the attribute's attlen and attalign.  We set
1471                  * atttypid to zero here as a means of catching code that incorrectly
1472                  * expects it to be valid.
1473                  */
1474                 attStruct->atttypid = InvalidOid;
1475
1476                 /* Remove any NOT NULL constraint the column may have */
1477                 attStruct->attnotnull = false;
1478
1479                 /* We don't want to keep stats for it anymore */
1480                 attStruct->attstattarget = 0;
1481
1482                 /*
1483                  * Change the column name to something that isn't likely to conflict
1484                  */
1485                 snprintf(newattname, sizeof(newattname),
1486                                  "........pg.dropped.%d........", attnum);
1487                 namestrcpy(&(attStruct->attname), newattname);
1488
1489                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
1490
1491                 /* keep the system catalog indexes current */
1492                 CatalogUpdateIndexes(attr_rel, tuple);
1493         }
1494
1495         /*
1496          * Because updating the pg_attribute row will trigger a relcache flush for
1497          * the target relation, we need not do anything else to notify other
1498          * backends of the change.
1499          */
1500
1501         heap_close(attr_rel, RowExclusiveLock);
1502
1503         if (attnum > 0)
1504                 RemoveStatistics(relid, attnum);
1505
1506         relation_close(rel, NoLock);
1507 }
1508
1509 /*
1510  *              RemoveAttrDefault
1511  *
1512  * If the specified relation/attribute has a default, remove it.
1513  * (If no default, raise error if complain is true, else return quietly.)
1514  */
1515 void
1516 RemoveAttrDefault(Oid relid, AttrNumber attnum,
1517                                   DropBehavior behavior, bool complain)
1518 {
1519         Relation        attrdef_rel;
1520         ScanKeyData scankeys[2];
1521         SysScanDesc scan;
1522         HeapTuple       tuple;
1523         bool            found = false;
1524
1525         attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1526
1527         ScanKeyInit(&scankeys[0],
1528                                 Anum_pg_attrdef_adrelid,
1529                                 BTEqualStrategyNumber, F_OIDEQ,
1530                                 ObjectIdGetDatum(relid));
1531         ScanKeyInit(&scankeys[1],
1532                                 Anum_pg_attrdef_adnum,
1533                                 BTEqualStrategyNumber, F_INT2EQ,
1534                                 Int16GetDatum(attnum));
1535
1536         scan = systable_beginscan(attrdef_rel, AttrDefaultIndexId, true,
1537                                                           SnapshotNow, 2, scankeys);
1538
1539         /* There should be at most one matching tuple, but we loop anyway */
1540         while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1541         {
1542                 ObjectAddress object;
1543
1544                 object.classId = AttrDefaultRelationId;
1545                 object.objectId = HeapTupleGetOid(tuple);
1546                 object.objectSubId = 0;
1547
1548                 performDeletion(&object, behavior);
1549
1550                 found = true;
1551         }
1552
1553         systable_endscan(scan);
1554         heap_close(attrdef_rel, RowExclusiveLock);
1555
1556         if (complain && !found)
1557                 elog(ERROR, "could not find attrdef tuple for relation %u attnum %d",
1558                          relid, attnum);
1559 }
1560
1561 /*
1562  *              RemoveAttrDefaultById
1563  *
1564  * Remove a pg_attrdef entry specified by OID.  This is the guts of
1565  * attribute-default removal.  Note it should be called via performDeletion,
1566  * not directly.
1567  */
1568 void
1569 RemoveAttrDefaultById(Oid attrdefId)
1570 {
1571         Relation        attrdef_rel;
1572         Relation        attr_rel;
1573         Relation        myrel;
1574         ScanKeyData scankeys[1];
1575         SysScanDesc scan;
1576         HeapTuple       tuple;
1577         Oid                     myrelid;
1578         AttrNumber      myattnum;
1579
1580         /* Grab an appropriate lock on the pg_attrdef relation */
1581         attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1582
1583         /* Find the pg_attrdef tuple */
1584         ScanKeyInit(&scankeys[0],
1585                                 ObjectIdAttributeNumber,
1586                                 BTEqualStrategyNumber, F_OIDEQ,
1587                                 ObjectIdGetDatum(attrdefId));
1588
1589         scan = systable_beginscan(attrdef_rel, AttrDefaultOidIndexId, true,
1590                                                           SnapshotNow, 1, scankeys);
1591
1592         tuple = systable_getnext(scan);
1593         if (!HeapTupleIsValid(tuple))
1594                 elog(ERROR, "could not find tuple for attrdef %u", attrdefId);
1595
1596         myrelid = ((Form_pg_attrdef) GETSTRUCT(tuple))->adrelid;
1597         myattnum = ((Form_pg_attrdef) GETSTRUCT(tuple))->adnum;
1598
1599         /* Get an exclusive lock on the relation owning the attribute */
1600         myrel = relation_open(myrelid, AccessExclusiveLock);
1601
1602         /* Now we can delete the pg_attrdef row */
1603         simple_heap_delete(attrdef_rel, &tuple->t_self);
1604
1605         systable_endscan(scan);
1606         heap_close(attrdef_rel, RowExclusiveLock);
1607
1608         /* Fix the pg_attribute row */
1609         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
1610
1611         tuple = SearchSysCacheCopy2(ATTNUM,
1612                                                                 ObjectIdGetDatum(myrelid),
1613                                                                 Int16GetDatum(myattnum));
1614         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
1615                 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1616                          myattnum, myrelid);
1617
1618         ((Form_pg_attribute) GETSTRUCT(tuple))->atthasdef = false;
1619
1620         simple_heap_update(attr_rel, &tuple->t_self, tuple);
1621
1622         /* keep the system catalog indexes current */
1623         CatalogUpdateIndexes(attr_rel, tuple);
1624
1625         /*
1626          * Our update of the pg_attribute row will force a relcache rebuild, so
1627          * there's nothing else to do here.
1628          */
1629         heap_close(attr_rel, RowExclusiveLock);
1630
1631         /* Keep lock on attribute's rel until end of xact */
1632         relation_close(myrel, NoLock);
1633 }
1634
1635 /*
1636  * heap_drop_with_catalog       - removes specified relation from catalogs
1637  *
1638  * Note that this routine is not responsible for dropping objects that are
1639  * linked to the pg_class entry via dependencies (for example, indexes and
1640  * constraints).  Those are deleted by the dependency-tracing logic in
1641  * dependency.c before control gets here.  In general, therefore, this routine
1642  * should never be called directly; go through performDeletion() instead.
1643  */
1644 void
1645 heap_drop_with_catalog(Oid relid)
1646 {
1647         Relation        rel;
1648
1649         /*
1650          * Open and lock the relation.
1651          */
1652         rel = relation_open(relid, AccessExclusiveLock);
1653
1654         /*
1655          * There can no longer be anyone *else* touching the relation, but we
1656          * might still have open queries or cursors, or pending trigger events, in
1657          * our own session.
1658          */
1659         CheckTableNotInUse(rel, "DROP TABLE");
1660
1661         /*
1662          * This effectively deletes all rows in the table, and may be done in a
1663          * serializable transaction.  In that case we must record a rw-conflict in
1664          * to this transaction from each transaction holding a predicate lock on
1665          * the table.
1666          */
1667         CheckTableForSerializableConflictIn(rel);
1668
1669         /*
1670          * Delete pg_foreign_table tuple first.
1671          */
1672         if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1673         {
1674                 Relation        rel;
1675                 HeapTuple       tuple;
1676
1677                 rel = heap_open(ForeignTableRelationId, RowExclusiveLock);
1678
1679                 tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
1680                 if (!HeapTupleIsValid(tuple))
1681                         elog(ERROR, "cache lookup failed for foreign table %u", relid);
1682
1683                 simple_heap_delete(rel, &tuple->t_self);
1684
1685                 ReleaseSysCache(tuple);
1686                 heap_close(rel, RowExclusiveLock);
1687         }
1688
1689         /*
1690          * Schedule unlinking of the relation's physical files at commit.
1691          */
1692         if (rel->rd_rel->relkind != RELKIND_VIEW &&
1693                 rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
1694                 rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
1695         {
1696                 RelationDropStorage(rel);
1697         }
1698
1699         /*
1700          * Close relcache entry, but *keep* AccessExclusiveLock on the relation
1701          * until transaction commit.  This ensures no one else will try to do
1702          * something with the doomed relation.
1703          */
1704         relation_close(rel, NoLock);
1705
1706         /*
1707          * Forget any ON COMMIT action for the rel
1708          */
1709         remove_on_commit_action(relid);
1710
1711         /*
1712          * Flush the relation from the relcache.  We want to do this before
1713          * starting to remove catalog entries, just to be certain that no relcache
1714          * entry rebuild will happen partway through.  (That should not really
1715          * matter, since we don't do CommandCounterIncrement here, but let's be
1716          * safe.)
1717          */
1718         RelationForgetRelation(relid);
1719
1720         /*
1721          * remove inheritance information
1722          */
1723         RelationRemoveInheritance(relid);
1724
1725         /*
1726          * delete statistics
1727          */
1728         RemoveStatistics(relid, 0);
1729
1730         /*
1731          * delete attribute tuples
1732          */
1733         DeleteAttributeTuples(relid);
1734
1735         /*
1736          * delete relation tuple
1737          */
1738         DeleteRelationTuple(relid);
1739 }
1740
1741
1742 /*
1743  * Store a default expression for column attnum of relation rel.
1744  */
1745 void
1746 StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr)
1747 {
1748         char       *adbin;
1749         char       *adsrc;
1750         Relation        adrel;
1751         HeapTuple       tuple;
1752         Datum           values[4];
1753         static bool nulls[4] = {false, false, false, false};
1754         Relation        attrrel;
1755         HeapTuple       atttup;
1756         Form_pg_attribute attStruct;
1757         Oid                     attrdefOid;
1758         ObjectAddress colobject,
1759                                 defobject;
1760
1761         /*
1762          * Flatten expression to string form for storage.
1763          */
1764         adbin = nodeToString(expr);
1765
1766         /*
1767          * Also deparse it to form the mostly-obsolete adsrc field.
1768          */
1769         adsrc = deparse_expression(expr,
1770                                                         deparse_context_for(RelationGetRelationName(rel),
1771                                                                                                 RelationGetRelid(rel)),
1772                                                            false, false);
1773
1774         /*
1775          * Make the pg_attrdef entry.
1776          */
1777         values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
1778         values[Anum_pg_attrdef_adnum - 1] = attnum;
1779         values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
1780         values[Anum_pg_attrdef_adsrc - 1] = CStringGetTextDatum(adsrc);
1781
1782         adrel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1783
1784         tuple = heap_form_tuple(adrel->rd_att, values, nulls);
1785         attrdefOid = simple_heap_insert(adrel, tuple);
1786
1787         CatalogUpdateIndexes(adrel, tuple);
1788
1789         defobject.classId = AttrDefaultRelationId;
1790         defobject.objectId = attrdefOid;
1791         defobject.objectSubId = 0;
1792
1793         heap_close(adrel, RowExclusiveLock);
1794
1795         /* now can free some of the stuff allocated above */
1796         pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
1797         pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1]));
1798         heap_freetuple(tuple);
1799         pfree(adbin);
1800         pfree(adsrc);
1801
1802         /*
1803          * Update the pg_attribute entry for the column to show that a default
1804          * exists.
1805          */
1806         attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
1807         atttup = SearchSysCacheCopy2(ATTNUM,
1808                                                                  ObjectIdGetDatum(RelationGetRelid(rel)),
1809                                                                  Int16GetDatum(attnum));
1810         if (!HeapTupleIsValid(atttup))
1811                 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1812                          attnum, RelationGetRelid(rel));
1813         attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
1814         if (!attStruct->atthasdef)
1815         {
1816                 attStruct->atthasdef = true;
1817                 simple_heap_update(attrrel, &atttup->t_self, atttup);
1818                 /* keep catalog indexes current */
1819                 CatalogUpdateIndexes(attrrel, atttup);
1820         }
1821         heap_close(attrrel, RowExclusiveLock);
1822         heap_freetuple(atttup);
1823
1824         /*
1825          * Make a dependency so that the pg_attrdef entry goes away if the column
1826          * (or whole table) is deleted.
1827          */
1828         colobject.classId = RelationRelationId;
1829         colobject.objectId = RelationGetRelid(rel);
1830         colobject.objectSubId = attnum;
1831
1832         recordDependencyOn(&defobject, &colobject, DEPENDENCY_AUTO);
1833
1834         /*
1835          * Record dependencies on objects used in the expression, too.
1836          */
1837         recordDependencyOnExpr(&defobject, expr, NIL, DEPENDENCY_NORMAL);
1838 }
1839
1840 /*
1841  * Store a check-constraint expression for the given relation.
1842  *
1843  * Caller is responsible for updating the count of constraints
1844  * in the pg_class entry for the relation.
1845  */
1846 static void
1847 StoreRelCheck(Relation rel, char *ccname, Node *expr,
1848                           bool is_local, int inhcount)
1849 {
1850         char       *ccbin;
1851         char       *ccsrc;
1852         List       *varList;
1853         int                     keycount;
1854         int16      *attNos;
1855
1856         /*
1857          * Flatten expression to string form for storage.
1858          */
1859         ccbin = nodeToString(expr);
1860
1861         /*
1862          * Also deparse it to form the mostly-obsolete consrc field.
1863          */
1864         ccsrc = deparse_expression(expr,
1865                                                         deparse_context_for(RelationGetRelationName(rel),
1866                                                                                                 RelationGetRelid(rel)),
1867                                                            false, false);
1868
1869         /*
1870          * Find columns of rel that are used in expr
1871          *
1872          * NB: pull_var_clause is okay here only because we don't allow subselects
1873          * in check constraints; it would fail to examine the contents of
1874          * subselects.
1875          */
1876         varList = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);
1877         keycount = list_length(varList);
1878
1879         if (keycount > 0)
1880         {
1881                 ListCell   *vl;
1882                 int                     i = 0;
1883
1884                 attNos = (int16 *) palloc(keycount * sizeof(int16));
1885                 foreach(vl, varList)
1886                 {
1887                         Var                *var = (Var *) lfirst(vl);
1888                         int                     j;
1889
1890                         for (j = 0; j < i; j++)
1891                                 if (attNos[j] == var->varattno)
1892                                         break;
1893                         if (j == i)
1894                                 attNos[i++] = var->varattno;
1895                 }
1896                 keycount = i;
1897         }
1898         else
1899                 attNos = NULL;
1900
1901         /*
1902          * Create the Check Constraint
1903          */
1904         CreateConstraintEntry(ccname,           /* Constraint Name */
1905                                                   RelationGetNamespace(rel),    /* namespace */
1906                                                   CONSTRAINT_CHECK,             /* Constraint Type */
1907                                                   false,        /* Is Deferrable */
1908                                                   false,        /* Is Deferred */
1909                                                   true, /* Is Validated */
1910                                                   RelationGetRelid(rel),                /* relation */
1911                                                   attNos,               /* attrs in the constraint */
1912                                                   keycount,             /* # attrs in the constraint */
1913                                                   InvalidOid,   /* not a domain constraint */
1914                                                   InvalidOid,   /* no associated index */
1915                                                   InvalidOid,   /* Foreign key fields */
1916                                                   NULL,
1917                                                   NULL,
1918                                                   NULL,
1919                                                   NULL,
1920                                                   0,
1921                                                   ' ',
1922                                                   ' ',
1923                                                   ' ',
1924                                                   NULL, /* not an exclusion constraint */
1925                                                   expr, /* Tree form of check constraint */
1926                                                   ccbin,        /* Binary form of check constraint */
1927                                                   ccsrc,        /* Source form of check constraint */
1928                                                   is_local,             /* conislocal */
1929                                                   inhcount);    /* coninhcount */
1930
1931         pfree(ccbin);
1932         pfree(ccsrc);
1933 }
1934
1935 /*
1936  * Store defaults and constraints (passed as a list of CookedConstraint).
1937  *
1938  * NOTE: only pre-cooked expressions will be passed this way, which is to
1939  * say expressions inherited from an existing relation.  Newly parsed
1940  * expressions can be added later, by direct calls to StoreAttrDefault
1941  * and StoreRelCheck (see AddRelationNewConstraints()).
1942  */
1943 static void
1944 StoreConstraints(Relation rel, List *cooked_constraints)
1945 {
1946         int                     numchecks = 0;
1947         ListCell   *lc;
1948
1949         if (!cooked_constraints)
1950                 return;                                 /* nothing to do */
1951
1952         /*
1953          * Deparsing of constraint expressions will fail unless the just-created
1954          * pg_attribute tuples for this relation are made visible.      So, bump the
1955          * command counter.  CAUTION: this will cause a relcache entry rebuild.
1956          */
1957         CommandCounterIncrement();
1958
1959         foreach(lc, cooked_constraints)
1960         {
1961                 CookedConstraint *con = (CookedConstraint *) lfirst(lc);
1962
1963                 switch (con->contype)
1964                 {
1965                         case CONSTR_DEFAULT:
1966                                 StoreAttrDefault(rel, con->attnum, con->expr);
1967                                 break;
1968                         case CONSTR_CHECK:
1969                                 StoreRelCheck(rel, con->name, con->expr,
1970                                                           con->is_local, con->inhcount);
1971                                 numchecks++;
1972                                 break;
1973                         default:
1974                                 elog(ERROR, "unrecognized constraint type: %d",
1975                                          (int) con->contype);
1976                 }
1977         }
1978
1979         if (numchecks > 0)
1980                 SetRelationNumChecks(rel, numchecks);
1981 }
1982
1983 /*
1984  * AddRelationNewConstraints
1985  *
1986  * Add new column default expressions and/or constraint check expressions
1987  * to an existing relation.  This is defined to do both for efficiency in
1988  * DefineRelation, but of course you can do just one or the other by passing
1989  * empty lists.
1990  *
1991  * rel: relation to be modified
1992  * newColDefaults: list of RawColumnDefault structures
1993  * newConstraints: list of Constraint nodes
1994  * allow_merge: TRUE if check constraints may be merged with existing ones
1995  * is_local: TRUE if definition is local, FALSE if it's inherited
1996  *
1997  * All entries in newColDefaults will be processed.  Entries in newConstraints
1998  * will be processed only if they are CONSTR_CHECK type.
1999  *
2000  * Returns a list of CookedConstraint nodes that shows the cooked form of
2001  * the default and constraint expressions added to the relation.
2002  *
2003  * NB: caller should have opened rel with AccessExclusiveLock, and should
2004  * hold that lock till end of transaction.      Also, we assume the caller has
2005  * done a CommandCounterIncrement if necessary to make the relation's catalog
2006  * tuples visible.
2007  */
2008 List *
2009 AddRelationNewConstraints(Relation rel,
2010                                                   List *newColDefaults,
2011                                                   List *newConstraints,
2012                                                   bool allow_merge,
2013                                                   bool is_local)
2014 {
2015         List       *cookedConstraints = NIL;
2016         TupleDesc       tupleDesc;
2017         TupleConstr *oldconstr;
2018         int                     numoldchecks;
2019         ParseState *pstate;
2020         RangeTblEntry *rte;
2021         int                     numchecks;
2022         List       *checknames;
2023         ListCell   *cell;
2024         Node       *expr;
2025         CookedConstraint *cooked;
2026
2027         /*
2028          * Get info about existing constraints.
2029          */
2030         tupleDesc = RelationGetDescr(rel);
2031         oldconstr = tupleDesc->constr;
2032         if (oldconstr)
2033                 numoldchecks = oldconstr->num_check;
2034         else
2035                 numoldchecks = 0;
2036
2037         /*
2038          * Create a dummy ParseState and insert the target relation as its sole
2039          * rangetable entry.  We need a ParseState for transformExpr.
2040          */
2041         pstate = make_parsestate(NULL);
2042         rte = addRangeTableEntryForRelation(pstate,
2043                                                                                 rel,
2044                                                                                 NULL,
2045                                                                                 false,
2046                                                                                 true);
2047         addRTEtoQuery(pstate, rte, true, true, true);
2048
2049         /*
2050          * Process column default expressions.
2051          */
2052         foreach(cell, newColDefaults)
2053         {
2054                 RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
2055                 Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1];
2056
2057                 expr = cookDefault(pstate, colDef->raw_default,
2058                                                    atp->atttypid, atp->atttypmod,
2059                                                    NameStr(atp->attname));
2060
2061                 /*
2062                  * If the expression is just a NULL constant, we do not bother to make
2063                  * an explicit pg_attrdef entry, since the default behavior is
2064                  * equivalent.
2065                  *
2066                  * Note a nonobvious property of this test: if the column is of a
2067                  * domain type, what we'll get is not a bare null Const but a
2068                  * CoerceToDomain expr, so we will not discard the default.  This is
2069                  * critical because the column default needs to be retained to
2070                  * override any default that the domain might have.
2071                  */
2072                 if (expr == NULL ||
2073                         (IsA(expr, Const) &&((Const *) expr)->constisnull))
2074                         continue;
2075
2076                 StoreAttrDefault(rel, colDef->attnum, expr);
2077
2078                 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2079                 cooked->contype = CONSTR_DEFAULT;
2080                 cooked->name = NULL;
2081                 cooked->attnum = colDef->attnum;
2082                 cooked->expr = expr;
2083                 cooked->is_local = is_local;
2084                 cooked->inhcount = is_local ? 0 : 1;
2085                 cookedConstraints = lappend(cookedConstraints, cooked);
2086         }
2087
2088         /*
2089          * Process constraint expressions.
2090          */
2091         numchecks = numoldchecks;
2092         checknames = NIL;
2093         foreach(cell, newConstraints)
2094         {
2095                 Constraint *cdef = (Constraint *) lfirst(cell);
2096                 char       *ccname;
2097
2098                 if (cdef->contype != CONSTR_CHECK)
2099                         continue;
2100
2101                 if (cdef->raw_expr != NULL)
2102                 {
2103                         Assert(cdef->cooked_expr == NULL);
2104
2105                         /*
2106                          * Transform raw parsetree to executable expression, and verify
2107                          * it's valid as a CHECK constraint.
2108                          */
2109                         expr = cookConstraint(pstate, cdef->raw_expr,
2110                                                                   RelationGetRelationName(rel));
2111                 }
2112                 else
2113                 {
2114                         Assert(cdef->cooked_expr != NULL);
2115
2116                         /*
2117                          * Here, we assume the parser will only pass us valid CHECK
2118                          * expressions, so we do no particular checking.
2119                          */
2120                         expr = stringToNode(cdef->cooked_expr);
2121                 }
2122
2123                 /*
2124                  * Check name uniqueness, or generate a name if none was given.
2125                  */
2126                 if (cdef->conname != NULL)
2127                 {
2128                         ListCell   *cell2;
2129
2130                         ccname = cdef->conname;
2131                         /* Check against other new constraints */
2132                         /* Needed because we don't do CommandCounterIncrement in loop */
2133                         foreach(cell2, checknames)
2134                         {
2135                                 if (strcmp((char *) lfirst(cell2), ccname) == 0)
2136                                         ereport(ERROR,
2137                                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
2138                                                          errmsg("check constraint \"%s\" already exists",
2139                                                                         ccname)));
2140                         }
2141
2142                         /* save name for future checks */
2143                         checknames = lappend(checknames, ccname);
2144
2145                         /*
2146                          * Check against pre-existing constraints.      If we are allowed to
2147                          * merge with an existing constraint, there's no more to do here.
2148                          * (We omit the duplicate constraint from the result, which is
2149                          * what ATAddCheckConstraint wants.)
2150                          */
2151                         if (MergeWithExistingConstraint(rel, ccname, expr,
2152                                                                                         allow_merge, is_local))
2153                                 continue;
2154                 }
2155                 else
2156                 {
2157                         /*
2158                          * When generating a name, we want to create "tab_col_check" for a
2159                          * column constraint and "tab_check" for a table constraint.  We
2160                          * no longer have any info about the syntactic positioning of the
2161                          * constraint phrase, so we approximate this by seeing whether the
2162                          * expression references more than one column.  (If the user
2163                          * played by the rules, the result is the same...)
2164                          *
2165                          * Note: pull_var_clause() doesn't descend into sublinks, but we
2166                          * eliminated those above; and anyway this only needs to be an
2167                          * approximate answer.
2168                          */
2169                         List       *vars;
2170                         char       *colname;
2171
2172                         vars = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);
2173
2174                         /* eliminate duplicates */
2175                         vars = list_union(NIL, vars);
2176
2177                         if (list_length(vars) == 1)
2178                                 colname = get_attname(RelationGetRelid(rel),
2179                                                                           ((Var *) linitial(vars))->varattno);
2180                         else
2181                                 colname = NULL;
2182
2183                         ccname = ChooseConstraintName(RelationGetRelationName(rel),
2184                                                                                   colname,
2185                                                                                   "check",
2186                                                                                   RelationGetNamespace(rel),
2187                                                                                   checknames);
2188
2189                         /* save name for future checks */
2190                         checknames = lappend(checknames, ccname);
2191                 }
2192
2193                 /*
2194                  * OK, store it.
2195                  */
2196                 StoreRelCheck(rel, ccname, expr, is_local, is_local ? 0 : 1);
2197
2198                 numchecks++;
2199
2200                 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2201                 cooked->contype = CONSTR_CHECK;
2202                 cooked->name = ccname;
2203                 cooked->attnum = 0;
2204                 cooked->expr = expr;
2205                 cooked->is_local = is_local;
2206                 cooked->inhcount = is_local ? 0 : 1;
2207                 cookedConstraints = lappend(cookedConstraints, cooked);
2208         }
2209
2210         /*
2211          * Update the count of constraints in the relation's pg_class tuple. We do
2212          * this even if there was no change, in order to ensure that an SI update
2213          * message is sent out for the pg_class tuple, which will force other
2214          * backends to rebuild their relcache entries for the rel. (This is
2215          * critical if we added defaults but not constraints.)
2216          */
2217         SetRelationNumChecks(rel, numchecks);
2218
2219         return cookedConstraints;
2220 }
2221
2222 /*
2223  * Check for a pre-existing check constraint that conflicts with a proposed
2224  * new one, and either adjust its conislocal/coninhcount settings or throw
2225  * error as needed.
2226  *
2227  * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
2228  * got a so-far-unique name, or throws error if conflict.
2229  */
2230 static bool
2231 MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
2232                                                         bool allow_merge, bool is_local)
2233 {
2234         bool            found;
2235         Relation        conDesc;
2236         SysScanDesc conscan;
2237         ScanKeyData skey[2];
2238         HeapTuple       tup;
2239
2240         /* Search for a pg_constraint entry with same name and relation */
2241         conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
2242
2243         found = false;
2244
2245         ScanKeyInit(&skey[0],
2246                                 Anum_pg_constraint_conname,
2247                                 BTEqualStrategyNumber, F_NAMEEQ,
2248                                 CStringGetDatum(ccname));
2249
2250         ScanKeyInit(&skey[1],
2251                                 Anum_pg_constraint_connamespace,
2252                                 BTEqualStrategyNumber, F_OIDEQ,
2253                                 ObjectIdGetDatum(RelationGetNamespace(rel)));
2254
2255         conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
2256                                                                  SnapshotNow, 2, skey);
2257
2258         while (HeapTupleIsValid(tup = systable_getnext(conscan)))
2259         {
2260                 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
2261
2262                 if (con->conrelid == RelationGetRelid(rel))
2263                 {
2264                         /* Found it.  Conflicts if not identical check constraint */
2265                         if (con->contype == CONSTRAINT_CHECK)
2266                         {
2267                                 Datum           val;
2268                                 bool            isnull;
2269
2270                                 val = fastgetattr(tup,
2271                                                                   Anum_pg_constraint_conbin,
2272                                                                   conDesc->rd_att, &isnull);
2273                                 if (isnull)
2274                                         elog(ERROR, "null conbin for rel %s",
2275                                                  RelationGetRelationName(rel));
2276                                 if (equal(expr, stringToNode(TextDatumGetCString(val))))
2277                                         found = true;
2278                         }
2279                         if (!found || !allow_merge)
2280                                 ereport(ERROR,
2281                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
2282                                 errmsg("constraint \"%s\" for relation \"%s\" already exists",
2283                                            ccname, RelationGetRelationName(rel))));
2284                         /* OK to update the tuple */
2285                         ereport(NOTICE,
2286                            (errmsg("merging constraint \"%s\" with inherited definition",
2287                                            ccname)));
2288                         tup = heap_copytuple(tup);
2289                         con = (Form_pg_constraint) GETSTRUCT(tup);
2290                         if (is_local)
2291                                 con->conislocal = true;
2292                         else
2293                                 con->coninhcount++;
2294                         simple_heap_update(conDesc, &tup->t_self, tup);
2295                         CatalogUpdateIndexes(conDesc, tup);
2296                         break;
2297                 }
2298         }
2299
2300         systable_endscan(conscan);
2301         heap_close(conDesc, RowExclusiveLock);
2302
2303         return found;
2304 }
2305
2306 /*
2307  * Update the count of constraints in the relation's pg_class tuple.
2308  *
2309  * Caller had better hold exclusive lock on the relation.
2310  *
2311  * An important side effect is that a SI update message will be sent out for
2312  * the pg_class tuple, which will force other backends to rebuild their
2313  * relcache entries for the rel.  Also, this backend will rebuild its
2314  * own relcache entry at the next CommandCounterIncrement.
2315  */
2316 static void
2317 SetRelationNumChecks(Relation rel, int numchecks)
2318 {
2319         Relation        relrel;
2320         HeapTuple       reltup;
2321         Form_pg_class relStruct;
2322
2323         relrel = heap_open(RelationRelationId, RowExclusiveLock);
2324         reltup = SearchSysCacheCopy1(RELOID,
2325                                                                  ObjectIdGetDatum(RelationGetRelid(rel)));
2326         if (!HeapTupleIsValid(reltup))
2327                 elog(ERROR, "cache lookup failed for relation %u",
2328                          RelationGetRelid(rel));
2329         relStruct = (Form_pg_class) GETSTRUCT(reltup);
2330
2331         if (relStruct->relchecks != numchecks)
2332         {
2333                 relStruct->relchecks = numchecks;
2334
2335                 simple_heap_update(relrel, &reltup->t_self, reltup);
2336
2337                 /* keep catalog indexes current */
2338                 CatalogUpdateIndexes(relrel, reltup);
2339         }
2340         else
2341         {
2342                 /* Skip the disk update, but force relcache inval anyway */
2343                 CacheInvalidateRelcache(rel);
2344         }
2345
2346         heap_freetuple(reltup);
2347         heap_close(relrel, RowExclusiveLock);
2348 }
2349
2350 /*
2351  * Take a raw default and convert it to a cooked format ready for
2352  * storage.
2353  *
2354  * Parse state should be set up to recognize any vars that might appear
2355  * in the expression.  (Even though we plan to reject vars, it's more
2356  * user-friendly to give the correct error message than "unknown var".)
2357  *
2358  * If atttypid is not InvalidOid, coerce the expression to the specified
2359  * type (and typmod atttypmod).   attname is only needed in this case:
2360  * it is used in the error message, if any.
2361  */
2362 Node *
2363 cookDefault(ParseState *pstate,
2364                         Node *raw_default,
2365                         Oid atttypid,
2366                         int32 atttypmod,
2367                         char *attname)
2368 {
2369         Node       *expr;
2370
2371         Assert(raw_default != NULL);
2372
2373         /*
2374          * Transform raw parsetree to executable expression.
2375          */
2376         expr = transformExpr(pstate, raw_default);
2377
2378         /*
2379          * Make sure default expr does not refer to any vars.
2380          */
2381         if (contain_var_clause(expr))
2382                 ereport(ERROR,
2383                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
2384                           errmsg("cannot use column references in default expression")));
2385
2386         /*
2387          * It can't return a set either.
2388          */
2389         if (expression_returns_set(expr))
2390                 ereport(ERROR,
2391                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
2392                                  errmsg("default expression must not return a set")));
2393
2394         /*
2395          * No subplans or aggregates, either...
2396          */
2397         if (pstate->p_hasSubLinks)
2398                 ereport(ERROR,
2399                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2400                                  errmsg("cannot use subquery in default expression")));
2401         if (pstate->p_hasAggs)
2402                 ereport(ERROR,
2403                                 (errcode(ERRCODE_GROUPING_ERROR),
2404                          errmsg("cannot use aggregate function in default expression")));
2405         if (pstate->p_hasWindowFuncs)
2406                 ereport(ERROR,
2407                                 (errcode(ERRCODE_WINDOWING_ERROR),
2408                                  errmsg("cannot use window function in default expression")));
2409
2410         /*
2411          * Coerce the expression to the correct type and typmod, if given. This
2412          * should match the parser's processing of non-defaulted expressions ---
2413          * see transformAssignedExpr().
2414          */
2415         if (OidIsValid(atttypid))
2416         {
2417                 Oid                     type_id = exprType(expr);
2418
2419                 expr = coerce_to_target_type(pstate, expr, type_id,
2420                                                                          atttypid, atttypmod,
2421                                                                          COERCION_ASSIGNMENT,
2422                                                                          COERCE_IMPLICIT_CAST,
2423                                                                          -1);
2424                 if (expr == NULL)
2425                         ereport(ERROR,
2426                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
2427                                          errmsg("column \"%s\" is of type %s"
2428                                                         " but default expression is of type %s",
2429                                                         attname,
2430                                                         format_type_be(atttypid),
2431                                                         format_type_be(type_id)),
2432                            errhint("You will need to rewrite or cast the expression.")));
2433         }
2434
2435         /*
2436          * Finally, take care of collations in the finished expression.
2437          */
2438         assign_expr_collations(pstate, expr);
2439
2440         return expr;
2441 }
2442
2443 /*
2444  * Take a raw CHECK constraint expression and convert it to a cooked format
2445  * ready for storage.
2446  *
2447  * Parse state must be set up to recognize any vars that might appear
2448  * in the expression.
2449  */
2450 static Node *
2451 cookConstraint(ParseState *pstate,
2452                            Node *raw_constraint,
2453                            char *relname)
2454 {
2455         Node       *expr;
2456
2457         /*
2458          * Transform raw parsetree to executable expression.
2459          */
2460         expr = transformExpr(pstate, raw_constraint);
2461
2462         /*
2463          * Make sure it yields a boolean result.
2464          */
2465         expr = coerce_to_boolean(pstate, expr, "CHECK");
2466
2467         /*
2468          * Take care of collations.
2469          */
2470         assign_expr_collations(pstate, expr);
2471
2472         /*
2473          * Make sure no outside relations are referred to.
2474          */
2475         if (list_length(pstate->p_rtable) != 1)
2476                 ereport(ERROR,
2477                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
2478                         errmsg("only table \"%s\" can be referenced in check constraint",
2479                                    relname)));
2480
2481         /*
2482          * No subplans or aggregates, either...
2483          */
2484         if (pstate->p_hasSubLinks)
2485                 ereport(ERROR,
2486                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2487                                  errmsg("cannot use subquery in check constraint")));
2488         if (pstate->p_hasAggs)
2489                 ereport(ERROR,
2490                                 (errcode(ERRCODE_GROUPING_ERROR),
2491                            errmsg("cannot use aggregate function in check constraint")));
2492         if (pstate->p_hasWindowFuncs)
2493                 ereport(ERROR,
2494                                 (errcode(ERRCODE_WINDOWING_ERROR),
2495                                  errmsg("cannot use window function in check constraint")));
2496
2497         return expr;
2498 }
2499
2500
2501 /*
2502  * RemoveStatistics --- remove entries in pg_statistic for a rel or column
2503  *
2504  * If attnum is zero, remove all entries for rel; else remove only the one(s)
2505  * for that column.
2506  */
2507 void
2508 RemoveStatistics(Oid relid, AttrNumber attnum)
2509 {
2510         Relation        pgstatistic;
2511         SysScanDesc scan;
2512         ScanKeyData key[2];
2513         int                     nkeys;
2514         HeapTuple       tuple;
2515
2516         pgstatistic = heap_open(StatisticRelationId, RowExclusiveLock);
2517
2518         ScanKeyInit(&key[0],
2519                                 Anum_pg_statistic_starelid,
2520                                 BTEqualStrategyNumber, F_OIDEQ,
2521                                 ObjectIdGetDatum(relid));
2522
2523         if (attnum == 0)
2524                 nkeys = 1;
2525         else
2526         {
2527                 ScanKeyInit(&key[1],
2528                                         Anum_pg_statistic_staattnum,
2529                                         BTEqualStrategyNumber, F_INT2EQ,
2530                                         Int16GetDatum(attnum));
2531                 nkeys = 2;
2532         }
2533
2534         scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
2535                                                           SnapshotNow, nkeys, key);
2536
2537         /* we must loop even when attnum != 0, in case of inherited stats */
2538         while (HeapTupleIsValid(tuple = systable_getnext(scan)))
2539                 simple_heap_delete(pgstatistic, &tuple->t_self);
2540
2541         systable_endscan(scan);
2542
2543         heap_close(pgstatistic, RowExclusiveLock);
2544 }
2545
2546
2547 /*
2548  * RelationTruncateIndexes - truncate all indexes associated
2549  * with the heap relation to zero tuples.
2550  *
2551  * The routine will truncate and then reconstruct the indexes on
2552  * the specified relation.      Caller must hold exclusive lock on rel.
2553  */
2554 static void
2555 RelationTruncateIndexes(Relation heapRelation)
2556 {
2557         ListCell   *indlist;
2558
2559         /* Ask the relcache to produce a list of the indexes of the rel */
2560         foreach(indlist, RelationGetIndexList(heapRelation))
2561         {
2562                 Oid                     indexId = lfirst_oid(indlist);
2563                 Relation        currentIndex;
2564                 IndexInfo  *indexInfo;
2565
2566                 /* Open the index relation; use exclusive lock, just to be sure */
2567                 currentIndex = index_open(indexId, AccessExclusiveLock);
2568
2569                 /* Fetch info needed for index_build */
2570                 indexInfo = BuildIndexInfo(currentIndex);
2571
2572                 /*
2573                  * Now truncate the actual file (and discard buffers).
2574                  */
2575                 RelationTruncate(currentIndex, 0);
2576
2577                 /* Initialize the index and rebuild */
2578                 /* Note: we do not need to re-establish pkey setting */
2579                 index_build(heapRelation, currentIndex, indexInfo, false, true);
2580
2581                 /* We're done with this index */
2582                 index_close(currentIndex, NoLock);
2583         }
2584 }
2585
2586 /*
2587  *       heap_truncate
2588  *
2589  *       This routine deletes all data within all the specified relations.
2590  *
2591  * This is not transaction-safe!  There is another, transaction-safe
2592  * implementation in commands/tablecmds.c.      We now use this only for
2593  * ON COMMIT truncation of temporary tables, where it doesn't matter.
2594  */
2595 void
2596 heap_truncate(List *relids)
2597 {
2598         List       *relations = NIL;
2599         ListCell   *cell;
2600
2601         /* Open relations for processing, and grab exclusive access on each */
2602         foreach(cell, relids)
2603         {
2604                 Oid                     rid = lfirst_oid(cell);
2605                 Relation        rel;
2606
2607                 rel = heap_open(rid, AccessExclusiveLock);
2608                 relations = lappend(relations, rel);
2609         }
2610
2611         /* Don't allow truncate on tables that are referenced by foreign keys */
2612         heap_truncate_check_FKs(relations, true);
2613
2614         /* OK to do it */
2615         foreach(cell, relations)
2616         {
2617                 Relation        rel = lfirst(cell);
2618
2619                 /* Truncate the relation */
2620                 heap_truncate_one_rel(rel);
2621
2622                 /* Close the relation, but keep exclusive lock on it until commit */
2623                 heap_close(rel, NoLock);
2624         }
2625 }
2626
2627 /*
2628  *       heap_truncate_one_rel
2629  *
2630  *       This routine deletes all data within the specified relation.
2631  *
2632  * This is not transaction-safe, because the truncation is done immediately
2633  * and cannot be rolled back later.  Caller is responsible for having
2634  * checked permissions etc, and must have obtained AccessExclusiveLock.
2635  */
2636 void
2637 heap_truncate_one_rel(Relation rel)
2638 {
2639         Oid                     toastrelid;
2640
2641         /* Truncate the actual file (and discard buffers) */
2642         RelationTruncate(rel, 0);
2643
2644         /* If the relation has indexes, truncate the indexes too */
2645         RelationTruncateIndexes(rel);
2646
2647         /* If there is a toast table, truncate that too */
2648         toastrelid = rel->rd_rel->reltoastrelid;
2649         if (OidIsValid(toastrelid))
2650         {
2651                 Relation        toastrel = heap_open(toastrelid, AccessExclusiveLock);
2652
2653                 RelationTruncate(toastrel, 0);
2654                 RelationTruncateIndexes(toastrel);
2655                 /* keep the lock... */
2656                 heap_close(toastrel, NoLock);
2657         }
2658 }
2659
2660 /*
2661  * heap_truncate_check_FKs
2662  *              Check for foreign keys referencing a list of relations that
2663  *              are to be truncated, and raise error if there are any
2664  *
2665  * We disallow such FKs (except self-referential ones) since the whole point
2666  * of TRUNCATE is to not scan the individual rows to be thrown away.
2667  *
2668  * This is split out so it can be shared by both implementations of truncate.
2669  * Caller should already hold a suitable lock on the relations.
2670  *
2671  * tempTables is only used to select an appropriate error message.
2672  */
2673 void
2674 heap_truncate_check_FKs(List *relations, bool tempTables)
2675 {
2676         List       *oids = NIL;
2677         List       *dependents;
2678         ListCell   *cell;
2679
2680         /*
2681          * Build a list of OIDs of the interesting relations.
2682          *
2683          * If a relation has no triggers, then it can neither have FKs nor be
2684          * referenced by a FK from another table, so we can ignore it.
2685          */
2686         foreach(cell, relations)
2687         {
2688                 Relation        rel = lfirst(cell);
2689
2690                 if (rel->rd_rel->relhastriggers)
2691                         oids = lappend_oid(oids, RelationGetRelid(rel));
2692         }
2693
2694         /*
2695          * Fast path: if no relation has triggers, none has FKs either.
2696          */
2697         if (oids == NIL)
2698                 return;
2699
2700         /*
2701          * Otherwise, must scan pg_constraint.  We make one pass with all the
2702          * relations considered; if this finds nothing, then all is well.
2703          */
2704         dependents = heap_truncate_find_FKs(oids);
2705         if (dependents == NIL)
2706                 return;
2707
2708         /*
2709          * Otherwise we repeat the scan once per relation to identify a particular
2710          * pair of relations to complain about.  This is pretty slow, but
2711          * performance shouldn't matter much in a failure path.  The reason for
2712          * doing things this way is to ensure that the message produced is not
2713          * dependent on chance row locations within pg_constraint.
2714          */
2715         foreach(cell, oids)
2716         {
2717                 Oid                     relid = lfirst_oid(cell);
2718                 ListCell   *cell2;
2719
2720                 dependents = heap_truncate_find_FKs(list_make1_oid(relid));
2721
2722                 foreach(cell2, dependents)
2723                 {
2724                         Oid                     relid2 = lfirst_oid(cell2);
2725
2726                         if (!list_member_oid(oids, relid2))
2727                         {
2728                                 char       *relname = get_rel_name(relid);
2729                                 char       *relname2 = get_rel_name(relid2);
2730
2731                                 if (tempTables)
2732                                         ereport(ERROR,
2733                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2734                                                          errmsg("unsupported ON COMMIT and foreign key combination"),
2735                                                          errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
2736                                                                            relname2, relname)));
2737                                 else
2738                                         ereport(ERROR,
2739                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2740                                                          errmsg("cannot truncate a table referenced in a foreign key constraint"),
2741                                                          errdetail("Table \"%s\" references \"%s\".",
2742                                                                            relname2, relname),
2743                                                    errhint("Truncate table \"%s\" at the same time, "
2744                                                                    "or use TRUNCATE ... CASCADE.",
2745                                                                    relname2)));
2746                         }
2747                 }
2748         }
2749 }
2750
2751 /*
2752  * heap_truncate_find_FKs
2753  *              Find relations having foreign keys referencing any of the given rels
2754  *
2755  * Input and result are both lists of relation OIDs.  The result contains
2756  * no duplicates, does *not* include any rels that were already in the input
2757  * list, and is sorted in OID order.  (The last property is enforced mainly
2758  * to guarantee consistent behavior in the regression tests; we don't want
2759  * behavior to change depending on chance locations of rows in pg_constraint.)
2760  *
2761  * Note: caller should already have appropriate lock on all rels mentioned
2762  * in relationIds.      Since adding or dropping an FK requires exclusive lock
2763  * on both rels, this ensures that the answer will be stable.
2764  */
2765 List *
2766 heap_truncate_find_FKs(List *relationIds)
2767 {
2768         List       *result = NIL;
2769         Relation        fkeyRel;
2770         SysScanDesc fkeyScan;
2771         HeapTuple       tuple;
2772
2773         /*
2774          * Must scan pg_constraint.  Right now, it is a seqscan because there is
2775          * no available index on confrelid.
2776          */
2777         fkeyRel = heap_open(ConstraintRelationId, AccessShareLock);
2778
2779         fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
2780                                                                   SnapshotNow, 0, NULL);
2781
2782         while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
2783         {
2784                 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
2785
2786                 /* Not a foreign key */
2787                 if (con->contype != CONSTRAINT_FOREIGN)
2788                         continue;
2789
2790                 /* Not referencing one of our list of tables */
2791                 if (!list_member_oid(relationIds, con->confrelid))
2792                         continue;
2793
2794                 /* Add referencer unless already in input or result list */
2795                 if (!list_member_oid(relationIds, con->conrelid))
2796                         result = insert_ordered_unique_oid(result, con->conrelid);
2797         }
2798
2799         systable_endscan(fkeyScan);
2800         heap_close(fkeyRel, AccessShareLock);
2801
2802         return result;
2803 }
2804
2805 /*
2806  * insert_ordered_unique_oid
2807  *              Insert a new Oid into a sorted list of Oids, preserving ordering,
2808  *              and eliminating duplicates
2809  *
2810  * Building the ordered list this way is O(N^2), but with a pretty small
2811  * constant, so for the number of entries we expect it will probably be
2812  * faster than trying to apply qsort().  It seems unlikely someone would be
2813  * trying to truncate a table with thousands of dependent tables ...
2814  */
2815 static List *
2816 insert_ordered_unique_oid(List *list, Oid datum)
2817 {
2818         ListCell   *prev;
2819
2820         /* Does the datum belong at the front? */
2821         if (list == NIL || datum < linitial_oid(list))
2822                 return lcons_oid(datum, list);
2823         /* Does it match the first entry? */
2824         if (datum == linitial_oid(list))
2825                 return list;                    /* duplicate, so don't insert */
2826         /* No, so find the entry it belongs after */
2827         prev = list_head(list);
2828         for (;;)
2829         {
2830                 ListCell   *curr = lnext(prev);
2831
2832                 if (curr == NULL || datum < lfirst_oid(curr))
2833                         break;                          /* it belongs after 'prev', before 'curr' */
2834
2835                 if (datum == lfirst_oid(curr))
2836                         return list;            /* duplicate, so don't insert */
2837
2838                 prev = curr;
2839         }
2840         /* Insert datum into list after 'prev' */
2841         lappend_cell_oid(list, prev, datum);
2842         return list;
2843 }