OSDN Git Service

Improve our #include situation by moving pointer types away from the
[pg-rex/syncrep.git] / src / backend / commands / cluster.c
1 /*-------------------------------------------------------------------------
2  *
3  * cluster.c
4  *        CLUSTER a table on an index.
5  *
6  * There is hardly anything left of Paul Brown's original implementation...
7  *
8  *
9  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
10  * Portions Copyright (c) 1994-5, Regents of the University of California
11  *
12  *
13  * IDENTIFICATION
14  *        $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.177 2008/06/19 00:46:04 alvherre Exp $
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19
20 #include "access/genam.h"
21 #include "access/heapam.h"
22 #include "access/relscan.h"
23 #include "access/rewriteheap.h"
24 #include "access/transam.h"
25 #include "access/xact.h"
26 #include "catalog/catalog.h"
27 #include "catalog/dependency.h"
28 #include "catalog/heap.h"
29 #include "catalog/index.h"
30 #include "catalog/indexing.h"
31 #include "catalog/namespace.h"
32 #include "catalog/toasting.h"
33 #include "commands/cluster.h"
34 #include "commands/tablecmds.h"
35 #include "commands/trigger.h"
36 #include "commands/vacuum.h"
37 #include "miscadmin.h"
38 #include "storage/bufmgr.h"
39 #include "storage/procarray.h"
40 #include "utils/acl.h"
41 #include "utils/fmgroids.h"
42 #include "utils/inval.h"
43 #include "utils/lsyscache.h"
44 #include "utils/memutils.h"
45 #include "utils/relcache.h"
46 #include "utils/snapmgr.h"
47 #include "utils/syscache.h"
48 #include "utils/tqual.h"
49
50
51 /*
52  * This struct is used to pass around the information on tables to be
53  * clustered. We need this so we can make a list of them when invoked without
54  * a specific table/index pair.
55  */
56 typedef struct
57 {
58         Oid                     tableOid;
59         Oid                     indexOid;
60 } RelToCluster;
61
62
63 static void cluster_rel(RelToCluster *rv, bool recheck);
64 static void rebuild_relation(Relation OldHeap, Oid indexOid);
65 static TransactionId copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
66 static List *get_tables_to_cluster(MemoryContext cluster_context);
67
68
69
70 /*---------------------------------------------------------------------------
71  * This cluster code allows for clustering multiple tables at once. Because
72  * of this, we cannot just run everything on a single transaction, or we
73  * would be forced to acquire exclusive locks on all the tables being
74  * clustered, simultaneously --- very likely leading to deadlock.
75  *
76  * To solve this we follow a similar strategy to VACUUM code,
77  * clustering each relation in a separate transaction. For this to work,
78  * we need to:
79  *      - provide a separate memory context so that we can pass information in
80  *        a way that survives across transactions
81  *      - start a new transaction every time a new relation is clustered
82  *      - check for validity of the information on to-be-clustered relations,
83  *        as someone might have deleted a relation behind our back, or
84  *        clustered one on a different index
85  *      - end the transaction
86  *
87  * The single-relation case does not have any such overhead.
88  *
89  * We also allow a relation to be specified without index.      In that case,
90  * the indisclustered bit will be looked up, and an ERROR will be thrown
91  * if there is no index with the bit set.
92  *---------------------------------------------------------------------------
93  */
94 void
95 cluster(ClusterStmt *stmt, bool isTopLevel)
96 {
97         if (stmt->relation != NULL)
98         {
99                 /* This is the single-relation case. */
100                 Oid                     tableOid,
101                                         indexOid = InvalidOid;
102                 Relation        rel;
103                 RelToCluster rvtc;
104
105                 /* Find and lock the table */
106                 rel = heap_openrv(stmt->relation, AccessExclusiveLock);
107
108                 tableOid = RelationGetRelid(rel);
109
110                 /* Check permissions */
111                 if (!pg_class_ownercheck(tableOid, GetUserId()))
112                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
113                                                    RelationGetRelationName(rel));
114
115                 /*
116                  * Reject clustering a remote temp table ... their local buffer
117                  * manager is not going to cope.
118                  */
119                 if (isOtherTempNamespace(RelationGetNamespace(rel)))
120                         ereport(ERROR,
121                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
122                            errmsg("cannot cluster temporary tables of other sessions")));
123
124                 if (stmt->indexname == NULL)
125                 {
126                         ListCell   *index;
127
128                         /* We need to find the index that has indisclustered set. */
129                         foreach(index, RelationGetIndexList(rel))
130                         {
131                                 HeapTuple       idxtuple;
132                                 Form_pg_index indexForm;
133
134                                 indexOid = lfirst_oid(index);
135                                 idxtuple = SearchSysCache(INDEXRELID,
136                                                                                   ObjectIdGetDatum(indexOid),
137                                                                                   0, 0, 0);
138                                 if (!HeapTupleIsValid(idxtuple))
139                                         elog(ERROR, "cache lookup failed for index %u", indexOid);
140                                 indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
141                                 if (indexForm->indisclustered)
142                                 {
143                                         ReleaseSysCache(idxtuple);
144                                         break;
145                                 }
146                                 ReleaseSysCache(idxtuple);
147                                 indexOid = InvalidOid;
148                         }
149
150                         if (!OidIsValid(indexOid))
151                                 ereport(ERROR,
152                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
153                                                  errmsg("there is no previously clustered index for table \"%s\"",
154                                                                 stmt->relation->relname)));
155                 }
156                 else
157                 {
158                         /*
159                          * The index is expected to be in the same namespace as the
160                          * relation.
161                          */
162                         indexOid = get_relname_relid(stmt->indexname,
163                                                                                  rel->rd_rel->relnamespace);
164                         if (!OidIsValid(indexOid))
165                                 ereport(ERROR,
166                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
167                                            errmsg("index \"%s\" for table \"%s\" does not exist",
168                                                           stmt->indexname, stmt->relation->relname)));
169                 }
170
171                 /* All other checks are done in cluster_rel() */
172                 rvtc.tableOid = tableOid;
173                 rvtc.indexOid = indexOid;
174
175                 /* close relation, keep lock till commit */
176                 heap_close(rel, NoLock);
177
178                 /* Do the job */
179                 cluster_rel(&rvtc, false);
180         }
181         else
182         {
183                 /*
184                  * This is the "multi relation" case. We need to cluster all tables
185                  * that have some index with indisclustered set.
186                  */
187                 MemoryContext cluster_context;
188                 List       *rvs;
189                 ListCell   *rv;
190
191                 /*
192                  * We cannot run this form of CLUSTER inside a user transaction block;
193                  * we'd be holding locks way too long.
194                  */
195                 PreventTransactionChain(isTopLevel, "CLUSTER");
196
197                 /*
198                  * Create special memory context for cross-transaction storage.
199                  *
200                  * Since it is a child of PortalContext, it will go away even in case
201                  * of error.
202                  */
203                 cluster_context = AllocSetContextCreate(PortalContext,
204                                                                                                 "Cluster",
205                                                                                                 ALLOCSET_DEFAULT_MINSIZE,
206                                                                                                 ALLOCSET_DEFAULT_INITSIZE,
207                                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
208
209                 /*
210                  * Build the list of relations to cluster.      Note that this lives in
211                  * cluster_context.
212                  */
213                 rvs = get_tables_to_cluster(cluster_context);
214
215                 /* Commit to get out of starting transaction */
216                 PopActiveSnapshot();
217                 CommitTransactionCommand();
218
219                 /* Ok, now that we've got them all, cluster them one by one */
220                 foreach(rv, rvs)
221                 {
222                         RelToCluster *rvtc = (RelToCluster *) lfirst(rv);
223
224                         /* Start a new transaction for each relation. */
225                         StartTransactionCommand();
226                         /* functions in indexes may want a snapshot set */
227                         PushActiveSnapshot(GetTransactionSnapshot());
228                         cluster_rel(rvtc, true);
229                         PopActiveSnapshot();
230                         CommitTransactionCommand();
231                 }
232
233                 /* Start a new transaction for the cleanup work. */
234                 StartTransactionCommand();
235
236                 /* Clean up working storage */
237                 MemoryContextDelete(cluster_context);
238         }
239 }
240
241 /*
242  * cluster_rel
243  *
244  * This clusters the table by creating a new, clustered table and
245  * swapping the relfilenodes of the new table and the old table, so
246  * the OID of the original table is preserved.  Thus we do not lose
247  * GRANT, inheritance nor references to this table (this was a bug
248  * in releases thru 7.3).
249  *
250  * Also create new indexes and swap the filenodes with the old indexes the
251  * same way we do for the relation.  Since we are effectively bulk-loading
252  * the new table, it's better to create the indexes afterwards than to fill
253  * them incrementally while we load the table.
254  */
255 static void
256 cluster_rel(RelToCluster *rvtc, bool recheck)
257 {
258         Relation        OldHeap;
259
260         /* Check for user-requested abort. */
261         CHECK_FOR_INTERRUPTS();
262
263         /*
264          * We grab exclusive access to the target rel and index for the duration
265          * of the transaction.  (This is redundant for the single-transaction
266          * case, since cluster() already did it.)  The index lock is taken inside
267          * check_index_is_clusterable.
268          */
269         OldHeap = try_relation_open(rvtc->tableOid, AccessExclusiveLock);
270
271         /* If the table has gone away, we can skip processing it */
272         if (!OldHeap)
273                 return;
274
275         /*
276          * Since we may open a new transaction for each relation, we have to check
277          * that the relation still is what we think it is.
278          *
279          * If this is a single-transaction CLUSTER, we can skip these tests. We
280          * *must* skip the one on indisclustered since it would reject an attempt
281          * to cluster a not-previously-clustered index.
282          */
283         if (recheck)
284         {
285                 HeapTuple       tuple;
286                 Form_pg_index indexForm;
287
288                 /* Check that the user still owns the relation */
289                 if (!pg_class_ownercheck(rvtc->tableOid, GetUserId()))
290                 {
291                         relation_close(OldHeap, AccessExclusiveLock);
292                         return;
293                 }
294
295                 /*
296                  * Silently skip a temp table for a remote session.  Only doing this
297                  * check in the "recheck" case is appropriate (which currently means
298                  * somebody is executing a database-wide CLUSTER), because there is
299                  * another check in cluster() which will stop any attempt to cluster
300                  * remote temp tables by name.  There is another check in
301                  * check_index_is_clusterable which is redundant, but we leave it for
302                  * extra safety.
303                  */
304                 if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
305                 {
306                         relation_close(OldHeap, AccessExclusiveLock);
307                         return;
308                 }
309
310                 /*
311                  * Check that the index still exists
312                  */
313                 if (!SearchSysCacheExists(RELOID,
314                                                                   ObjectIdGetDatum(rvtc->indexOid),
315                                                                   0, 0, 0))
316                 {
317                         relation_close(OldHeap, AccessExclusiveLock);
318                         return;
319                 }
320
321                 /*
322                  * Check that the index is still the one with indisclustered set.
323                  */
324                 tuple = SearchSysCache(INDEXRELID,
325                                                            ObjectIdGetDatum(rvtc->indexOid),
326                                                            0, 0, 0);
327                 if (!HeapTupleIsValid(tuple))   /* probably can't happen */
328                 {
329                         relation_close(OldHeap, AccessExclusiveLock);
330                         return;
331                 }
332                 indexForm = (Form_pg_index) GETSTRUCT(tuple);
333                 if (!indexForm->indisclustered)
334                 {
335                         ReleaseSysCache(tuple);
336                         relation_close(OldHeap, AccessExclusiveLock);
337                         return;
338                 }
339                 ReleaseSysCache(tuple);
340         }
341
342         /* Check index is valid to cluster on */
343         check_index_is_clusterable(OldHeap, rvtc->indexOid, recheck);
344
345         /* rebuild_relation does all the dirty work */
346         rebuild_relation(OldHeap, rvtc->indexOid);
347
348         /* NB: rebuild_relation does heap_close() on OldHeap */
349 }
350
351 /*
352  * Verify that the specified index is a legitimate index to cluster on
353  *
354  * Side effect: obtains exclusive lock on the index.  The caller should
355  * already have exclusive lock on the table, so the index lock is likely
356  * redundant, but it seems best to grab it anyway to ensure the index
357  * definition can't change under us.
358  */
359 void
360 check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck)
361 {
362         Relation        OldIndex;
363
364         OldIndex = index_open(indexOid, AccessExclusiveLock);
365
366         /*
367          * Check that index is in fact an index on the given relation
368          */
369         if (OldIndex->rd_index == NULL ||
370                 OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
371                 ereport(ERROR,
372                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
373                                  errmsg("\"%s\" is not an index for table \"%s\"",
374                                                 RelationGetRelationName(OldIndex),
375                                                 RelationGetRelationName(OldHeap))));
376
377         /*
378          * Disallow clustering on incomplete indexes (those that might not index
379          * every row of the relation).  We could relax this by making a separate
380          * seqscan pass over the table to copy the missing rows, but that seems
381          * expensive and tedious.
382          */
383         if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred))
384                 ereport(ERROR,
385                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
386                                  errmsg("cannot cluster on partial index \"%s\"",
387                                                 RelationGetRelationName(OldIndex))));
388
389         if (!OldIndex->rd_am->amclusterable)
390                 ereport(ERROR,
391                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
392                                  errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
393                                                 RelationGetRelationName(OldIndex))));
394
395         if (!OldIndex->rd_am->amindexnulls)
396         {
397                 AttrNumber      colno;
398
399                 /*
400                  * If the AM doesn't index nulls, then it's a partial index unless we
401                  * can prove all the rows are non-null.  Note we only need look at the
402                  * first column; multicolumn-capable AMs are *required* to index nulls
403                  * in columns after the first.
404                  */
405                 colno = OldIndex->rd_index->indkey.values[0];
406                 if (colno > 0)
407                 {
408                         /* ordinary user attribute */
409                         if (!OldHeap->rd_att->attrs[colno - 1]->attnotnull)
410                                 ereport(ERROR,
411                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
412                                                  errmsg("cannot cluster on index \"%s\" because access method does not handle null values",
413                                                                 RelationGetRelationName(OldIndex)),
414                                                  recheck
415                                                  ? errhint("You might be able to work around this by marking column \"%s\" NOT NULL, or use ALTER TABLE ... SET WITHOUT CLUSTER to remove the cluster specification from the table.",
416                                                  NameStr(OldHeap->rd_att->attrs[colno - 1]->attname))
417                                                  : errhint("You might be able to work around this by marking column \"%s\" NOT NULL.",
418                                           NameStr(OldHeap->rd_att->attrs[colno - 1]->attname))));
419                 }
420                 else if (colno < 0)
421                 {
422                         /* system column --- okay, always non-null */
423                 }
424                 else
425                         /* index expression, lose... */
426                         ereport(ERROR,
427                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
428                                          errmsg("cannot cluster on expressional index \"%s\" because its index access method does not handle null values",
429                                                         RelationGetRelationName(OldIndex))));
430         }
431
432         /*
433          * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
434          * it might well not contain entries for every heap row, or might not even
435          * be internally consistent.  (But note that we don't check indcheckxmin;
436          * the worst consequence of following broken HOT chains would be that we
437          * might put recently-dead tuples out-of-order in the new table, and there
438          * is little harm in that.)
439          */
440         if (!OldIndex->rd_index->indisvalid)
441                 ereport(ERROR,
442                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
443                                  errmsg("cannot cluster on invalid index \"%s\"",
444                                                 RelationGetRelationName(OldIndex))));
445
446         /*
447          * Disallow clustering system relations.  This will definitely NOT work
448          * for shared relations (we have no way to update pg_class rows in other
449          * databases), nor for nailed-in-cache relations (the relfilenode values
450          * for those are hardwired, see relcache.c).  It might work for other
451          * system relations, but I ain't gonna risk it.
452          */
453         if (IsSystemRelation(OldHeap))
454                 ereport(ERROR,
455                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
456                                  errmsg("\"%s\" is a system catalog",
457                                                 RelationGetRelationName(OldHeap))));
458
459         /*
460          * Don't allow cluster on temp tables of other backends ... their local
461          * buffer manager is not going to cope.
462          */
463         if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
464                 ereport(ERROR,
465                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
466                            errmsg("cannot cluster temporary tables of other sessions")));
467
468         /*
469          * Also check for active uses of the relation in the current transaction,
470          * including open scans and pending AFTER trigger events.
471          */
472         CheckTableNotInUse(OldHeap, "CLUSTER");
473
474         /* Drop relcache refcnt on OldIndex, but keep lock */
475         index_close(OldIndex, NoLock);
476 }
477
478 /*
479  * mark_index_clustered: mark the specified index as the one clustered on
480  *
481  * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
482  */
483 void
484 mark_index_clustered(Relation rel, Oid indexOid)
485 {
486         HeapTuple       indexTuple;
487         Form_pg_index indexForm;
488         Relation        pg_index;
489         ListCell   *index;
490
491         /*
492          * If the index is already marked clustered, no need to do anything.
493          */
494         if (OidIsValid(indexOid))
495         {
496                 indexTuple = SearchSysCache(INDEXRELID,
497                                                                         ObjectIdGetDatum(indexOid),
498                                                                         0, 0, 0);
499                 if (!HeapTupleIsValid(indexTuple))
500                         elog(ERROR, "cache lookup failed for index %u", indexOid);
501                 indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
502
503                 if (indexForm->indisclustered)
504                 {
505                         ReleaseSysCache(indexTuple);
506                         return;
507                 }
508
509                 ReleaseSysCache(indexTuple);
510         }
511
512         /*
513          * Check each index of the relation and set/clear the bit as needed.
514          */
515         pg_index = heap_open(IndexRelationId, RowExclusiveLock);
516
517         foreach(index, RelationGetIndexList(rel))
518         {
519                 Oid                     thisIndexOid = lfirst_oid(index);
520
521                 indexTuple = SearchSysCacheCopy(INDEXRELID,
522                                                                                 ObjectIdGetDatum(thisIndexOid),
523                                                                                 0, 0, 0);
524                 if (!HeapTupleIsValid(indexTuple))
525                         elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
526                 indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
527
528                 /*
529                  * Unset the bit if set.  We know it's wrong because we checked this
530                  * earlier.
531                  */
532                 if (indexForm->indisclustered)
533                 {
534                         indexForm->indisclustered = false;
535                         simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
536                         CatalogUpdateIndexes(pg_index, indexTuple);
537                         /* Ensure we see the update in the index's relcache entry */
538                         CacheInvalidateRelcacheByRelid(thisIndexOid);
539                 }
540                 else if (thisIndexOid == indexOid)
541                 {
542                         indexForm->indisclustered = true;
543                         simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
544                         CatalogUpdateIndexes(pg_index, indexTuple);
545                         /* Ensure we see the update in the index's relcache entry */
546                         CacheInvalidateRelcacheByRelid(thisIndexOid);
547                 }
548                 heap_freetuple(indexTuple);
549         }
550
551         heap_close(pg_index, RowExclusiveLock);
552 }
553
554 /*
555  * rebuild_relation: rebuild an existing relation in index order
556  *
557  * OldHeap: table to rebuild --- must be opened and exclusive-locked!
558  * indexOid: index to cluster by
559  *
560  * NB: this routine closes OldHeap at the right time; caller should not.
561  */
562 static void
563 rebuild_relation(Relation OldHeap, Oid indexOid)
564 {
565         Oid                     tableOid = RelationGetRelid(OldHeap);
566         Oid                     tableSpace = OldHeap->rd_rel->reltablespace;
567         Oid                     OIDNewHeap;
568         char            NewHeapName[NAMEDATALEN];
569         TransactionId frozenXid;
570         ObjectAddress object;
571
572         /* Mark the correct index as clustered */
573         mark_index_clustered(OldHeap, indexOid);
574
575         /* Close relcache entry, but keep lock until transaction commit */
576         heap_close(OldHeap, NoLock);
577
578         /*
579          * Create the new heap, using a temporary name in the same namespace as
580          * the existing table.  NOTE: there is some risk of collision with user
581          * relnames.  Working around this seems more trouble than it's worth; in
582          * particular, we can't create the new heap in a different namespace from
583          * the old, or we will have problems with the TEMP status of temp tables.
584          */
585         snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", tableOid);
586
587         OIDNewHeap = make_new_heap(tableOid, NewHeapName, tableSpace);
588
589         /*
590          * We don't need CommandCounterIncrement() because make_new_heap did it.
591          */
592
593         /*
594          * Copy the heap data into the new table in the desired order.
595          */
596         frozenXid = copy_heap_data(OIDNewHeap, tableOid, indexOid);
597
598         /* To make the new heap's data visible (probably not needed?). */
599         CommandCounterIncrement();
600
601         /* Swap the physical files of the old and new heaps. */
602         swap_relation_files(tableOid, OIDNewHeap, frozenXid);
603
604         CommandCounterIncrement();
605
606         /* Destroy new heap with old filenode */
607         object.classId = RelationRelationId;
608         object.objectId = OIDNewHeap;
609         object.objectSubId = 0;
610
611         /*
612          * The new relation is local to our transaction and we know nothing
613          * depends on it, so DROP_RESTRICT should be OK.
614          */
615         performDeletion(&object, DROP_RESTRICT);
616
617         /* performDeletion does CommandCounterIncrement at end */
618
619         /*
620          * Rebuild each index on the relation (but not the toast table, which is
621          * all-new at this point).      We do not need CommandCounterIncrement()
622          * because reindex_relation does it.
623          */
624         reindex_relation(tableOid, false);
625 }
626
627 /*
628  * Create the new table that we will fill with correctly-ordered data.
629  */
630 Oid
631 make_new_heap(Oid OIDOldHeap, const char *NewName, Oid NewTableSpace)
632 {
633         TupleDesc       OldHeapDesc,
634                                 tupdesc;
635         Oid                     OIDNewHeap;
636         Relation        OldHeap;
637         HeapTuple       tuple;
638         Datum           reloptions;
639         bool            isNull;
640
641         OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
642         OldHeapDesc = RelationGetDescr(OldHeap);
643
644         /*
645          * Need to make a copy of the tuple descriptor, since
646          * heap_create_with_catalog modifies it.  Note that the NewHeap will
647          * not receive any of the defaults or constraints associated with the
648          * OldHeap; we don't need 'em, and there's no reason to spend cycles
649          * inserting them into the catalogs only to delete them.
650          */
651         tupdesc = CreateTupleDescCopy(OldHeapDesc);
652
653         /*
654          * Use options of the old heap for new heap.
655          */
656         tuple = SearchSysCache(RELOID,
657                                                    ObjectIdGetDatum(OIDOldHeap),
658                                                    0, 0, 0);
659         if (!HeapTupleIsValid(tuple))
660                 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
661         reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
662                                                                  &isNull);
663         if (isNull)
664                 reloptions = (Datum) 0;
665
666         OIDNewHeap = heap_create_with_catalog(NewName,
667                                                                                   RelationGetNamespace(OldHeap),
668                                                                                   NewTableSpace,
669                                                                                   InvalidOid,
670                                                                                   OldHeap->rd_rel->relowner,
671                                                                                   tupdesc,
672                                                                                   NIL,
673                                                                                   OldHeap->rd_rel->relkind,
674                                                                                   OldHeap->rd_rel->relisshared,
675                                                                                   true,
676                                                                                   0,
677                                                                                   ONCOMMIT_NOOP,
678                                                                                   reloptions,
679                                                                                   allowSystemTableMods);
680
681         ReleaseSysCache(tuple);
682
683         /*
684          * Advance command counter so that the newly-created relation's catalog
685          * tuples will be visible to heap_open.
686          */
687         CommandCounterIncrement();
688
689         /*
690          * If necessary, create a TOAST table for the new relation. Note that
691          * AlterTableCreateToastTable ends with CommandCounterIncrement(), so that
692          * the TOAST table will be visible for insertion.
693          */
694         AlterTableCreateToastTable(OIDNewHeap);
695
696         heap_close(OldHeap, NoLock);
697
698         return OIDNewHeap;
699 }
700
701 /*
702  * Do the physical copying of heap data.  Returns the TransactionId used as
703  * freeze cutoff point for the tuples.
704  */
705 static TransactionId
706 copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
707 {
708         Relation        NewHeap,
709                                 OldHeap,
710                                 OldIndex;
711         TupleDesc       oldTupDesc;
712         TupleDesc       newTupDesc;
713         int                     natts;
714         Datum      *values;
715         bool       *isnull;
716         IndexScanDesc scan;
717         HeapTuple       tuple;
718         bool            use_wal;
719         TransactionId OldestXmin;
720         TransactionId FreezeXid;
721         RewriteState rwstate;
722
723         /*
724          * Open the relations we need.
725          */
726         NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock);
727         OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
728         OldIndex = index_open(OIDOldIndex, AccessExclusiveLock);
729
730         /*
731          * Their tuple descriptors should be exactly alike, but here we only need
732          * assume that they have the same number of columns.
733          */
734         oldTupDesc = RelationGetDescr(OldHeap);
735         newTupDesc = RelationGetDescr(NewHeap);
736         Assert(newTupDesc->natts == oldTupDesc->natts);
737
738         /* Preallocate values/isnull arrays */
739         natts = newTupDesc->natts;
740         values = (Datum *) palloc(natts * sizeof(Datum));
741         isnull = (bool *) palloc(natts * sizeof(bool));
742
743         /*
744          * We need to log the copied data in WAL iff WAL archiving is enabled AND
745          * it's not a temp rel.
746          */
747         use_wal = XLogArchivingActive() && !NewHeap->rd_istemp;
748
749         /* use_wal off requires rd_targblock be initially invalid */
750         Assert(NewHeap->rd_targblock == InvalidBlockNumber);
751
752         /*
753          * compute xids used to freeze and weed out dead tuples.  We use -1
754          * freeze_min_age to avoid having CLUSTER freeze tuples earlier than a
755          * plain VACUUM would.
756          */
757         vacuum_set_xid_limits(-1, OldHeap->rd_rel->relisshared,
758                                                   &OldestXmin, &FreezeXid);
759
760         /*
761          * FreezeXid will become the table's new relfrozenxid, and that mustn't
762          * go backwards, so take the max.
763          */
764         if (TransactionIdPrecedes(FreezeXid, OldHeap->rd_rel->relfrozenxid))
765                 FreezeXid = OldHeap->rd_rel->relfrozenxid;
766
767         /* Initialize the rewrite operation */
768         rwstate = begin_heap_rewrite(NewHeap, OldestXmin, FreezeXid, use_wal);
769
770         /*
771          * Scan through the OldHeap in OldIndex order and copy each tuple into the
772          * NewHeap.  To ensure we see recently-dead tuples that still need to be
773          * copied, we scan with SnapshotAny and use HeapTupleSatisfiesVacuum for
774          * the visibility test.
775          */
776         scan = index_beginscan(OldHeap, OldIndex,
777                                                    SnapshotAny, 0, (ScanKey) NULL);
778
779         while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
780         {
781                 HeapTuple       copiedTuple;
782                 bool            isdead;
783                 int                     i;
784
785                 CHECK_FOR_INTERRUPTS();
786
787                 /* Since we used no scan keys, should never need to recheck */
788                 if (scan->xs_recheck)
789                         elog(ERROR, "CLUSTER does not support lossy index conditions");
790
791                 LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
792
793                 switch (HeapTupleSatisfiesVacuum(tuple->t_data, OldestXmin,
794                                                                                  scan->xs_cbuf))
795                 {
796                         case HEAPTUPLE_DEAD:
797                                 /* Definitely dead */
798                                 isdead = true;
799                                 break;
800                         case HEAPTUPLE_LIVE:
801                         case HEAPTUPLE_RECENTLY_DEAD:
802                                 /* Live or recently dead, must copy it */
803                                 isdead = false;
804                                 break;
805                         case HEAPTUPLE_INSERT_IN_PROGRESS:
806
807                                 /*
808                                  * We should not see this unless it's been inserted earlier in
809                                  * our own transaction.
810                                  */
811                                 if (!TransactionIdIsCurrentTransactionId(
812                                                                           HeapTupleHeaderGetXmin(tuple->t_data)))
813                                         elog(ERROR, "concurrent insert in progress");
814                                 /* treat as live */
815                                 isdead = false;
816                                 break;
817                         case HEAPTUPLE_DELETE_IN_PROGRESS:
818
819                                 /*
820                                  * We should not see this unless it's been deleted earlier in
821                                  * our own transaction.
822                                  */
823                                 Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI));
824                                 if (!TransactionIdIsCurrentTransactionId(
825                                                                           HeapTupleHeaderGetXmax(tuple->t_data)))
826                                         elog(ERROR, "concurrent delete in progress");
827                                 /* treat as recently dead */
828                                 isdead = false;
829                                 break;
830                         default:
831                                 elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
832                                 isdead = false; /* keep compiler quiet */
833                                 break;
834                 }
835
836                 LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
837
838                 if (isdead)
839                 {
840                         /* heap rewrite module still needs to see it... */
841                         rewrite_heap_dead_tuple(rwstate, tuple);
842                         continue;
843                 }
844
845                 /*
846                  * We cannot simply copy the tuple as-is, for several reasons:
847                  *
848                  * 1. We'd like to squeeze out the values of any dropped columns, both
849                  * to save space and to ensure we have no corner-case failures. (It's
850                  * possible for example that the new table hasn't got a TOAST table
851                  * and so is unable to store any large values of dropped cols.)
852                  *
853                  * 2. The tuple might not even be legal for the new table; this is
854                  * currently only known to happen as an after-effect of ALTER TABLE
855                  * SET WITHOUT OIDS.
856                  *
857                  * So, we must reconstruct the tuple from component Datums.
858                  */
859                 heap_deform_tuple(tuple, oldTupDesc, values, isnull);
860
861                 /* Be sure to null out any dropped columns */
862                 for (i = 0; i < natts; i++)
863                 {
864                         if (newTupDesc->attrs[i]->attisdropped)
865                                 isnull[i] = true;
866                 }
867
868                 copiedTuple = heap_form_tuple(newTupDesc, values, isnull);
869
870                 /* Preserve OID, if any */
871                 if (NewHeap->rd_rel->relhasoids)
872                         HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));
873
874                 /* The heap rewrite module does the rest */
875                 rewrite_heap_tuple(rwstate, tuple, copiedTuple);
876
877                 heap_freetuple(copiedTuple);
878         }
879
880         index_endscan(scan);
881
882         /* Write out any remaining tuples, and fsync if needed */
883         end_heap_rewrite(rwstate);
884
885         pfree(values);
886         pfree(isnull);
887
888         index_close(OldIndex, NoLock);
889         heap_close(OldHeap, NoLock);
890         heap_close(NewHeap, NoLock);
891
892         return FreezeXid;
893 }
894
895 /*
896  * Swap the physical files of two given relations.
897  *
898  * We swap the physical identity (reltablespace and relfilenode) while
899  * keeping the same logical identities of the two relations.
900  *
901  * Also swap any TOAST links, so that the toast data moves along with
902  * the main-table data.
903  *
904  * Additionally, the first relation is marked with relfrozenxid set to
905  * frozenXid.  It seems a bit ugly to have this here, but all callers would
906  * have to do it anyway, so having it here saves a heap_update.  Note: the
907  * TOAST table needs no special handling, because since we swapped the links,
908  * the entry for the TOAST table will now contain RecentXmin in relfrozenxid,
909  * which is the correct value.
910  */
911 void
912 swap_relation_files(Oid r1, Oid r2, TransactionId frozenXid)
913 {
914         Relation        relRelation;
915         HeapTuple       reltup1,
916                                 reltup2;
917         Form_pg_class relform1,
918                                 relform2;
919         Oid                     swaptemp;
920         CatalogIndexState indstate;
921
922         /* We need writable copies of both pg_class tuples. */
923         relRelation = heap_open(RelationRelationId, RowExclusiveLock);
924
925         reltup1 = SearchSysCacheCopy(RELOID,
926                                                                  ObjectIdGetDatum(r1),
927                                                                  0, 0, 0);
928         if (!HeapTupleIsValid(reltup1))
929                 elog(ERROR, "cache lookup failed for relation %u", r1);
930         relform1 = (Form_pg_class) GETSTRUCT(reltup1);
931
932         reltup2 = SearchSysCacheCopy(RELOID,
933                                                                  ObjectIdGetDatum(r2),
934                                                                  0, 0, 0);
935         if (!HeapTupleIsValid(reltup2))
936                 elog(ERROR, "cache lookup failed for relation %u", r2);
937         relform2 = (Form_pg_class) GETSTRUCT(reltup2);
938
939         /*
940          * Actually swap the fields in the two tuples
941          */
942         swaptemp = relform1->relfilenode;
943         relform1->relfilenode = relform2->relfilenode;
944         relform2->relfilenode = swaptemp;
945
946         swaptemp = relform1->reltablespace;
947         relform1->reltablespace = relform2->reltablespace;
948         relform2->reltablespace = swaptemp;
949
950         swaptemp = relform1->reltoastrelid;
951         relform1->reltoastrelid = relform2->reltoastrelid;
952         relform2->reltoastrelid = swaptemp;
953
954         /* we should not swap reltoastidxid */
955
956         /* set rel1's frozen Xid */
957         Assert(TransactionIdIsNormal(frozenXid));
958         relform1->relfrozenxid = frozenXid;
959
960         /* swap size statistics too, since new rel has freshly-updated stats */
961         {
962                 int4            swap_pages;
963                 float4          swap_tuples;
964
965                 swap_pages = relform1->relpages;
966                 relform1->relpages = relform2->relpages;
967                 relform2->relpages = swap_pages;
968
969                 swap_tuples = relform1->reltuples;
970                 relform1->reltuples = relform2->reltuples;
971                 relform2->reltuples = swap_tuples;
972         }
973
974         /* Update the tuples in pg_class */
975         simple_heap_update(relRelation, &reltup1->t_self, reltup1);
976         simple_heap_update(relRelation, &reltup2->t_self, reltup2);
977
978         /* Keep system catalogs current */
979         indstate = CatalogOpenIndexes(relRelation);
980         CatalogIndexInsert(indstate, reltup1);
981         CatalogIndexInsert(indstate, reltup2);
982         CatalogCloseIndexes(indstate);
983
984         /*
985          * If we have toast tables associated with the relations being swapped,
986          * change their dependency links to re-associate them with their new
987          * owning relations.  Otherwise the wrong one will get dropped ...
988          *
989          * NOTE: it is possible that only one table has a toast table; this can
990          * happen in CLUSTER if there were dropped columns in the old table, and
991          * in ALTER TABLE when adding or changing type of columns.
992          *
993          * NOTE: at present, a TOAST table's only dependency is the one on its
994          * owning table.  If more are ever created, we'd need to use something
995          * more selective than deleteDependencyRecordsFor() to get rid of only the
996          * link we want.
997          */
998         if (relform1->reltoastrelid || relform2->reltoastrelid)
999         {
1000                 ObjectAddress baseobject,
1001                                         toastobject;
1002                 long            count;
1003
1004                 /* Delete old dependencies */
1005                 if (relform1->reltoastrelid)
1006                 {
1007                         count = deleteDependencyRecordsFor(RelationRelationId,
1008                                                                                            relform1->reltoastrelid);
1009                         if (count != 1)
1010                                 elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1011                                          count);
1012                 }
1013                 if (relform2->reltoastrelid)
1014                 {
1015                         count = deleteDependencyRecordsFor(RelationRelationId,
1016                                                                                            relform2->reltoastrelid);
1017                         if (count != 1)
1018                                 elog(ERROR, "expected one dependency record for TOAST table, found %ld",
1019                                          count);
1020                 }
1021
1022                 /* Register new dependencies */
1023                 baseobject.classId = RelationRelationId;
1024                 baseobject.objectSubId = 0;
1025                 toastobject.classId = RelationRelationId;
1026                 toastobject.objectSubId = 0;
1027
1028                 if (relform1->reltoastrelid)
1029                 {
1030                         baseobject.objectId = r1;
1031                         toastobject.objectId = relform1->reltoastrelid;
1032                         recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
1033                 }
1034
1035                 if (relform2->reltoastrelid)
1036                 {
1037                         baseobject.objectId = r2;
1038                         toastobject.objectId = relform2->reltoastrelid;
1039                         recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
1040                 }
1041         }
1042
1043         /*
1044          * Blow away the old relcache entries now.      We need this kluge because
1045          * relcache.c keeps a link to the smgr relation for the physical file, and
1046          * that will be out of date as soon as we do CommandCounterIncrement.
1047          * Whichever of the rels is the second to be cleared during cache
1048          * invalidation will have a dangling reference to an already-deleted smgr
1049          * relation.  Rather than trying to avoid this by ordering operations just
1050          * so, it's easiest to not have the relcache entries there at all.
1051          * (Fortunately, since one of the entries is local in our transaction,
1052          * it's sufficient to clear out our own relcache this way; the problem
1053          * cannot arise for other backends when they see our update on the
1054          * non-local relation.)
1055          */
1056         RelationForgetRelation(r1);
1057         RelationForgetRelation(r2);
1058
1059         /* Clean up. */
1060         heap_freetuple(reltup1);
1061         heap_freetuple(reltup2);
1062
1063         heap_close(relRelation, RowExclusiveLock);
1064 }
1065
1066 /*
1067  * Get a list of tables that the current user owns and
1068  * have indisclustered set.  Return the list in a List * of rvsToCluster
1069  * with the tableOid and the indexOid on which the table is already
1070  * clustered.
1071  */
1072 static List *
1073 get_tables_to_cluster(MemoryContext cluster_context)
1074 {
1075         Relation        indRelation;
1076         HeapScanDesc scan;
1077         ScanKeyData entry;
1078         HeapTuple       indexTuple;
1079         Form_pg_index index;
1080         MemoryContext old_context;
1081         RelToCluster *rvtc;
1082         List       *rvs = NIL;
1083
1084         /*
1085          * Get all indexes that have indisclustered set and are owned by
1086          * appropriate user. System relations or nailed-in relations cannot ever
1087          * have indisclustered set, because CLUSTER will refuse to set it when
1088          * called with one of them as argument.
1089          */
1090         indRelation = heap_open(IndexRelationId, AccessShareLock);
1091         ScanKeyInit(&entry,
1092                                 Anum_pg_index_indisclustered,
1093                                 BTEqualStrategyNumber, F_BOOLEQ,
1094                                 BoolGetDatum(true));
1095         scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
1096         while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1097         {
1098                 index = (Form_pg_index) GETSTRUCT(indexTuple);
1099
1100                 if (!pg_class_ownercheck(index->indrelid, GetUserId()))
1101                         continue;
1102
1103                 /*
1104                  * We have to build the list in a different memory context so it will
1105                  * survive the cross-transaction processing
1106                  */
1107                 old_context = MemoryContextSwitchTo(cluster_context);
1108
1109                 rvtc = (RelToCluster *) palloc(sizeof(RelToCluster));
1110                 rvtc->tableOid = index->indrelid;
1111                 rvtc->indexOid = index->indexrelid;
1112                 rvs = lcons(rvtc, rvs);
1113
1114                 MemoryContextSwitchTo(old_context);
1115         }
1116         heap_endscan(scan);
1117
1118         relation_close(indRelation, AccessShareLock);
1119
1120         return rvs;
1121 }