]> granicus.if.org Git - postgresql/blob - src/backend/commands/cluster.c
pgindent run for 8.3.
[postgresql] / src / backend / commands / cluster.c
1 /*-------------------------------------------------------------------------
2  *
3  * cluster.c
4  *        CLUSTER a table on an index.
5  *
6  * There is hardly anything left of Paul Brown's original implementation...
7  *
8  *
9  * Portions Copyright (c) 1996-2007, PostgreSQL Global Development Group
10  * Portions Copyright (c) 1994-5, Regents of the University of California
11  *
12  *
13  * IDENTIFICATION
14  *        $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.165 2007/11/15 21:14:33 momjian Exp $
15  *
16  *-------------------------------------------------------------------------
17  */
18 #include "postgres.h"
19
20 #include "access/genam.h"
21 #include "access/heapam.h"
22 #include "access/rewriteheap.h"
23 #include "access/transam.h"
24 #include "access/xact.h"
25 #include "catalog/catalog.h"
26 #include "catalog/dependency.h"
27 #include "catalog/heap.h"
28 #include "catalog/index.h"
29 #include "catalog/indexing.h"
30 #include "catalog/namespace.h"
31 #include "catalog/toasting.h"
32 #include "commands/cluster.h"
33 #include "commands/vacuum.h"
34 #include "miscadmin.h"
35 #include "storage/procarray.h"
36 #include "utils/acl.h"
37 #include "utils/fmgroids.h"
38 #include "utils/inval.h"
39 #include "utils/lsyscache.h"
40 #include "utils/memutils.h"
41 #include "utils/relcache.h"
42 #include "utils/syscache.h"
43
44
45 /*
46  * This struct is used to pass around the information on tables to be
47  * clustered. We need this so we can make a list of them when invoked without
48  * a specific table/index pair.
49  */
50 typedef struct
51 {
52         Oid                     tableOid;
53         Oid                     indexOid;
54 } RelToCluster;
55
56
57 static void cluster_rel(RelToCluster *rv, bool recheck);
58 static void rebuild_relation(Relation OldHeap, Oid indexOid);
59 static TransactionId copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
60 static List *get_tables_to_cluster(MemoryContext cluster_context);
61
62
63
64 /*---------------------------------------------------------------------------
65  * This cluster code allows for clustering multiple tables at once. Because
66  * of this, we cannot just run everything on a single transaction, or we
67  * would be forced to acquire exclusive locks on all the tables being
68  * clustered, simultaneously --- very likely leading to deadlock.
69  *
70  * To solve this we follow a similar strategy to VACUUM code,
71  * clustering each relation in a separate transaction. For this to work,
72  * we need to:
73  *      - provide a separate memory context so that we can pass information in
74  *        a way that survives across transactions
75  *      - start a new transaction every time a new relation is clustered
76  *      - check for validity of the information on to-be-clustered relations,
77  *        as someone might have deleted a relation behind our back, or
78  *        clustered one on a different index
79  *      - end the transaction
80  *
81  * The single-relation case does not have any such overhead.
82  *
83  * We also allow a relation to be specified without index.      In that case,
84  * the indisclustered bit will be looked up, and an ERROR will be thrown
85  * if there is no index with the bit set.
86  *---------------------------------------------------------------------------
87  */
88 void
89 cluster(ClusterStmt *stmt, bool isTopLevel)
90 {
91         if (stmt->relation != NULL)
92         {
93                 /* This is the single-relation case. */
94                 Oid                     tableOid,
95                                         indexOid = InvalidOid;
96                 Relation        rel;
97                 RelToCluster rvtc;
98
99                 /* Find and lock the table */
100                 rel = heap_openrv(stmt->relation, AccessExclusiveLock);
101
102                 tableOid = RelationGetRelid(rel);
103
104                 /* Check permissions */
105                 if (!pg_class_ownercheck(tableOid, GetUserId()))
106                         aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
107                                                    RelationGetRelationName(rel));
108
109                 /*
110                  * Reject clustering a remote temp table ... their local buffer
111                  * manager is not going to cope.
112                  */
113                 if (isOtherTempNamespace(RelationGetNamespace(rel)))
114                         ereport(ERROR,
115                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
116                            errmsg("cannot cluster temporary tables of other sessions")));
117
118                 if (stmt->indexname == NULL)
119                 {
120                         ListCell   *index;
121
122                         /* We need to find the index that has indisclustered set. */
123                         foreach(index, RelationGetIndexList(rel))
124                         {
125                                 HeapTuple       idxtuple;
126                                 Form_pg_index indexForm;
127
128                                 indexOid = lfirst_oid(index);
129                                 idxtuple = SearchSysCache(INDEXRELID,
130                                                                                   ObjectIdGetDatum(indexOid),
131                                                                                   0, 0, 0);
132                                 if (!HeapTupleIsValid(idxtuple))
133                                         elog(ERROR, "cache lookup failed for index %u", indexOid);
134                                 indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
135                                 if (indexForm->indisclustered)
136                                 {
137                                         ReleaseSysCache(idxtuple);
138                                         break;
139                                 }
140                                 ReleaseSysCache(idxtuple);
141                                 indexOid = InvalidOid;
142                         }
143
144                         if (!OidIsValid(indexOid))
145                                 ereport(ERROR,
146                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
147                                                  errmsg("there is no previously clustered index for table \"%s\"",
148                                                                 stmt->relation->relname)));
149                 }
150                 else
151                 {
152                         /*
153                          * The index is expected to be in the same namespace as the
154                          * relation.
155                          */
156                         indexOid = get_relname_relid(stmt->indexname,
157                                                                                  rel->rd_rel->relnamespace);
158                         if (!OidIsValid(indexOid))
159                                 ereport(ERROR,
160                                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
161                                            errmsg("index \"%s\" for table \"%s\" does not exist",
162                                                           stmt->indexname, stmt->relation->relname)));
163                 }
164
165                 /* All other checks are done in cluster_rel() */
166                 rvtc.tableOid = tableOid;
167                 rvtc.indexOid = indexOid;
168
169                 /* close relation, keep lock till commit */
170                 heap_close(rel, NoLock);
171
172                 /* Do the job */
173                 cluster_rel(&rvtc, false);
174         }
175         else
176         {
177                 /*
178                  * This is the "multi relation" case. We need to cluster all tables
179                  * that have some index with indisclustered set.
180                  */
181                 MemoryContext cluster_context;
182                 List       *rvs;
183                 ListCell   *rv;
184
185                 /*
186                  * We cannot run this form of CLUSTER inside a user transaction block;
187                  * we'd be holding locks way too long.
188                  */
189                 PreventTransactionChain(isTopLevel, "CLUSTER");
190
191                 /*
192                  * Create special memory context for cross-transaction storage.
193                  *
194                  * Since it is a child of PortalContext, it will go away even in case
195                  * of error.
196                  */
197                 cluster_context = AllocSetContextCreate(PortalContext,
198                                                                                                 "Cluster",
199                                                                                                 ALLOCSET_DEFAULT_MINSIZE,
200                                                                                                 ALLOCSET_DEFAULT_INITSIZE,
201                                                                                                 ALLOCSET_DEFAULT_MAXSIZE);
202
203                 /*
204                  * Build the list of relations to cluster.      Note that this lives in
205                  * cluster_context.
206                  */
207                 rvs = get_tables_to_cluster(cluster_context);
208
209                 /* Commit to get out of starting transaction */
210                 CommitTransactionCommand();
211
212                 /* Ok, now that we've got them all, cluster them one by one */
213                 foreach(rv, rvs)
214                 {
215                         RelToCluster *rvtc = (RelToCluster *) lfirst(rv);
216
217                         /* Start a new transaction for each relation. */
218                         StartTransactionCommand();
219                         /* functions in indexes may want a snapshot set */
220                         ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
221                         cluster_rel(rvtc, true);
222                         CommitTransactionCommand();
223                 }
224
225                 /* Start a new transaction for the cleanup work. */
226                 StartTransactionCommand();
227
228                 /* Clean up working storage */
229                 MemoryContextDelete(cluster_context);
230         }
231 }
232
233 /*
234  * cluster_rel
235  *
236  * This clusters the table by creating a new, clustered table and
237  * swapping the relfilenodes of the new table and the old table, so
238  * the OID of the original table is preserved.  Thus we do not lose
239  * GRANT, inheritance nor references to this table (this was a bug
240  * in releases thru 7.3).
241  *
242  * Also create new indexes and swap the filenodes with the old indexes the
243  * same way we do for the relation.  Since we are effectively bulk-loading
244  * the new table, it's better to create the indexes afterwards than to fill
245  * them incrementally while we load the table.
246  */
247 static void
248 cluster_rel(RelToCluster *rvtc, bool recheck)
249 {
250         Relation        OldHeap;
251
252         /* Check for user-requested abort. */
253         CHECK_FOR_INTERRUPTS();
254
255         /*
256          * We grab exclusive access to the target rel and index for the duration
257          * of the transaction.  (This is redundant for the single-transaction
258          * case, since cluster() already did it.)  The index lock is taken inside
259          * check_index_is_clusterable.
260          */
261         OldHeap = try_relation_open(rvtc->tableOid, AccessExclusiveLock);
262
263         /* If the table has gone away, we can skip processing it */
264         if (!OldHeap)
265                 return;
266
267         /*
268          * Since we may open a new transaction for each relation, we have to check
269          * that the relation still is what we think it is.
270          *
271          * If this is a single-transaction CLUSTER, we can skip these tests. We
272          * *must* skip the one on indisclustered since it would reject an attempt
273          * to cluster a not-previously-clustered index.
274          */
275         if (recheck)
276         {
277                 HeapTuple       tuple;
278                 Form_pg_index indexForm;
279
280                 /* Check that the user still owns the relation */
281                 if (!pg_class_ownercheck(rvtc->tableOid, GetUserId()))
282                 {
283                         relation_close(OldHeap, AccessExclusiveLock);
284                         return;
285                 }
286
287                 /*
288                  * Silently skip a temp table for a remote session.  Only doing this
289                  * check in the "recheck" case is appropriate (which currently means
290                  * somebody is executing a database-wide CLUSTER), because there is
291                  * another check in cluster() which will stop any attempt to cluster
292                  * remote temp tables by name.  There is another check in
293                  * check_index_is_clusterable which is redundant, but we leave it for
294                  * extra safety.
295                  */
296                 if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
297                 {
298                         relation_close(OldHeap, AccessExclusiveLock);
299                         return;
300                 }
301
302                 /*
303                  * Check that the index still exists
304                  */
305                 if (!SearchSysCacheExists(RELOID,
306                                                                   ObjectIdGetDatum(rvtc->indexOid),
307                                                                   0, 0, 0))
308                 {
309                         relation_close(OldHeap, AccessExclusiveLock);
310                         return;
311                 }
312
313                 /*
314                  * Check that the index is still the one with indisclustered set.
315                  */
316                 tuple = SearchSysCache(INDEXRELID,
317                                                            ObjectIdGetDatum(rvtc->indexOid),
318                                                            0, 0, 0);
319                 if (!HeapTupleIsValid(tuple))   /* probably can't happen */
320                 {
321                         relation_close(OldHeap, AccessExclusiveLock);
322                         return;
323                 }
324                 indexForm = (Form_pg_index) GETSTRUCT(tuple);
325                 if (!indexForm->indisclustered)
326                 {
327                         ReleaseSysCache(tuple);
328                         relation_close(OldHeap, AccessExclusiveLock);
329                         return;
330                 }
331                 ReleaseSysCache(tuple);
332         }
333
334         /* Check index is valid to cluster on */
335         check_index_is_clusterable(OldHeap, rvtc->indexOid, recheck);
336
337         /* rebuild_relation does all the dirty work */
338         rebuild_relation(OldHeap, rvtc->indexOid);
339
340         /* NB: rebuild_relation does heap_close() on OldHeap */
341 }
342
343 /*
344  * Verify that the specified index is a legitimate index to cluster on
345  *
346  * Side effect: obtains exclusive lock on the index.  The caller should
347  * already have exclusive lock on the table, so the index lock is likely
348  * redundant, but it seems best to grab it anyway to ensure the index
349  * definition can't change under us.
350  */
351 void
352 check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck)
353 {
354         Relation        OldIndex;
355
356         OldIndex = index_open(indexOid, AccessExclusiveLock);
357
358         /*
359          * Check that index is in fact an index on the given relation
360          */
361         if (OldIndex->rd_index == NULL ||
362                 OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
363                 ereport(ERROR,
364                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
365                                  errmsg("\"%s\" is not an index for table \"%s\"",
366                                                 RelationGetRelationName(OldIndex),
367                                                 RelationGetRelationName(OldHeap))));
368
369         /*
370          * Disallow clustering on incomplete indexes (those that might not index
371          * every row of the relation).  We could relax this by making a separate
372          * seqscan pass over the table to copy the missing rows, but that seems
373          * expensive and tedious.
374          */
375         if (!heap_attisnull(OldIndex->rd_indextuple, Anum_pg_index_indpred))
376                 ereport(ERROR,
377                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
378                                  errmsg("cannot cluster on partial index \"%s\"",
379                                                 RelationGetRelationName(OldIndex))));
380
381         if (!OldIndex->rd_am->amclusterable)
382                 ereport(ERROR,
383                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
384                                  errmsg("cannot cluster on index \"%s\" because access method does not support clustering",
385                                                 RelationGetRelationName(OldIndex))));
386
387         if (!OldIndex->rd_am->amindexnulls)
388         {
389                 AttrNumber      colno;
390
391                 /*
392                  * If the AM doesn't index nulls, then it's a partial index unless we
393                  * can prove all the rows are non-null.  Note we only need look at the
394                  * first column; multicolumn-capable AMs are *required* to index nulls
395                  * in columns after the first.
396                  */
397                 colno = OldIndex->rd_index->indkey.values[0];
398                 if (colno > 0)
399                 {
400                         /* ordinary user attribute */
401                         if (!OldHeap->rd_att->attrs[colno - 1]->attnotnull)
402                                 ereport(ERROR,
403                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
404                                                  errmsg("cannot cluster on index \"%s\" because access method does not handle null values",
405                                                                 RelationGetRelationName(OldIndex)),
406                                                  recheck
407                                                  ? errhint("You might be able to work around this by marking column \"%s\" NOT NULL, or use ALTER TABLE ... SET WITHOUT CLUSTER to remove the cluster specification from the table.",
408                                                  NameStr(OldHeap->rd_att->attrs[colno - 1]->attname))
409                                                  : errhint("You might be able to work around this by marking column \"%s\" NOT NULL.",
410                                           NameStr(OldHeap->rd_att->attrs[colno - 1]->attname))));
411                 }
412                 else if (colno < 0)
413                 {
414                         /* system column --- okay, always non-null */
415                 }
416                 else
417                         /* index expression, lose... */
418                         ereport(ERROR,
419                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
420                                          errmsg("cannot cluster on expressional index \"%s\" because its index access method does not handle null values",
421                                                         RelationGetRelationName(OldIndex))));
422         }
423
424         /*
425          * Disallow if index is left over from a failed CREATE INDEX CONCURRENTLY;
426          * it might well not contain entries for every heap row, or might not even
427          * be internally consistent.  (But note that we don't check indcheckxmin;
428          * the worst consequence of following broken HOT chains would be that we
429          * might put recently-dead tuples out-of-order in the new table, and there
430          * is little harm in that.)
431          */
432         if (!OldIndex->rd_index->indisvalid)
433                 ereport(ERROR,
434                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
435                                  errmsg("cannot cluster on invalid index \"%s\"",
436                                                 RelationGetRelationName(OldIndex))));
437
438         /*
439          * Disallow clustering system relations.  This will definitely NOT work
440          * for shared relations (we have no way to update pg_class rows in other
441          * databases), nor for nailed-in-cache relations (the relfilenode values
442          * for those are hardwired, see relcache.c).  It might work for other
443          * system relations, but I ain't gonna risk it.
444          */
445         if (IsSystemRelation(OldHeap))
446                 ereport(ERROR,
447                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
448                                  errmsg("\"%s\" is a system catalog",
449                                                 RelationGetRelationName(OldHeap))));
450
451         /*
452          * Don't allow cluster on temp tables of other backends ... their local
453          * buffer manager is not going to cope.
454          */
455         if (isOtherTempNamespace(RelationGetNamespace(OldHeap)))
456                 ereport(ERROR,
457                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
458                            errmsg("cannot cluster temporary tables of other sessions")));
459
460         /* Drop relcache refcnt on OldIndex, but keep lock */
461         index_close(OldIndex, NoLock);
462 }
463
464 /*
465  * mark_index_clustered: mark the specified index as the one clustered on
466  *
467  * With indexOid == InvalidOid, will mark all indexes of rel not-clustered.
468  */
469 void
470 mark_index_clustered(Relation rel, Oid indexOid)
471 {
472         HeapTuple       indexTuple;
473         Form_pg_index indexForm;
474         Relation        pg_index;
475         ListCell   *index;
476
477         /*
478          * If the index is already marked clustered, no need to do anything.
479          */
480         if (OidIsValid(indexOid))
481         {
482                 indexTuple = SearchSysCache(INDEXRELID,
483                                                                         ObjectIdGetDatum(indexOid),
484                                                                         0, 0, 0);
485                 if (!HeapTupleIsValid(indexTuple))
486                         elog(ERROR, "cache lookup failed for index %u", indexOid);
487                 indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
488
489                 if (indexForm->indisclustered)
490                 {
491                         ReleaseSysCache(indexTuple);
492                         return;
493                 }
494
495                 ReleaseSysCache(indexTuple);
496         }
497
498         /*
499          * Check each index of the relation and set/clear the bit as needed.
500          */
501         pg_index = heap_open(IndexRelationId, RowExclusiveLock);
502
503         foreach(index, RelationGetIndexList(rel))
504         {
505                 Oid                     thisIndexOid = lfirst_oid(index);
506
507                 indexTuple = SearchSysCacheCopy(INDEXRELID,
508                                                                                 ObjectIdGetDatum(thisIndexOid),
509                                                                                 0, 0, 0);
510                 if (!HeapTupleIsValid(indexTuple))
511                         elog(ERROR, "cache lookup failed for index %u", thisIndexOid);
512                 indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
513
514                 /*
515                  * Unset the bit if set.  We know it's wrong because we checked this
516                  * earlier.
517                  */
518                 if (indexForm->indisclustered)
519                 {
520                         indexForm->indisclustered = false;
521                         simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
522                         CatalogUpdateIndexes(pg_index, indexTuple);
523                         /* Ensure we see the update in the index's relcache entry */
524                         CacheInvalidateRelcacheByRelid(thisIndexOid);
525                 }
526                 else if (thisIndexOid == indexOid)
527                 {
528                         indexForm->indisclustered = true;
529                         simple_heap_update(pg_index, &indexTuple->t_self, indexTuple);
530                         CatalogUpdateIndexes(pg_index, indexTuple);
531                         /* Ensure we see the update in the index's relcache entry */
532                         CacheInvalidateRelcacheByRelid(thisIndexOid);
533                 }
534                 heap_freetuple(indexTuple);
535         }
536
537         heap_close(pg_index, RowExclusiveLock);
538 }
539
540 /*
541  * rebuild_relation: rebuild an existing relation in index order
542  *
543  * OldHeap: table to rebuild --- must be opened and exclusive-locked!
544  * indexOid: index to cluster by
545  *
546  * NB: this routine closes OldHeap at the right time; caller should not.
547  */
548 static void
549 rebuild_relation(Relation OldHeap, Oid indexOid)
550 {
551         Oid                     tableOid = RelationGetRelid(OldHeap);
552         Oid                     tableSpace = OldHeap->rd_rel->reltablespace;
553         Oid                     OIDNewHeap;
554         char            NewHeapName[NAMEDATALEN];
555         TransactionId frozenXid;
556         ObjectAddress object;
557
558         /* Mark the correct index as clustered */
559         mark_index_clustered(OldHeap, indexOid);
560
561         /* Close relcache entry, but keep lock until transaction commit */
562         heap_close(OldHeap, NoLock);
563
564         /*
565          * Create the new heap, using a temporary name in the same namespace as
566          * the existing table.  NOTE: there is some risk of collision with user
567          * relnames.  Working around this seems more trouble than it's worth; in
568          * particular, we can't create the new heap in a different namespace from
569          * the old, or we will have problems with the TEMP status of temp tables.
570          */
571         snprintf(NewHeapName, sizeof(NewHeapName), "pg_temp_%u", tableOid);
572
573         OIDNewHeap = make_new_heap(tableOid, NewHeapName, tableSpace);
574
575         /*
576          * We don't need CommandCounterIncrement() because make_new_heap did it.
577          */
578
579         /*
580          * Copy the heap data into the new table in the desired order.
581          */
582         frozenXid = copy_heap_data(OIDNewHeap, tableOid, indexOid);
583
584         /* To make the new heap's data visible (probably not needed?). */
585         CommandCounterIncrement();
586
587         /* Swap the physical files of the old and new heaps. */
588         swap_relation_files(tableOid, OIDNewHeap, frozenXid);
589
590         CommandCounterIncrement();
591
592         /* Destroy new heap with old filenode */
593         object.classId = RelationRelationId;
594         object.objectId = OIDNewHeap;
595         object.objectSubId = 0;
596
597         /*
598          * The new relation is local to our transaction and we know nothing
599          * depends on it, so DROP_RESTRICT should be OK.
600          */
601         performDeletion(&object, DROP_RESTRICT);
602
603         /* performDeletion does CommandCounterIncrement at end */
604
605         /*
606          * Rebuild each index on the relation (but not the toast table, which is
607          * all-new at this point).      We do not need CommandCounterIncrement()
608          * because reindex_relation does it.
609          */
610         reindex_relation(tableOid, false);
611 }
612
613 /*
614  * Create the new table that we will fill with correctly-ordered data.
615  */
616 Oid
617 make_new_heap(Oid OIDOldHeap, const char *NewName, Oid NewTableSpace)
618 {
619         TupleDesc       OldHeapDesc,
620                                 tupdesc;
621         Oid                     OIDNewHeap;
622         Relation        OldHeap;
623         HeapTuple       tuple;
624         Datum           reloptions;
625         bool            isNull;
626
627         OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
628         OldHeapDesc = RelationGetDescr(OldHeap);
629
630         /*
631          * Need to make a copy of the tuple descriptor, since
632          * heap_create_with_catalog modifies it.
633          */
634         tupdesc = CreateTupleDescCopyConstr(OldHeapDesc);
635
636         /*
637          * Use options of the old heap for new heap.
638          */
639         tuple = SearchSysCache(RELOID,
640                                                    ObjectIdGetDatum(OIDOldHeap),
641                                                    0, 0, 0);
642         if (!HeapTupleIsValid(tuple))
643                 elog(ERROR, "cache lookup failed for relation %u", OIDOldHeap);
644         reloptions = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_reloptions,
645                                                                  &isNull);
646         if (isNull)
647                 reloptions = (Datum) 0;
648
649         OIDNewHeap = heap_create_with_catalog(NewName,
650                                                                                   RelationGetNamespace(OldHeap),
651                                                                                   NewTableSpace,
652                                                                                   InvalidOid,
653                                                                                   OldHeap->rd_rel->relowner,
654                                                                                   tupdesc,
655                                                                                   OldHeap->rd_rel->relkind,
656                                                                                   OldHeap->rd_rel->relisshared,
657                                                                                   true,
658                                                                                   0,
659                                                                                   ONCOMMIT_NOOP,
660                                                                                   reloptions,
661                                                                                   allowSystemTableMods);
662
663         ReleaseSysCache(tuple);
664
665         /*
666          * Advance command counter so that the newly-created relation's catalog
667          * tuples will be visible to heap_open.
668          */
669         CommandCounterIncrement();
670
671         /*
672          * If necessary, create a TOAST table for the new relation. Note that
673          * AlterTableCreateToastTable ends with CommandCounterIncrement(), so that
674          * the TOAST table will be visible for insertion.
675          */
676         AlterTableCreateToastTable(OIDNewHeap);
677
678         heap_close(OldHeap, NoLock);
679
680         return OIDNewHeap;
681 }
682
683 /*
684  * Do the physical copying of heap data.  Returns the TransactionId used as
685  * freeze cutoff point for the tuples.
686  */
687 static TransactionId
688 copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
689 {
690         Relation        NewHeap,
691                                 OldHeap,
692                                 OldIndex;
693         TupleDesc       oldTupDesc;
694         TupleDesc       newTupDesc;
695         int                     natts;
696         Datum      *values;
697         bool       *isnull;
698         IndexScanDesc scan;
699         HeapTuple       tuple;
700         bool            use_wal;
701         TransactionId OldestXmin;
702         TransactionId FreezeXid;
703         RewriteState rwstate;
704
705         /*
706          * Open the relations we need.
707          */
708         NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock);
709         OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
710         OldIndex = index_open(OIDOldIndex, AccessExclusiveLock);
711
712         /*
713          * Their tuple descriptors should be exactly alike, but here we only need
714          * assume that they have the same number of columns.
715          */
716         oldTupDesc = RelationGetDescr(OldHeap);
717         newTupDesc = RelationGetDescr(NewHeap);
718         Assert(newTupDesc->natts == oldTupDesc->natts);
719
720         /* Preallocate values/isnull arrays */
721         natts = newTupDesc->natts;
722         values = (Datum *) palloc(natts * sizeof(Datum));
723         isnull = (bool *) palloc(natts * sizeof(bool));
724
725         /*
726          * We need to log the copied data in WAL iff WAL archiving is enabled AND
727          * it's not a temp rel.
728          */
729         use_wal = XLogArchivingActive() && !NewHeap->rd_istemp;
730
731         /* use_wal off requires rd_targblock be initially invalid */
732         Assert(NewHeap->rd_targblock == InvalidBlockNumber);
733
734         /*
735          * compute xids used to freeze and weed out dead tuples.  We use -1
736          * freeze_min_age to avoid having CLUSTER freeze tuples earlier than a
737          * plain VACUUM would.
738          */
739         vacuum_set_xid_limits(-1, OldHeap->rd_rel->relisshared,
740                                                   &OldestXmin, &FreezeXid);
741
742         /* Initialize the rewrite operation */
743         rwstate = begin_heap_rewrite(NewHeap, OldestXmin, FreezeXid, use_wal);
744
745         /*
746          * Scan through the OldHeap in OldIndex order and copy each tuple into the
747          * NewHeap.  To ensure we see recently-dead tuples that still need to be
748          * copied, we scan with SnapshotAny and use HeapTupleSatisfiesVacuum for
749          * the visibility test.
750          */
751         scan = index_beginscan(OldHeap, OldIndex,
752                                                    SnapshotAny, 0, (ScanKey) NULL);
753
754         while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
755         {
756                 HeapTuple       copiedTuple;
757                 bool            isdead;
758                 int                     i;
759
760                 CHECK_FOR_INTERRUPTS();
761
762                 LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
763
764                 switch (HeapTupleSatisfiesVacuum(tuple->t_data, OldestXmin,
765                                                                                  scan->xs_cbuf))
766                 {
767                         case HEAPTUPLE_DEAD:
768                                 /* Definitely dead */
769                                 isdead = true;
770                                 break;
771                         case HEAPTUPLE_LIVE:
772                         case HEAPTUPLE_RECENTLY_DEAD:
773                                 /* Live or recently dead, must copy it */
774                                 isdead = false;
775                                 break;
776                         case HEAPTUPLE_INSERT_IN_PROGRESS:
777
778                                 /*
779                                  * We should not see this unless it's been inserted earlier in
780                                  * our own transaction.
781                                  */
782                                 if (!TransactionIdIsCurrentTransactionId(
783                                                                           HeapTupleHeaderGetXmin(tuple->t_data)))
784                                         elog(ERROR, "concurrent insert in progress");
785                                 /* treat as live */
786                                 isdead = false;
787                                 break;
788                         case HEAPTUPLE_DELETE_IN_PROGRESS:
789
790                                 /*
791                                  * We should not see this unless it's been deleted earlier in
792                                  * our own transaction.
793                                  */
794                                 Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI));
795                                 if (!TransactionIdIsCurrentTransactionId(
796                                                                           HeapTupleHeaderGetXmax(tuple->t_data)))
797                                         elog(ERROR, "concurrent delete in progress");
798                                 /* treat as recently dead */
799                                 isdead = false;
800                                 break;
801                         default:
802                                 elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
803                                 isdead = false; /* keep compiler quiet */
804                                 break;
805                 }
806
807                 LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
808
809                 if (isdead)
810                 {
811                         /* heap rewrite module still needs to see it... */
812                         rewrite_heap_dead_tuple(rwstate, tuple);
813                         continue;
814                 }
815
816                 /*
817                  * We cannot simply copy the tuple as-is, for several reasons:
818                  *
819                  * 1. We'd like to squeeze out the values of any dropped columns, both
820                  * to save space and to ensure we have no corner-case failures. (It's
821                  * possible for example that the new table hasn't got a TOAST table
822                  * and so is unable to store any large values of dropped cols.)
823                  *
824                  * 2. The tuple might not even be legal for the new table; this is
825                  * currently only known to happen as an after-effect of ALTER TABLE
826                  * SET WITHOUT OIDS.
827                  *
828                  * So, we must reconstruct the tuple from component Datums.
829                  */
830                 heap_deform_tuple(tuple, oldTupDesc, values, isnull);
831
832                 /* Be sure to null out any dropped columns */
833                 for (i = 0; i < natts; i++)
834                 {
835                         if (newTupDesc->attrs[i]->attisdropped)
836                                 isnull[i] = true;
837                 }
838
839                 copiedTuple = heap_form_tuple(newTupDesc, values, isnull);
840
841                 /* Preserve OID, if any */
842                 if (NewHeap->rd_rel->relhasoids)
843                         HeapTupleSetOid(copiedTuple, HeapTupleGetOid(tuple));
844
845                 /* The heap rewrite module does the rest */
846                 rewrite_heap_tuple(rwstate, tuple, copiedTuple);
847
848                 heap_freetuple(copiedTuple);
849         }
850
851         index_endscan(scan);
852
853         /* Write out any remaining tuples, and fsync if needed */
854         end_heap_rewrite(rwstate);
855
856         pfree(values);
857         pfree(isnull);
858
859         index_close(OldIndex, NoLock);
860         heap_close(OldHeap, NoLock);
861         heap_close(NewHeap, NoLock);
862
863         return FreezeXid;
864 }
865
866 /*
867  * Swap the physical files of two given relations.
868  *
869  * We swap the physical identity (reltablespace and relfilenode) while
870  * keeping the same logical identities of the two relations.
871  *
872  * Also swap any TOAST links, so that the toast data moves along with
873  * the main-table data.
874  *
875  * Additionally, the first relation is marked with relfrozenxid set to
876  * frozenXid.  It seems a bit ugly to have this here, but all callers would
877  * have to do it anyway, so having it here saves a heap_update.  Note: the
878  * TOAST table needs no special handling, because since we swapped the links,
879  * the entry for the TOAST table will now contain RecentXmin in relfrozenxid,
880  * which is the correct value.
881  */
882 void
883 swap_relation_files(Oid r1, Oid r2, TransactionId frozenXid)
884 {
885         Relation        relRelation;
886         HeapTuple       reltup1,
887                                 reltup2;
888         Form_pg_class relform1,
889                                 relform2;
890         Oid                     swaptemp;
891         CatalogIndexState indstate;
892
893         /* We need writable copies of both pg_class tuples. */
894         relRelation = heap_open(RelationRelationId, RowExclusiveLock);
895
896         reltup1 = SearchSysCacheCopy(RELOID,
897                                                                  ObjectIdGetDatum(r1),
898                                                                  0, 0, 0);
899         if (!HeapTupleIsValid(reltup1))
900                 elog(ERROR, "cache lookup failed for relation %u", r1);
901         relform1 = (Form_pg_class) GETSTRUCT(reltup1);
902
903         reltup2 = SearchSysCacheCopy(RELOID,
904                                                                  ObjectIdGetDatum(r2),
905                                                                  0, 0, 0);
906         if (!HeapTupleIsValid(reltup2))
907                 elog(ERROR, "cache lookup failed for relation %u", r2);
908         relform2 = (Form_pg_class) GETSTRUCT(reltup2);
909
910         /*
911          * Actually swap the fields in the two tuples
912          */
913         swaptemp = relform1->relfilenode;
914         relform1->relfilenode = relform2->relfilenode;
915         relform2->relfilenode = swaptemp;
916
917         swaptemp = relform1->reltablespace;
918         relform1->reltablespace = relform2->reltablespace;
919         relform2->reltablespace = swaptemp;
920
921         swaptemp = relform1->reltoastrelid;
922         relform1->reltoastrelid = relform2->reltoastrelid;
923         relform2->reltoastrelid = swaptemp;
924
925         /* we should not swap reltoastidxid */
926
927         /* set rel1's frozen Xid */
928         Assert(TransactionIdIsNormal(frozenXid));
929         relform1->relfrozenxid = frozenXid;
930
931         /* swap size statistics too, since new rel has freshly-updated stats */
932         {
933                 int4            swap_pages;
934                 float4          swap_tuples;
935
936                 swap_pages = relform1->relpages;
937                 relform1->relpages = relform2->relpages;
938                 relform2->relpages = swap_pages;
939
940                 swap_tuples = relform1->reltuples;
941                 relform1->reltuples = relform2->reltuples;
942                 relform2->reltuples = swap_tuples;
943         }
944
945         /* Update the tuples in pg_class */
946         simple_heap_update(relRelation, &reltup1->t_self, reltup1);
947         simple_heap_update(relRelation, &reltup2->t_self, reltup2);
948
949         /* Keep system catalogs current */
950         indstate = CatalogOpenIndexes(relRelation);
951         CatalogIndexInsert(indstate, reltup1);
952         CatalogIndexInsert(indstate, reltup2);
953         CatalogCloseIndexes(indstate);
954
955         /*
956          * If we have toast tables associated with the relations being swapped,
957          * change their dependency links to re-associate them with their new
958          * owning relations.  Otherwise the wrong one will get dropped ...
959          *
960          * NOTE: it is possible that only one table has a toast table; this can
961          * happen in CLUSTER if there were dropped columns in the old table, and
962          * in ALTER TABLE when adding or changing type of columns.
963          *
964          * NOTE: at present, a TOAST table's only dependency is the one on its
965          * owning table.  If more are ever created, we'd need to use something
966          * more selective than deleteDependencyRecordsFor() to get rid of only the
967          * link we want.
968          */
969         if (relform1->reltoastrelid || relform2->reltoastrelid)
970         {
971                 ObjectAddress baseobject,
972                                         toastobject;
973                 long            count;
974
975                 /* Delete old dependencies */
976                 if (relform1->reltoastrelid)
977                 {
978                         count = deleteDependencyRecordsFor(RelationRelationId,
979                                                                                            relform1->reltoastrelid);
980                         if (count != 1)
981                                 elog(ERROR, "expected one dependency record for TOAST table, found %ld",
982                                          count);
983                 }
984                 if (relform2->reltoastrelid)
985                 {
986                         count = deleteDependencyRecordsFor(RelationRelationId,
987                                                                                            relform2->reltoastrelid);
988                         if (count != 1)
989                                 elog(ERROR, "expected one dependency record for TOAST table, found %ld",
990                                          count);
991                 }
992
993                 /* Register new dependencies */
994                 baseobject.classId = RelationRelationId;
995                 baseobject.objectSubId = 0;
996                 toastobject.classId = RelationRelationId;
997                 toastobject.objectSubId = 0;
998
999                 if (relform1->reltoastrelid)
1000                 {
1001                         baseobject.objectId = r1;
1002                         toastobject.objectId = relform1->reltoastrelid;
1003                         recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
1004                 }
1005
1006                 if (relform2->reltoastrelid)
1007                 {
1008                         baseobject.objectId = r2;
1009                         toastobject.objectId = relform2->reltoastrelid;
1010                         recordDependencyOn(&toastobject, &baseobject, DEPENDENCY_INTERNAL);
1011                 }
1012         }
1013
1014         /*
1015          * Blow away the old relcache entries now.      We need this kluge because
1016          * relcache.c keeps a link to the smgr relation for the physical file, and
1017          * that will be out of date as soon as we do CommandCounterIncrement.
1018          * Whichever of the rels is the second to be cleared during cache
1019          * invalidation will have a dangling reference to an already-deleted smgr
1020          * relation.  Rather than trying to avoid this by ordering operations just
1021          * so, it's easiest to not have the relcache entries there at all.
1022          * (Fortunately, since one of the entries is local in our transaction,
1023          * it's sufficient to clear out our own relcache this way; the problem
1024          * cannot arise for other backends when they see our update on the
1025          * non-local relation.)
1026          */
1027         RelationForgetRelation(r1);
1028         RelationForgetRelation(r2);
1029
1030         /* Clean up. */
1031         heap_freetuple(reltup1);
1032         heap_freetuple(reltup2);
1033
1034         heap_close(relRelation, RowExclusiveLock);
1035 }
1036
1037 /*
1038  * Get a list of tables that the current user owns and
1039  * have indisclustered set.  Return the list in a List * of rvsToCluster
1040  * with the tableOid and the indexOid on which the table is already
1041  * clustered.
1042  */
1043 static List *
1044 get_tables_to_cluster(MemoryContext cluster_context)
1045 {
1046         Relation        indRelation;
1047         HeapScanDesc scan;
1048         ScanKeyData entry;
1049         HeapTuple       indexTuple;
1050         Form_pg_index index;
1051         MemoryContext old_context;
1052         RelToCluster *rvtc;
1053         List       *rvs = NIL;
1054
1055         /*
1056          * Get all indexes that have indisclustered set and are owned by
1057          * appropriate user. System relations or nailed-in relations cannot ever
1058          * have indisclustered set, because CLUSTER will refuse to set it when
1059          * called with one of them as argument.
1060          */
1061         indRelation = heap_open(IndexRelationId, AccessShareLock);
1062         ScanKeyInit(&entry,
1063                                 Anum_pg_index_indisclustered,
1064                                 BTEqualStrategyNumber, F_BOOLEQ,
1065                                 BoolGetDatum(true));
1066         scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
1067         while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1068         {
1069                 index = (Form_pg_index) GETSTRUCT(indexTuple);
1070
1071                 if (!pg_class_ownercheck(index->indrelid, GetUserId()))
1072                         continue;
1073
1074                 /*
1075                  * We have to build the list in a different memory context so it will
1076                  * survive the cross-transaction processing
1077                  */
1078                 old_context = MemoryContextSwitchTo(cluster_context);
1079
1080                 rvtc = (RelToCluster *) palloc(sizeof(RelToCluster));
1081                 rvtc->tableOid = index->indrelid;
1082                 rvtc->indexOid = index->indexrelid;
1083                 rvs = lcons(rvtc, rvs);
1084
1085                 MemoryContextSwitchTo(old_context);
1086         }
1087         heap_endscan(scan);
1088
1089         relation_close(indRelation, AccessShareLock);
1090
1091         return rvs;
1092 }