OSDN Git Service

6b8eb53dd1b6e5ba4d3a8978411db8b9fe807a09
[pg-rex/syncrep.git] / src / backend / catalog / heap.c
1 /*-------------------------------------------------------------------------
2  *
3  * heap.c
4  *        code to create and destroy POSTGRES heap relations
5  *
6  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/catalog/heap.c
12  *
13  *
14  * INTERFACE ROUTINES
15  *              heap_create()                   - Create an uncataloged heap relation
16  *              heap_create_with_catalog() - Create a cataloged relation
17  *              heap_drop_with_catalog() - Removes named relation from catalogs
18  *
19  * NOTES
20  *        this code taken from access/heap/create.c, which contains
21  *        the old heap_create_with_catalog, amcreate, and amdestroy.
22  *        those routines will soon call these routines using the function
23  *        manager,
24  *        just like the poorly named "NewXXX" routines do.      The
25  *        "New" routines are all going to die soon, once and for all!
26  *              -cim 1/13/91
27  *
28  *-------------------------------------------------------------------------
29  */
30 #include "postgres.h"
31
32 #include "access/genam.h"
33 #include "access/heapam.h"
34 #include "access/sysattr.h"
35 #include "access/transam.h"
36 #include "access/xact.h"
37 #include "catalog/catalog.h"
38 #include "catalog/dependency.h"
39 #include "catalog/heap.h"
40 #include "catalog/index.h"
41 #include "catalog/indexing.h"
42 #include "catalog/namespace.h"
43 #include "catalog/objectaccess.h"
44 #include "catalog/pg_attrdef.h"
45 #include "catalog/pg_collation.h"
46 #include "catalog/pg_constraint.h"
47 #include "catalog/pg_foreign_table.h"
48 #include "catalog/pg_inherits.h"
49 #include "catalog/pg_namespace.h"
50 #include "catalog/pg_statistic.h"
51 #include "catalog/pg_tablespace.h"
52 #include "catalog/pg_type.h"
53 #include "catalog/pg_type_fn.h"
54 #include "catalog/storage.h"
55 #include "commands/tablecmds.h"
56 #include "commands/typecmds.h"
57 #include "miscadmin.h"
58 #include "nodes/nodeFuncs.h"
59 #include "optimizer/var.h"
60 #include "parser/parse_coerce.h"
61 #include "parser/parse_collate.h"
62 #include "parser/parse_expr.h"
63 #include "parser/parse_relation.h"
64 #include "storage/bufmgr.h"
65 #include "storage/freespace.h"
66 #include "storage/predicate.h"
67 #include "storage/smgr.h"
68 #include "utils/acl.h"
69 #include "utils/builtins.h"
70 #include "utils/fmgroids.h"
71 #include "utils/inval.h"
72 #include "utils/lsyscache.h"
73 #include "utils/relcache.h"
74 #include "utils/snapmgr.h"
75 #include "utils/syscache.h"
76 #include "utils/tqual.h"
77
78
79 /* Potentially set by contrib/pg_upgrade_support functions */
80 Oid                     binary_upgrade_next_heap_pg_class_oid = InvalidOid;
81 Oid                     binary_upgrade_next_toast_pg_class_oid = InvalidOid;
82
83 static void AddNewRelationTuple(Relation pg_class_desc,
84                                         Relation new_rel_desc,
85                                         Oid new_rel_oid,
86                                         Oid new_type_oid,
87                                         Oid reloftype,
88                                         Oid relowner,
89                                         char relkind,
90                                         Datum relacl,
91                                         Datum reloptions);
92 static Oid AddNewRelationType(const char *typeName,
93                                    Oid typeNamespace,
94                                    Oid new_rel_oid,
95                                    char new_rel_kind,
96                                    Oid ownerid,
97                                    Oid new_row_type,
98                                    Oid new_array_type);
99 static void RelationRemoveInheritance(Oid relid);
100 static void StoreRelCheck(Relation rel, char *ccname, Node *expr,
101                           bool is_validated, bool is_local, int inhcount);
102 static void StoreConstraints(Relation rel, List *cooked_constraints);
103 static bool MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
104                                                         bool allow_merge, bool is_local);
105 static void SetRelationNumChecks(Relation rel, int numchecks);
106 static Node *cookConstraint(ParseState *pstate,
107                            Node *raw_constraint,
108                            char *relname);
109 static List *insert_ordered_unique_oid(List *list, Oid datum);
110
111
112 /* ----------------------------------------------------------------
113  *                              XXX UGLY HARD CODED BADNESS FOLLOWS XXX
114  *
115  *              these should all be moved to someplace in the lib/catalog
116  *              module, if not obliterated first.
117  * ----------------------------------------------------------------
118  */
119
120
121 /*
122  * Note:
123  *              Should the system special case these attributes in the future?
124  *              Advantage:      consume much less space in the ATTRIBUTE relation.
125  *              Disadvantage:  special cases will be all over the place.
126  */
127
128 /*
129  * The initializers below do not include the attoptions or attacl fields,
130  * but that's OK - we're never going to reference anything beyond the
131  * fixed-size portion of the structure anyway.
132  */
133
134 static FormData_pg_attribute a1 = {
135         0, {"ctid"}, TIDOID, 0, sizeof(ItemPointerData),
136         SelfItemPointerAttributeNumber, 0, -1, -1,
137         false, 'p', 's', true, false, false, true, 0
138 };
139
140 static FormData_pg_attribute a2 = {
141         0, {"oid"}, OIDOID, 0, sizeof(Oid),
142         ObjectIdAttributeNumber, 0, -1, -1,
143         true, 'p', 'i', true, false, false, true, 0
144 };
145
146 static FormData_pg_attribute a3 = {
147         0, {"xmin"}, XIDOID, 0, sizeof(TransactionId),
148         MinTransactionIdAttributeNumber, 0, -1, -1,
149         true, 'p', 'i', true, false, false, true, 0
150 };
151
152 static FormData_pg_attribute a4 = {
153         0, {"cmin"}, CIDOID, 0, sizeof(CommandId),
154         MinCommandIdAttributeNumber, 0, -1, -1,
155         true, 'p', 'i', true, false, false, true, 0
156 };
157
158 static FormData_pg_attribute a5 = {
159         0, {"xmax"}, XIDOID, 0, sizeof(TransactionId),
160         MaxTransactionIdAttributeNumber, 0, -1, -1,
161         true, 'p', 'i', true, false, false, true, 0
162 };
163
164 static FormData_pg_attribute a6 = {
165         0, {"cmax"}, CIDOID, 0, sizeof(CommandId),
166         MaxCommandIdAttributeNumber, 0, -1, -1,
167         true, 'p', 'i', true, false, false, true, 0
168 };
169
170 /*
171  * We decided to call this attribute "tableoid" rather than say
172  * "classoid" on the basis that in the future there may be more than one
173  * table of a particular class/type. In any case table is still the word
174  * used in SQL.
175  */
176 static FormData_pg_attribute a7 = {
177         0, {"tableoid"}, OIDOID, 0, sizeof(Oid),
178         TableOidAttributeNumber, 0, -1, -1,
179         true, 'p', 'i', true, false, false, true, 0
180 };
181
182 static const Form_pg_attribute SysAtt[] = {&a1, &a2, &a3, &a4, &a5, &a6, &a7};
183
184 /*
185  * This function returns a Form_pg_attribute pointer for a system attribute.
186  * Note that we elog if the presented attno is invalid, which would only
187  * happen if there's a problem upstream.
188  */
189 Form_pg_attribute
190 SystemAttributeDefinition(AttrNumber attno, bool relhasoids)
191 {
192         if (attno >= 0 || attno < -(int) lengthof(SysAtt))
193                 elog(ERROR, "invalid system attribute number %d", attno);
194         if (attno == ObjectIdAttributeNumber && !relhasoids)
195                 elog(ERROR, "invalid system attribute number %d", attno);
196         return SysAtt[-attno - 1];
197 }
198
199 /*
200  * If the given name is a system attribute name, return a Form_pg_attribute
201  * pointer for a prototype definition.  If not, return NULL.
202  */
203 Form_pg_attribute
204 SystemAttributeByName(const char *attname, bool relhasoids)
205 {
206         int                     j;
207
208         for (j = 0; j < (int) lengthof(SysAtt); j++)
209         {
210                 Form_pg_attribute att = SysAtt[j];
211
212                 if (relhasoids || att->attnum != ObjectIdAttributeNumber)
213                 {
214                         if (strcmp(NameStr(att->attname), attname) == 0)
215                                 return att;
216                 }
217         }
218
219         return NULL;
220 }
221
222
223 /* ----------------------------------------------------------------
224  *                              XXX END OF UGLY HARD CODED BADNESS XXX
225  * ---------------------------------------------------------------- */
226
227
228 /* ----------------------------------------------------------------
229  *              heap_create             - Create an uncataloged heap relation
230  *
231  *              Note API change: the caller must now always provide the OID
232  *              to use for the relation.
233  *
234  *              rel->rd_rel is initialized by RelationBuildLocalRelation,
235  *              and is mostly zeroes at return.
236  * ----------------------------------------------------------------
237  */
238 Relation
239 heap_create(const char *relname,
240                         Oid relnamespace,
241                         Oid reltablespace,
242                         Oid relid,
243                         TupleDesc tupDesc,
244                         char relkind,
245                         char relpersistence,
246                         bool shared_relation,
247                         bool mapped_relation,
248                         bool allow_system_table_mods)
249 {
250         bool            create_storage;
251         Relation        rel;
252
253         /* The caller must have provided an OID for the relation. */
254         Assert(OidIsValid(relid));
255
256         /*
257          * sanity checks
258          */
259         if (!allow_system_table_mods &&
260                 (IsSystemNamespace(relnamespace) || IsToastNamespace(relnamespace)) &&
261                 IsNormalProcessingMode())
262                 ereport(ERROR,
263                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
264                                  errmsg("permission denied to create \"%s.%s\"",
265                                                 get_namespace_name(relnamespace), relname),
266                 errdetail("System catalog modifications are currently disallowed.")));
267
268         /*
269          * Decide if we need storage or not, and handle a couple other special
270          * cases for particular relkinds.
271          */
272         switch (relkind)
273         {
274                 case RELKIND_VIEW:
275                 case RELKIND_COMPOSITE_TYPE:
276                 case RELKIND_FOREIGN_TABLE:
277                         create_storage = false;
278
279                         /*
280                          * Force reltablespace to zero if the relation has no physical
281                          * storage.  This is mainly just for cleanliness' sake.
282                          */
283                         reltablespace = InvalidOid;
284                         break;
285                 case RELKIND_SEQUENCE:
286                         create_storage = true;
287
288                         /*
289                          * Force reltablespace to zero for sequences, since we don't
290                          * support moving them around into different tablespaces.
291                          */
292                         reltablespace = InvalidOid;
293                         break;
294                 default:
295                         create_storage = true;
296                         break;
297         }
298
299         /*
300          * Never allow a pg_class entry to explicitly specify the database's
301          * default tablespace in reltablespace; force it to zero instead. This
302          * ensures that if the database is cloned with a different default
303          * tablespace, the pg_class entry will still match where CREATE DATABASE
304          * will put the physically copied relation.
305          *
306          * Yes, this is a bit of a hack.
307          */
308         if (reltablespace == MyDatabaseTableSpace)
309                 reltablespace = InvalidOid;
310
311         /*
312          * build the relcache entry.
313          */
314         rel = RelationBuildLocalRelation(relname,
315                                                                          relnamespace,
316                                                                          tupDesc,
317                                                                          relid,
318                                                                          reltablespace,
319                                                                          shared_relation,
320                                                                          mapped_relation,
321                                                                          relpersistence);
322
323         /*
324          * Have the storage manager create the relation's disk file, if needed.
325          *
326          * We only create the main fork here, other forks will be created on
327          * demand.
328          */
329         if (create_storage)
330         {
331                 RelationOpenSmgr(rel);
332                 RelationCreateStorage(rel->rd_node, relpersistence);
333         }
334
335         return rel;
336 }
337
338 /* ----------------------------------------------------------------
339  *              heap_create_with_catalog                - Create a cataloged relation
340  *
341  *              this is done in multiple steps:
342  *
343  *              1) CheckAttributeNamesTypes() is used to make certain the tuple
344  *                 descriptor contains a valid set of attribute names and types
345  *
346  *              2) pg_class is opened and get_relname_relid()
347  *                 performs a scan to ensure that no relation with the
348  *                 same name already exists.
349  *
350  *              3) heap_create() is called to create the new relation on disk.
351  *
352  *              4) TypeCreate() is called to define a new type corresponding
353  *                 to the new relation.
354  *
355  *              5) AddNewRelationTuple() is called to register the
356  *                 relation in pg_class.
357  *
358  *              6) AddNewAttributeTuples() is called to register the
359  *                 new relation's schema in pg_attribute.
360  *
361  *              7) StoreConstraints is called ()                - vadim 08/22/97
362  *
363  *              8) the relations are closed and the new relation's oid
364  *                 is returned.
365  *
366  * ----------------------------------------------------------------
367  */
368
369 /* --------------------------------
370  *              CheckAttributeNamesTypes
371  *
372  *              this is used to make certain the tuple descriptor contains a
373  *              valid set of attribute names and datatypes.  a problem simply
374  *              generates ereport(ERROR) which aborts the current transaction.
375  * --------------------------------
376  */
377 void
378 CheckAttributeNamesTypes(TupleDesc tupdesc, char relkind,
379                                                  bool allow_system_table_mods)
380 {
381         int                     i;
382         int                     j;
383         int                     natts = tupdesc->natts;
384
385         /* Sanity check on column count */
386         if (natts < 0 || natts > MaxHeapAttributeNumber)
387                 ereport(ERROR,
388                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
389                                  errmsg("tables can have at most %d columns",
390                                                 MaxHeapAttributeNumber)));
391
392         /*
393          * first check for collision with system attribute names
394          *
395          * Skip this for a view or type relation, since those don't have system
396          * attributes.
397          */
398         if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
399         {
400                 for (i = 0; i < natts; i++)
401                 {
402                         if (SystemAttributeByName(NameStr(tupdesc->attrs[i]->attname),
403                                                                           tupdesc->tdhasoid) != NULL)
404                                 ereport(ERROR,
405                                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
406                                                  errmsg("column name \"%s\" conflicts with a system column name",
407                                                                 NameStr(tupdesc->attrs[i]->attname))));
408                 }
409         }
410
411         /*
412          * next check for repeated attribute names
413          */
414         for (i = 1; i < natts; i++)
415         {
416                 for (j = 0; j < i; j++)
417                 {
418                         if (strcmp(NameStr(tupdesc->attrs[j]->attname),
419                                            NameStr(tupdesc->attrs[i]->attname)) == 0)
420                                 ereport(ERROR,
421                                                 (errcode(ERRCODE_DUPLICATE_COLUMN),
422                                                  errmsg("column name \"%s\" specified more than once",
423                                                                 NameStr(tupdesc->attrs[j]->attname))));
424                 }
425         }
426
427         /*
428          * next check the attribute types
429          */
430         for (i = 0; i < natts; i++)
431         {
432                 CheckAttributeType(NameStr(tupdesc->attrs[i]->attname),
433                                                    tupdesc->attrs[i]->atttypid,
434                                                    tupdesc->attrs[i]->attcollation,
435                                                    NIL, /* assume we're creating a new rowtype */
436                                                    allow_system_table_mods);
437         }
438 }
439
440 /* --------------------------------
441  *              CheckAttributeType
442  *
443  *              Verify that the proposed datatype of an attribute is legal.
444  *              This is needed mainly because there are types (and pseudo-types)
445  *              in the catalogs that we do not support as elements of real tuples.
446  *              We also check some other properties required of a table column.
447  *
448  * If the attribute is being proposed for addition to an existing table or
449  * composite type, pass a one-element list of the rowtype OID as
450  * containing_rowtypes.  When checking a to-be-created rowtype, it's
451  * sufficient to pass NIL, because there could not be any recursive reference
452  * to a not-yet-existing rowtype.
453  * --------------------------------
454  */
455 void
456 CheckAttributeType(const char *attname,
457                                    Oid atttypid, Oid attcollation,
458                                    List *containing_rowtypes,
459                                    bool allow_system_table_mods)
460 {
461         char            att_typtype = get_typtype(atttypid);
462         Oid                     att_typelem;
463
464         if (atttypid == UNKNOWNOID)
465         {
466                 /*
467                  * Warn user, but don't fail, if column to be created has UNKNOWN type
468                  * (usually as a result of a 'retrieve into' - jolly)
469                  */
470                 ereport(WARNING,
471                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
472                                  errmsg("column \"%s\" has type \"unknown\"", attname),
473                                  errdetail("Proceeding with relation creation anyway.")));
474         }
475         else if (att_typtype == TYPTYPE_PSEUDO)
476         {
477                 /*
478                  * Refuse any attempt to create a pseudo-type column, except for a
479                  * special hack for pg_statistic: allow ANYARRAY when modifying system
480                  * catalogs (this allows creating pg_statistic and cloning it during
481                  * VACUUM FULL)
482                  */
483                 if (atttypid != ANYARRAYOID || !allow_system_table_mods)
484                         ereport(ERROR,
485                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
486                                          errmsg("column \"%s\" has pseudo-type %s",
487                                                         attname, format_type_be(atttypid))));
488         }
489         else if (att_typtype == TYPTYPE_DOMAIN)
490         {
491                 /*
492                  * If it's a domain, recurse to check its base type.
493                  */
494                 CheckAttributeType(attname, getBaseType(atttypid), attcollation,
495                                                    containing_rowtypes,
496                                                    allow_system_table_mods);
497         }
498         else if (att_typtype == TYPTYPE_COMPOSITE)
499         {
500                 /*
501                  * For a composite type, recurse into its attributes.
502                  */
503                 Relation        relation;
504                 TupleDesc       tupdesc;
505                 int                     i;
506
507                 /*
508                  * Check for self-containment.  Eventually we might be able to allow
509                  * this (just return without complaint, if so) but it's not clear how
510                  * many other places would require anti-recursion defenses before it
511                  * would be safe to allow tables to contain their own rowtype.
512                  */
513                 if (list_member_oid(containing_rowtypes, atttypid))
514                         ereport(ERROR,
515                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
516                                 errmsg("composite type %s cannot be made a member of itself",
517                                            format_type_be(atttypid))));
518
519                 containing_rowtypes = lcons_oid(atttypid, containing_rowtypes);
520
521                 relation = relation_open(get_typ_typrelid(atttypid), AccessShareLock);
522
523                 tupdesc = RelationGetDescr(relation);
524
525                 for (i = 0; i < tupdesc->natts; i++)
526                 {
527                         Form_pg_attribute attr = tupdesc->attrs[i];
528
529                         if (attr->attisdropped)
530                                 continue;
531                         CheckAttributeType(NameStr(attr->attname),
532                                                            attr->atttypid, attr->attcollation,
533                                                            containing_rowtypes,
534                                                            allow_system_table_mods);
535                 }
536
537                 relation_close(relation, AccessShareLock);
538
539                 containing_rowtypes = list_delete_first(containing_rowtypes);
540         }
541         else if (OidIsValid((att_typelem = get_element_type(atttypid))))
542         {
543                 /*
544                  * Must recurse into array types, too, in case they are composite.
545                  */
546                 CheckAttributeType(attname, att_typelem, attcollation,
547                                                    containing_rowtypes,
548                                                    allow_system_table_mods);
549         }
550
551         /*
552          * This might not be strictly invalid per SQL standard, but it is pretty
553          * useless, and it cannot be dumped, so we must disallow it.
554          */
555         if (!OidIsValid(attcollation) && type_is_collatable(atttypid))
556                 ereport(ERROR,
557                                 (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
558                                  errmsg("no collation was derived for column \"%s\" with collatable type %s",
559                                                 attname, format_type_be(atttypid)),
560                 errhint("Use the COLLATE clause to set the collation explicitly.")));
561 }
562
563 /*
564  * InsertPgAttributeTuple
565  *              Construct and insert a new tuple in pg_attribute.
566  *
567  * Caller has already opened and locked pg_attribute.  new_attribute is the
568  * attribute to insert (but we ignore attacl and attoptions, which are always
569  * initialized to NULL).
570  *
571  * indstate is the index state for CatalogIndexInsert.  It can be passed as
572  * NULL, in which case we'll fetch the necessary info.  (Don't do this when
573  * inserting multiple attributes, because it's a tad more expensive.)
574  */
575 void
576 InsertPgAttributeTuple(Relation pg_attribute_rel,
577                                            Form_pg_attribute new_attribute,
578                                            CatalogIndexState indstate)
579 {
580         Datum           values[Natts_pg_attribute];
581         bool            nulls[Natts_pg_attribute];
582         HeapTuple       tup;
583
584         /* This is a tad tedious, but way cleaner than what we used to do... */
585         memset(values, 0, sizeof(values));
586         memset(nulls, false, sizeof(nulls));
587
588         values[Anum_pg_attribute_attrelid - 1] = ObjectIdGetDatum(new_attribute->attrelid);
589         values[Anum_pg_attribute_attname - 1] = NameGetDatum(&new_attribute->attname);
590         values[Anum_pg_attribute_atttypid - 1] = ObjectIdGetDatum(new_attribute->atttypid);
591         values[Anum_pg_attribute_attstattarget - 1] = Int32GetDatum(new_attribute->attstattarget);
592         values[Anum_pg_attribute_attlen - 1] = Int16GetDatum(new_attribute->attlen);
593         values[Anum_pg_attribute_attnum - 1] = Int16GetDatum(new_attribute->attnum);
594         values[Anum_pg_attribute_attndims - 1] = Int32GetDatum(new_attribute->attndims);
595         values[Anum_pg_attribute_attcacheoff - 1] = Int32GetDatum(new_attribute->attcacheoff);
596         values[Anum_pg_attribute_atttypmod - 1] = Int32GetDatum(new_attribute->atttypmod);
597         values[Anum_pg_attribute_attbyval - 1] = BoolGetDatum(new_attribute->attbyval);
598         values[Anum_pg_attribute_attstorage - 1] = CharGetDatum(new_attribute->attstorage);
599         values[Anum_pg_attribute_attalign - 1] = CharGetDatum(new_attribute->attalign);
600         values[Anum_pg_attribute_attnotnull - 1] = BoolGetDatum(new_attribute->attnotnull);
601         values[Anum_pg_attribute_atthasdef - 1] = BoolGetDatum(new_attribute->atthasdef);
602         values[Anum_pg_attribute_attisdropped - 1] = BoolGetDatum(new_attribute->attisdropped);
603         values[Anum_pg_attribute_attislocal - 1] = BoolGetDatum(new_attribute->attislocal);
604         values[Anum_pg_attribute_attinhcount - 1] = Int32GetDatum(new_attribute->attinhcount);
605         values[Anum_pg_attribute_attcollation - 1] = ObjectIdGetDatum(new_attribute->attcollation);
606
607         /* start out with empty permissions and empty options */
608         nulls[Anum_pg_attribute_attacl - 1] = true;
609         nulls[Anum_pg_attribute_attoptions - 1] = true;
610
611         tup = heap_form_tuple(RelationGetDescr(pg_attribute_rel), values, nulls);
612
613         /* finally insert the new tuple, update the indexes, and clean up */
614         simple_heap_insert(pg_attribute_rel, tup);
615
616         if (indstate != NULL)
617                 CatalogIndexInsert(indstate, tup);
618         else
619                 CatalogUpdateIndexes(pg_attribute_rel, tup);
620
621         heap_freetuple(tup);
622 }
623
624 /* --------------------------------
625  *              AddNewAttributeTuples
626  *
627  *              this registers the new relation's schema by adding
628  *              tuples to pg_attribute.
629  * --------------------------------
630  */
631 static void
632 AddNewAttributeTuples(Oid new_rel_oid,
633                                           TupleDesc tupdesc,
634                                           char relkind,
635                                           bool oidislocal,
636                                           int oidinhcount)
637 {
638         Form_pg_attribute attr;
639         int                     i;
640         Relation        rel;
641         CatalogIndexState indstate;
642         int                     natts = tupdesc->natts;
643         ObjectAddress myself,
644                                 referenced;
645
646         /*
647          * open pg_attribute and its indexes.
648          */
649         rel = heap_open(AttributeRelationId, RowExclusiveLock);
650
651         indstate = CatalogOpenIndexes(rel);
652
653         /*
654          * First we add the user attributes.  This is also a convenient place to
655          * add dependencies on their datatypes and collations.
656          */
657         for (i = 0; i < natts; i++)
658         {
659                 attr = tupdesc->attrs[i];
660                 /* Fill in the correct relation OID */
661                 attr->attrelid = new_rel_oid;
662                 /* Make sure these are OK, too */
663                 attr->attstattarget = -1;
664                 attr->attcacheoff = -1;
665
666                 InsertPgAttributeTuple(rel, attr, indstate);
667
668                 /* Add dependency info */
669                 myself.classId = RelationRelationId;
670                 myself.objectId = new_rel_oid;
671                 myself.objectSubId = i + 1;
672                 referenced.classId = TypeRelationId;
673                 referenced.objectId = attr->atttypid;
674                 referenced.objectSubId = 0;
675                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
676
677                 /* The default collation is pinned, so don't bother recording it */
678                 if (OidIsValid(attr->attcollation) &&
679                         attr->attcollation != DEFAULT_COLLATION_OID)
680                 {
681                         referenced.classId = CollationRelationId;
682                         referenced.objectId = attr->attcollation;
683                         referenced.objectSubId = 0;
684                         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
685                 }
686         }
687
688         /*
689          * Next we add the system attributes.  Skip OID if rel has no OIDs. Skip
690          * all for a view or type relation.  We don't bother with making datatype
691          * dependencies here, since presumably all these types are pinned.
692          */
693         if (relkind != RELKIND_VIEW && relkind != RELKIND_COMPOSITE_TYPE)
694         {
695                 for (i = 0; i < (int) lengthof(SysAtt); i++)
696                 {
697                         FormData_pg_attribute attStruct;
698
699                         /* skip OID where appropriate */
700                         if (!tupdesc->tdhasoid &&
701                                 SysAtt[i]->attnum == ObjectIdAttributeNumber)
702                                 continue;
703
704                         memcpy(&attStruct, (char *) SysAtt[i], sizeof(FormData_pg_attribute));
705
706                         /* Fill in the correct relation OID in the copied tuple */
707                         attStruct.attrelid = new_rel_oid;
708
709                         /* Fill in correct inheritance info for the OID column */
710                         if (attStruct.attnum == ObjectIdAttributeNumber)
711                         {
712                                 attStruct.attislocal = oidislocal;
713                                 attStruct.attinhcount = oidinhcount;
714                         }
715
716                         InsertPgAttributeTuple(rel, &attStruct, indstate);
717                 }
718         }
719
720         /*
721          * clean up
722          */
723         CatalogCloseIndexes(indstate);
724
725         heap_close(rel, RowExclusiveLock);
726 }
727
728 /* --------------------------------
729  *              InsertPgClassTuple
730  *
731  *              Construct and insert a new tuple in pg_class.
732  *
733  * Caller has already opened and locked pg_class.
734  * Tuple data is taken from new_rel_desc->rd_rel, except for the
735  * variable-width fields which are not present in a cached reldesc.
736  * relacl and reloptions are passed in Datum form (to avoid having
737  * to reference the data types in heap.h).      Pass (Datum) 0 to set them
738  * to NULL.
739  * --------------------------------
740  */
741 void
742 InsertPgClassTuple(Relation pg_class_desc,
743                                    Relation new_rel_desc,
744                                    Oid new_rel_oid,
745                                    Datum relacl,
746                                    Datum reloptions)
747 {
748         Form_pg_class rd_rel = new_rel_desc->rd_rel;
749         Datum           values[Natts_pg_class];
750         bool            nulls[Natts_pg_class];
751         HeapTuple       tup;
752
753         /* This is a tad tedious, but way cleaner than what we used to do... */
754         memset(values, 0, sizeof(values));
755         memset(nulls, false, sizeof(nulls));
756
757         values[Anum_pg_class_relname - 1] = NameGetDatum(&rd_rel->relname);
758         values[Anum_pg_class_relnamespace - 1] = ObjectIdGetDatum(rd_rel->relnamespace);
759         values[Anum_pg_class_reltype - 1] = ObjectIdGetDatum(rd_rel->reltype);
760         values[Anum_pg_class_reloftype - 1] = ObjectIdGetDatum(rd_rel->reloftype);
761         values[Anum_pg_class_relowner - 1] = ObjectIdGetDatum(rd_rel->relowner);
762         values[Anum_pg_class_relam - 1] = ObjectIdGetDatum(rd_rel->relam);
763         values[Anum_pg_class_relfilenode - 1] = ObjectIdGetDatum(rd_rel->relfilenode);
764         values[Anum_pg_class_reltablespace - 1] = ObjectIdGetDatum(rd_rel->reltablespace);
765         values[Anum_pg_class_relpages - 1] = Int32GetDatum(rd_rel->relpages);
766         values[Anum_pg_class_reltuples - 1] = Float4GetDatum(rd_rel->reltuples);
767         values[Anum_pg_class_reltoastrelid - 1] = ObjectIdGetDatum(rd_rel->reltoastrelid);
768         values[Anum_pg_class_reltoastidxid - 1] = ObjectIdGetDatum(rd_rel->reltoastidxid);
769         values[Anum_pg_class_relhasindex - 1] = BoolGetDatum(rd_rel->relhasindex);
770         values[Anum_pg_class_relisshared - 1] = BoolGetDatum(rd_rel->relisshared);
771         values[Anum_pg_class_relpersistence - 1] = CharGetDatum(rd_rel->relpersistence);
772         values[Anum_pg_class_relkind - 1] = CharGetDatum(rd_rel->relkind);
773         values[Anum_pg_class_relnatts - 1] = Int16GetDatum(rd_rel->relnatts);
774         values[Anum_pg_class_relchecks - 1] = Int16GetDatum(rd_rel->relchecks);
775         values[Anum_pg_class_relhasoids - 1] = BoolGetDatum(rd_rel->relhasoids);
776         values[Anum_pg_class_relhaspkey - 1] = BoolGetDatum(rd_rel->relhaspkey);
777         values[Anum_pg_class_relhasrules - 1] = BoolGetDatum(rd_rel->relhasrules);
778         values[Anum_pg_class_relhastriggers - 1] = BoolGetDatum(rd_rel->relhastriggers);
779         values[Anum_pg_class_relhassubclass - 1] = BoolGetDatum(rd_rel->relhassubclass);
780         values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
781         if (relacl != (Datum) 0)
782                 values[Anum_pg_class_relacl - 1] = relacl;
783         else
784                 nulls[Anum_pg_class_relacl - 1] = true;
785         if (reloptions != (Datum) 0)
786                 values[Anum_pg_class_reloptions - 1] = reloptions;
787         else
788                 nulls[Anum_pg_class_reloptions - 1] = true;
789
790         tup = heap_form_tuple(RelationGetDescr(pg_class_desc), values, nulls);
791
792         /*
793          * The new tuple must have the oid already chosen for the rel.  Sure would
794          * be embarrassing to do this sort of thing in polite company.
795          */
796         HeapTupleSetOid(tup, new_rel_oid);
797
798         /* finally insert the new tuple, update the indexes, and clean up */
799         simple_heap_insert(pg_class_desc, tup);
800
801         CatalogUpdateIndexes(pg_class_desc, tup);
802
803         heap_freetuple(tup);
804 }
805
806 /* --------------------------------
807  *              AddNewRelationTuple
808  *
809  *              this registers the new relation in the catalogs by
810  *              adding a tuple to pg_class.
811  * --------------------------------
812  */
813 static void
814 AddNewRelationTuple(Relation pg_class_desc,
815                                         Relation new_rel_desc,
816                                         Oid new_rel_oid,
817                                         Oid new_type_oid,
818                                         Oid reloftype,
819                                         Oid relowner,
820                                         char relkind,
821                                         Datum relacl,
822                                         Datum reloptions)
823 {
824         Form_pg_class new_rel_reltup;
825
826         /*
827          * first we update some of the information in our uncataloged relation's
828          * relation descriptor.
829          */
830         new_rel_reltup = new_rel_desc->rd_rel;
831
832         switch (relkind)
833         {
834                 case RELKIND_RELATION:
835                 case RELKIND_INDEX:
836                 case RELKIND_TOASTVALUE:
837                         /* The relation is real, but as yet empty */
838                         new_rel_reltup->relpages = 0;
839                         new_rel_reltup->reltuples = 0;
840                         break;
841                 case RELKIND_SEQUENCE:
842                         /* Sequences always have a known size */
843                         new_rel_reltup->relpages = 1;
844                         new_rel_reltup->reltuples = 1;
845                         break;
846                 default:
847                         /* Views, etc, have no disk storage */
848                         new_rel_reltup->relpages = 0;
849                         new_rel_reltup->reltuples = 0;
850                         break;
851         }
852
853         /* Initialize relfrozenxid */
854         if (relkind == RELKIND_RELATION ||
855                 relkind == RELKIND_TOASTVALUE)
856         {
857                 /*
858                  * Initialize to the minimum XID that could put tuples in the table.
859                  * We know that no xacts older than RecentXmin are still running, so
860                  * that will do.
861                  */
862                 new_rel_reltup->relfrozenxid = RecentXmin;
863         }
864         else
865         {
866                 /*
867                  * Other relation types will not contain XIDs, so set relfrozenxid to
868                  * InvalidTransactionId.  (Note: a sequence does contain a tuple, but
869                  * we force its xmin to be FrozenTransactionId always; see
870                  * commands/sequence.c.)
871                  */
872                 new_rel_reltup->relfrozenxid = InvalidTransactionId;
873         }
874
875         new_rel_reltup->relowner = relowner;
876         new_rel_reltup->reltype = new_type_oid;
877         new_rel_reltup->reloftype = reloftype;
878         new_rel_reltup->relkind = relkind;
879
880         new_rel_desc->rd_att->tdtypeid = new_type_oid;
881
882         /* Now build and insert the tuple */
883         InsertPgClassTuple(pg_class_desc, new_rel_desc, new_rel_oid,
884                                            relacl, reloptions);
885 }
886
887
888 /* --------------------------------
889  *              AddNewRelationType -
890  *
891  *              define a composite type corresponding to the new relation
892  * --------------------------------
893  */
894 static Oid
895 AddNewRelationType(const char *typeName,
896                                    Oid typeNamespace,
897                                    Oid new_rel_oid,
898                                    char new_rel_kind,
899                                    Oid ownerid,
900                                    Oid new_row_type,
901                                    Oid new_array_type)
902 {
903         return
904                 TypeCreate(new_row_type,        /* optional predetermined OID */
905                                    typeName,    /* type name */
906                                    typeNamespace,               /* type namespace */
907                                    new_rel_oid, /* relation oid */
908                                    new_rel_kind,        /* relation kind */
909                                    ownerid,             /* owner's ID */
910                                    -1,                  /* internal size (varlena) */
911                                    TYPTYPE_COMPOSITE,   /* type-type (composite) */
912                                    TYPCATEGORY_COMPOSITE,               /* type-category (ditto) */
913                                    false,               /* composite types are never preferred */
914                                    DEFAULT_TYPDELIM,    /* default array delimiter */
915                                    F_RECORD_IN, /* input procedure */
916                                    F_RECORD_OUT,        /* output procedure */
917                                    F_RECORD_RECV,               /* receive procedure */
918                                    F_RECORD_SEND,               /* send procedure */
919                                    InvalidOid,  /* typmodin procedure - none */
920                                    InvalidOid,  /* typmodout procedure - none */
921                                    InvalidOid,  /* analyze procedure - default */
922                                    InvalidOid,  /* array element type - irrelevant */
923                                    false,               /* this is not an array type */
924                                    new_array_type,              /* array type if any */
925                                    InvalidOid,  /* domain base type - irrelevant */
926                                    NULL,                /* default value - none */
927                                    NULL,                /* default binary representation */
928                                    false,               /* passed by reference */
929                                    'd',                 /* alignment - must be the largest! */
930                                    'x',                 /* fully TOASTable */
931                                    -1,                  /* typmod */
932                                    0,                   /* array dimensions for typBaseType */
933                                    false,               /* Type NOT NULL */
934                                    InvalidOid); /* rowtypes never have a collation */
935 }
936
937 /* --------------------------------
938  *              heap_create_with_catalog
939  *
940  *              creates a new cataloged relation.  see comments above.
941  *
942  * Arguments:
943  *      relname: name to give to new rel
944  *      relnamespace: OID of namespace it goes in
945  *      reltablespace: OID of tablespace it goes in
946  *      relid: OID to assign to new rel, or InvalidOid to select a new OID
947  *      reltypeid: OID to assign to rel's rowtype, or InvalidOid to select one
948  *      ownerid: OID of new rel's owner
949  *      tupdesc: tuple descriptor (source of column definitions)
950  *      cooked_constraints: list of precooked check constraints and defaults
951  *      relkind: relkind for new rel
952  *      shared_relation: TRUE if it's to be a shared relation
953  *      mapped_relation: TRUE if the relation will use the relfilenode map
954  *      oidislocal: TRUE if oid column (if any) should be marked attislocal
955  *      oidinhcount: attinhcount to assign to oid column (if any)
956  *      oncommit: ON COMMIT marking (only relevant if it's a temp table)
957  *      reloptions: reloptions in Datum form, or (Datum) 0 if none
958  *      use_user_acl: TRUE if should look for user-defined default permissions;
959  *              if FALSE, relacl is always set NULL
960  *      allow_system_table_mods: TRUE to allow creation in system namespaces
961  *
962  * Returns the OID of the new relation
963  * --------------------------------
964  */
965 Oid
966 heap_create_with_catalog(const char *relname,
967                                                  Oid relnamespace,
968                                                  Oid reltablespace,
969                                                  Oid relid,
970                                                  Oid reltypeid,
971                                                  Oid reloftypeid,
972                                                  Oid ownerid,
973                                                  TupleDesc tupdesc,
974                                                  List *cooked_constraints,
975                                                  char relkind,
976                                                  char relpersistence,
977                                                  bool shared_relation,
978                                                  bool mapped_relation,
979                                                  bool oidislocal,
980                                                  int oidinhcount,
981                                                  OnCommitAction oncommit,
982                                                  Datum reloptions,
983                                                  bool use_user_acl,
984                                                  bool allow_system_table_mods)
985 {
986         Relation        pg_class_desc;
987         Relation        new_rel_desc;
988         Acl                *relacl;
989         Oid                     existing_relid;
990         Oid                     old_type_oid;
991         Oid                     new_type_oid;
992         Oid                     new_array_oid = InvalidOid;
993
994         pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
995
996         /*
997          * sanity checks
998          */
999         Assert(IsNormalProcessingMode() || IsBootstrapProcessingMode());
1000
1001         CheckAttributeNamesTypes(tupdesc, relkind, allow_system_table_mods);
1002
1003         /*
1004          * This would fail later on anyway, if the relation already exists.  But
1005          * by catching it here we can emit a nicer error message.
1006          */
1007         existing_relid = get_relname_relid(relname, relnamespace);
1008         if (existing_relid != InvalidOid)
1009                 ereport(ERROR,
1010                                 (errcode(ERRCODE_DUPLICATE_TABLE),
1011                                  errmsg("relation \"%s\" already exists", relname)));
1012
1013         /*
1014          * Since we are going to create a rowtype as well, also check for
1015          * collision with an existing type name.  If there is one and it's an
1016          * autogenerated array, we can rename it out of the way; otherwise we can
1017          * at least give a good error message.
1018          */
1019         old_type_oid = GetSysCacheOid2(TYPENAMENSP,
1020                                                                    CStringGetDatum(relname),
1021                                                                    ObjectIdGetDatum(relnamespace));
1022         if (OidIsValid(old_type_oid))
1023         {
1024                 if (!moveArrayTypeName(old_type_oid, relname, relnamespace))
1025                         ereport(ERROR,
1026                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
1027                                          errmsg("type \"%s\" already exists", relname),
1028                            errhint("A relation has an associated type of the same name, "
1029                                            "so you must use a name that doesn't conflict "
1030                                            "with any existing type.")));
1031         }
1032
1033         /*
1034          * Shared relations must be in pg_global (last-ditch check)
1035          */
1036         if (shared_relation && reltablespace != GLOBALTABLESPACE_OID)
1037                 elog(ERROR, "shared relations must be placed in pg_global tablespace");
1038
1039         /*
1040          * Allocate an OID for the relation, unless we were told what to use.
1041          *
1042          * The OID will be the relfilenode as well, so make sure it doesn't
1043          * collide with either pg_class OIDs or existing physical files.
1044          */
1045         if (!OidIsValid(relid))
1046         {
1047                 /*
1048                  * Use binary-upgrade override for pg_class.oid/relfilenode, if
1049                  * supplied.
1050                  */
1051                 if (IsBinaryUpgrade &&
1052                         OidIsValid(binary_upgrade_next_heap_pg_class_oid) &&
1053                         (relkind == RELKIND_RELATION || relkind == RELKIND_SEQUENCE ||
1054                          relkind == RELKIND_VIEW || relkind == RELKIND_COMPOSITE_TYPE ||
1055                          relkind == RELKIND_FOREIGN_TABLE))
1056                 {
1057                         relid = binary_upgrade_next_heap_pg_class_oid;
1058                         binary_upgrade_next_heap_pg_class_oid = InvalidOid;
1059                 }
1060                 else if (IsBinaryUpgrade &&
1061                                  OidIsValid(binary_upgrade_next_toast_pg_class_oid) &&
1062                                  relkind == RELKIND_TOASTVALUE)
1063                 {
1064                         relid = binary_upgrade_next_toast_pg_class_oid;
1065                         binary_upgrade_next_toast_pg_class_oid = InvalidOid;
1066                 }
1067                 else
1068                         relid = GetNewRelFileNode(reltablespace, pg_class_desc,
1069                                                                           relpersistence);
1070         }
1071
1072         /*
1073          * Determine the relation's initial permissions.
1074          */
1075         if (use_user_acl)
1076         {
1077                 switch (relkind)
1078                 {
1079                         case RELKIND_RELATION:
1080                         case RELKIND_VIEW:
1081                         case RELKIND_FOREIGN_TABLE:
1082                                 relacl = get_user_default_acl(ACL_OBJECT_RELATION, ownerid,
1083                                                                                           relnamespace);
1084                                 break;
1085                         case RELKIND_SEQUENCE:
1086                                 relacl = get_user_default_acl(ACL_OBJECT_SEQUENCE, ownerid,
1087                                                                                           relnamespace);
1088                                 break;
1089                         default:
1090                                 relacl = NULL;
1091                                 break;
1092                 }
1093         }
1094         else
1095                 relacl = NULL;
1096
1097         /*
1098          * Create the relcache entry (mostly dummy at this point) and the physical
1099          * disk file.  (If we fail further down, it's the smgr's responsibility to
1100          * remove the disk file again.)
1101          */
1102         new_rel_desc = heap_create(relname,
1103                                                            relnamespace,
1104                                                            reltablespace,
1105                                                            relid,
1106                                                            tupdesc,
1107                                                            relkind,
1108                                                            relpersistence,
1109                                                            shared_relation,
1110                                                            mapped_relation,
1111                                                            allow_system_table_mods);
1112
1113         Assert(relid == RelationGetRelid(new_rel_desc));
1114
1115         /*
1116          * Decide whether to create an array type over the relation's rowtype. We
1117          * do not create any array types for system catalogs (ie, those made
1118          * during initdb).      We create array types for regular relations, views,
1119          * composite types and foreign tables ... but not, eg, for toast tables or
1120          * sequences.
1121          */
1122         if (IsUnderPostmaster && (relkind == RELKIND_RELATION ||
1123                                                           relkind == RELKIND_VIEW ||
1124                                                           relkind == RELKIND_FOREIGN_TABLE ||
1125                                                           relkind == RELKIND_COMPOSITE_TYPE))
1126                 new_array_oid = AssignTypeArrayOid();
1127
1128         /*
1129          * Since defining a relation also defines a complex type, we add a new
1130          * system type corresponding to the new relation.  The OID of the type can
1131          * be preselected by the caller, but if reltypeid is InvalidOid, we'll
1132          * generate a new OID for it.
1133          *
1134          * NOTE: we could get a unique-index failure here, in case someone else is
1135          * creating the same type name in parallel but hadn't committed yet when
1136          * we checked for a duplicate name above.
1137          */
1138         new_type_oid = AddNewRelationType(relname,
1139                                                                           relnamespace,
1140                                                                           relid,
1141                                                                           relkind,
1142                                                                           ownerid,
1143                                                                           reltypeid,
1144                                                                           new_array_oid);
1145
1146         /*
1147          * Now make the array type if wanted.
1148          */
1149         if (OidIsValid(new_array_oid))
1150         {
1151                 char       *relarrayname;
1152
1153                 relarrayname = makeArrayTypeName(relname, relnamespace);
1154
1155                 TypeCreate(new_array_oid,               /* force the type's OID to this */
1156                                    relarrayname,        /* Array type name */
1157                                    relnamespace,        /* Same namespace as parent */
1158                                    InvalidOid,  /* Not composite, no relationOid */
1159                                    0,                   /* relkind, also N/A here */
1160                                    ownerid,             /* owner's ID */
1161                                    -1,                  /* Internal size (varlena) */
1162                                    TYPTYPE_BASE,        /* Not composite - typelem is */
1163                                    TYPCATEGORY_ARRAY,   /* type-category (array) */
1164                                    false,               /* array types are never preferred */
1165                                    DEFAULT_TYPDELIM,    /* default array delimiter */
1166                                    F_ARRAY_IN,  /* array input proc */
1167                                    F_ARRAY_OUT, /* array output proc */
1168                                    F_ARRAY_RECV,        /* array recv (bin) proc */
1169                                    F_ARRAY_SEND,        /* array send (bin) proc */
1170                                    InvalidOid,  /* typmodin procedure - none */
1171                                    InvalidOid,  /* typmodout procedure - none */
1172                                    InvalidOid,  /* analyze procedure - default */
1173                                    new_type_oid,        /* array element type - the rowtype */
1174                                    true,                /* yes, this is an array type */
1175                                    InvalidOid,  /* this has no array type */
1176                                    InvalidOid,  /* domain base type - irrelevant */
1177                                    NULL,                /* default value - none */
1178                                    NULL,                /* default binary representation */
1179                                    false,               /* passed by reference */
1180                                    'd',                 /* alignment - must be the largest! */
1181                                    'x',                 /* fully TOASTable */
1182                                    -1,                  /* typmod */
1183                                    0,                   /* array dimensions for typBaseType */
1184                                    false,               /* Type NOT NULL */
1185                                    InvalidOid); /* rowtypes never have a collation */
1186
1187                 pfree(relarrayname);
1188         }
1189
1190         /*
1191          * now create an entry in pg_class for the relation.
1192          *
1193          * NOTE: we could get a unique-index failure here, in case someone else is
1194          * creating the same relation name in parallel but hadn't committed yet
1195          * when we checked for a duplicate name above.
1196          */
1197         AddNewRelationTuple(pg_class_desc,
1198                                                 new_rel_desc,
1199                                                 relid,
1200                                                 new_type_oid,
1201                                                 reloftypeid,
1202                                                 ownerid,
1203                                                 relkind,
1204                                                 PointerGetDatum(relacl),
1205                                                 reloptions);
1206
1207         /*
1208          * now add tuples to pg_attribute for the attributes in our new relation.
1209          */
1210         AddNewAttributeTuples(relid, new_rel_desc->rd_att, relkind,
1211                                                   oidislocal, oidinhcount);
1212
1213         /*
1214          * Make a dependency link to force the relation to be deleted if its
1215          * namespace is.  Also make a dependency link to its owner, as well as
1216          * dependencies for any roles mentioned in the default ACL.
1217          *
1218          * For composite types, these dependencies are tracked for the pg_type
1219          * entry, so we needn't record them here.  Likewise, TOAST tables don't
1220          * need a namespace dependency (they live in a pinned namespace) nor an
1221          * owner dependency (they depend indirectly through the parent table), nor
1222          * should they have any ACL entries.  The same applies for extension
1223          * dependencies.
1224          *
1225          * Also, skip this in bootstrap mode, since we don't make dependencies
1226          * while bootstrapping.
1227          */
1228         if (relkind != RELKIND_COMPOSITE_TYPE &&
1229                 relkind != RELKIND_TOASTVALUE &&
1230                 !IsBootstrapProcessingMode())
1231         {
1232                 ObjectAddress myself,
1233                                         referenced;
1234
1235                 myself.classId = RelationRelationId;
1236                 myself.objectId = relid;
1237                 myself.objectSubId = 0;
1238                 referenced.classId = NamespaceRelationId;
1239                 referenced.objectId = relnamespace;
1240                 referenced.objectSubId = 0;
1241                 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1242
1243                 recordDependencyOnOwner(RelationRelationId, relid, ownerid);
1244
1245                 recordDependencyOnCurrentExtension(&myself);
1246
1247                 if (reloftypeid)
1248                 {
1249                         referenced.classId = TypeRelationId;
1250                         referenced.objectId = reloftypeid;
1251                         referenced.objectSubId = 0;
1252                         recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
1253                 }
1254
1255                 if (relacl != NULL)
1256                 {
1257                         int                     nnewmembers;
1258                         Oid                *newmembers;
1259
1260                         nnewmembers = aclmembers(relacl, &newmembers);
1261                         updateAclDependencies(RelationRelationId, relid, 0,
1262                                                                   ownerid,
1263                                                                   0, NULL,
1264                                                                   nnewmembers, newmembers);
1265                 }
1266         }
1267
1268         /* Post creation hook for new relation */
1269         InvokeObjectAccessHook(OAT_POST_CREATE, RelationRelationId, relid, 0);
1270
1271         /*
1272          * Store any supplied constraints and defaults.
1273          *
1274          * NB: this may do a CommandCounterIncrement and rebuild the relcache
1275          * entry, so the relation must be valid and self-consistent at this point.
1276          * In particular, there are not yet constraints and defaults anywhere.
1277          */
1278         StoreConstraints(new_rel_desc, cooked_constraints);
1279
1280         /*
1281          * If there's a special on-commit action, remember it
1282          */
1283         if (oncommit != ONCOMMIT_NOOP)
1284                 register_on_commit_action(relid, oncommit);
1285
1286         /*
1287          * If this is an unlogged relation, it needs an init fork so that it can
1288          * be correctly reinitialized on restart.  Since we're going to do an
1289          * immediate sync, we only need to xlog this if archiving or streaming is
1290          * enabled.  And the immediate sync is required, because otherwise there's
1291          * no guarantee that this will hit the disk before the next checkpoint
1292          * moves the redo pointer.
1293          */
1294         if (relpersistence == RELPERSISTENCE_UNLOGGED)
1295         {
1296                 Assert(relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE);
1297
1298                 RelationOpenSmgr(new_rel_desc);
1299                 smgrcreate(new_rel_desc->rd_smgr, INIT_FORKNUM, false);
1300                 if (XLogIsNeeded())
1301                         log_smgrcreate(&new_rel_desc->rd_smgr->smgr_rnode.node,
1302                                                    INIT_FORKNUM);
1303                 smgrimmedsync(new_rel_desc->rd_smgr, INIT_FORKNUM);
1304         }
1305
1306         /*
1307          * ok, the relation has been cataloged, so close our relations and return
1308          * the OID of the newly created relation.
1309          */
1310         heap_close(new_rel_desc, NoLock);       /* do not unlock till end of xact */
1311         heap_close(pg_class_desc, RowExclusiveLock);
1312
1313         return relid;
1314 }
1315
1316
1317 /*
1318  *              RelationRemoveInheritance
1319  *
1320  * Formerly, this routine checked for child relations and aborted the
1321  * deletion if any were found.  Now we rely on the dependency mechanism
1322  * to check for or delete child relations.      By the time we get here,
1323  * there are no children and we need only remove any pg_inherits rows
1324  * linking this relation to its parent(s).
1325  */
1326 static void
1327 RelationRemoveInheritance(Oid relid)
1328 {
1329         Relation        catalogRelation;
1330         SysScanDesc scan;
1331         ScanKeyData key;
1332         HeapTuple       tuple;
1333
1334         catalogRelation = heap_open(InheritsRelationId, RowExclusiveLock);
1335
1336         ScanKeyInit(&key,
1337                                 Anum_pg_inherits_inhrelid,
1338                                 BTEqualStrategyNumber, F_OIDEQ,
1339                                 ObjectIdGetDatum(relid));
1340
1341         scan = systable_beginscan(catalogRelation, InheritsRelidSeqnoIndexId, true,
1342                                                           SnapshotNow, 1, &key);
1343
1344         while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1345                 simple_heap_delete(catalogRelation, &tuple->t_self);
1346
1347         systable_endscan(scan);
1348         heap_close(catalogRelation, RowExclusiveLock);
1349 }
1350
1351 /*
1352  *              DeleteRelationTuple
1353  *
1354  * Remove pg_class row for the given relid.
1355  *
1356  * Note: this is shared by relation deletion and index deletion.  It's
1357  * not intended for use anyplace else.
1358  */
1359 void
1360 DeleteRelationTuple(Oid relid)
1361 {
1362         Relation        pg_class_desc;
1363         HeapTuple       tup;
1364
1365         /* Grab an appropriate lock on the pg_class relation */
1366         pg_class_desc = heap_open(RelationRelationId, RowExclusiveLock);
1367
1368         tup = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
1369         if (!HeapTupleIsValid(tup))
1370                 elog(ERROR, "cache lookup failed for relation %u", relid);
1371
1372         /* delete the relation tuple from pg_class, and finish up */
1373         simple_heap_delete(pg_class_desc, &tup->t_self);
1374
1375         ReleaseSysCache(tup);
1376
1377         heap_close(pg_class_desc, RowExclusiveLock);
1378 }
1379
1380 /*
1381  *              DeleteAttributeTuples
1382  *
1383  * Remove pg_attribute rows for the given relid.
1384  *
1385  * Note: this is shared by relation deletion and index deletion.  It's
1386  * not intended for use anyplace else.
1387  */
1388 void
1389 DeleteAttributeTuples(Oid relid)
1390 {
1391         Relation        attrel;
1392         SysScanDesc scan;
1393         ScanKeyData key[1];
1394         HeapTuple       atttup;
1395
1396         /* Grab an appropriate lock on the pg_attribute relation */
1397         attrel = heap_open(AttributeRelationId, RowExclusiveLock);
1398
1399         /* Use the index to scan only attributes of the target relation */
1400         ScanKeyInit(&key[0],
1401                                 Anum_pg_attribute_attrelid,
1402                                 BTEqualStrategyNumber, F_OIDEQ,
1403                                 ObjectIdGetDatum(relid));
1404
1405         scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true,
1406                                                           SnapshotNow, 1, key);
1407
1408         /* Delete all the matching tuples */
1409         while ((atttup = systable_getnext(scan)) != NULL)
1410                 simple_heap_delete(attrel, &atttup->t_self);
1411
1412         /* Clean up after the scan */
1413         systable_endscan(scan);
1414         heap_close(attrel, RowExclusiveLock);
1415 }
1416
1417 /*
1418  *              RemoveAttributeById
1419  *
1420  * This is the guts of ALTER TABLE DROP COLUMN: actually mark the attribute
1421  * deleted in pg_attribute.  We also remove pg_statistic entries for it.
1422  * (Everything else needed, such as getting rid of any pg_attrdef entry,
1423  * is handled by dependency.c.)
1424  */
1425 void
1426 RemoveAttributeById(Oid relid, AttrNumber attnum)
1427 {
1428         Relation        rel;
1429         Relation        attr_rel;
1430         HeapTuple       tuple;
1431         Form_pg_attribute attStruct;
1432         char            newattname[NAMEDATALEN];
1433
1434         /*
1435          * Grab an exclusive lock on the target table, which we will NOT release
1436          * until end of transaction.  (In the simple case where we are directly
1437          * dropping this column, AlterTableDropColumn already did this ... but
1438          * when cascading from a drop of some other object, we may not have any
1439          * lock.)
1440          */
1441         rel = relation_open(relid, AccessExclusiveLock);
1442
1443         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
1444
1445         tuple = SearchSysCacheCopy2(ATTNUM,
1446                                                                 ObjectIdGetDatum(relid),
1447                                                                 Int16GetDatum(attnum));
1448         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
1449                 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1450                          attnum, relid);
1451         attStruct = (Form_pg_attribute) GETSTRUCT(tuple);
1452
1453         if (attnum < 0)
1454         {
1455                 /* System attribute (probably OID) ... just delete the row */
1456
1457                 simple_heap_delete(attr_rel, &tuple->t_self);
1458         }
1459         else
1460         {
1461                 /* Dropping user attributes is lots harder */
1462
1463                 /* Mark the attribute as dropped */
1464                 attStruct->attisdropped = true;
1465
1466                 /*
1467                  * Set the type OID to invalid.  A dropped attribute's type link
1468                  * cannot be relied on (once the attribute is dropped, the type might
1469                  * be too). Fortunately we do not need the type row --- the only
1470                  * really essential information is the type's typlen and typalign,
1471                  * which are preserved in the attribute's attlen and attalign.  We set
1472                  * atttypid to zero here as a means of catching code that incorrectly
1473                  * expects it to be valid.
1474                  */
1475                 attStruct->atttypid = InvalidOid;
1476
1477                 /* Remove any NOT NULL constraint the column may have */
1478                 attStruct->attnotnull = false;
1479
1480                 /* We don't want to keep stats for it anymore */
1481                 attStruct->attstattarget = 0;
1482
1483                 /*
1484                  * Change the column name to something that isn't likely to conflict
1485                  */
1486                 snprintf(newattname, sizeof(newattname),
1487                                  "........pg.dropped.%d........", attnum);
1488                 namestrcpy(&(attStruct->attname), newattname);
1489
1490                 simple_heap_update(attr_rel, &tuple->t_self, tuple);
1491
1492                 /* keep the system catalog indexes current */
1493                 CatalogUpdateIndexes(attr_rel, tuple);
1494         }
1495
1496         /*
1497          * Because updating the pg_attribute row will trigger a relcache flush for
1498          * the target relation, we need not do anything else to notify other
1499          * backends of the change.
1500          */
1501
1502         heap_close(attr_rel, RowExclusiveLock);
1503
1504         if (attnum > 0)
1505                 RemoveStatistics(relid, attnum);
1506
1507         relation_close(rel, NoLock);
1508 }
1509
1510 /*
1511  *              RemoveAttrDefault
1512  *
1513  * If the specified relation/attribute has a default, remove it.
1514  * (If no default, raise error if complain is true, else return quietly.)
1515  */
1516 void
1517 RemoveAttrDefault(Oid relid, AttrNumber attnum,
1518                                   DropBehavior behavior, bool complain)
1519 {
1520         Relation        attrdef_rel;
1521         ScanKeyData scankeys[2];
1522         SysScanDesc scan;
1523         HeapTuple       tuple;
1524         bool            found = false;
1525
1526         attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1527
1528         ScanKeyInit(&scankeys[0],
1529                                 Anum_pg_attrdef_adrelid,
1530                                 BTEqualStrategyNumber, F_OIDEQ,
1531                                 ObjectIdGetDatum(relid));
1532         ScanKeyInit(&scankeys[1],
1533                                 Anum_pg_attrdef_adnum,
1534                                 BTEqualStrategyNumber, F_INT2EQ,
1535                                 Int16GetDatum(attnum));
1536
1537         scan = systable_beginscan(attrdef_rel, AttrDefaultIndexId, true,
1538                                                           SnapshotNow, 2, scankeys);
1539
1540         /* There should be at most one matching tuple, but we loop anyway */
1541         while (HeapTupleIsValid(tuple = systable_getnext(scan)))
1542         {
1543                 ObjectAddress object;
1544
1545                 object.classId = AttrDefaultRelationId;
1546                 object.objectId = HeapTupleGetOid(tuple);
1547                 object.objectSubId = 0;
1548
1549                 performDeletion(&object, behavior);
1550
1551                 found = true;
1552         }
1553
1554         systable_endscan(scan);
1555         heap_close(attrdef_rel, RowExclusiveLock);
1556
1557         if (complain && !found)
1558                 elog(ERROR, "could not find attrdef tuple for relation %u attnum %d",
1559                          relid, attnum);
1560 }
1561
1562 /*
1563  *              RemoveAttrDefaultById
1564  *
1565  * Remove a pg_attrdef entry specified by OID.  This is the guts of
1566  * attribute-default removal.  Note it should be called via performDeletion,
1567  * not directly.
1568  */
1569 void
1570 RemoveAttrDefaultById(Oid attrdefId)
1571 {
1572         Relation        attrdef_rel;
1573         Relation        attr_rel;
1574         Relation        myrel;
1575         ScanKeyData scankeys[1];
1576         SysScanDesc scan;
1577         HeapTuple       tuple;
1578         Oid                     myrelid;
1579         AttrNumber      myattnum;
1580
1581         /* Grab an appropriate lock on the pg_attrdef relation */
1582         attrdef_rel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1583
1584         /* Find the pg_attrdef tuple */
1585         ScanKeyInit(&scankeys[0],
1586                                 ObjectIdAttributeNumber,
1587                                 BTEqualStrategyNumber, F_OIDEQ,
1588                                 ObjectIdGetDatum(attrdefId));
1589
1590         scan = systable_beginscan(attrdef_rel, AttrDefaultOidIndexId, true,
1591                                                           SnapshotNow, 1, scankeys);
1592
1593         tuple = systable_getnext(scan);
1594         if (!HeapTupleIsValid(tuple))
1595                 elog(ERROR, "could not find tuple for attrdef %u", attrdefId);
1596
1597         myrelid = ((Form_pg_attrdef) GETSTRUCT(tuple))->adrelid;
1598         myattnum = ((Form_pg_attrdef) GETSTRUCT(tuple))->adnum;
1599
1600         /* Get an exclusive lock on the relation owning the attribute */
1601         myrel = relation_open(myrelid, AccessExclusiveLock);
1602
1603         /* Now we can delete the pg_attrdef row */
1604         simple_heap_delete(attrdef_rel, &tuple->t_self);
1605
1606         systable_endscan(scan);
1607         heap_close(attrdef_rel, RowExclusiveLock);
1608
1609         /* Fix the pg_attribute row */
1610         attr_rel = heap_open(AttributeRelationId, RowExclusiveLock);
1611
1612         tuple = SearchSysCacheCopy2(ATTNUM,
1613                                                                 ObjectIdGetDatum(myrelid),
1614                                                                 Int16GetDatum(myattnum));
1615         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
1616                 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1617                          myattnum, myrelid);
1618
1619         ((Form_pg_attribute) GETSTRUCT(tuple))->atthasdef = false;
1620
1621         simple_heap_update(attr_rel, &tuple->t_self, tuple);
1622
1623         /* keep the system catalog indexes current */
1624         CatalogUpdateIndexes(attr_rel, tuple);
1625
1626         /*
1627          * Our update of the pg_attribute row will force a relcache rebuild, so
1628          * there's nothing else to do here.
1629          */
1630         heap_close(attr_rel, RowExclusiveLock);
1631
1632         /* Keep lock on attribute's rel until end of xact */
1633         relation_close(myrel, NoLock);
1634 }
1635
1636 /*
1637  * heap_drop_with_catalog       - removes specified relation from catalogs
1638  *
1639  * Note that this routine is not responsible for dropping objects that are
1640  * linked to the pg_class entry via dependencies (for example, indexes and
1641  * constraints).  Those are deleted by the dependency-tracing logic in
1642  * dependency.c before control gets here.  In general, therefore, this routine
1643  * should never be called directly; go through performDeletion() instead.
1644  */
1645 void
1646 heap_drop_with_catalog(Oid relid)
1647 {
1648         Relation        rel;
1649
1650         /*
1651          * Open and lock the relation.
1652          */
1653         rel = relation_open(relid, AccessExclusiveLock);
1654
1655         /*
1656          * There can no longer be anyone *else* touching the relation, but we
1657          * might still have open queries or cursors, or pending trigger events, in
1658          * our own session.
1659          */
1660         CheckTableNotInUse(rel, "DROP TABLE");
1661
1662         /*
1663          * This effectively deletes all rows in the table, and may be done in a
1664          * serializable transaction.  In that case we must record a rw-conflict in
1665          * to this transaction from each transaction holding a predicate lock on
1666          * the table.
1667          */
1668         CheckTableForSerializableConflictIn(rel);
1669
1670         /*
1671          * Delete pg_foreign_table tuple first.
1672          */
1673         if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
1674         {
1675                 Relation        rel;
1676                 HeapTuple       tuple;
1677
1678                 rel = heap_open(ForeignTableRelationId, RowExclusiveLock);
1679
1680                 tuple = SearchSysCache1(FOREIGNTABLEREL, ObjectIdGetDatum(relid));
1681                 if (!HeapTupleIsValid(tuple))
1682                         elog(ERROR, "cache lookup failed for foreign table %u", relid);
1683
1684                 simple_heap_delete(rel, &tuple->t_self);
1685
1686                 ReleaseSysCache(tuple);
1687                 heap_close(rel, RowExclusiveLock);
1688         }
1689
1690         /*
1691          * Schedule unlinking of the relation's physical files at commit.
1692          */
1693         if (rel->rd_rel->relkind != RELKIND_VIEW &&
1694                 rel->rd_rel->relkind != RELKIND_COMPOSITE_TYPE &&
1695                 rel->rd_rel->relkind != RELKIND_FOREIGN_TABLE)
1696         {
1697                 RelationDropStorage(rel);
1698         }
1699
1700         /*
1701          * Close relcache entry, but *keep* AccessExclusiveLock on the relation
1702          * until transaction commit.  This ensures no one else will try to do
1703          * something with the doomed relation.
1704          */
1705         relation_close(rel, NoLock);
1706
1707         /*
1708          * Forget any ON COMMIT action for the rel
1709          */
1710         remove_on_commit_action(relid);
1711
1712         /*
1713          * Flush the relation from the relcache.  We want to do this before
1714          * starting to remove catalog entries, just to be certain that no relcache
1715          * entry rebuild will happen partway through.  (That should not really
1716          * matter, since we don't do CommandCounterIncrement here, but let's be
1717          * safe.)
1718          */
1719         RelationForgetRelation(relid);
1720
1721         /*
1722          * remove inheritance information
1723          */
1724         RelationRemoveInheritance(relid);
1725
1726         /*
1727          * delete statistics
1728          */
1729         RemoveStatistics(relid, 0);
1730
1731         /*
1732          * delete attribute tuples
1733          */
1734         DeleteAttributeTuples(relid);
1735
1736         /*
1737          * delete relation tuple
1738          */
1739         DeleteRelationTuple(relid);
1740 }
1741
1742
1743 /*
1744  * Store a default expression for column attnum of relation rel.
1745  */
1746 void
1747 StoreAttrDefault(Relation rel, AttrNumber attnum, Node *expr)
1748 {
1749         char       *adbin;
1750         char       *adsrc;
1751         Relation        adrel;
1752         HeapTuple       tuple;
1753         Datum           values[4];
1754         static bool nulls[4] = {false, false, false, false};
1755         Relation        attrrel;
1756         HeapTuple       atttup;
1757         Form_pg_attribute attStruct;
1758         Oid                     attrdefOid;
1759         ObjectAddress colobject,
1760                                 defobject;
1761
1762         /*
1763          * Flatten expression to string form for storage.
1764          */
1765         adbin = nodeToString(expr);
1766
1767         /*
1768          * Also deparse it to form the mostly-obsolete adsrc field.
1769          */
1770         adsrc = deparse_expression(expr,
1771                                                         deparse_context_for(RelationGetRelationName(rel),
1772                                                                                                 RelationGetRelid(rel)),
1773                                                            false, false);
1774
1775         /*
1776          * Make the pg_attrdef entry.
1777          */
1778         values[Anum_pg_attrdef_adrelid - 1] = RelationGetRelid(rel);
1779         values[Anum_pg_attrdef_adnum - 1] = attnum;
1780         values[Anum_pg_attrdef_adbin - 1] = CStringGetTextDatum(adbin);
1781         values[Anum_pg_attrdef_adsrc - 1] = CStringGetTextDatum(adsrc);
1782
1783         adrel = heap_open(AttrDefaultRelationId, RowExclusiveLock);
1784
1785         tuple = heap_form_tuple(adrel->rd_att, values, nulls);
1786         attrdefOid = simple_heap_insert(adrel, tuple);
1787
1788         CatalogUpdateIndexes(adrel, tuple);
1789
1790         defobject.classId = AttrDefaultRelationId;
1791         defobject.objectId = attrdefOid;
1792         defobject.objectSubId = 0;
1793
1794         heap_close(adrel, RowExclusiveLock);
1795
1796         /* now can free some of the stuff allocated above */
1797         pfree(DatumGetPointer(values[Anum_pg_attrdef_adbin - 1]));
1798         pfree(DatumGetPointer(values[Anum_pg_attrdef_adsrc - 1]));
1799         heap_freetuple(tuple);
1800         pfree(adbin);
1801         pfree(adsrc);
1802
1803         /*
1804          * Update the pg_attribute entry for the column to show that a default
1805          * exists.
1806          */
1807         attrrel = heap_open(AttributeRelationId, RowExclusiveLock);
1808         atttup = SearchSysCacheCopy2(ATTNUM,
1809                                                                  ObjectIdGetDatum(RelationGetRelid(rel)),
1810                                                                  Int16GetDatum(attnum));
1811         if (!HeapTupleIsValid(atttup))
1812                 elog(ERROR, "cache lookup failed for attribute %d of relation %u",
1813                          attnum, RelationGetRelid(rel));
1814         attStruct = (Form_pg_attribute) GETSTRUCT(atttup);
1815         if (!attStruct->atthasdef)
1816         {
1817                 attStruct->atthasdef = true;
1818                 simple_heap_update(attrrel, &atttup->t_self, atttup);
1819                 /* keep catalog indexes current */
1820                 CatalogUpdateIndexes(attrrel, atttup);
1821         }
1822         heap_close(attrrel, RowExclusiveLock);
1823         heap_freetuple(atttup);
1824
1825         /*
1826          * Make a dependency so that the pg_attrdef entry goes away if the column
1827          * (or whole table) is deleted.
1828          */
1829         colobject.classId = RelationRelationId;
1830         colobject.objectId = RelationGetRelid(rel);
1831         colobject.objectSubId = attnum;
1832
1833         recordDependencyOn(&defobject, &colobject, DEPENDENCY_AUTO);
1834
1835         /*
1836          * Record dependencies on objects used in the expression, too.
1837          */
1838         recordDependencyOnExpr(&defobject, expr, NIL, DEPENDENCY_NORMAL);
1839 }
1840
1841 /*
1842  * Store a check-constraint expression for the given relation.
1843  *
1844  * Caller is responsible for updating the count of constraints
1845  * in the pg_class entry for the relation.
1846  */
1847 static void
1848 StoreRelCheck(Relation rel, char *ccname, Node *expr,
1849                           bool is_validated, bool is_local, int inhcount)
1850 {
1851         char       *ccbin;
1852         char       *ccsrc;
1853         List       *varList;
1854         int                     keycount;
1855         int16      *attNos;
1856
1857         /*
1858          * Flatten expression to string form for storage.
1859          */
1860         ccbin = nodeToString(expr);
1861
1862         /*
1863          * Also deparse it to form the mostly-obsolete consrc field.
1864          */
1865         ccsrc = deparse_expression(expr,
1866                                                         deparse_context_for(RelationGetRelationName(rel),
1867                                                                                                 RelationGetRelid(rel)),
1868                                                            false, false);
1869
1870         /*
1871          * Find columns of rel that are used in expr
1872          *
1873          * NB: pull_var_clause is okay here only because we don't allow subselects
1874          * in check constraints; it would fail to examine the contents of
1875          * subselects.
1876          */
1877         varList = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);
1878         keycount = list_length(varList);
1879
1880         if (keycount > 0)
1881         {
1882                 ListCell   *vl;
1883                 int                     i = 0;
1884
1885                 attNos = (int16 *) palloc(keycount * sizeof(int16));
1886                 foreach(vl, varList)
1887                 {
1888                         Var                *var = (Var *) lfirst(vl);
1889                         int                     j;
1890
1891                         for (j = 0; j < i; j++)
1892                                 if (attNos[j] == var->varattno)
1893                                         break;
1894                         if (j == i)
1895                                 attNos[i++] = var->varattno;
1896                 }
1897                 keycount = i;
1898         }
1899         else
1900                 attNos = NULL;
1901
1902         /*
1903          * Create the Check Constraint
1904          */
1905         CreateConstraintEntry(ccname,           /* Constraint Name */
1906                                                   RelationGetNamespace(rel),    /* namespace */
1907                                                   CONSTRAINT_CHECK,             /* Constraint Type */
1908                                                   false,        /* Is Deferrable */
1909                                                   false,        /* Is Deferred */
1910                                                   is_validated,
1911                                                   RelationGetRelid(rel),                /* relation */
1912                                                   attNos,               /* attrs in the constraint */
1913                                                   keycount,             /* # attrs in the constraint */
1914                                                   InvalidOid,   /* not a domain constraint */
1915                                                   InvalidOid,   /* no associated index */
1916                                                   InvalidOid,   /* Foreign key fields */
1917                                                   NULL,
1918                                                   NULL,
1919                                                   NULL,
1920                                                   NULL,
1921                                                   0,
1922                                                   ' ',
1923                                                   ' ',
1924                                                   ' ',
1925                                                   NULL, /* not an exclusion constraint */
1926                                                   expr, /* Tree form of check constraint */
1927                                                   ccbin,        /* Binary form of check constraint */
1928                                                   ccsrc,        /* Source form of check constraint */
1929                                                   is_local,             /* conislocal */
1930                                                   inhcount);    /* coninhcount */
1931
1932         pfree(ccbin);
1933         pfree(ccsrc);
1934 }
1935
1936 /*
1937  * Store defaults and constraints (passed as a list of CookedConstraint).
1938  *
1939  * NOTE: only pre-cooked expressions will be passed this way, which is to
1940  * say expressions inherited from an existing relation.  Newly parsed
1941  * expressions can be added later, by direct calls to StoreAttrDefault
1942  * and StoreRelCheck (see AddRelationNewConstraints()).
1943  */
1944 static void
1945 StoreConstraints(Relation rel, List *cooked_constraints)
1946 {
1947         int                     numchecks = 0;
1948         ListCell   *lc;
1949
1950         if (!cooked_constraints)
1951                 return;                                 /* nothing to do */
1952
1953         /*
1954          * Deparsing of constraint expressions will fail unless the just-created
1955          * pg_attribute tuples for this relation are made visible.      So, bump the
1956          * command counter.  CAUTION: this will cause a relcache entry rebuild.
1957          */
1958         CommandCounterIncrement();
1959
1960         foreach(lc, cooked_constraints)
1961         {
1962                 CookedConstraint *con = (CookedConstraint *) lfirst(lc);
1963
1964                 switch (con->contype)
1965                 {
1966                         case CONSTR_DEFAULT:
1967                                 StoreAttrDefault(rel, con->attnum, con->expr);
1968                                 break;
1969                         case CONSTR_CHECK:
1970                                 StoreRelCheck(rel, con->name, con->expr, !con->skip_validation,
1971                                                           con->is_local, con->inhcount);
1972                                 numchecks++;
1973                                 break;
1974                         default:
1975                                 elog(ERROR, "unrecognized constraint type: %d",
1976                                          (int) con->contype);
1977                 }
1978         }
1979
1980         if (numchecks > 0)
1981                 SetRelationNumChecks(rel, numchecks);
1982 }
1983
1984 /*
1985  * AddRelationNewConstraints
1986  *
1987  * Add new column default expressions and/or constraint check expressions
1988  * to an existing relation.  This is defined to do both for efficiency in
1989  * DefineRelation, but of course you can do just one or the other by passing
1990  * empty lists.
1991  *
1992  * rel: relation to be modified
1993  * newColDefaults: list of RawColumnDefault structures
1994  * newConstraints: list of Constraint nodes
1995  * allow_merge: TRUE if check constraints may be merged with existing ones
1996  * is_local: TRUE if definition is local, FALSE if it's inherited
1997  *
1998  * All entries in newColDefaults will be processed.  Entries in newConstraints
1999  * will be processed only if they are CONSTR_CHECK type.
2000  *
2001  * Returns a list of CookedConstraint nodes that shows the cooked form of
2002  * the default and constraint expressions added to the relation.
2003  *
2004  * NB: caller should have opened rel with AccessExclusiveLock, and should
2005  * hold that lock till end of transaction.      Also, we assume the caller has
2006  * done a CommandCounterIncrement if necessary to make the relation's catalog
2007  * tuples visible.
2008  */
2009 List *
2010 AddRelationNewConstraints(Relation rel,
2011                                                   List *newColDefaults,
2012                                                   List *newConstraints,
2013                                                   bool allow_merge,
2014                                                   bool is_local)
2015 {
2016         List       *cookedConstraints = NIL;
2017         TupleDesc       tupleDesc;
2018         TupleConstr *oldconstr;
2019         int                     numoldchecks;
2020         ParseState *pstate;
2021         RangeTblEntry *rte;
2022         int                     numchecks;
2023         List       *checknames;
2024         ListCell   *cell;
2025         Node       *expr;
2026         CookedConstraint *cooked;
2027
2028         /*
2029          * Get info about existing constraints.
2030          */
2031         tupleDesc = RelationGetDescr(rel);
2032         oldconstr = tupleDesc->constr;
2033         if (oldconstr)
2034                 numoldchecks = oldconstr->num_check;
2035         else
2036                 numoldchecks = 0;
2037
2038         /*
2039          * Create a dummy ParseState and insert the target relation as its sole
2040          * rangetable entry.  We need a ParseState for transformExpr.
2041          */
2042         pstate = make_parsestate(NULL);
2043         rte = addRangeTableEntryForRelation(pstate,
2044                                                                                 rel,
2045                                                                                 NULL,
2046                                                                                 false,
2047                                                                                 true);
2048         addRTEtoQuery(pstate, rte, true, true, true);
2049
2050         /*
2051          * Process column default expressions.
2052          */
2053         foreach(cell, newColDefaults)
2054         {
2055                 RawColumnDefault *colDef = (RawColumnDefault *) lfirst(cell);
2056                 Form_pg_attribute atp = rel->rd_att->attrs[colDef->attnum - 1];
2057
2058                 expr = cookDefault(pstate, colDef->raw_default,
2059                                                    atp->atttypid, atp->atttypmod,
2060                                                    NameStr(atp->attname));
2061
2062                 /*
2063                  * If the expression is just a NULL constant, we do not bother to make
2064                  * an explicit pg_attrdef entry, since the default behavior is
2065                  * equivalent.
2066                  *
2067                  * Note a nonobvious property of this test: if the column is of a
2068                  * domain type, what we'll get is not a bare null Const but a
2069                  * CoerceToDomain expr, so we will not discard the default.  This is
2070                  * critical because the column default needs to be retained to
2071                  * override any default that the domain might have.
2072                  */
2073                 if (expr == NULL ||
2074                         (IsA(expr, Const) &&((Const *) expr)->constisnull))
2075                         continue;
2076
2077                 StoreAttrDefault(rel, colDef->attnum, expr);
2078
2079                 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2080                 cooked->contype = CONSTR_DEFAULT;
2081                 cooked->name = NULL;
2082                 cooked->attnum = colDef->attnum;
2083                 cooked->expr = expr;
2084                 cooked->skip_validation = false;
2085                 cooked->is_local = is_local;
2086                 cooked->inhcount = is_local ? 0 : 1;
2087                 cookedConstraints = lappend(cookedConstraints, cooked);
2088         }
2089
2090         /*
2091          * Process constraint expressions.
2092          */
2093         numchecks = numoldchecks;
2094         checknames = NIL;
2095         foreach(cell, newConstraints)
2096         {
2097                 Constraint *cdef = (Constraint *) lfirst(cell);
2098                 char       *ccname;
2099
2100                 if (cdef->contype != CONSTR_CHECK)
2101                         continue;
2102
2103                 if (cdef->raw_expr != NULL)
2104                 {
2105                         Assert(cdef->cooked_expr == NULL);
2106
2107                         /*
2108                          * Transform raw parsetree to executable expression, and verify
2109                          * it's valid as a CHECK constraint.
2110                          */
2111                         expr = cookConstraint(pstate, cdef->raw_expr,
2112                                                                   RelationGetRelationName(rel));
2113                 }
2114                 else
2115                 {
2116                         Assert(cdef->cooked_expr != NULL);
2117
2118                         /*
2119                          * Here, we assume the parser will only pass us valid CHECK
2120                          * expressions, so we do no particular checking.
2121                          */
2122                         expr = stringToNode(cdef->cooked_expr);
2123                 }
2124
2125                 /*
2126                  * Check name uniqueness, or generate a name if none was given.
2127                  */
2128                 if (cdef->conname != NULL)
2129                 {
2130                         ListCell   *cell2;
2131
2132                         ccname = cdef->conname;
2133                         /* Check against other new constraints */
2134                         /* Needed because we don't do CommandCounterIncrement in loop */
2135                         foreach(cell2, checknames)
2136                         {
2137                                 if (strcmp((char *) lfirst(cell2), ccname) == 0)
2138                                         ereport(ERROR,
2139                                                         (errcode(ERRCODE_DUPLICATE_OBJECT),
2140                                                          errmsg("check constraint \"%s\" already exists",
2141                                                                         ccname)));
2142                         }
2143
2144                         /* save name for future checks */
2145                         checknames = lappend(checknames, ccname);
2146
2147                         /*
2148                          * Check against pre-existing constraints.      If we are allowed to
2149                          * merge with an existing constraint, there's no more to do here.
2150                          * (We omit the duplicate constraint from the result, which is
2151                          * what ATAddCheckConstraint wants.)
2152                          */
2153                         if (MergeWithExistingConstraint(rel, ccname, expr,
2154                                                                                         allow_merge, is_local))
2155                                 continue;
2156                 }
2157                 else
2158                 {
2159                         /*
2160                          * When generating a name, we want to create "tab_col_check" for a
2161                          * column constraint and "tab_check" for a table constraint.  We
2162                          * no longer have any info about the syntactic positioning of the
2163                          * constraint phrase, so we approximate this by seeing whether the
2164                          * expression references more than one column.  (If the user
2165                          * played by the rules, the result is the same...)
2166                          *
2167                          * Note: pull_var_clause() doesn't descend into sublinks, but we
2168                          * eliminated those above; and anyway this only needs to be an
2169                          * approximate answer.
2170                          */
2171                         List       *vars;
2172                         char       *colname;
2173
2174                         vars = pull_var_clause(expr, PVC_REJECT_PLACEHOLDERS);
2175
2176                         /* eliminate duplicates */
2177                         vars = list_union(NIL, vars);
2178
2179                         if (list_length(vars) == 1)
2180                                 colname = get_attname(RelationGetRelid(rel),
2181                                                                           ((Var *) linitial(vars))->varattno);
2182                         else
2183                                 colname = NULL;
2184
2185                         ccname = ChooseConstraintName(RelationGetRelationName(rel),
2186                                                                                   colname,
2187                                                                                   "check",
2188                                                                                   RelationGetNamespace(rel),
2189                                                                                   checknames);
2190
2191                         /* save name for future checks */
2192                         checknames = lappend(checknames, ccname);
2193                 }
2194
2195                 /*
2196                  * OK, store it.
2197                  */
2198                 StoreRelCheck(rel, ccname, expr, !cdef->skip_validation, is_local,
2199                                           is_local ? 0 : 1);
2200
2201                 numchecks++;
2202
2203                 cooked = (CookedConstraint *) palloc(sizeof(CookedConstraint));
2204                 cooked->contype = CONSTR_CHECK;
2205                 cooked->name = ccname;
2206                 cooked->attnum = 0;
2207                 cooked->expr = expr;
2208                 cooked->skip_validation = cdef->skip_validation;
2209                 cooked->is_local = is_local;
2210                 cooked->inhcount = is_local ? 0 : 1;
2211                 cookedConstraints = lappend(cookedConstraints, cooked);
2212         }
2213
2214         /*
2215          * Update the count of constraints in the relation's pg_class tuple. We do
2216          * this even if there was no change, in order to ensure that an SI update
2217          * message is sent out for the pg_class tuple, which will force other
2218          * backends to rebuild their relcache entries for the rel. (This is
2219          * critical if we added defaults but not constraints.)
2220          */
2221         SetRelationNumChecks(rel, numchecks);
2222
2223         return cookedConstraints;
2224 }
2225
2226 /*
2227  * Check for a pre-existing check constraint that conflicts with a proposed
2228  * new one, and either adjust its conislocal/coninhcount settings or throw
2229  * error as needed.
2230  *
2231  * Returns TRUE if merged (constraint is a duplicate), or FALSE if it's
2232  * got a so-far-unique name, or throws error if conflict.
2233  */
2234 static bool
2235 MergeWithExistingConstraint(Relation rel, char *ccname, Node *expr,
2236                                                         bool allow_merge, bool is_local)
2237 {
2238         bool            found;
2239         Relation        conDesc;
2240         SysScanDesc conscan;
2241         ScanKeyData skey[2];
2242         HeapTuple       tup;
2243
2244         /* Search for a pg_constraint entry with same name and relation */
2245         conDesc = heap_open(ConstraintRelationId, RowExclusiveLock);
2246
2247         found = false;
2248
2249         ScanKeyInit(&skey[0],
2250                                 Anum_pg_constraint_conname,
2251                                 BTEqualStrategyNumber, F_NAMEEQ,
2252                                 CStringGetDatum(ccname));
2253
2254         ScanKeyInit(&skey[1],
2255                                 Anum_pg_constraint_connamespace,
2256                                 BTEqualStrategyNumber, F_OIDEQ,
2257                                 ObjectIdGetDatum(RelationGetNamespace(rel)));
2258
2259         conscan = systable_beginscan(conDesc, ConstraintNameNspIndexId, true,
2260                                                                  SnapshotNow, 2, skey);
2261
2262         while (HeapTupleIsValid(tup = systable_getnext(conscan)))
2263         {
2264                 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup);
2265
2266                 if (con->conrelid == RelationGetRelid(rel))
2267                 {
2268                         /* Found it.  Conflicts if not identical check constraint */
2269                         if (con->contype == CONSTRAINT_CHECK)
2270                         {
2271                                 Datum           val;
2272                                 bool            isnull;
2273
2274                                 val = fastgetattr(tup,
2275                                                                   Anum_pg_constraint_conbin,
2276                                                                   conDesc->rd_att, &isnull);
2277                                 if (isnull)
2278                                         elog(ERROR, "null conbin for rel %s",
2279                                                  RelationGetRelationName(rel));
2280                                 if (equal(expr, stringToNode(TextDatumGetCString(val))))
2281                                         found = true;
2282                         }
2283                         if (!found || !allow_merge)
2284                                 ereport(ERROR,
2285                                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
2286                                 errmsg("constraint \"%s\" for relation \"%s\" already exists",
2287                                            ccname, RelationGetRelationName(rel))));
2288                         /* OK to update the tuple */
2289                         ereport(NOTICE,
2290                            (errmsg("merging constraint \"%s\" with inherited definition",
2291                                            ccname)));
2292                         tup = heap_copytuple(tup);
2293                         con = (Form_pg_constraint) GETSTRUCT(tup);
2294                         if (is_local)
2295                                 con->conislocal = true;
2296                         else
2297                                 con->coninhcount++;
2298                         simple_heap_update(conDesc, &tup->t_self, tup);
2299                         CatalogUpdateIndexes(conDesc, tup);
2300                         break;
2301                 }
2302         }
2303
2304         systable_endscan(conscan);
2305         heap_close(conDesc, RowExclusiveLock);
2306
2307         return found;
2308 }
2309
2310 /*
2311  * Update the count of constraints in the relation's pg_class tuple.
2312  *
2313  * Caller had better hold exclusive lock on the relation.
2314  *
2315  * An important side effect is that a SI update message will be sent out for
2316  * the pg_class tuple, which will force other backends to rebuild their
2317  * relcache entries for the rel.  Also, this backend will rebuild its
2318  * own relcache entry at the next CommandCounterIncrement.
2319  */
2320 static void
2321 SetRelationNumChecks(Relation rel, int numchecks)
2322 {
2323         Relation        relrel;
2324         HeapTuple       reltup;
2325         Form_pg_class relStruct;
2326
2327         relrel = heap_open(RelationRelationId, RowExclusiveLock);
2328         reltup = SearchSysCacheCopy1(RELOID,
2329                                                                  ObjectIdGetDatum(RelationGetRelid(rel)));
2330         if (!HeapTupleIsValid(reltup))
2331                 elog(ERROR, "cache lookup failed for relation %u",
2332                          RelationGetRelid(rel));
2333         relStruct = (Form_pg_class) GETSTRUCT(reltup);
2334
2335         if (relStruct->relchecks != numchecks)
2336         {
2337                 relStruct->relchecks = numchecks;
2338
2339                 simple_heap_update(relrel, &reltup->t_self, reltup);
2340
2341                 /* keep catalog indexes current */
2342                 CatalogUpdateIndexes(relrel, reltup);
2343         }
2344         else
2345         {
2346                 /* Skip the disk update, but force relcache inval anyway */
2347                 CacheInvalidateRelcache(rel);
2348         }
2349
2350         heap_freetuple(reltup);
2351         heap_close(relrel, RowExclusiveLock);
2352 }
2353
2354 /*
2355  * Take a raw default and convert it to a cooked format ready for
2356  * storage.
2357  *
2358  * Parse state should be set up to recognize any vars that might appear
2359  * in the expression.  (Even though we plan to reject vars, it's more
2360  * user-friendly to give the correct error message than "unknown var".)
2361  *
2362  * If atttypid is not InvalidOid, coerce the expression to the specified
2363  * type (and typmod atttypmod).   attname is only needed in this case:
2364  * it is used in the error message, if any.
2365  */
2366 Node *
2367 cookDefault(ParseState *pstate,
2368                         Node *raw_default,
2369                         Oid atttypid,
2370                         int32 atttypmod,
2371                         char *attname)
2372 {
2373         Node       *expr;
2374
2375         Assert(raw_default != NULL);
2376
2377         /*
2378          * Transform raw parsetree to executable expression.
2379          */
2380         expr = transformExpr(pstate, raw_default);
2381
2382         /*
2383          * Make sure default expr does not refer to any vars.
2384          */
2385         if (contain_var_clause(expr))
2386                 ereport(ERROR,
2387                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
2388                           errmsg("cannot use column references in default expression")));
2389
2390         /*
2391          * It can't return a set either.
2392          */
2393         if (expression_returns_set(expr))
2394                 ereport(ERROR,
2395                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
2396                                  errmsg("default expression must not return a set")));
2397
2398         /*
2399          * No subplans or aggregates, either...
2400          */
2401         if (pstate->p_hasSubLinks)
2402                 ereport(ERROR,
2403                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2404                                  errmsg("cannot use subquery in default expression")));
2405         if (pstate->p_hasAggs)
2406                 ereport(ERROR,
2407                                 (errcode(ERRCODE_GROUPING_ERROR),
2408                          errmsg("cannot use aggregate function in default expression")));
2409         if (pstate->p_hasWindowFuncs)
2410                 ereport(ERROR,
2411                                 (errcode(ERRCODE_WINDOWING_ERROR),
2412                                  errmsg("cannot use window function in default expression")));
2413
2414         /*
2415          * Coerce the expression to the correct type and typmod, if given. This
2416          * should match the parser's processing of non-defaulted expressions ---
2417          * see transformAssignedExpr().
2418          */
2419         if (OidIsValid(atttypid))
2420         {
2421                 Oid                     type_id = exprType(expr);
2422
2423                 expr = coerce_to_target_type(pstate, expr, type_id,
2424                                                                          atttypid, atttypmod,
2425                                                                          COERCION_ASSIGNMENT,
2426                                                                          COERCE_IMPLICIT_CAST,
2427                                                                          -1);
2428                 if (expr == NULL)
2429                         ereport(ERROR,
2430                                         (errcode(ERRCODE_DATATYPE_MISMATCH),
2431                                          errmsg("column \"%s\" is of type %s"
2432                                                         " but default expression is of type %s",
2433                                                         attname,
2434                                                         format_type_be(atttypid),
2435                                                         format_type_be(type_id)),
2436                            errhint("You will need to rewrite or cast the expression.")));
2437         }
2438
2439         /*
2440          * Finally, take care of collations in the finished expression.
2441          */
2442         assign_expr_collations(pstate, expr);
2443
2444         return expr;
2445 }
2446
2447 /*
2448  * Take a raw CHECK constraint expression and convert it to a cooked format
2449  * ready for storage.
2450  *
2451  * Parse state must be set up to recognize any vars that might appear
2452  * in the expression.
2453  */
2454 static Node *
2455 cookConstraint(ParseState *pstate,
2456                            Node *raw_constraint,
2457                            char *relname)
2458 {
2459         Node       *expr;
2460
2461         /*
2462          * Transform raw parsetree to executable expression.
2463          */
2464         expr = transformExpr(pstate, raw_constraint);
2465
2466         /*
2467          * Make sure it yields a boolean result.
2468          */
2469         expr = coerce_to_boolean(pstate, expr, "CHECK");
2470
2471         /*
2472          * Take care of collations.
2473          */
2474         assign_expr_collations(pstate, expr);
2475
2476         /*
2477          * Make sure no outside relations are referred to.
2478          */
2479         if (list_length(pstate->p_rtable) != 1)
2480                 ereport(ERROR,
2481                                 (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
2482                         errmsg("only table \"%s\" can be referenced in check constraint",
2483                                    relname)));
2484
2485         /*
2486          * No subplans or aggregates, either...
2487          */
2488         if (pstate->p_hasSubLinks)
2489                 ereport(ERROR,
2490                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2491                                  errmsg("cannot use subquery in check constraint")));
2492         if (pstate->p_hasAggs)
2493                 ereport(ERROR,
2494                                 (errcode(ERRCODE_GROUPING_ERROR),
2495                            errmsg("cannot use aggregate function in check constraint")));
2496         if (pstate->p_hasWindowFuncs)
2497                 ereport(ERROR,
2498                                 (errcode(ERRCODE_WINDOWING_ERROR),
2499                                  errmsg("cannot use window function in check constraint")));
2500
2501         return expr;
2502 }
2503
2504
2505 /*
2506  * RemoveStatistics --- remove entries in pg_statistic for a rel or column
2507  *
2508  * If attnum is zero, remove all entries for rel; else remove only the one(s)
2509  * for that column.
2510  */
2511 void
2512 RemoveStatistics(Oid relid, AttrNumber attnum)
2513 {
2514         Relation        pgstatistic;
2515         SysScanDesc scan;
2516         ScanKeyData key[2];
2517         int                     nkeys;
2518         HeapTuple       tuple;
2519
2520         pgstatistic = heap_open(StatisticRelationId, RowExclusiveLock);
2521
2522         ScanKeyInit(&key[0],
2523                                 Anum_pg_statistic_starelid,
2524                                 BTEqualStrategyNumber, F_OIDEQ,
2525                                 ObjectIdGetDatum(relid));
2526
2527         if (attnum == 0)
2528                 nkeys = 1;
2529         else
2530         {
2531                 ScanKeyInit(&key[1],
2532                                         Anum_pg_statistic_staattnum,
2533                                         BTEqualStrategyNumber, F_INT2EQ,
2534                                         Int16GetDatum(attnum));
2535                 nkeys = 2;
2536         }
2537
2538         scan = systable_beginscan(pgstatistic, StatisticRelidAttnumInhIndexId, true,
2539                                                           SnapshotNow, nkeys, key);
2540
2541         /* we must loop even when attnum != 0, in case of inherited stats */
2542         while (HeapTupleIsValid(tuple = systable_getnext(scan)))
2543                 simple_heap_delete(pgstatistic, &tuple->t_self);
2544
2545         systable_endscan(scan);
2546
2547         heap_close(pgstatistic, RowExclusiveLock);
2548 }
2549
2550
2551 /*
2552  * RelationTruncateIndexes - truncate all indexes associated
2553  * with the heap relation to zero tuples.
2554  *
2555  * The routine will truncate and then reconstruct the indexes on
2556  * the specified relation.      Caller must hold exclusive lock on rel.
2557  */
2558 static void
2559 RelationTruncateIndexes(Relation heapRelation)
2560 {
2561         ListCell   *indlist;
2562
2563         /* Ask the relcache to produce a list of the indexes of the rel */
2564         foreach(indlist, RelationGetIndexList(heapRelation))
2565         {
2566                 Oid                     indexId = lfirst_oid(indlist);
2567                 Relation        currentIndex;
2568                 IndexInfo  *indexInfo;
2569
2570                 /* Open the index relation; use exclusive lock, just to be sure */
2571                 currentIndex = index_open(indexId, AccessExclusiveLock);
2572
2573                 /* Fetch info needed for index_build */
2574                 indexInfo = BuildIndexInfo(currentIndex);
2575
2576                 /*
2577                  * Now truncate the actual file (and discard buffers).
2578                  */
2579                 RelationTruncate(currentIndex, 0);
2580
2581                 /* Initialize the index and rebuild */
2582                 /* Note: we do not need to re-establish pkey setting */
2583                 index_build(heapRelation, currentIndex, indexInfo, false, true);
2584
2585                 /* We're done with this index */
2586                 index_close(currentIndex, NoLock);
2587         }
2588 }
2589
2590 /*
2591  *       heap_truncate
2592  *
2593  *       This routine deletes all data within all the specified relations.
2594  *
2595  * This is not transaction-safe!  There is another, transaction-safe
2596  * implementation in commands/tablecmds.c.      We now use this only for
2597  * ON COMMIT truncation of temporary tables, where it doesn't matter.
2598  */
2599 void
2600 heap_truncate(List *relids)
2601 {
2602         List       *relations = NIL;
2603         ListCell   *cell;
2604
2605         /* Open relations for processing, and grab exclusive access on each */
2606         foreach(cell, relids)
2607         {
2608                 Oid                     rid = lfirst_oid(cell);
2609                 Relation        rel;
2610
2611                 rel = heap_open(rid, AccessExclusiveLock);
2612                 relations = lappend(relations, rel);
2613         }
2614
2615         /* Don't allow truncate on tables that are referenced by foreign keys */
2616         heap_truncate_check_FKs(relations, true);
2617
2618         /* OK to do it */
2619         foreach(cell, relations)
2620         {
2621                 Relation        rel = lfirst(cell);
2622
2623                 /* Truncate the relation */
2624                 heap_truncate_one_rel(rel);
2625
2626                 /* Close the relation, but keep exclusive lock on it until commit */
2627                 heap_close(rel, NoLock);
2628         }
2629 }
2630
2631 /*
2632  *       heap_truncate_one_rel
2633  *
2634  *       This routine deletes all data within the specified relation.
2635  *
2636  * This is not transaction-safe, because the truncation is done immediately
2637  * and cannot be rolled back later.  Caller is responsible for having
2638  * checked permissions etc, and must have obtained AccessExclusiveLock.
2639  */
2640 void
2641 heap_truncate_one_rel(Relation rel)
2642 {
2643         Oid                     toastrelid;
2644
2645         /* Truncate the actual file (and discard buffers) */
2646         RelationTruncate(rel, 0);
2647
2648         /* If the relation has indexes, truncate the indexes too */
2649         RelationTruncateIndexes(rel);
2650
2651         /* If there is a toast table, truncate that too */
2652         toastrelid = rel->rd_rel->reltoastrelid;
2653         if (OidIsValid(toastrelid))
2654         {
2655                 Relation        toastrel = heap_open(toastrelid, AccessExclusiveLock);
2656
2657                 RelationTruncate(toastrel, 0);
2658                 RelationTruncateIndexes(toastrel);
2659                 /* keep the lock... */
2660                 heap_close(toastrel, NoLock);
2661         }
2662 }
2663
2664 /*
2665  * heap_truncate_check_FKs
2666  *              Check for foreign keys referencing a list of relations that
2667  *              are to be truncated, and raise error if there are any
2668  *
2669  * We disallow such FKs (except self-referential ones) since the whole point
2670  * of TRUNCATE is to not scan the individual rows to be thrown away.
2671  *
2672  * This is split out so it can be shared by both implementations of truncate.
2673  * Caller should already hold a suitable lock on the relations.
2674  *
2675  * tempTables is only used to select an appropriate error message.
2676  */
2677 void
2678 heap_truncate_check_FKs(List *relations, bool tempTables)
2679 {
2680         List       *oids = NIL;
2681         List       *dependents;
2682         ListCell   *cell;
2683
2684         /*
2685          * Build a list of OIDs of the interesting relations.
2686          *
2687          * If a relation has no triggers, then it can neither have FKs nor be
2688          * referenced by a FK from another table, so we can ignore it.
2689          */
2690         foreach(cell, relations)
2691         {
2692                 Relation        rel = lfirst(cell);
2693
2694                 if (rel->rd_rel->relhastriggers)
2695                         oids = lappend_oid(oids, RelationGetRelid(rel));
2696         }
2697
2698         /*
2699          * Fast path: if no relation has triggers, none has FKs either.
2700          */
2701         if (oids == NIL)
2702                 return;
2703
2704         /*
2705          * Otherwise, must scan pg_constraint.  We make one pass with all the
2706          * relations considered; if this finds nothing, then all is well.
2707          */
2708         dependents = heap_truncate_find_FKs(oids);
2709         if (dependents == NIL)
2710                 return;
2711
2712         /*
2713          * Otherwise we repeat the scan once per relation to identify a particular
2714          * pair of relations to complain about.  This is pretty slow, but
2715          * performance shouldn't matter much in a failure path.  The reason for
2716          * doing things this way is to ensure that the message produced is not
2717          * dependent on chance row locations within pg_constraint.
2718          */
2719         foreach(cell, oids)
2720         {
2721                 Oid                     relid = lfirst_oid(cell);
2722                 ListCell   *cell2;
2723
2724                 dependents = heap_truncate_find_FKs(list_make1_oid(relid));
2725
2726                 foreach(cell2, dependents)
2727                 {
2728                         Oid                     relid2 = lfirst_oid(cell2);
2729
2730                         if (!list_member_oid(oids, relid2))
2731                         {
2732                                 char       *relname = get_rel_name(relid);
2733                                 char       *relname2 = get_rel_name(relid2);
2734
2735                                 if (tempTables)
2736                                         ereport(ERROR,
2737                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2738                                                          errmsg("unsupported ON COMMIT and foreign key combination"),
2739                                                          errdetail("Table \"%s\" references \"%s\", but they do not have the same ON COMMIT setting.",
2740                                                                            relname2, relname)));
2741                                 else
2742                                         ereport(ERROR,
2743                                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
2744                                                          errmsg("cannot truncate a table referenced in a foreign key constraint"),
2745                                                          errdetail("Table \"%s\" references \"%s\".",
2746                                                                            relname2, relname),
2747                                                    errhint("Truncate table \"%s\" at the same time, "
2748                                                                    "or use TRUNCATE ... CASCADE.",
2749                                                                    relname2)));
2750                         }
2751                 }
2752         }
2753 }
2754
2755 /*
2756  * heap_truncate_find_FKs
2757  *              Find relations having foreign keys referencing any of the given rels
2758  *
2759  * Input and result are both lists of relation OIDs.  The result contains
2760  * no duplicates, does *not* include any rels that were already in the input
2761  * list, and is sorted in OID order.  (The last property is enforced mainly
2762  * to guarantee consistent behavior in the regression tests; we don't want
2763  * behavior to change depending on chance locations of rows in pg_constraint.)
2764  *
2765  * Note: caller should already have appropriate lock on all rels mentioned
2766  * in relationIds.      Since adding or dropping an FK requires exclusive lock
2767  * on both rels, this ensures that the answer will be stable.
2768  */
2769 List *
2770 heap_truncate_find_FKs(List *relationIds)
2771 {
2772         List       *result = NIL;
2773         Relation        fkeyRel;
2774         SysScanDesc fkeyScan;
2775         HeapTuple       tuple;
2776
2777         /*
2778          * Must scan pg_constraint.  Right now, it is a seqscan because there is
2779          * no available index on confrelid.
2780          */
2781         fkeyRel = heap_open(ConstraintRelationId, AccessShareLock);
2782
2783         fkeyScan = systable_beginscan(fkeyRel, InvalidOid, false,
2784                                                                   SnapshotNow, 0, NULL);
2785
2786         while (HeapTupleIsValid(tuple = systable_getnext(fkeyScan)))
2787         {
2788                 Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tuple);
2789
2790                 /* Not a foreign key */
2791                 if (con->contype != CONSTRAINT_FOREIGN)
2792                         continue;
2793
2794                 /* Not referencing one of our list of tables */
2795                 if (!list_member_oid(relationIds, con->confrelid))
2796                         continue;
2797
2798                 /* Add referencer unless already in input or result list */
2799                 if (!list_member_oid(relationIds, con->conrelid))
2800                         result = insert_ordered_unique_oid(result, con->conrelid);
2801         }
2802
2803         systable_endscan(fkeyScan);
2804         heap_close(fkeyRel, AccessShareLock);
2805
2806         return result;
2807 }
2808
2809 /*
2810  * insert_ordered_unique_oid
2811  *              Insert a new Oid into a sorted list of Oids, preserving ordering,
2812  *              and eliminating duplicates
2813  *
2814  * Building the ordered list this way is O(N^2), but with a pretty small
2815  * constant, so for the number of entries we expect it will probably be
2816  * faster than trying to apply qsort().  It seems unlikely someone would be
2817  * trying to truncate a table with thousands of dependent tables ...
2818  */
2819 static List *
2820 insert_ordered_unique_oid(List *list, Oid datum)
2821 {
2822         ListCell   *prev;
2823
2824         /* Does the datum belong at the front? */
2825         if (list == NIL || datum < linitial_oid(list))
2826                 return lcons_oid(datum, list);
2827         /* Does it match the first entry? */
2828         if (datum == linitial_oid(list))
2829                 return list;                    /* duplicate, so don't insert */
2830         /* No, so find the entry it belongs after */
2831         prev = list_head(list);
2832         for (;;)
2833         {
2834                 ListCell   *curr = lnext(prev);
2835
2836                 if (curr == NULL || datum < lfirst_oid(curr))
2837                         break;                          /* it belongs after 'prev', before 'curr' */
2838
2839                 if (datum == lfirst_oid(curr))
2840                         return list;            /* duplicate, so don't insert */
2841
2842                 prev = curr;
2843         }
2844         /* Insert datum into list after 'prev' */
2845         lappend_cell_oid(list, prev, datum);
2846         return list;
2847 }