OSDN Git Service

Refactor broken CREATE TABLE IF NOT EXISTS support.
[pg-rex/syncrep.git] / src / backend / commands / cluster.c
1 /*-------------------------------------------------------------------------
2  *
3  * cluster.c
4  *        CLUSTER a table on an index.  This is now also used for VACUUM FULL.
5  *
6  * There is hardly anything left of Paul Brown's original implementation...
7  *
8  *
9  * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
10  * Portions Copyright (c) 1994-5, Regents of the University of California
11  *
12  *
13  * IDENTIFICATION
14  *        src/backend/commands/cluster.c
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19
20 #include "access/genam.h"
21 #include "access/heapam.h"
22 #include "access/relscan.h"
23 #include "access/rewriteheap.h"
24 #include "access/transam.h"
25 #include "access/xact.h"
26 #include "catalog/catalog.h"
27 #include "catalog/dependency.h"
28 #include "catalog/heap.h"
29 #include "catalog/index.h"
30 #include "catalog/indexing.h"
31 #include "catalog/namespace.h"
32 #include "catalog/pg_namespace.h"
33 #include "catalog/toasting.h"
34 #include "commands/cluster.h"
35 #include "commands/tablecmds.h"
36 #include "commands/trigger.h"
37 #include "commands/vacuum.h"
38 #include "miscadmin.h"
39 #include "optimizer/planner.h"
40 #include "storage/bufmgr.h"
41 #include "storage/procarray.h"
42 #include "storage/smgr.h"
43 #include "utils/acl.h"
44 #include "utils/fmgroids.h"
45 #include "utils/inval.h"
46 #include "utils/lsyscache.h"
47 #include "utils/memutils.h"
48 #include "utils/pg_rusage.h"
49 #include "utils/relcache.h"
50 #include "utils/relmapper.h"
51 #include "utils/snapmgr.h"
52 #include "utils/syscache.h"
53 #include "utils/tqual.h"
54 #include "utils/tuplesort.h"
55
56
57 /*
58  * This struct is used to pass around the information on tables to be
59  * clustered. We need this so we can make a list of them when invoked without
60  * a specific table/index pair.
61  */
62 typedef struct
63 {
64         Oid                     tableOid;
65         Oid                     indexOid;
66 } RelToCluster;
67
68
69 static void rebuild_relation(Relation OldHeap, Oid indexOid,
70                                  int freeze_min_age, int freeze_table_age, bool verbose);
71 static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
72                            int freeze_min_age, int freeze_table_age, bool verbose,
73                            bool *pSwapToastByContent, TransactionId *pFreezeXid);
74 static List *get_tables_to_cluster(MemoryContext cluster_context);
75 static void reform_and_rewrite_tuple(HeapTuple tuple,
76                                                  TupleDesc oldTupDesc, TupleDesc newTupDesc,
77                                                  Datum *values, bool *isnull,
78                                                  bool newRelHasOids, RewriteState rwstate);
79
80
81 /*---------------------------------------------------------------------------
82  * This cluster code allows for clustering multiple tables at once. Because
83  * of this, we cannot just run everything on a single transaction, or we
84  * would be forced to acquire exclusive locks on all the tables being
85  * clustered, simultaneously --- very likely leading to deadlock.
86  *
87  * To solve this we follow a similar strategy to VACUUM code,
88  * clustering each relation in a separate transaction. For this to work,
89  * we need to:
90  *      - provide a separate memory context so that we can pass information in
91  *        a way that survives across transactions
92  *      - start a new transaction every time a new relation is clustered
93  *      - check for validity of the information on to-be-clustered relations,
94  *        as someone might have deleted a relation behind our back, or
95  *        clustered one on a different index
96  *      - end the transaction
97  *
98  * The single-relation case does not have any such overhead.
99  *
100  * We also allow a relation to be specified without index.      In that case,
101  * the indisclustered bit will be looked up, and an ERROR will be thrown
102  * if there is no index with the bit set.
103  *---------------------------------------------------------------------------
104  */
105 void
106 cluster(ClusterStmt *stmt, bool isTopLevel)
107 {
108         if (stmt->relation != NULL)
109         {
110                 /* This is the single-relation case. */
111                 Oid                     tableOid,
112                                         indexOid = InvalidOid;
113                 Relation        rel;
114
115                 /* Find and lock the table */
116                 rel = heap_openrv(stmt->relation, AccessExclusiveLock);
117
118                 tableOid = RelationGetRelid(rel);
119
120                 /* Check permissions */
121                 if (!pg_class_ownercheck(tableOid, GetUserId()))
122                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
123                                                    RelationGetRelationName(rel));
124
125                 /*
126                  * Reject clustering a remote temp table ... their local buffer
127                  * manager is not going to cope.
128                  */
129                 if (RELATION_IS_OTHER_TEMP(rel))
130                         ereport(ERROR,
131                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
132                            errmsg("cannot cluster temporary tables of other sessions")));
133
134                 if (stmt->indexname == NULL)
135                 {
136                         ListCell   *index;
137
138                         /* We need to find the index that has indisclustered set. */
139                         foreach(index, RelationGetIndexList(rel))
140                         {
141                                 HeapTuple       idxtuple;
142                                 Form_pg_index indexForm;
143
144                                 indexOid = lfirst_oid(index);
145                                 idxtuple = SearchSysCache1(INDEXRELID,
146                                                                                    ObjectIdGetDatum(indexOid));
147                                 if (!HeapTupleIsValid(idxtuple))
148                                         elog(ERROR, "cache lookup failed for index %u", indexOid);
149                                 indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
150                                 if (indexForm->indisclustered)
151                                 {
152                                         ReleaseSysCache(idxtuple);
153                                         break;
154                                 }
155                                 ReleaseSysCache(idxtuple);
156                                 indexOid = InvalidOid;
157                         }
158
159                         if (!OidIsValid(indexOid))
160                                 ereport(ERROR,
161                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
162                                                  errmsg("there is no previously clustered index for table \"%s\"",
163                                                                 stmt->relation->relname)));
164                 }
165                 else
166                 {
167                         /*
168                          * The index is expected to be in the same namespace as the
169                          * relation.
170                          */
171                         indexOid = get_relname_relid(stmt->indexname,
172                                                                                  rel->rd_rel->relnamespace);
173                         if (!OidIsValid(indexOid))
174                                 ereport(ERROR,
175                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
176                                            errmsg("index \"%s\" for table \"%s\" does not exist",
177                                                           stmt->indexname, stmt->relation->relname)));
178                 }
179
180                 /* close relation, keep lock till commit */
181                 heap_close(rel, NoLock);
182
183                 /* Do the job */
184                 cluster_rel(tableOid, indexOid, false, stmt->verbose, -1, -1);
185         }
186         else
187         {
188                 /*
189                  * This is the "multi relation" case. We need to cluster all tables
190                  * that have some index with indisclustered set.
191                  */
192                 MemoryContext cluster_context;
193                 List       *rvs;
194                 ListCell   *rv;
195
196                 /*
197                  * We cannot run this form of CLUSTER inside a user transaction block;
198                  * we'd be holding locks way too long.
199                  */
200                 PreventTransactionChain(isTopLevel, "CLUSTER");
201
202                 /*
203                  * Create special memory context for cross-transaction storage.
204                  *
205                  * Since it is a child of PortalContext, it will go away even in case
206                  * of error.
207                  */
208                 cluster_context = AllocSetContextCreate(PortalContext,
209                                                                                                 "Cluster",
210                                                                                                 ALLOCSET_DEFAULT_MINSIZE,
211                                                                                                 ALLOCSET_DEFAULT_INITSIZE,
212                                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
213
214                 /*
215                  * Build the list of relations to cluster.      Note that this lives in
216                  * cluster_context.
217                  */
218                 rvs = get_tables_to_cluster(cluster_context);
219
220                 /* Commit to get out of starting transaction */
221                 PopActiveSnapshot();
222                 CommitTransactionCommand();
223
224                 /* Ok, now that we've got them all, cluster them one by one */
225                 foreach(rv, rvs)
226                 {
227                         RelToCluster *rvtc = (RelToCluster *) lfirst(rv);
228
229                         /* Start a new transaction for each relation. */
230                         StartTransactionCommand();
231                         /* functions in indexes may want a snapshot set */
232                         PushActiveSnapshot(GetTransactionSnapshot());
233                         cluster_rel(rvtc->tableOid, rvtc->indexOid, true, stmt->verbose,
234                                                 -1, -1);
235                         PopActiveSnapshot();
236                         CommitTransactionCommand();
237                 }
238
239                 /* Start a new transaction for the cleanup work. */
240                 StartTransactionCommand();
241
242                 /* Clean up working storage */
243                 MemoryContextDelete(cluster_context);
244         }
245 }
246
247 /*
248  * cluster_rel
249  *
250  * This clusters the table by creating a new, clustered table and
251  * swapping the relfilenodes of the new table and the old table, so
252  * the OID of the original table is preserved.  Thus we do not lose
253  * GRANT, inheritance nor references to this table (this was a bug
254  * in releases thru 7.3).
255  *
256  * Indexes are rebuilt too, via REINDEX. Since we are effectively bulk-loading
257  * the new table, it's better to create the indexes afterwards than to fill
258  * them incrementally while we load the table.
259  *
260  * If indexOid is InvalidOid, the table will be rewritten in physical order
261  * instead of index order.      This is the new implementation of VACUUM FULL,
262  * and error messages should refer to the operation as VACUUM not CLUSTER.
263  */
264 void
265 cluster_rel(Oid tableOid, Oid indexOid, bool recheck, bool verbose,
266                         int freeze_min_age, int freeze_table_age)
267 {
268         Relation        OldHeap;
269
270         /* Check for user-requested abort. */
271         CHECK_FOR_INTERRUPTS();
272
273         /*
274          * We grab exclusive access to the target rel and index for the duration
275          * of the transaction.  (This is redundant for the single-transaction
276          * case, since cluster() already did it.)  The index lock is taken inside
277          * check_index_is_clusterable.
278          */
279         OldHeap = try_relation_open(tableOid, AccessExclusiveLock);
280
281         /* If the table has gone away, we can skip processing it */
282         if (!OldHeap)
283                 return;
284
285         /*
286          * Since we may open a new transaction for each relation, we have to check
287          * that the relation still is what we think it is.
288          *
289          * If this is a single-transaction CLUSTER, we can skip these tests. We
290          * *must* skip the one on indisclustered since it would reject an attempt
291          * to cluster a not-previously-clustered index.
292          */
293         if (recheck)
294         {
295                 HeapTuple       tuple;
296                 Form_pg_index indexForm;
297
298                 /* Check that the user still owns the relation */
299                 if (!pg_class_ownercheck(tableOid, GetUserId()))
300                 {
301                         relation_close(OldHeap, AccessExclusiveLock);
302                         return;
303                 }
304
305                 /*
306                  * Silently skip a temp table for a remote session.  Only doing this
307                  * check in the "recheck" case is appropriate (which currently means
308                  * somebody is executing a database-wide CLUSTER), because there is
309                  * another check in cluster() which will stop any attempt to cluster
310                  * remote temp tables by name.  There is another check in cluster_rel
311                  * which is redundant, but we leave it for extra safety.
312                  */
313                 if (RELATION_IS_OTHER_TEMP(OldHeap))
314                 {
315                         relation_close(OldHeap, AccessExclusiveLock);
316                         return;
317                 }
318
319                 if (OidIsValid(indexOid))
320                 {
321                         /*
322                          * Check that the index still exists
323                          */
324                         if (!SearchSysCacheExists1(RELOID, ObjectIdGetDatum(indexOid)))
325                         {
326                                 relation_close(OldHeap, AccessExclusiveLock);
327                                 return;
328                         }
329
330                         /*
331                          * Check that the index is still the one with indisclustered set.
332                          */
333                         tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexOid));
334                         if (!HeapTupleIsValid(tuple))           /* probably can't happen */
335                         {
336                                 relation_close(OldHeap, AccessExclusiveLock);
337                                 return;
338                         }
339                         indexForm = (Form_pg_index) GETSTRUCT(tuple);
340                         if (!indexForm->indisclustered)
341                         {
342                                 ReleaseSysCache(tuple);
343                                 relation_close(OldHeap, AccessExclusiveLock);
344                                 return;
345                         }
346                         ReleaseSysCache(tuple);
347                 }
348         }
349
350         /*
351          * We allow VACUUM FULL, but not CLUSTER, on shared catalogs.  CLUSTER
352          * would work in most respects, but the index would only get marked as
353          * indisclustered in the current database, leading to unexpected behavior
354          * if CLUSTER were later invoked in another database.
355          */
356         if (OidIsValid(indexOid) && OldHeap->rd_rel->relisshared)
357                 ereport(ERROR,
358                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
359                                  errmsg("cannot cluster a shared catalog")));
360
361         /*
362          * Don't process temp tables of other backends ... their local buffer
363          * manager is not going to cope.
364          */
365         if (RELATION_IS_OTHER_TEMP(OldHeap))
366         {
367                 if (OidIsValid(indexOid))
368                         ereport(ERROR,
369                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
370                            errmsg("cannot cluster temporary tables of other sessions")));
371                 else
372                         ereport(ERROR,
373                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
374                                 errmsg("cannot vacuum temporary tables of other sessions")));
375         }
376
377         /*
378          * Also check for active uses of the relation in the current transaction,
379          * including open scans and pending AFTER trigger events.
380          */
381         CheckTableNotInUse(OldHeap, OidIsValid(indexOid) ? "CLUSTER" : "VACUUM");
382
383         /* Check heap and index are valid to cluster on */
384         if (OidIsValid(indexOid))
385                 check_index_is_clusterable(OldHeap, indexOid, recheck, AccessExclusiveLock);
386
387         /* rebuild_relation does all the dirty work */
388         rebuild_relation(OldHeap, indexOid, freeze_min_age, freeze_table_age,
389                                          verbose);
390
391         /* NB: rebuild_relation does heap_close() on OldHeap */
392 }
393
394 /*
395  * Verify that the specified heap and index are valid to cluster on
396  *
397  * Side effect: obtains exclusive lock on the index.  The caller should
398  * already have exclusive lock on the table, so the index lock is likely
399  * redundant, but it seems best to grab it anyway to ensure the index
400  * definition can't change under us.
401  */
402 void
403 check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck, LOCKMODE lockmode)
404 {
405         Relation        OldIndex;
406
407         OldIndex = index_open(indexOid, lockmode);
408
409         /*
410          * Check that index is in fact an index on the given relation
411          */
412         if (OldIndex->rd_index == NULL ||
413                 OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
414                 ereport(ERROR,
415                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
416                                  errmsg("\"%s\" is not an index for table \"%s\"",
417                                                 RelationGetRelationName(OldIndex),
418                                                 RelationGetRelationName(OldHeap))));
419
420         /* Index AM must allow clustering */
421         if (!OldIndex->rd_am->amclusterable)
422                 ereport(ERROR,
423                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
424                                  errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
425                                                 RelationGetRelationName(OldIndex))));
426
427         /*
428          * Disallow clustering on incomplete indexes (those that might not index
429          * every row of the relation).  We could relax this by making a separate
430          * seqscan pass over the table to copy the missing rows, but that seems
431          * expensive and tedious.
432          */
433         if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred))
434                 ereport(ERROR,
435                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
436                                  errmsg("cannot cluster on partial index \"%s\"",
437                                                 RelationGetRelationName(OldIndex))));
438
439         /*
440          * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
441          * it might well not contain entries for every heap row, or might not even
442          * be internally consistent.  (But note that we don't check indcheckxmin;
443          * the worst consequence of following broken HOT chains would be that we
444          * might put recently-dead tuples out-of-order in the new table, and there
445          * is little harm in that.)
446          */
447         if (!OldIndex->rd_index->indisvalid)
448                 ereport(ERROR,
449                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
450                                  errmsg("cannot cluster on invalid index \"%s\"",
451                                                 RelationGetRelationName(OldIndex))));
452
453         /* Drop relcache refcnt on OldIndex, but keep lock */
454         index_close(OldIndex, NoLock);
455 }
456
457 /*
458  * mark_index_clustered: mark the specified index as the one clustered on
459  *
460  * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
461  */
462 void
463 mark_index_clustered(Relation rel, Oid indexOid)
464 {
465         HeapTuple       indexTuple;
466         Form_pg_index indexForm;
467         Relation        pg_index;
468         ListCell   *index;
469
470         /*
471          * If the index is already marked clustered, no need to do anything.
472          */
473         if (OidIsValid(indexOid))
474         {
475                 indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexOid));
476                 if (!HeapTupleIsValid(indexTuple))
477                         elog(ERROR, "cache lookup failed for index %u", indexOid);
478                 indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
479
480                 if (indexForm->indisclustered)
481                 {
482                         ReleaseSysCache(indexTuple);
483                         return;
484                 }
485
486                 ReleaseSysCache(indexTuple);
487         }
488
489         /*
490          * Check each index of the relation and set/clear the bit as needed.
491          */
492         pg_index = heap_open(IndexRelationId, RowExclusiveLock);
493
494         foreach(index, RelationGetIndexList(rel))
495         {
496                 Oid                     thisIndexOid = lfirst_oid(index);
497
498                 indexTuple = SearchSysCacheCopy1(INDEXRELID,
499                                                                                  ObjectIdGetDatum(thisIndexOid));
500                 if (!HeapTupleIsValid(indexTuple))
501                         elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
502                 indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
503
504                 /*
505                  * Unset the bit if set.  We know it's wrong because we checked this
506                  * earlier.
507                  */
508                 if (indexForm->indisclustered)
509                 {
510                         indexForm->indisclustered = false;
511                         simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
512                         CatalogUpdateIndexes(pg_index, indexTuple);
513                 }
514                 else if (thisIndexOid == indexOid)
515                 {
516                         indexForm->indisclustered = true;
517                         simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
518                         CatalogUpdateIndexes(pg_index, indexTuple);
519                 }
520                 heap_freetuple(indexTuple);
521         }
522
523         heap_close(pg_index, RowExclusiveLock);
524 }
525
526 /*
527  * rebuild_relation: rebuild an existing relation in index or physical order
528  *
529  * OldHeap: table to rebuild --- must be opened and exclusive-locked!
530  * indexOid: index to cluster by, or InvalidOid to rewrite in physical order.
531  *
532  * NB: this routine closes OldHeap at the right time; caller should not.
533  */
534 static void
535 rebuild_relation(Relation OldHeap, Oid indexOid,
536                                  int freeze_min_age, int freeze_table_age, bool verbose)
537 {
538         Oid                     tableOid = RelationGetRelid(OldHeap);
539         Oid                     tableSpace = OldHeap->rd_rel->reltablespace;
540         Oid                     OIDNewHeap;
541         bool            is_system_catalog;
542         bool            swap_toast_by_content;
543         TransactionId frozenXid;
544
545         /* Mark the correct index as clustered */
546         if (OidIsValid(indexOid))
547                 mark_index_clustered(OldHeap, indexOid);
548
549         /* Remember if it's a system catalog */
550         is_system_catalog = IsSystemRelation(OldHeap);
551
552         /* Close relcache entry, but keep lock until transaction commit */
553         heap_close(OldHeap, NoLock);
554
555         /* Create the transient table that will receive the re-ordered data */
556         OIDNewHeap = make_new_heap(tableOid, tableSpace);
557
558         /* Copy the heap data into the new table in the desired order */
559         copy_heap_data(OIDNewHeap, tableOid, indexOid,
560                                    freeze_min_age, freeze_table_age, verbose,
561                                    &swap_toast_by_content, &frozenXid);
562
563         /*
564          * Swap the physical files of the target and transient tables, then
565          * rebuild the target's indexes and throw away the transient table.
566          */
567         finish_heap_swap(tableOid, OIDNewHeap, is_system_catalog,
568                                          swap_toast_by_content, false, frozenXid);
569 }
570
571
572 /*
573  * Create the transient table that will be filled with new data during
574  * CLUSTER, ALTER TABLE, and similar operations.  The transient table
575  * duplicates the logical structure of the OldHeap, but is placed in
576  * NewTableSpace which might be different from OldHeap's.
577  *
578  * After this, the caller should load the new heap with transferred/modified
579  * data, then call finish_heap_swap to complete the operation.
580  */
581 Oid
582 make_new_heap(Oid OIDOldHeap, Oid NewTableSpace)
583 {
584         TupleDesc       OldHeapDesc,
585                                 tupdesc;
586         char            NewHeapName[NAMEDATALEN];
587         Oid                     OIDNewHeap;
588         Oid                     toastid;
589         Relation        OldHeap;
590         HeapTuple       tuple;
591         Datum           reloptions;
592         bool            isNull;
593
594         OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
595         OldHeapDesc = RelationGetDescr(OldHeap);
596
597         /*
598          * Need to make a copy of the tuple descriptor, since
599          * heap_create_with_catalog modifies it.  Note that the NewHeap will not
600          * receive any of the defaults or constraints associated with the OldHeap;
601          * we don't need 'em, and there's no reason to spend cycles inserting them
602          * into the catalogs only to delete them.
603          */
604         tupdesc = CreateTupleDescCopy(OldHeapDesc);
605
606         /*
607          * But we do want to use reloptions of the old heap for new heap.
608          */
609         tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(OIDOldHeap));
610         if (!HeapTupleIsValid(tuple))
611                 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
612         reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
613                                                                  &isNull);
614         if (isNull)
615                 reloptions = (Datum) 0;
616
617         /*
618          * Create the new heap, using a temporary name in the same namespace as
619          * the existing table.  NOTE: there is some risk of collision with user
620          * relnames.  Working around this seems more trouble than it's worth; in
621          * particular, we can't create the new heap in a different namespace from
622          * the old, or we will have problems with the TEMP status of temp tables.
623          *
624          * Note: the new heap is not a shared relation, even if we are rebuilding
625          * a shared rel.  However, we do make the new heap mapped if the source is
626          * mapped.      This simplifies swap_relation_files, and is absolutely
627          * necessary for rebuilding pg_class, for reasons explained there.
628          */
629         snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", OIDOldHeap);
630
631         OIDNewHeap = heap_create_with_catalog(NewHeapName,
632                                                                                   RelationGetNamespace(OldHeap),
633                                                                                   NewTableSpace,
634                                                                                   InvalidOid,
635                                                                                   InvalidOid,
636                                                                                   InvalidOid,
637                                                                                   OldHeap->rd_rel->relowner,
638                                                                                   tupdesc,
639                                                                                   NIL,
640                                                                                   OldHeap->rd_rel->relkind,
641                                                                                   OldHeap->rd_rel->relpersistence,
642                                                                                   false,
643                                                                                   RelationIsMapped(OldHeap),
644                                                                                   true,
645                                                                                   0,
646                                                                                   ONCOMMIT_NOOP,
647                                                                                   reloptions,
648                                                                                   false,
649                                                                                   true);
650         Assert(OIDNewHeap != InvalidOid);
651
652         ReleaseSysCache(tuple);
653
654         /*
655          * Advance command counter so that the newly-created relation's catalog
656          * tuples will be visible to heap_open.
657          */
658         CommandCounterIncrement();
659
660         /*
661          * If necessary, create a TOAST table for the new relation.
662          *
663          * If the relation doesn't have a TOAST table already, we can't need one
664          * for the new relation.  The other way around is possible though: if some
665          * wide columns have been dropped, AlterTableCreateToastTable can decide
666          * that no TOAST table is needed for the new table.
667          *
668          * Note that AlterTableCreateToastTable ends with CommandCounterIncrement,
669          * so that the TOAST table will be visible for insertion.
670          */
671         toastid = OldHeap->rd_rel->reltoastrelid;
672         if (OidIsValid(toastid))
673         {
674                 /* keep the existing toast table's reloptions, if any */
675                 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(toastid));
676                 if (!HeapTupleIsValid(tuple))
677                         elog(ERROR, "cache lookup failed for relation %u", toastid);
678                 reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
679                                                                          &isNull);
680                 if (isNull)
681                         reloptions = (Datum) 0;
682
683                 AlterTableCreateToastTable(OIDNewHeap, reloptions);
684
685                 ReleaseSysCache(tuple);
686         }
687
688         heap_close(OldHeap, NoLock);
689
690         return OIDNewHeap;
691 }
692
693 /*
694  * Do the physical copying of heap data.
695  *
696  * There are two output parameters:
697  * *pSwapToastByContent is set true if toast tables must be swapped by content.
698  * *pFreezeXid receives the TransactionId used as freeze cutoff point.
699  */
700 static void
701 copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
702                            int freeze_min_age, int freeze_table_age, bool verbose,
703                            bool *pSwapToastByContent, TransactionId *pFreezeXid)
704 {
705         Relation        NewHeap,
706                                 OldHeap,
707                                 OldIndex;
708         TupleDesc       oldTupDesc;
709         TupleDesc       newTupDesc;
710         int                     natts;
711         Datum      *values;
712         bool       *isnull;
713         IndexScanDesc indexScan;
714         HeapScanDesc heapScan;
715         bool            use_wal;
716         bool            is_system_catalog;
717         TransactionId OldestXmin;
718         TransactionId FreezeXid;
719         RewriteState rwstate;
720         bool            use_sort;
721         Tuplesortstate *tuplesort;
722         double          num_tuples = 0,
723                                 tups_vacuumed = 0,
724                                 tups_recently_dead = 0;
725         int                     elevel = verbose ? INFO : DEBUG2;
726         PGRUsage        ru0;
727
728         pg_rusage_init(&ru0);
729
730         /*
731          * Open the relations we need.
732          */
733         NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock);
734         OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
735         if (OidIsValid(OIDOldIndex))
736                 OldIndex = index_open(OIDOldIndex, AccessExclusiveLock);
737         else
738                 OldIndex = NULL;
739
740         /*
741          * Their tuple descriptors should be exactly alike, but here we only need
742          * assume that they have the same number of columns.
743          */
744         oldTupDesc = RelationGetDescr(OldHeap);
745         newTupDesc = RelationGetDescr(NewHeap);
746         Assert(newTupDesc->natts == oldTupDesc->natts);
747
748         /* Preallocate values/isnull arrays */
749         natts = newTupDesc->natts;
750         values = (Datum *) palloc(natts * sizeof(Datum));
751         isnull = (bool *) palloc(natts * sizeof(bool));
752
753         /*
754          * We need to log the copied data in WAL iff WAL archiving/streaming is
755          * enabled AND it's not a WAL-logged rel.
756          */
757         use_wal = XLogIsNeeded() && RelationNeedsWAL(NewHeap);
758
759         /* use_wal off requires smgr_targblock be initially invalid */
760         Assert(RelationGetTargetBlock(NewHeap) == InvalidBlockNumber);
761
762         /*
763          * If both tables have TOAST tables, perform toast swap by content.  It is
764          * possible that the old table has a toast table but the new one doesn't,
765          * if toastable columns have been dropped.      In that case we have to do
766          * swap by links.  This is okay because swap by content is only essential
767          * for system catalogs, and we don't support schema changes for them.
768          */
769         if (OldHeap->rd_rel->reltoastrelid && NewHeap->rd_rel->reltoastrelid)
770         {
771                 *pSwapToastByContent = true;
772
773                 /*
774                  * When doing swap by content, any toast pointers written into NewHeap
775                  * must use the old toast table's OID, because that's where the toast
776                  * data will eventually be found.  Set this up by setting rd_toastoid.
777                  * Note that we must hold NewHeap open until we are done writing data,
778                  * since the relcache will not guarantee to remember this setting once
779                  * the relation is closed.      Also, this technique depends on the fact
780                  * that no one will try to read from the NewHeap until after we've
781                  * finished writing it and swapping the rels --- otherwise they could
782                  * follow the toast pointers to the wrong place.
783                  */
784                 NewHeap->rd_toastoid = OldHeap->rd_rel->reltoastrelid;
785         }
786         else
787                 *pSwapToastByContent = false;
788
789         /*
790          * compute xids used to freeze and weed out dead tuples.  We use -1
791          * freeze_min_age to avoid having CLUSTER freeze tuples earlier than a
792          * plain VACUUM would.
793          */
794         vacuum_set_xid_limits(freeze_min_age, freeze_table_age,
795                                                   OldHeap->rd_rel->relisshared,
796                                                   &OldestXmin, &FreezeXid, NULL);
797
798         /*
799          * FreezeXid will become the table's new relfrozenxid, and that mustn't go
800          * backwards, so take the max.
801          */
802         if (TransactionIdPrecedes(FreezeXid, OldHeap->rd_rel->relfrozenxid))
803                 FreezeXid = OldHeap->rd_rel->relfrozenxid;
804
805         /* return selected value to caller */
806         *pFreezeXid = FreezeXid;
807
808         /* Remember if it's a system catalog */
809         is_system_catalog = IsSystemRelation(OldHeap);
810
811         /* Initialize the rewrite operation */
812         rwstate = begin_heap_rewrite(NewHeap, OldestXmin, FreezeXid, use_wal);
813
814         /*
815          * Decide whether to use an indexscan or seqscan-and-optional-sort to scan
816          * the OldHeap.  We know how to use a sort to duplicate the ordering of a
817          * btree index, and will use seqscan-and-sort for that case if the planner
818          * tells us it's cheaper.  Otherwise, always indexscan if an index is
819          * provided, else plain seqscan.
820          */
821         if (OldIndex != NULL && OldIndex->rd_rel->relam == BTREE_AM_OID)
822                 use_sort = plan_cluster_use_sort(OIDOldHeap, OIDOldIndex);
823         else
824                 use_sort = false;
825
826         /* Set up sorting if wanted */
827         if (use_sort)
828                 tuplesort = tuplesort_begin_cluster(oldTupDesc, OldIndex,
829                                                                                         maintenance_work_mem, false);
830         else
831                 tuplesort = NULL;
832
833         /*
834          * Prepare to scan the OldHeap.  To ensure we see recently-dead tuples
835          * that still need to be copied, we scan with SnapshotAny and use
836          * HeapTupleSatisfiesVacuum for the visibility test.
837          */
838         if (OldIndex != NULL && !use_sort)
839         {
840                 heapScan = NULL;
841                 indexScan = index_beginscan(OldHeap, OldIndex, SnapshotAny, 0, 0);
842                 index_rescan(indexScan, NULL, 0, NULL, 0);
843         }
844         else
845         {
846                 heapScan = heap_beginscan(OldHeap, SnapshotAny, 0, (ScanKey) NULL);
847                 indexScan = NULL;
848         }
849
850         /* Log what we're doing */
851         if (indexScan != NULL)
852                 ereport(elevel,
853                                 (errmsg("clustering \"%s.%s\" using index scan on \"%s\"",
854                                                 get_namespace_name(RelationGetNamespace(OldHeap)),
855                                                 RelationGetRelationName(OldHeap),
856                                                 RelationGetRelationName(OldIndex))));
857         else if (tuplesort != NULL)
858                 ereport(elevel,
859                                 (errmsg("clustering \"%s.%s\" using sequential scan and sort",
860                                                 get_namespace_name(RelationGetNamespace(OldHeap)),
861                                                 RelationGetRelationName(OldHeap))));
862         else
863                 ereport(elevel,
864                                 (errmsg("vacuuming \"%s.%s\"",
865                                                 get_namespace_name(RelationGetNamespace(OldHeap)),
866                                                 RelationGetRelationName(OldHeap))));
867
868         /*
869          * Scan through the OldHeap, either in OldIndex order or sequentially;
870          * copy each tuple into the NewHeap, or transiently to the tuplesort
871          * module.      Note that we don't bother sorting dead tuples (they won't get
872          * to the new table anyway).
873          */
874         for (;;)
875         {
876                 HeapTuple       tuple;
877                 Buffer          buf;
878                 bool            isdead;
879
880                 CHECK_FOR_INTERRUPTS();
881
882                 if (indexScan != NULL)
883                 {
884                         tuple = index_getnext(indexScan, ForwardScanDirection);
885                         if (tuple == NULL)
886                                 break;
887
888                         /* Since we used no scan keys, should never need to recheck */
889                         if (indexScan->xs_recheck)
890                                 elog(ERROR, "CLUSTER does not support lossy index conditions");
891
892                         buf = indexScan->xs_cbuf;
893                 }
894                 else
895                 {
896                         tuple = heap_getnext(heapScan, ForwardScanDirection);
897                         if (tuple == NULL)
898                                 break;
899
900                         buf = heapScan->rs_cbuf;
901                 }
902
903                 LockBuffer(buf, BUFFER_LOCK_SHARE);
904
905                 switch (HeapTupleSatisfiesVacuum(tuple->t_data, OldestXmin, buf))
906                 {
907                         case HEAPTUPLE_DEAD:
908                                 /* Definitely dead */
909                                 isdead = true;
910                                 break;
911                         case HEAPTUPLE_RECENTLY_DEAD:
912                                 tups_recently_dead += 1;
913                                 /* fall through */
914                         case HEAPTUPLE_LIVE:
915                                 /* Live or recently dead, must copy it */
916                                 isdead = false;
917                                 break;
918                         case HEAPTUPLE_INSERT_IN_PROGRESS:
919
920                                 /*
921                                  * Since we hold exclusive lock on the relation, normally the
922                                  * only way to see this is if it was inserted earlier in our
923                                  * own transaction.  However, it can happen in system
924                                  * catalogs, since we tend to release write lock before commit
925                                  * there.  Give a warning if neither case applies; but in any
926                                  * case we had better copy it.
927                                  */
928                                 if (!is_system_catalog &&
929                                         !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(tuple->t_data)))
930                                         elog(WARNING, "concurrent insert in progress within table \"%s\"",
931                                                  RelationGetRelationName(OldHeap));
932                                 /* treat as live */
933                                 isdead = false;
934                                 break;
935                         case HEAPTUPLE_DELETE_IN_PROGRESS:
936
937                                 /*
938                                  * Similar situation to INSERT_IN_PROGRESS case.
939                                  */
940                                 Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI));
941                                 if (!is_system_catalog &&
942                                         !TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmax(tuple->t_data)))
943                                         elog(WARNING, "concurrent delete in progress within table \"%s\"",
944                                                  RelationGetRelationName(OldHeap));
945                                 /* treat as recently dead */
946                                 tups_recently_dead += 1;
947                                 isdead = false;
948                                 break;
949                         default:
950                                 elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
951                                 isdead = false; /* keep compiler quiet */
952                                 break;
953                 }
954
955                 LockBuffer(buf, BUFFER_LOCK_UNLOCK);
956
957                 if (isdead)
958                 {
959                         tups_vacuumed += 1;
960                         /* heap rewrite module still needs to see it... */
961                         if (rewrite_heap_dead_tuple(rwstate, tuple))
962                         {
963                                 /* A previous recently-dead tuple is now known dead */
964                                 tups_vacuumed += 1;
965                                 tups_recently_dead -= 1;
966                         }
967                         continue;
968                 }
969
970                 num_tuples += 1;
971                 if (tuplesort != NULL)
972                         tuplesort_putheaptuple(tuplesort, tuple);
973                 else
974                         reform_and_rewrite_tuple(tuple,
975                                                                          oldTupDesc, newTupDesc,
976                                                                          values, isnull,
977                                                                          NewHeap->rd_rel->relhasoids, rwstate);
978         }
979
980         if (indexScan != NULL)
981                 index_endscan(indexScan);
982         if (heapScan != NULL)
983                 heap_endscan(heapScan);
984
985         /*
986          * In scan-and-sort mode, complete the sort, then read out all live tuples
987          * from the tuplestore and write them to the new relation.
988          */
989         if (tuplesort != NULL)
990         {
991                 tuplesort_performsort(tuplesort);
992
993                 for (;;)
994                 {
995                         HeapTuple       tuple;
996                         bool            shouldfree;
997
998                         CHECK_FOR_INTERRUPTS();
999
1000                         tuple = tuplesort_getheaptuple(tuplesort, true, &shouldfree);
1001                         if (tuple == NULL)
1002                                 break;
1003
1004                         reform_and_rewrite_tuple(tuple,
1005                                                                          oldTupDesc, newTupDesc,
1006                                                                          values, isnull,
1007                                                                          NewHeap->rd_rel->relhasoids, rwstate);
1008
1009                         if (shouldfree)
1010                                 heap_freetuple(tuple);
1011                 }
1012
1013                 tuplesort_end(tuplesort);
1014         }
1015
1016         /* Write out any remaining tuples, and fsync if needed */
1017         end_heap_rewrite(rwstate);
1018
1019         /* Reset rd_toastoid just to be tidy --- it shouldn't be looked at again */
1020         NewHeap->rd_toastoid = InvalidOid;
1021
1022         /* Log what we did */
1023         ereport(elevel,
1024                         (errmsg("\"%s\": found %.0f removable, %.0f nonremovable row versions in %u pages",
1025                                         RelationGetRelationName(OldHeap),
1026                                         tups_vacuumed, num_tuples,
1027                                         RelationGetNumberOfBlocks(OldHeap)),
1028                          errdetail("%.0f dead row versions cannot be removed yet.\n"
1029                                            "%s.",
1030                                            tups_recently_dead,
1031                                            pg_rusage_show(&ru0))));
1032
1033         /* Clean up */
1034         pfree(values);
1035         pfree(isnull);
1036
1037         if (OldIndex != NULL)
1038                 index_close(OldIndex, NoLock);
1039         heap_close(OldHeap, NoLock);
1040         heap_close(NewHeap, NoLock);
1041 }
1042
1043 /*
1044  * Swap the physical files of two given relations.
1045  *
1046  * We swap the physical identity (reltablespace and relfilenode) while
1047  * keeping the same logical identities of the two relations.
1048  *
1049  * We can swap associated TOAST data in either of two ways: recursively swap
1050  * the physical content of the toast tables (and their indexes), or swap the
1051  * TOAST links in the given relations' pg_class entries.  The former is needed
1052  * to manage rewrites of shared catalogs (where we cannot change the pg_class
1053  * links) while the latter is the only way to handle cases in which a toast
1054  * table is added or removed altogether.
1055  *
1056  * Additionally, the first relation is marked with relfrozenxid set to
1057  * frozenXid.  It seems a bit ugly to have this here, but the caller would
1058  * have to do it anyway, so having it here saves a heap_update.  Note: in
1059  * the swap-toast-links case, we assume we don't need to change the toast
1060  * table's relfrozenxid: the new version of the toast table should already
1061  * have relfrozenxid set to RecentXmin, which is good enough.
1062  *
1063  * Lastly, if r2 and its toast table and toast index (if any) are mapped,
1064  * their OIDs are emitted into mapped_tables[].  This is hacky but beats
1065  * having to look the information up again later in finish_heap_swap.
1066  */
1067 static void
1068 swap_relation_files(Oid r1, Oid r2, bool target_is_pg_class,
1069                                         bool swap_toast_by_content,
1070                                         TransactionId frozenXid,
1071                                         Oid *mapped_tables)
1072 {
1073         Relation        relRelation;
1074         HeapTuple       reltup1,
1075                                 reltup2;
1076         Form_pg_class relform1,
1077                                 relform2;
1078         Oid                     relfilenode1,
1079                                 relfilenode2;
1080         Oid                     swaptemp;
1081         CatalogIndexState indstate;
1082
1083         /* We need writable copies of both pg_class tuples. */
1084         relRelation = heap_open(RelationRelationId, RowExclusiveLock);
1085
1086         reltup1 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r1));
1087         if (!HeapTupleIsValid(reltup1))
1088                 elog(ERROR, "cache lookup failed for relation %u", r1);
1089         relform1 = (Form_pg_class) GETSTRUCT(reltup1);
1090
1091         reltup2 = SearchSysCacheCopy1(RELOID, ObjectIdGetDatum(r2));
1092         if (!HeapTupleIsValid(reltup2))
1093                 elog(ERROR, "cache lookup failed for relation %u", r2);
1094         relform2 = (Form_pg_class) GETSTRUCT(reltup2);
1095
1096         relfilenode1 = relform1->relfilenode;
1097         relfilenode2 = relform2->relfilenode;
1098
1099         if (OidIsValid(relfilenode1) && OidIsValid(relfilenode2))
1100         {
1101                 /* Normal non-mapped relations: swap relfilenodes and reltablespaces */
1102                 Assert(!target_is_pg_class);
1103
1104                 swaptemp = relform1->relfilenode;
1105                 relform1->relfilenode = relform2->relfilenode;
1106                 relform2->relfilenode = swaptemp;
1107
1108                 swaptemp = relform1->reltablespace;
1109                 relform1->reltablespace = relform2->reltablespace;
1110                 relform2->reltablespace = swaptemp;
1111
1112                 /* Also swap toast links, if we're swapping by links */
1113                 if (!swap_toast_by_content)
1114                 {
1115                         swaptemp = relform1->reltoastrelid;
1116                         relform1->reltoastrelid = relform2->reltoastrelid;
1117                         relform2->reltoastrelid = swaptemp;
1118
1119                         /* we should NOT swap reltoastidxid */
1120                 }
1121         }
1122         else
1123         {
1124                 /*
1125                  * Mapped-relation case.  Here we have to swap the relation mappings
1126                  * instead of modifying the pg_class columns.  Both must be mapped.
1127                  */
1128                 if (OidIsValid(relfilenode1) || OidIsValid(relfilenode2))
1129                         elog(ERROR, "cannot swap mapped relation \"%s\" with non-mapped relation",
1130                                  NameStr(relform1->relname));
1131
1132                 /*
1133                  * We can't change the tablespace of a mapped rel, and we can't handle
1134                  * toast link swapping for one either, because we must not apply any
1135                  * critical changes to its pg_class row.  These cases should be
1136                  * prevented by upstream permissions tests, so this check is a
1137                  * non-user-facing emergency backstop.
1138                  */
1139                 if (relform1->reltablespace != relform2->reltablespace)
1140                         elog(ERROR, "cannot change tablespace of mapped relation \"%s\"",
1141                                  NameStr(relform1->relname));
1142                 if (!swap_toast_by_content &&
1143                         (relform1->reltoastrelid || relform2->reltoastrelid))
1144                         elog(ERROR, "cannot swap toast by links for mapped relation \"%s\"",
1145                                  NameStr(relform1->relname));
1146
1147                 /*
1148                  * Fetch the mappings --- shouldn't fail, but be paranoid
1149                  */
1150                 relfilenode1 = RelationMapOidToFilenode(r1, relform1->relisshared);
1151                 if (!OidIsValid(relfilenode1))
1152                         elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1153                                  NameStr(relform1->relname), r1);
1154                 relfilenode2 = RelationMapOidToFilenode(r2, relform2->relisshared);
1155                 if (!OidIsValid(relfilenode2))
1156                         elog(ERROR, "could not find relation mapping for relation \"%s\", OID %u",
1157                                  NameStr(relform2->relname), r2);
1158
1159                 /*
1160                  * Send replacement mappings to relmapper.      Note these won't actually
1161                  * take effect until CommandCounterIncrement.
1162                  */
1163                 RelationMapUpdateMap(r1, relfilenode2, relform1->relisshared, false);
1164                 RelationMapUpdateMap(r2, relfilenode1, relform2->relisshared, false);
1165
1166                 /* Pass OIDs of mapped r2 tables back to caller */
1167                 *mapped_tables++ = r2;
1168         }
1169
1170         /*
1171          * In the case of a shared catalog, these next few steps will only affect
1172          * our own database's pg_class row; but that's okay, because they are all
1173          * noncritical updates.  That's also an important fact for the case of a
1174          * mapped catalog, because it's possible that we'll commit the map change
1175          * and then fail to commit the pg_class update.
1176          */
1177
1178         /* set rel1's frozen Xid */
1179         if (relform1->relkind != RELKIND_INDEX)
1180         {
1181                 Assert(TransactionIdIsNormal(frozenXid));
1182                 relform1->relfrozenxid = frozenXid;
1183         }
1184
1185         /* swap size statistics too, since new rel has freshly-updated stats */
1186         {
1187                 int4            swap_pages;
1188                 float4          swap_tuples;
1189
1190                 swap_pages = relform1->relpages;
1191                 relform1->relpages = relform2->relpages;
1192                 relform2->relpages = swap_pages;
1193
1194                 swap_tuples = relform1->reltuples;
1195                 relform1->reltuples = relform2->reltuples;
1196                 relform2->reltuples = swap_tuples;
1197         }
1198
1199         /*
1200          * Update the tuples in pg_class --- unless the target relation of the
1201          * swap is pg_class itself.  In that case, there is zero point in making
1202          * changes because we'd be updating the old data that we're about to throw
1203          * away.  Because the real work being done here for a mapped relation is
1204          * just to change the relation map settings, it's all right to not update
1205          * the pg_class rows in this case.
1206          */
1207         if (!target_is_pg_class)
1208         {
1209                 simple_heap_update(relRelation, &reltup1->t_self, reltup1);
1210                 simple_heap_update(relRelation, &reltup2->t_self, reltup2);
1211
1212                 /* Keep system catalogs current */
1213                 indstate = CatalogOpenIndexes(relRelation);
1214                 CatalogIndexInsert(indstate, reltup1);
1215                 CatalogIndexInsert(indstate, reltup2);
1216                 CatalogCloseIndexes(indstate);
1217         }
1218         else
1219         {
1220                 /* no update ... but we do still need relcache inval */
1221                 CacheInvalidateRelcacheByTuple(reltup1);
1222                 CacheInvalidateRelcacheByTuple(reltup2);
1223         }
1224
1225         /*
1226          * If we have toast tables associated with the relations being swapped,
1227          * deal with them too.
1228          */
1229         if (relform1->reltoastrelid || relform2->reltoastrelid)
1230         {
1231                 if (swap_toast_by_content)
1232                 {
1233                         if (relform1->reltoastrelid && relform2->reltoastrelid)
1234                         {
1235                                 /* Recursively swap the contents of the toast tables */
1236                                 swap_relation_files(relform1->reltoastrelid,
1237                                                                         relform2->reltoastrelid,
1238                                                                         target_is_pg_class,
1239                                                                         swap_toast_by_content,
1240                                                                         frozenXid,
1241                                                                         mapped_tables);
1242                         }
1243                         else
1244                         {
1245                                 /* caller messed up */
1246                                 elog(ERROR, "cannot swap toast files by content when there's only one");
1247                         }
1248                 }
1249                 else
1250                 {
1251                         /*
1252                          * We swapped the ownership links, so we need to change dependency
1253                          * data to match.
1254                          *
1255                          * NOTE: it is possible that only one table has a toast table.
1256                          *
1257                          * NOTE: at present, a TOAST table's only dependency is the one on
1258                          * its owning table.  If more are ever created, we'd need to use
1259                          * something more selective than deleteDependencyRecordsFor() to
1260                          * get rid of just the link we want.
1261                          */
1262                         ObjectAddress baseobject,
1263                                                 toastobject;
1264                         long            count;
1265
1266                         /*
1267                          * We disallow this case for system catalogs, to avoid the
1268                          * possibility that the catalog we're rebuilding is one of the
1269                          * ones the dependency changes would change.  It's too late to be
1270                          * making any data changes to the target catalog.
1271                          */
1272                         if (IsSystemClass(relform1))
1273                                 elog(ERROR, "cannot swap toast files by links for system catalogs");
1274
1275                         /* Delete old dependencies */
1276                         if (relform1->reltoastrelid)
1277                         {
1278                                 count = deleteDependencyRecordsFor(RelationRelationId,
1279                                                                                                    relform1->reltoastrelid,
1280                                                                                                    false);
1281                                 if (count != 1)
1282                                         elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1283                                                  count);
1284                         }
1285                         if (relform2->reltoastrelid)
1286                         {
1287                                 count = deleteDependencyRecordsFor(RelationRelationId,
1288                                                                                                    relform2->reltoastrelid,
1289                                                                                                    false);
1290                                 if (count != 1)
1291                                         elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1292                                                  count);
1293                         }
1294
1295                         /* Register new dependencies */
1296                         baseobject.classId = RelationRelationId;
1297                         baseobject.objectSubId = 0;
1298                         toastobject.classId = RelationRelationId;
1299                         toastobject.objectSubId = 0;
1300
1301                         if (relform1->reltoastrelid)
1302                         {
1303                                 baseobject.objectId = r1;
1304                                 toastobject.objectId = relform1->reltoastrelid;
1305                                 recordDependencyOn(&toastobject, &baseobject,
1306                                                                    DEPENDENCY_INTERNAL);
1307                         }
1308
1309                         if (relform2->reltoastrelid)
1310                         {
1311                                 baseobject.objectId = r2;
1312                                 toastobject.objectId = relform2->reltoastrelid;
1313                                 recordDependencyOn(&toastobject, &baseobject,
1314                                                                    DEPENDENCY_INTERNAL);
1315                         }
1316                 }
1317         }
1318
1319         /*
1320          * If we're swapping two toast tables by content, do the same for their
1321          * indexes.
1322          */
1323         if (swap_toast_by_content &&
1324                 relform1->reltoastidxid && relform2->reltoastidxid)
1325                 swap_relation_files(relform1->reltoastidxid,
1326                                                         relform2->reltoastidxid,
1327                                                         target_is_pg_class,
1328                                                         swap_toast_by_content,
1329                                                         InvalidTransactionId,
1330                                                         mapped_tables);
1331
1332         /* Clean up. */
1333         heap_freetuple(reltup1);
1334         heap_freetuple(reltup2);
1335
1336         heap_close(relRelation, RowExclusiveLock);
1337
1338         /*
1339          * Close both relcache entries' smgr links.  We need this kluge because
1340          * both links will be invalidated during upcoming CommandCounterIncrement.
1341          * Whichever of the rels is the second to be cleared will have a dangling
1342          * reference to the other's smgr entry.  Rather than trying to avoid this
1343          * by ordering operations just so, it's easiest to close the links first.
1344          * (Fortunately, since one of the entries is local in our transaction,
1345          * it's sufficient to clear out our own relcache this way; the problem
1346          * cannot arise for other backends when they see our update on the
1347          * non-transient relation.)
1348          *
1349          * Caution: the placement of this step interacts with the decision to
1350          * handle toast rels by recursion.      When we are trying to rebuild pg_class
1351          * itself, the smgr close on pg_class must happen after all accesses in
1352          * this function.
1353          */
1354         RelationCloseSmgrByOid(r1);
1355         RelationCloseSmgrByOid(r2);
1356 }
1357
1358 /*
1359  * Remove the transient table that was built by make_new_heap, and finish
1360  * cleaning up (including rebuilding all indexes on the old heap).
1361  */
1362 void
1363 finish_heap_swap(Oid OIDOldHeap, Oid OIDNewHeap,
1364                                  bool is_system_catalog,
1365                                  bool swap_toast_by_content,
1366                                  bool check_constraints,
1367                                  TransactionId frozenXid)
1368 {
1369         ObjectAddress object;
1370         Oid                     mapped_tables[4];
1371         int                     reindex_flags;
1372         int                     i;
1373
1374         /* Zero out possible results from swapped_relation_files */
1375         memset(mapped_tables, 0, sizeof(mapped_tables));
1376
1377         /*
1378          * Swap the contents of the heap relations (including any toast tables).
1379          * Also set old heap's relfrozenxid to frozenXid.
1380          */
1381         swap_relation_files(OIDOldHeap, OIDNewHeap,
1382                                                 (OIDOldHeap == RelationRelationId),
1383                                                 swap_toast_by_content, frozenXid, mapped_tables);
1384
1385         /*
1386          * If it's a system catalog, queue an sinval message to flush all
1387          * catcaches on the catalog when we reach CommandCounterIncrement.
1388          */
1389         if (is_system_catalog)
1390                 CacheInvalidateCatalog(OIDOldHeap);
1391
1392         /*
1393          * Rebuild each index on the relation (but not the toast table, which is
1394          * all-new at this point).      It is important to do this before the DROP
1395          * step because if we are processing a system catalog that will be used
1396          * during DROP, we want to have its indexes available.  There is no
1397          * advantage to the other order anyway because this is all transactional,
1398          * so no chance to reclaim disk space before commit.  We do not need a
1399          * final CommandCounterIncrement() because reindex_relation does it.
1400          *
1401          * Note: because index_build is called via reindex_relation, it will never
1402          * set indcheckxmin true for the indexes.  This is OK even though in some
1403          * sense we are building new indexes rather than rebuilding existing ones,
1404          * because the new heap won't contain any HOT chains at all, let alone
1405          * broken ones, so it can't be necessary to set indcheckxmin.
1406          */
1407         reindex_flags = REINDEX_REL_SUPPRESS_INDEX_USE;
1408         if (check_constraints)
1409                 reindex_flags |= REINDEX_REL_CHECK_CONSTRAINTS;
1410         reindex_relation(OIDOldHeap, reindex_flags);
1411
1412         /* Destroy new heap with old filenode */
1413         object.classId = RelationRelationId;
1414         object.objectId = OIDNewHeap;
1415         object.objectSubId = 0;
1416
1417         /*
1418          * The new relation is local to our transaction and we know nothing
1419          * depends on it, so DROP_RESTRICT should be OK.
1420          */
1421         performDeletion(&object, DROP_RESTRICT);
1422
1423         /* performDeletion does CommandCounterIncrement at end */
1424
1425         /*
1426          * Now we must remove any relation mapping entries that we set up for the
1427          * transient table, as well as its toast table and toast index if any. If
1428          * we fail to do this before commit, the relmapper will complain about new
1429          * permanent map entries being added post-bootstrap.
1430          */
1431         for (i = 0; OidIsValid(mapped_tables[i]); i++)
1432                 RelationMapRemoveMapping(mapped_tables[i]);
1433
1434         /*
1435          * At this point, everything is kosher except that, if we did toast swap
1436          * by links, the toast table's name corresponds to the transient table.
1437          * The name is irrelevant to the backend because it's referenced by OID,
1438          * but users looking at the catalogs could be confused.  Rename it to
1439          * prevent this problem.
1440          *
1441          * Note no lock required on the relation, because we already hold an
1442          * exclusive lock on it.
1443          */
1444         if (!swap_toast_by_content)
1445         {
1446                 Relation        newrel;
1447
1448                 newrel = heap_open(OIDOldHeap, NoLock);
1449                 if (OidIsValid(newrel->rd_rel->reltoastrelid))
1450                 {
1451                         Relation        toastrel;
1452                         Oid                     toastidx;
1453                         Oid                     toastnamespace;
1454                         char            NewToastName[NAMEDATALEN];
1455
1456                         toastrel = relation_open(newrel->rd_rel->reltoastrelid,
1457                                                                          AccessShareLock);
1458                         toastidx = toastrel->rd_rel->reltoastidxid;
1459                         toastnamespace = toastrel->rd_rel->relnamespace;
1460                         relation_close(toastrel, AccessShareLock);
1461
1462                         /* rename the toast table ... */
1463                         snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u",
1464                                          OIDOldHeap);
1465                         RenameRelationInternal(newrel->rd_rel->reltoastrelid,
1466                                                                    NewToastName,
1467                                                                    toastnamespace);
1468
1469                         /* ... and its index too */
1470                         snprintf(NewToastName, NAMEDATALEN, "pg_toast_%u_index",
1471                                          OIDOldHeap);
1472                         RenameRelationInternal(toastidx,
1473                                                                    NewToastName,
1474                                                                    toastnamespace);
1475                 }
1476                 relation_close(newrel, NoLock);
1477         }
1478 }
1479
1480
1481 /*
1482  * Get a list of tables that the current user owns and
1483  * have indisclustered set.  Return the list in a List * of rvsToCluster
1484  * with the tableOid and the indexOid on which the table is already
1485  * clustered.
1486  */
1487 static List *
1488 get_tables_to_cluster(MemoryContext cluster_context)
1489 {
1490         Relation        indRelation;
1491         HeapScanDesc scan;
1492         ScanKeyData entry;
1493         HeapTuple       indexTuple;
1494         Form_pg_index index;
1495         MemoryContext old_context;
1496         RelToCluster *rvtc;
1497         List       *rvs = NIL;
1498
1499         /*
1500          * Get all indexes that have indisclustered set and are owned by
1501          * appropriate user. System relations or nailed-in relations cannot ever
1502          * have indisclustered set, because CLUSTER will refuse to set it when
1503          * called with one of them as argument.
1504          */
1505         indRelation = heap_open(IndexRelationId, AccessShareLock);
1506         ScanKeyInit(&entry,
1507                                 Anum_pg_index_indisclustered,
1508                                 BTEqualStrategyNumber, F_BOOLEQ,
1509                                 BoolGetDatum(true));
1510         scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
1511         while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1512         {
1513                 index = (Form_pg_index) GETSTRUCT(indexTuple);
1514
1515                 if (!pg_class_ownercheck(index->indrelid, GetUserId()))
1516                         continue;
1517
1518                 /*
1519                  * We have to build the list in a different memory context so it will
1520                  * survive the cross-transaction processing
1521                  */
1522                 old_context = MemoryContextSwitchTo(cluster_context);
1523
1524                 rvtc = (RelToCluster *) palloc(sizeof(RelToCluster));
1525                 rvtc->tableOid = index->indrelid;
1526                 rvtc->indexOid = index->indexrelid;
1527                 rvs = lcons(rvtc, rvs);
1528
1529                 MemoryContextSwitchTo(old_context);
1530         }
1531         heap_endscan(scan);
1532
1533         relation_close(indRelation, AccessShareLock);
1534
1535         return rvs;
1536 }
1537
1538
1539 /*
1540  * Reconstruct and rewrite the given tuple
1541  *
1542  * We cannot simply copy the tuple as-is, for several reasons:
1543  *
1544  * 1. We'd like to squeeze out the values of any dropped columns, both
1545  * to save space and to ensure we have no corner-case failures. (It's
1546  * possible for example that the new table hasn't got a TOAST table
1547  * and so is unable to store any large values of dropped cols.)
1548  *
1549  * 2. The tuple might not even be legal for the new table; this is
1550  * currently only known to happen as an after-effect of ALTER TABLE
1551  * SET WITHOUT OIDS.
1552  *
1553  * So, we must reconstruct the tuple from component Datums.
1554  */
1555 static void
1556 reform_and_rewrite_tuple(HeapTuple tuple,
1557                                                  TupleDesc oldTupDesc, TupleDesc newTupDesc,
1558                                                  Datum *values, bool *isnull,
1559                                                  bool newRelHasOids, RewriteState rwstate)
1560 {
1561         HeapTuple       copiedTuple;
1562         int                     i;
1563
1564         heap_deform_tuple(tuple, oldTupDesc, values, isnull);
1565
1566         /* Be sure to null out any dropped columns */
1567         for (i = 0; i < newTupDesc->natts; i++)
1568         {
1569                 if (newTupDesc->attrs[i]->attisdropped)
1570                         isnull[i] = true;
1571         }
1572
1573         copiedTuple = heap_form_tuple(newTupDesc, values, isnull);
1574
1575         /* Preserve OID, if any */
1576         if (newRelHasOids)
1577                 HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));
1578
1579         /* The heap rewrite module does the rest */
1580         rewrite_heap_tuple(rwstate, tuple, copiedTuple);
1581
1582         heap_freetuple(copiedTuple);
1583 }