*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.93 2002/11/11 22:19:21 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.94 2002/11/15 03:09:35 momjian Exp $
*
*-------------------------------------------------------------------------
*/
#include "catalog/index.h"
#include "catalog/indexing.h"
#include "catalog/catname.h"
+#include "catalog/namespace.h"
#include "commands/cluster.h"
#include "commands/tablecmds.h"
#include "miscadmin.h"
+#include "utils/acl.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
bool isclustered;
} IndexAttrs;
+/* This struct is used to pass around the information on tables to be
+ * clustered. We need this so we can make a list of them when invoked without
+ * a specific table/index pair.
+ */
+typedef struct
+{
+ Oid tableOid;
+ Oid indexOid;
+ bool isPrevious;
+} relToCluster;
+
static Oid make_new_heap(Oid OIDOldHeap, const char *NewName);
static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
static List *get_indexattr_list(Relation OldHeap, Oid OldIndex);
static void recreate_indexattr(Oid OIDOldHeap, List *indexes);
static void swap_relfilenodes(Oid r1, Oid r2);
+static void cluster_rel(relToCluster *rv);
+static bool check_cluster_ownership(Oid relOid);
+static List *get_tables_to_cluster(Oid owner);
+static MemoryContext cluster_context = NULL;
/*
* cluster
* the new table, it's better to create the indexes afterwards than to fill
* them incrementally while we load the table.
*
- * Permissions checks were done already.
+ * Since we may open a new transaction for each relation, we have to
+ * check that the relation still is what we think it is.
*/
void
-cluster(RangeVar *oldrelation, char *oldindexname)
+cluster_rel(relToCluster *rvtc)
{
- Oid OIDOldHeap,
- OIDOldIndex,
- OIDNewHeap;
+ Oid OIDNewHeap;
Relation OldHeap,
OldIndex;
char NewHeapName[NAMEDATALEN];
ObjectAddress object;
List *indexes;
+ /* Check for user-requested abort. */
+ CHECK_FOR_INTERRUPTS();
+
+ /* Check if the relation and index still exist before opening them
+ */
+ if (!SearchSysCacheExists(RELOID,
+ ObjectIdGetDatum(rvtc->tableOid),
+ 0, 0, 0) ||
+ !SearchSysCacheExists(RELOID,
+ ObjectIdGetDatum(rvtc->indexOid),
+ 0, 0, 0))
+ return;
+
+ /* Check that the user still owns the relation */
+ if (!check_cluster_ownership(rvtc->tableOid))
+ return;
+
+ /* Check that the index is still the one with indisclustered set.
+ * If this is a standalone cluster, skip this test.
+ */
+ if (rvtc->isPrevious)
+ {
+ HeapTuple tuple;
+ Form_pg_index indexForm;
+
+ tuple = SearchSysCache(INDEXRELID,
+ ObjectIdGetDatum(rvtc->indexOid),
+ 0, 0, 0);
+ indexForm = (Form_pg_index) GETSTRUCT(tuple);
+ if (!indexForm->indisclustered)
+ {
+ ReleaseSysCache(tuple);
+ return;
+ }
+ ReleaseSysCache(tuple);
+ }
+
/*
* We grab exclusive access to the target rel and index for the
* duration of the transaction.
*/
- OldHeap = heap_openrv(oldrelation, AccessExclusiveLock);
- OIDOldHeap = RelationGetRelid(OldHeap);
+ OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
- /*
- * The index is expected to be in the same namespace as the relation.
- */
- OIDOldIndex = get_relname_relid(oldindexname,
- RelationGetNamespace(OldHeap));
- if (!OidIsValid(OIDOldIndex))
- elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
- oldindexname, RelationGetRelationName(OldHeap));
- OldIndex = index_open(OIDOldIndex);
+ OldIndex = index_open(rvtc->indexOid);
LockRelation(OldIndex, AccessExclusiveLock);
/*
* Check that index is in fact an index on the given relation
*/
if (OldIndex->rd_index == NULL ||
- OldIndex->rd_index->indrelid != OIDOldHeap)
+ OldIndex->rd_index->indrelid != rvtc->tableOid)
elog(ERROR, "CLUSTER: \"%s\" is not an index for table \"%s\"",
RelationGetRelationName(OldIndex),
RelationGetRelationName(OldHeap));
RelationGetRelationName(OldHeap));
/* Save the information of all indexes on the relation. */
- indexes = get_indexattr_list(OldHeap, OIDOldIndex);
+ indexes = get_indexattr_list(OldHeap, rvtc->indexOid);
/* Drop relcache refcnts, but do NOT give up the locks */
index_close(OldIndex);
* namespace from the old, or we will have problems with the TEMP
* status of temp tables.
*/
- snprintf(NewHeapName, NAMEDATALEN, "pg_temp_%u", OIDOldHeap);
+ snprintf(NewHeapName, NAMEDATALEN, "pg_temp_%u", rvtc->tableOid);
- OIDNewHeap = make_new_heap(OIDOldHeap, NewHeapName);
+ OIDNewHeap = make_new_heap(rvtc->tableOid, NewHeapName);
/*
* We don't need CommandCounterIncrement() because make_new_heap did
/*
* Copy the heap data into the new table in the desired order.
*/
- copy_heap_data(OIDNewHeap, OIDOldHeap, OIDOldIndex);
+ copy_heap_data(OIDNewHeap, rvtc->tableOid, rvtc->indexOid);
/* To make the new heap's data visible (probably not needed?). */
CommandCounterIncrement();
/* Swap the relfilenodes of the old and new heaps. */
- swap_relfilenodes(OIDOldHeap, OIDNewHeap);
+ swap_relfilenodes(rvtc->tableOid, OIDNewHeap);
CommandCounterIncrement();
* Recreate each index on the relation. We do not need
* CommandCounterIncrement() because recreate_indexattr does it.
*/
- recreate_indexattr(OIDOldHeap, indexes);
+ recreate_indexattr(rvtc->tableOid, indexes);
}
/*
heap_close(relRelation, RowExclusiveLock);
}
+
+/*---------------------------------------------------------------------------
+ * This cluster code allows for clustering multiple tables at once. Because
+ * of this, we cannot just run everything on a single transaction, or we
+ * would be forced to acquire exclusive locks on all the tables being
+ * clustered. To solve this we follow a similar strategy to VACUUM code,
+ * clustering each relation in a separate transaction. For this to work,
+ * we need to:
+ * - provide a separate memory context so that we can pass information in
+ * a way that trascends transactions
+ * - start a new transaction every time a new relation is clustered
+ * - check for validity of the information on to-be-clustered relations,
+ * as someone might have deleted a relation behind our back, or
+ * clustered one on a different index
+ * - end the transaction
+ *
+ * The single relation code does not have any overhead.
+ *
+ * We also allow a relation being specified without index. In that case,
+ * the indisclustered bit will be looked up, and an ERROR will be thrown
+ * if there is no index with the bit set.
+ *---------------------------------------------------------------------------
+ */
+void
+cluster(ClusterStmt *stmt)
+{
+
+ /* This is the single relation case. */
+ if (stmt->relation != NULL)
+ {
+ Oid indexOid = InvalidOid,
+ tableOid;
+ relToCluster rvtc;
+ HeapTuple tuple;
+ Form_pg_class classForm;
+
+ tableOid = RangeVarGetRelid(stmt->relation, false);
+ if (!check_cluster_ownership(tableOid))
+ elog(ERROR, "CLUSTER: You do not own relation %s",
+ stmt->relation->relname);
+
+ tuple = SearchSysCache(RELOID,
+ ObjectIdGetDatum(tableOid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(tuple))
+ elog(ERROR, "Cache lookup failed for relation %u", tableOid);
+ classForm = (Form_pg_class) GETSTRUCT(tuple);
+
+ if (stmt->indexname == NULL)
+ {
+ List *index;
+ Relation rel = RelationIdGetRelation(tableOid);
+ HeapTuple ituple = NULL,
+ idxtuple = NULL;
+
+ /* We need to fetch the index that has indisclustered set. */
+ foreach (index, RelationGetIndexList(rel))
+ {
+ Form_pg_index indexForm;
+
+ indexOid = lfirsti(index);
+ ituple = SearchSysCache(RELOID,
+ ObjectIdGetDatum(indexOid),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(ituple))
+ elog(ERROR, "Cache lookup failed for relation %u", indexOid);
+ idxtuple = SearchSysCache(INDEXRELID,
+ ObjectIdGetDatum(HeapTupleGetOid(ituple)),
+ 0, 0, 0);
+ if (!HeapTupleIsValid(idxtuple))
+ elog(ERROR, "Cache lookup failed for index %u", HeapTupleGetOid(ituple));
+ indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
+ if (indexForm->indisclustered)
+ break;
+ indexOid = InvalidOid;
+ }
+ if (indexOid == InvalidOid)
+ elog(ERROR, "CLUSTER: No previously clustered index found on table %s",
+ stmt->relation->relname);
+ RelationClose(rel);
+ ReleaseSysCache(ituple);
+ ReleaseSysCache(idxtuple);
+ }
+ else
+ {
+ /* The index is expected to be in the same namespace as the relation. */
+ indexOid = get_relname_relid(stmt->indexname, classForm->relnamespace);
+ }
+ ReleaseSysCache(tuple);
+
+ /* XXX Maybe the namespace should be reported as well */
+ if (!OidIsValid(indexOid))
+ elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
+ stmt->indexname, stmt->relation->relname);
+ rvtc.tableOid = tableOid;
+ rvtc.indexOid = indexOid;
+ rvtc.isPrevious = false;
+
+ /* Do the job */
+ cluster_rel(&rvtc);
+ }
+ else
+ {
+ /*
+ * This is the "no relation" case. We need to cluster all tables
+ * that have some index with indisclustered set.
+ */
+
+ relToCluster *rvtc;
+ List *rv,
+ *rvs;
+
+ /*
+ * We cannot run CLUSTER ALL inside a user transaction block; if we were inside
+ * a transaction, then our commit- and start-transaction-command calls
+ * would not have the intended effect!
+ */
+ if (IsTransactionBlock())
+ elog(ERROR, "CLUSTER cannot run inside a BEGIN/END block");
+
+ /* Running CLUSTER from a function would free the function context */
+ if (!MemoryContextContains(QueryContext, stmt))
+ elog(ERROR, "CLUSTER cannot be called from a function");
+ /*
+ * Create special memory context for cross-transaction storage.
+ *
+ * Since it is a child of QueryContext, it will go away even in case
+ * of error.
+ */
+ cluster_context = AllocSetContextCreate(QueryContext,
+ "Cluster",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ /*
+ * Build the list of relations to cluster. Note that this lives in
+ * cluster_context.
+ */
+ rvs = get_tables_to_cluster(GetUserId());
+
+ /* Ok, now that we've got them all, cluster them one by one */
+ foreach (rv, rvs)
+ {
+ rvtc = (relToCluster *)lfirst(rv);
+
+ /* Start a new transaction for this relation. */
+ StartTransactionCommand(true);
+ cluster_rel(rvtc);
+ CommitTransactionCommand(true);
+ }
+ }
+
+ /* Start a new transaction for the cleanup work. */
+ StartTransactionCommand(true);
+
+ /* Clean up working storage */
+ if (stmt->relation == NULL)
+ {
+ MemoryContextDelete(cluster_context);
+ cluster_context = NULL;
+ }
+}
+
+/* Checks if the user owns the relation. Superusers
+ * are allowed to cluster any table.
+ */
+bool
+check_cluster_ownership(Oid relOid)
+{
+ /* Superusers bypass this check */
+ return pg_class_ownercheck(relOid, GetUserId());
+}
+
+/* Get a list of tables that the current user owns and
+ * have indisclustered set. Return the list in a List * of rvsToCluster
+ * with the tableOid and the indexOid on which the table is already
+ * clustered.
+ */
+List *
+get_tables_to_cluster(Oid owner)
+{
+ Relation indRelation;
+ HeapScanDesc scan;
+ ScanKeyData entry;
+ HeapTuple indexTuple;
+ Form_pg_index index;
+ relToCluster *rvtc;
+ List *rvs = NIL;
+
+ /*
+ * Get all indexes that have indisclustered set. System
+ * relations or nailed-in relations cannot ever have
+ * indisclustered set, because CLUSTER will refuse to
+ * set it when called with one of them as argument.
+ */
+ indRelation = relation_openr(IndexRelationName, RowExclusiveLock);
+ ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indisclustered,
+ F_BOOLEQ, true);
+ scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
+ while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
+ {
+ MemoryContext old_context = NULL;
+
+ index = (Form_pg_index) GETSTRUCT(indexTuple);
+ if (!check_cluster_ownership(index->indrelid))
+ continue;
+
+ /*
+ * We have to build the struct in a different memory context so
+ * it will survive the cross-transaction processing
+ */
+
+ old_context = MemoryContextSwitchTo(cluster_context);
+
+ rvtc = (relToCluster *)palloc(sizeof(relToCluster));
+ rvtc->indexOid = index->indexrelid;
+ rvtc->tableOid = index->indrelid;
+ rvtc->isPrevious = true;
+ rvs = lcons((void *)rvtc, rvs);
+
+ MemoryContextSwitchTo(old_context);
+ }
+ heap_endscan(scan);
+
+ /*
+ * Release the lock on pg_index. We will check the indexes
+ * later again.
+ *
+ */
+ relation_close(indRelation, RowExclusiveLock);
+ return rvs;
+}