OSDN Git Service

Standard pgindent run for 8.1.
[pg-rex/syncrep.git] / src / backend / commands / cluster.c
1 /*-------------------------------------------------------------------------
2  *
3  * cluster.c
4  *        CLUSTER a table on an index.
5  *
6  * There is hardly anything left of Paul Brown's original implementation...
7  *
8  *
9  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
10  * Portions Copyright (c) 1994-5, Regents of the University of California
11  *
12  *
13  * IDENTIFICATION
14  *        $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.140 2005/10/15 02:49:15 momjian Exp $
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19
20 #include "access/genam.h"
21 #include "access/heapam.h"
22 #include "catalog/catalog.h"
23 #include "catalog/dependency.h"
24 #include "catalog/heap.h"
25 #include "catalog/index.h"
26 #include "catalog/indexing.h"
27 #include "catalog/namespace.h"
28 #include "commands/cluster.h"
29 #include "commands/tablecmds.h"
30 #include "miscadmin.h"
31 #include "utils/acl.h"
32 #include "utils/fmgroids.h"
33 #include "utils/inval.h"
34 #include "utils/lsyscache.h"
35 #include "utils/memutils.h"
36 #include "utils/syscache.h"
37 #include "utils/relcache.h"
38
39
40 /*
41  * This struct is used to pass around the information on tables to be
42  * clustered. We need this so we can make a list of them when invoked without
43  * a specific table/index pair.
44  */
45 typedef struct
46 {
47         Oid                     tableOid;
48         Oid                     indexOid;
49 } RelToCluster;
50
51
52 static void cluster_rel(RelToCluster *rv, bool recheck);
53 static void rebuild_relation(Relation OldHeap, Oid indexOid);
54 static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
55 static List *get_tables_to_cluster(MemoryContext cluster_context);
56
57
58
59 /*---------------------------------------------------------------------------
60  * This cluster code allows for clustering multiple tables at once. Because
61  * of this, we cannot just run everything on a single transaction, or we
62  * would be forced to acquire exclusive locks on all the tables being
63  * clustered, simultaneously --- very likely leading to deadlock.
64  *
65  * To solve this we follow a similar strategy to VACUUM code,
66  * clustering each relation in a separate transaction. For this to work,
67  * we need to:
68  *      - provide a separate memory context so that we can pass information in
69  *        a way that survives across transactions
70  *      - start a new transaction every time a new relation is clustered
71  *      - check for validity of the information on to-be-clustered relations,
72  *        as someone might have deleted a relation behind our back, or
73  *        clustered one on a different index
74  *      - end the transaction
75  *
76  * The single-relation case does not have any such overhead.
77  *
78  * We also allow a relation being specified without index.      In that case,
79  * the indisclustered bit will be looked up, and an ERROR will be thrown
80  * if there is no index with the bit set.
81  *---------------------------------------------------------------------------
82  */
83 void
84 cluster(ClusterStmt *stmt)
85 {
86         if (stmt->relation != NULL)
87         {
88                 /* This is the single-relation case. */
89                 Oid                     tableOid,
90                                         indexOid = InvalidOid;
91                 Relation        rel;
92                 RelToCluster rvtc;
93
94                 /* Find and lock the table */
95                 rel = heap_openrv(stmt->relation, AccessExclusiveLock);
96
97                 tableOid = RelationGetRelid(rel);
98
99                 /* Check permissions */
100                 if (!pg_class_ownercheck(tableOid, GetUserId()))
101                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
102                                                    RelationGetRelationName(rel));
103
104                 if (stmt->indexname == NULL)
105                 {
106                         ListCell   *index;
107
108                         /* We need to find the index that has indisclustered set. */
109                         foreach(index, RelationGetIndexList(rel))
110                         {
111                                 HeapTuple       idxtuple;
112                                 Form_pg_index indexForm;
113
114                                 indexOid = lfirst_oid(index);
115                                 idxtuple = SearchSysCache(INDEXRELID,
116                                                                                   ObjectIdGetDatum(indexOid),
117                                                                                   0, 0, 0);
118                                 if (!HeapTupleIsValid(idxtuple))
119                                         elog(ERROR, "cache lookup failed for index %u", indexOid);
120                                 indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
121                                 if (indexForm->indisclustered)
122                                 {
123                                         ReleaseSysCache(idxtuple);
124                                         break;
125                                 }
126                                 ReleaseSysCache(idxtuple);
127                                 indexOid = InvalidOid;
128                         }
129
130                         if (!OidIsValid(indexOid))
131                                 ereport(ERROR,
132                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
133                                                  errmsg("there is no previously clustered index for table \"%s\"",
134                                                                 stmt->relation->relname)));
135                 }
136                 else
137                 {
138                         /*
139                          * The index is expected to be in the same namespace as the
140                          * relation.
141                          */
142                         indexOid = get_relname_relid(stmt->indexname,
143                                                                                  rel->rd_rel->relnamespace);
144                         if (!OidIsValid(indexOid))
145                                 ereport(ERROR,
146                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
147                                            errmsg("index \"%s\" for table \"%s\" does not exist",
148                                                           stmt->indexname, stmt->relation->relname)));
149                 }
150
151                 /* All other checks are done in cluster_rel() */
152                 rvtc.tableOid = tableOid;
153                 rvtc.indexOid = indexOid;
154
155                 /* close relation, keep lock till commit */
156                 heap_close(rel, NoLock);
157
158                 /* Do the job */
159                 cluster_rel(&rvtc, false);
160         }
161         else
162         {
163                 /*
164                  * This is the "multi relation" case. We need to cluster all tables
165                  * that have some index with indisclustered set.
166                  */
167                 MemoryContext cluster_context;
168                 List       *rvs;
169                 ListCell   *rv;
170
171                 /*
172                  * We cannot run this form of CLUSTER inside a user transaction block;
173                  * we'd be holding locks way too long.
174                  */
175                 PreventTransactionChain((void *) stmt, "CLUSTER");
176
177                 /*
178                  * Create special memory context for cross-transaction storage.
179                  *
180                  * Since it is a child of PortalContext, it will go away even in case of
181                  * error.
182                  */
183                 cluster_context = AllocSetContextCreate(PortalContext,
184                                                                                                 "Cluster",
185                                                                                                 ALLOCSET_DEFAULT_MINSIZE,
186                                                                                                 ALLOCSET_DEFAULT_INITSIZE,
187                                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
188
189                 /*
190                  * Build the list of relations to cluster.      Note that this lives in
191                  * cluster_context.
192                  */
193                 rvs = get_tables_to_cluster(cluster_context);
194
195                 /* Commit to get out of starting transaction */
196                 CommitTransactionCommand();
197
198                 /* Ok, now that we've got them all, cluster them one by one */
199                 foreach(rv, rvs)
200                 {
201                         RelToCluster *rvtc = (RelToCluster *) lfirst(rv);
202
203                         /* Start a new transaction for each relation. */
204                         StartTransactionCommand();
205                         /* functions in indexes may want a snapshot set */
206                         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
207                         cluster_rel(rvtc, true);
208                         CommitTransactionCommand();
209                 }
210
211                 /* Start a new transaction for the cleanup work. */
212                 StartTransactionCommand();
213
214                 /* Clean up working storage */
215                 MemoryContextDelete(cluster_context);
216         }
217 }
218
219 /*
220  * cluster_rel
221  *
222  * This clusters the table by creating a new, clustered table and
223  * swapping the relfilenodes of the new table and the old table, so
224  * the OID of the original table is preserved.  Thus we do not lose
225  * GRANT, inheritance nor references to this table (this was a bug
226  * in releases thru 7.3).
227  *
228  * Also create new indexes and swap the filenodes with the old indexes the
229  * same way we do for the relation.  Since we are effectively bulk-loading
230  * the new table, it's better to create the indexes afterwards than to fill
231  * them incrementally while we load the table.
232  */
233 static void
234 cluster_rel(RelToCluster *rvtc, bool recheck)
235 {
236         Relation        OldHeap;
237
238         /* Check for user-requested abort. */
239         CHECK_FOR_INTERRUPTS();
240
241         /*
242          * Since we may open a new transaction for each relation, we have to check
243          * that the relation still is what we think it is.
244          *
245          * If this is a single-transaction CLUSTER, we can skip these tests. We *must*
246          * skip the one on indisclustered since it would reject an attempt to
247          * cluster a not-previously-clustered index.
248          */
249         if (recheck)
250         {
251                 HeapTuple       tuple;
252                 Form_pg_index indexForm;
253
254                 /*
255                  * Check if the relation and index still exist before opening them
256                  */
257                 if (!SearchSysCacheExists(RELOID,
258                                                                   ObjectIdGetDatum(rvtc->tableOid),
259                                                                   0, 0, 0) ||
260                         !SearchSysCacheExists(RELOID,
261                                                                   ObjectIdGetDatum(rvtc->indexOid),
262                                                                   0, 0, 0))
263                         return;
264
265                 /* Check that the user still owns the relation */
266                 if (!pg_class_ownercheck(rvtc->tableOid, GetUserId()))
267                         return;
268
269                 /*
270                  * Check that the index is still the one with indisclustered set.
271                  */
272                 tuple = SearchSysCache(INDEXRELID,
273                                                            ObjectIdGetDatum(rvtc->indexOid),
274                                                            0, 0, 0);
275                 if (!HeapTupleIsValid(tuple))
276                         return;                         /* could have gone away... */
277                 indexForm = (Form_pg_index) GETSTRUCT(tuple);
278                 if (!indexForm->indisclustered)
279                 {
280                         ReleaseSysCache(tuple);
281                         return;
282                 }
283                 ReleaseSysCache(tuple);
284         }
285
286         /*
287          * We grab exclusive access to the target rel and index for the duration
288          * of the transaction.  (This is redundant for the single- transaction
289          * case, since cluster() already did it.)  The index lock is taken inside
290          * check_index_is_clusterable.
291          */
292         OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
293
294         /* Check index is valid to cluster on */
295         check_index_is_clusterable(OldHeap, rvtc->indexOid, recheck);
296
297         /* rebuild_relation does all the dirty work */
298         rebuild_relation(OldHeap, rvtc->indexOid);
299
300         /* NB: rebuild_relation does heap_close() on OldHeap */
301 }
302
303 /*
304  * Verify that the specified index is a legitimate index to cluster on
305  *
306  * Side effect: obtains exclusive lock on the index.  The caller should
307  * already have exclusive lock on the table, so the index lock is likely
308  * redundant, but it seems best to grab it anyway to ensure the index
309  * definition can't change under us.
310  */
311 void
312 check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck)
313 {
314         Relation        OldIndex;
315
316         OldIndex = index_open(indexOid);
317         LockRelation(OldIndex, AccessExclusiveLock);
318
319         /*
320          * Check that index is in fact an index on the given relation
321          */
322         if (OldIndex->rd_index == NULL ||
323                 OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
324                 ereport(ERROR,
325                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
326                                  errmsg("\"%s\" is not an index for table \"%s\"",
327                                                 RelationGetRelationName(OldIndex),
328                                                 RelationGetRelationName(OldHeap))));
329
330         /*
331          * Disallow clustering on incomplete indexes (those that might not index
332          * every row of the relation).  We could relax this by making a separate
333          * seqscan pass over the table to copy the missing rows, but that seems
334          * expensive and tedious.
335          */
336         if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred))
337                 ereport(ERROR,
338                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
339                                  errmsg("cannot cluster on partial index \"%s\"",
340                                                 RelationGetRelationName(OldIndex))));
341
342         if (!OldIndex->rd_am->amindexnulls)
343         {
344                 AttrNumber      colno;
345
346                 /*
347                  * If the AM doesn't index nulls, then it's a partial index unless we
348                  * can prove all the rows are non-null.  Note we only need look at the
349                  * first column; multicolumn-capable AMs are *required* to index nulls
350                  * in columns after the first.
351                  */
352                 colno = OldIndex->rd_index->indkey.values[0];
353                 if (colno > 0)
354                 {
355                         /* ordinary user attribute */
356                         if (!OldHeap->rd_att->attrs[colno - 1]->attnotnull)
357                                 ereport(ERROR,
358                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
359                                                  errmsg("cannot cluster on index \"%s\" because access method\n"
360                                                                 "does not handle null values",
361                                                                 RelationGetRelationName(OldIndex)),
362                                                  errhint("You may be able to work around this by marking column \"%s\" NOT NULL%s",
363                                                  NameStr(OldHeap->rd_att->attrs[colno - 1]->attname),
364                                                                  recheck ? ",\nor use ALTER TABLE ... SET WITHOUT CLUSTER to remove the cluster\n"
365                                                                  "specification from the table." : ".")));
366                 }
367                 else if (colno < 0)
368                 {
369                         /* system column --- okay, always non-null */
370                 }
371                 else
372                         /* index expression, lose... */
373                         ereport(ERROR,
374                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
375                                          errmsg("cannot cluster on expressional index \"%s\" because its index access\n"
376                                                         "method does not handle null values",
377                                                         RelationGetRelationName(OldIndex))));
378         }
379
380         /*
381          * Disallow clustering system relations.  This will definitely NOT work
382          * for shared relations (we have no way to update pg_class rows in other
383          * databases), nor for nailed-in-cache relations (the relfilenode values
384          * for those are hardwired, see relcache.c).  It might work for other
385          * system relations, but I ain't gonna risk it.
386          */
387         if (IsSystemRelation(OldHeap))
388                 ereport(ERROR,
389                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
390                                  errmsg("\"%s\" is a system catalog",
391                                                 RelationGetRelationName(OldHeap))));
392
393         /*
394          * Don't allow cluster on temp tables of other backends ... their local
395          * buffer manager is not going to cope.
396          */
397         if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
398                 ereport(ERROR,
399                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
400                            errmsg("cannot cluster temporary tables of other sessions")));
401
402         /* Drop relcache refcnt on OldIndex, but keep lock */
403         index_close(OldIndex);
404 }
405
406 /*
407  * mark_index_clustered: mark the specified index as the one clustered on
408  *
409  * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
410  */
411 void
412 mark_index_clustered(Relation rel, Oid indexOid)
413 {
414         HeapTuple       indexTuple;
415         Form_pg_index indexForm;
416         Relation        pg_index;
417         ListCell   *index;
418
419         /*
420          * If the index is already marked clustered, no need to do anything.
421          */
422         if (OidIsValid(indexOid))
423         {
424                 indexTuple = SearchSysCache(INDEXRELID,
425                                                                         ObjectIdGetDatum(indexOid),
426                                                                         0, 0, 0);
427                 if (!HeapTupleIsValid(indexTuple))
428                         elog(ERROR, "cache lookup failed for index %u", indexOid);
429                 indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
430
431                 if (indexForm->indisclustered)
432                 {
433                         ReleaseSysCache(indexTuple);
434                         return;
435                 }
436
437                 ReleaseSysCache(indexTuple);
438         }
439
440         /*
441          * Check each index of the relation and set/clear the bit as needed.
442          */
443         pg_index = heap_open(IndexRelationId, RowExclusiveLock);
444
445         foreach(index, RelationGetIndexList(rel))
446         {
447                 Oid                     thisIndexOid = lfirst_oid(index);
448
449                 indexTuple = SearchSysCacheCopy(INDEXRELID,
450                                                                                 ObjectIdGetDatum(thisIndexOid),
451                                                                                 0, 0, 0);
452                 if (!HeapTupleIsValid(indexTuple))
453                         elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
454                 indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
455
456                 /*
457                  * Unset the bit if set.  We know it's wrong because we checked this
458                  * earlier.
459                  */
460                 if (indexForm->indisclustered)
461                 {
462                         indexForm->indisclustered = false;
463                         simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
464                         CatalogUpdateIndexes(pg_index, indexTuple);
465                         /* Ensure we see the update in the index's relcache entry */
466                         CacheInvalidateRelcacheByRelid(thisIndexOid);
467                 }
468                 else if (thisIndexOid == indexOid)
469                 {
470                         indexForm->indisclustered = true;
471                         simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
472                         CatalogUpdateIndexes(pg_index, indexTuple);
473                         /* Ensure we see the update in the index's relcache entry */
474                         CacheInvalidateRelcacheByRelid(thisIndexOid);
475                 }
476                 heap_freetuple(indexTuple);
477         }
478
479         heap_close(pg_index, RowExclusiveLock);
480 }
481
482 /*
483  * rebuild_relation: rebuild an existing relation in index order
484  *
485  * OldHeap: table to rebuild --- must be opened and exclusive-locked!
486  * indexOid: index to cluster by
487  *
488  * NB: this routine closes OldHeap at the right time; caller should not.
489  */
490 static void
491 rebuild_relation(Relation OldHeap, Oid indexOid)
492 {
493         Oid                     tableOid = RelationGetRelid(OldHeap);
494         Oid                     tableSpace = OldHeap->rd_rel->reltablespace;
495         Oid                     OIDNewHeap;
496         char            NewHeapName[NAMEDATALEN];
497         ObjectAddress object;
498
499         /* Mark the correct index as clustered */
500         mark_index_clustered(OldHeap, indexOid);
501
502         /* Close relcache entry, but keep lock until transaction commit */
503         heap_close(OldHeap, NoLock);
504
505         /*
506          * Create the new heap, using a temporary name in the same namespace as
507          * the existing table.  NOTE: there is some risk of collision with user
508          * relnames.  Working around this seems more trouble than it's worth; in
509          * particular, we can't create the new heap in a different namespace from
510          * the old, or we will have problems with the TEMP status of temp tables.
511          */
512         snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", tableOid);
513
514         OIDNewHeap = make_new_heap(tableOid, NewHeapName, tableSpace);
515
516         /*
517          * We don't need CommandCounterIncrement() because make_new_heap did it.
518          */
519
520         /*
521          * Copy the heap data into the new table in the desired order.
522          */
523         copy_heap_data(OIDNewHeap, tableOid, indexOid);
524
525         /* To make the new heap's data visible (probably not needed?). */
526         CommandCounterIncrement();
527
528         /* Swap the physical files of the old and new heaps. */
529         swap_relation_files(tableOid, OIDNewHeap);
530
531         CommandCounterIncrement();
532
533         /* Destroy new heap with old filenode */
534         object.classId = RelationRelationId;
535         object.objectId = OIDNewHeap;
536         object.objectSubId = 0;
537
538         /*
539          * The new relation is local to our transaction and we know nothing
540          * depends on it, so DROP_RESTRICT should be OK.
541          */
542         performDeletion(&object, DROP_RESTRICT);
543
544         /* performDeletion does CommandCounterIncrement at end */
545
546         /*
547          * Rebuild each index on the relation (but not the toast table, which is
548          * all-new at this point).      We do not need CommandCounterIncrement()
549          * because reindex_relation does it.
550          */
551         reindex_relation(tableOid, false);
552 }
553
554 /*
555  * Create the new table that we will fill with correctly-ordered data.
556  */
557 Oid
558 make_new_heap(Oid OIDOldHeap, const char *NewName, Oid NewTableSpace)
559 {
560         TupleDesc       OldHeapDesc,
561                                 tupdesc;
562         Oid                     OIDNewHeap;
563         Relation        OldHeap;
564
565         OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
566         OldHeapDesc = RelationGetDescr(OldHeap);
567
568         /*
569          * Need to make a copy of the tuple descriptor, since
570          * heap_create_with_catalog modifies it.
571          */
572         tupdesc = CreateTupleDescCopyConstr(OldHeapDesc);
573
574         OIDNewHeap = heap_create_with_catalog(NewName,
575                                                                                   RelationGetNamespace(OldHeap),
576                                                                                   NewTableSpace,
577                                                                                   InvalidOid,
578                                                                                   OldHeap->rd_rel->relowner,
579                                                                                   tupdesc,
580                                                                                   OldHeap->rd_rel->relkind,
581                                                                                   OldHeap->rd_rel->relisshared,
582                                                                                   true,
583                                                                                   0,
584                                                                                   ONCOMMIT_NOOP,
585                                                                                   allowSystemTableMods);
586
587         /*
588          * Advance command counter so that the newly-created relation's catalog
589          * tuples will be visible to heap_open.
590          */
591         CommandCounterIncrement();
592
593         /*
594          * If necessary, create a TOAST table for the new relation. Note that
595          * AlterTableCreateToastTable ends with CommandCounterIncrement(), so that
596          * the TOAST table will be visible for insertion.
597          */
598         AlterTableCreateToastTable(OIDNewHeap, true);
599
600         heap_close(OldHeap, NoLock);
601
602         return OIDNewHeap;
603 }
604
605 /*
606  * Do the physical copying of heap data.
607  */
608 static void
609 copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
610 {
611         Relation        NewHeap,
612                                 OldHeap,
613                                 OldIndex;
614         TupleDesc       oldTupDesc;
615         TupleDesc       newTupDesc;
616         int                     natts;
617         Datum      *values;
618         char       *nulls;
619         IndexScanDesc scan;
620         HeapTuple       tuple;
621
622         /*
623          * Open the relations we need.
624          */
625         NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock);
626         OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
627         OldIndex = index_open(OIDOldIndex);
628
629         /*
630          * Their tuple descriptors should be exactly alike, but here we only need
631          * assume that they have the same number of columns.
632          */
633         oldTupDesc = RelationGetDescr(OldHeap);
634         newTupDesc = RelationGetDescr(NewHeap);
635         Assert(newTupDesc->natts == oldTupDesc->natts);
636
637         /* Preallocate values/nulls arrays */
638         natts = newTupDesc->natts;
639         values = (Datum *) palloc0(natts * sizeof(Datum));
640         nulls = (char *) palloc(natts * sizeof(char));
641         memset(nulls, 'n', natts * sizeof(char));
642
643         /*
644          * Scan through the OldHeap on the OldIndex and copy each tuple into the
645          * NewHeap.
646          */
647         scan = index_beginscan(OldHeap, OldIndex, SnapshotNow, 0, (ScanKey) NULL);
648
649         while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
650         {
651                 /*
652                  * We cannot simply pass the tuple to heap_insert(), for several
653                  * reasons:
654                  *
655                  * 1. heap_insert() will overwrite the commit-status fields of the tuple
656                  * it's handed.  This would trash the source relation, which is bad
657                  * news if we abort later on.  (This was a bug in releases thru 7.0)
658                  *
659                  * 2. We'd like to squeeze out the values of any dropped columns, both to
660                  * save space and to ensure we have no corner-case failures. (It's
661                  * possible for example that the new table hasn't got a TOAST table
662                  * and so is unable to store any large values of dropped cols.)
663                  *
664                  * 3. The tuple might not even be legal for the new table; this is
665                  * currently only known to happen as an after-effect of ALTER TABLE
666                  * SET WITHOUT OIDS.
667                  *
668                  * So, we must reconstruct the tuple from component Datums.
669                  */
670                 HeapTuple       copiedTuple;
671                 int                     i;
672
673                 heap_deformtuple(tuple, oldTupDesc, values, nulls);
674
675                 /* Be sure to null out any dropped columns */
676                 for (i = 0; i < natts; i++)
677                 {
678                         if (newTupDesc->attrs[i]->attisdropped)
679                                 nulls[i] = 'n';
680                 }
681
682                 copiedTuple = heap_formtuple(newTupDesc, values, nulls);
683
684                 /* Preserve OID, if any */
685                 if (NewHeap->rd_rel->relhasoids)
686                         HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));
687
688                 simple_heap_insert(NewHeap, copiedTuple);
689
690                 heap_freetuple(copiedTuple);
691
692                 CHECK_FOR_INTERRUPTS();
693         }
694
695         index_endscan(scan);
696
697         pfree(values);
698         pfree(nulls);
699
700         index_close(OldIndex);
701         heap_close(OldHeap, NoLock);
702         heap_close(NewHeap, NoLock);
703 }
704
705 /*
706  * Swap the physical files of two given relations.
707  *
708  * We swap the physical identity (reltablespace and relfilenode) while
709  * keeping the same logical identities of the two relations.
710  *
711  * Also swap any TOAST links, so that the toast data moves along with
712  * the main-table data.
713  */
714 void
715 swap_relation_files(Oid r1, Oid r2)
716 {
717         Relation        relRelation;
718         HeapTuple       reltup1,
719                                 reltup2;
720         Form_pg_class relform1,
721                                 relform2;
722         Oid                     swaptemp;
723         CatalogIndexState indstate;
724
725         /* We need writable copies of both pg_class tuples. */
726         relRelation = heap_open(RelationRelationId, RowExclusiveLock);
727
728         reltup1 = SearchSysCacheCopy(RELOID,
729                                                                  ObjectIdGetDatum(r1),
730                                                                  0, 0, 0);
731         if (!HeapTupleIsValid(reltup1))
732                 elog(ERROR, "cache lookup failed for relation %u", r1);
733         relform1 = (Form_pg_class) GETSTRUCT(reltup1);
734
735         reltup2 = SearchSysCacheCopy(RELOID,
736                                                                  ObjectIdGetDatum(r2),
737                                                                  0, 0, 0);
738         if (!HeapTupleIsValid(reltup2))
739                 elog(ERROR, "cache lookup failed for relation %u", r2);
740         relform2 = (Form_pg_class) GETSTRUCT(reltup2);
741
742         /*
743          * Actually swap the fields in the two tuples
744          */
745         swaptemp = relform1->relfilenode;
746         relform1->relfilenode = relform2->relfilenode;
747         relform2->relfilenode = swaptemp;
748
749         swaptemp = relform1->reltablespace;
750         relform1->reltablespace = relform2->reltablespace;
751         relform2->reltablespace = swaptemp;
752
753         swaptemp = relform1->reltoastrelid;
754         relform1->reltoastrelid = relform2->reltoastrelid;
755         relform2->reltoastrelid = swaptemp;
756
757         /* we should not swap reltoastidxid */
758
759         /* swap size statistics too, since new rel has freshly-updated stats */
760         {
761                 int4            swap_pages;
762                 float4          swap_tuples;
763
764                 swap_pages = relform1->relpages;
765                 relform1->relpages = relform2->relpages;
766                 relform2->relpages = swap_pages;
767
768                 swap_tuples = relform1->reltuples;
769                 relform1->reltuples = relform2->reltuples;
770                 relform2->reltuples = swap_tuples;
771         }
772
773         /* Update the tuples in pg_class */
774         simple_heap_update(relRelation, &reltup1->t_self, reltup1);
775         simple_heap_update(relRelation, &reltup2->t_self, reltup2);
776
777         /* Keep system catalogs current */
778         indstate = CatalogOpenIndexes(relRelation);
779         CatalogIndexInsert(indstate, reltup1);
780         CatalogIndexInsert(indstate, reltup2);
781         CatalogCloseIndexes(indstate);
782
783         /*
784          * If we have toast tables associated with the relations being swapped,
785          * change their dependency links to re-associate them with their new
786          * owning relations.  Otherwise the wrong one will get dropped ...
787          *
788          * NOTE: it is possible that only one table has a toast table; this can
789          * happen in CLUSTER if there were dropped columns in the old table, and
790          * in ALTER TABLE when adding or changing type of columns.
791          *
792          * NOTE: at present, a TOAST table's only dependency is the one on its owning
793          * table.  If more are ever created, we'd need to use something more
794          * selective than deleteDependencyRecordsFor() to get rid of only the link
795          * we want.
796          */
797         if (relform1->reltoastrelid || relform2->reltoastrelid)
798         {
799                 ObjectAddress baseobject,
800                                         toastobject;
801                 long            count;
802
803                 /* Delete old dependencies */
804                 if (relform1->reltoastrelid)
805                 {
806                         count = deleteDependencyRecordsFor(RelationRelationId,
807                                                                                            relform1->reltoastrelid);
808                         if (count != 1)
809                                 elog(ERROR, "expected one dependency record for TOAST table, found %ld",
810                                          count);
811                 }
812                 if (relform2->reltoastrelid)
813                 {
814                         count = deleteDependencyRecordsFor(RelationRelationId,
815                                                                                            relform2->reltoastrelid);
816                         if (count != 1)
817                                 elog(ERROR, "expected one dependency record for TOAST table, found %ld",
818                                          count);
819                 }
820
821                 /* Register new dependencies */
822                 baseobject.classId = RelationRelationId;
823                 baseobject.objectSubId = 0;
824                 toastobject.classId = RelationRelationId;
825                 toastobject.objectSubId = 0;
826
827                 if (relform1->reltoastrelid)
828                 {
829                         baseobject.objectId = r1;
830                         toastobject.objectId = relform1->reltoastrelid;
831                         recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
832                 }
833
834                 if (relform2->reltoastrelid)
835                 {
836                         baseobject.objectId = r2;
837                         toastobject.objectId = relform2->reltoastrelid;
838                         recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
839                 }
840         }
841
842         /*
843          * Blow away the old relcache entries now.      We need this kluge because
844          * relcache.c keeps a link to the smgr relation for the physical file, and
845          * that will be out of date as soon as we do CommandCounterIncrement.
846          * Whichever of the rels is the second to be cleared during cache
847          * invalidation will have a dangling reference to an already-deleted smgr
848          * relation.  Rather than trying to avoid this by ordering operations just
849          * so, it's easiest to not have the relcache entries there at all.
850          * (Fortunately, since one of the entries is local in our transaction,
851          * it's sufficient to clear out our own relcache this way; the problem
852          * cannot arise for other backends when they see our update on the
853          * non-local relation.)
854          */
855         RelationForgetRelation(r1);
856         RelationForgetRelation(r2);
857
858         /* Clean up. */
859         heap_freetuple(reltup1);
860         heap_freetuple(reltup2);
861
862         heap_close(relRelation, RowExclusiveLock);
863 }
864
865 /*
866  * Get a list of tables that the current user owns and
867  * have indisclustered set.  Return the list in a List * of rvsToCluster
868  * with the tableOid and the indexOid on which the table is already
869  * clustered.
870  */
871 static List *
872 get_tables_to_cluster(MemoryContext cluster_context)
873 {
874         Relation        indRelation;
875         HeapScanDesc scan;
876         ScanKeyData entry;
877         HeapTuple       indexTuple;
878         Form_pg_index index;
879         MemoryContext old_context;
880         RelToCluster *rvtc;
881         List       *rvs = NIL;
882
883         /*
884          * Get all indexes that have indisclustered set and are owned by
885          * appropriate user. System relations or nailed-in relations cannot ever
886          * have indisclustered set, because CLUSTER will refuse to set it when
887          * called with one of them as argument.
888          */
889         indRelation = heap_open(IndexRelationId, AccessShareLock);
890         ScanKeyInit(&entry,
891                                 Anum_pg_index_indisclustered,
892                                 BTEqualStrategyNumber, F_BOOLEQ,
893                                 BoolGetDatum(true));
894         scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
895         while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
896         {
897                 index = (Form_pg_index) GETSTRUCT(indexTuple);
898
899                 if (!pg_class_ownercheck(index->indrelid, GetUserId()))
900                         continue;
901
902                 /*
903                  * We have to build the list in a different memory context so it will
904                  * survive the cross-transaction processing
905                  */
906                 old_context = MemoryContextSwitchTo(cluster_context);
907
908                 rvtc = (RelToCluster *) palloc(sizeof(RelToCluster));
909                 rvtc->tableOid = index->indrelid;
910                 rvtc->indexOid = index->indexrelid;
911                 rvs = lcons(rvtc, rvs);
912
913                 MemoryContextSwitchTo(old_context);
914         }
915         heap_endscan(scan);
916
917         relation_close(indRelation, AccessShareLock);
918
919         return rvs;
920 }