]> granicus.if.org Git - postgresql/commitdiff
Code review for CLUSTER ALL patch. Fix bogus locking, incorrect transaction
authorTom Lane <tgl@sss.pgh.pa.us>
Mon, 30 Dec 2002 18:42:17 +0000 (18:42 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Mon, 30 Dec 2002 18:42:17 +0000 (18:42 +0000)
stop/start nesting, other infelicities.

doc/src/sgml/ref/cluster.sgml
src/backend/commands/cluster.c
src/backend/commands/tablecmds.c
src/backend/tcop/utility.c
src/include/commands/cluster.h
src/test/regress/expected/cluster.out

index e72a8f61d1f1129c403666012ce4a9df9301694a..95daa8bf176797fd11b616aaceaf3e9847c3adee 100644 (file)
@@ -1,5 +1,5 @@
 <!--
-$Header: /cvsroot/pgsql/doc/src/sgml/ref/cluster.sgml,v 1.22 2002/11/18 17:12:06 momjian Exp $
+$Header: /cvsroot/pgsql/doc/src/sgml/ref/cluster.sgml,v 1.23 2002/12/30 18:42:12 tgl Exp $
 PostgreSQL documentation
 -->
 
@@ -108,14 +108,16 @@ CLUSTER
 
   <para>
    When a table is clustered, <productname>PostgreSQL</productname>
-   remembers on which index it was clustered.  In calls to
+   remembers on which index it was clustered.  The form
    <command>CLUSTER <replaceable class="parameter">tablename</replaceable></command>,
-   the table is clustered on the same index that it was clustered before.
+   re-clusters the table on the same index that it was clustered before.
   </para>
 
   <para>
-   A simple <command>CLUSTER</command> clusters all the tables in the database
-   that the calling user owns and uses the saved cluster information.  This
+   <command>CLUSTER</command> without any parameter re-clusters all the tables
+   in the
+   current database that the calling user owns, or all tables if called
+   by a superuser.  (Never-clustered tables are not touched.)  This
    form of <command>CLUSTER</command> cannot be called from inside a
    transaction or function.
   </para>
@@ -157,15 +159,15 @@ CLUSTER
    </para>
 
    <para>
-       <command>CLUSTER</command> preserves GRANT, inheritance, index, foreign
-       key, and other ancillary information about the table.
+    <command>CLUSTER</command> preserves GRANT, inheritance, index, foreign
+    key, and other ancillary information about the table.
    </para>
 
    <para>
-       Because <command>CLUSTER</command> remembers the clustering information,
-       one can cluster the tables one wants clustered manually the first time, and
-       setup a timed event similar to <command>VACUUM</command> so that the tables
-       are periodically and automatically clustered.
+    Because <command>CLUSTER</command> remembers the clustering information,
+    one can cluster the tables one wants clustered manually the first time, and
+    setup a timed event similar to <command>VACUUM</command> so that the tables
+    are periodically re-clustered.
    </para>
 
    <para>
index 05321d353beceec993bb0107939518a4ad2e33ab..0361ede00929e20d2576ca61e03bbec6cc53ded0 100644 (file)
@@ -11,7 +11,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.102 2002/12/06 05:00:10 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/cluster.c,v 1.103 2002/12/30 18:42:13 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -36,6 +36,7 @@
 #include "utils/syscache.h"
 #include "utils/relcache.h"
 
+
 /*
  * We need one of these structs for each index in the relation to be
  * clustered.  It's basically the data needed by index_create() so
@@ -51,7 +52,8 @@ typedef struct
        bool            isclustered;
 } IndexAttrs;
 
-/* This struct is used to pass around the information on tables to be
+/*
+ * This struct is used to pass around the information on tables to be
  * clustered. We need this so we can make a list of them when invoked without
  * a specific table/index pair.
  */
@@ -59,21 +61,174 @@ typedef struct
 {
        Oid             tableOid;
        Oid             indexOid;
-       bool    isPrevious;
-} relToCluster;
+} RelToCluster;
 
+
+static void cluster_rel(RelToCluster *rv, bool recheck);
 static Oid     make_new_heap(Oid OIDOldHeap, const char *NewName);
 static void copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
-static void recreate_indexattr(Oid OIDOldHeap, List *indexes);
+static List *get_indexattr_list(Relation OldHeap, Oid OldIndex);
+static void rebuild_indexes(Oid OIDOldHeap, List *indexes);
 static void swap_relfilenodes(Oid r1, Oid r2);
-static void cluster_rel(relToCluster *rv);
-static bool check_cluster_ownership(Oid relOid);
-static List *get_tables_to_cluster(AclId owner);
+static bool check_cluster_permitted(Oid relOid);
+static List *get_tables_to_cluster(MemoryContext cluster_context);
+
+
+
+/*---------------------------------------------------------------------------
+ * This cluster code allows for clustering multiple tables at once.    Because
+ * of this, we cannot just run everything on a single transaction, or we
+ * would be forced to acquire exclusive locks on all the tables being
+ * clustered, simultaneously --- very likely leading to deadlock.
+ *
+ * To solve this we follow a similar strategy to VACUUM code,
+ * clustering each relation in a separate transaction. For this to work,
+ * we need to:
+ *  - provide a separate memory context so that we can pass information in
+ *    a way that survives across transactions
+ *  - start a new transaction every time a new relation is clustered
+ *  - check for validity of the information on to-be-clustered relations,
+ *    as someone might have deleted a relation behind our back, or
+ *    clustered one on a different index
+ *  - end the transaction
+ *
+ * The single-relation case does not have any such overhead.
+ *
+ * We also allow a relation being specified without index.  In that case,
+ * the indisclustered bit will be looked up, and an ERROR will be thrown
+ * if there is no index with the bit set.
+ *---------------------------------------------------------------------------
+ */
+void
+cluster(ClusterStmt *stmt)
+{
+       if (stmt->relation != NULL)
+       {
+               /* This is the single-relation case. */
+               Oid                             tableOid,
+                                               indexOid = InvalidOid;
+               Relation                rel;
+               RelToCluster    rvtc;
+
+               /* Find and lock the table */
+               tableOid = RangeVarGetRelid(stmt->relation, false);
+
+               rel = heap_open(tableOid, AccessExclusiveLock);
+
+               /* Check permissions */
+               if (!check_cluster_permitted(tableOid))
+                       elog(ERROR, "CLUSTER: You do not own relation %s",
+                                stmt->relation->relname);
+
+               if (stmt->indexname == NULL)
+               {
+                       List       *index;
+
+                       /* We need to find the index that has indisclustered set. */
+                       foreach (index, RelationGetIndexList(rel))
+                       {
+                               HeapTuple               idxtuple;
+                               Form_pg_index   indexForm;
+
+                               indexOid = lfirsti(index);
+                               idxtuple = SearchSysCache(INDEXRELID,
+                                                                                 ObjectIdGetDatum(indexOid),
+                                                                                 0, 0, 0);
+                               if (!HeapTupleIsValid(idxtuple))
+                                       elog(ERROR, "Cache lookup failed for index %u",
+                                                indexOid);
+                               indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
+                               if (indexForm->indisclustered)
+                               {
+                                       ReleaseSysCache(idxtuple);
+                                       break;
+                               }
+                               ReleaseSysCache(idxtuple);
+                               indexOid = InvalidOid;
+                       }
+
+                       if (!OidIsValid(indexOid))
+                               elog(ERROR, "CLUSTER: No previously clustered index found on table \"%s\"",
+                                        stmt->relation->relname);
+               }
+               else
+               {
+                       /* The index is expected to be in the same namespace as the relation. */
+                       indexOid = get_relname_relid(stmt->indexname,
+                                                                                rel->rd_rel->relnamespace);
+                       if (!OidIsValid(indexOid))
+                               elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
+                                        stmt->indexname, stmt->relation->relname);
+               }
+
+               rvtc.tableOid = tableOid;
+               rvtc.indexOid = indexOid;
+
+               /* close relation, keep lock till commit */
+               heap_close(rel, NoLock);
+
+               /* Do the job */
+               cluster_rel(&rvtc, false);
+       }
+       else
+       {
+               /*
+                * This is the "multi relation" case. We need to cluster all tables
+                * that have some index with indisclustered set.
+                */
+               MemoryContext   cluster_context;
+               List                    *rv,
+                                               *rvs;
+
+               /*
+                * We cannot run this form of CLUSTER inside a user transaction block;
+                * we'd be holding locks way too long.
+                */
+               PreventTransactionChain((void *) stmt, "CLUSTER");
+
+               /*
+                * Create special memory context for cross-transaction storage.
+                *
+                * Since it is a child of QueryContext, it will go away even in case
+                * of error.
+                */
+               cluster_context = AllocSetContextCreate(QueryContext,
+                                                                                               "Cluster",
+                                                                                               ALLOCSET_DEFAULT_MINSIZE,
+                                                                                               ALLOCSET_DEFAULT_INITSIZE,
+                                                                                               ALLOCSET_DEFAULT_MAXSIZE);
+
+               /*
+                * Build the list of relations to cluster.  Note that this lives in
+                * cluster_context.
+                */
+               rvs = get_tables_to_cluster(cluster_context);
+
+               /* Commit to get out of starting transaction */
+               CommitTransactionCommand(true);
+
+               /* Ok, now that we've got them all, cluster them one by one */
+               foreach (rv, rvs)
+               {
+                       RelToCluster    *rvtc = (RelToCluster *) lfirst(rv);
+
+                       /* Start a new transaction for each relation. */
+                       StartTransactionCommand(true);
+                       SetQuerySnapshot();     /* might be needed for functional index */
+                       cluster_rel(rvtc, true);
+                       CommitTransactionCommand(true);
+               }
+
+               /* Start a new transaction for the cleanup work. */
+               StartTransactionCommand(true);
 
-static MemoryContext cluster_context = NULL;
+               /* Clean up working storage */
+               MemoryContextDelete(cluster_context);
+       }
+}
 
 /*
- * cluster
+ * cluster_rel
  *
  * This clusters the table by creating a new, clustered table and
  * swapping the relfilenodes of the new table and the old table, so
@@ -85,45 +240,52 @@ static MemoryContext cluster_context = NULL;
  * same way we do for the relation.  Since we are effectively bulk-loading
  * the new table, it's better to create the indexes afterwards than to fill
  * them incrementally while we load the table.
- *
- * Since we may open a new transaction for each relation, we have to
- * check that the relation still is what we think it is.
  */
-void
-cluster_rel(relToCluster *rvtc)
+static void
+cluster_rel(RelToCluster *rvtc, bool recheck)
 {
        Relation        OldHeap,
                                OldIndex;
-       List       *indexes;
 
        /* Check for user-requested abort. */
        CHECK_FOR_INTERRUPTS();
 
-       /* Check if the relation and index still exist before opening them
+       /*
+        * Since we may open a new transaction for each relation, we have to
+        * check that the relation still is what we think it is.
+        *
+        * If this is a single-transaction CLUSTER, we can skip these tests.
+        * We *must* skip the one on indisclustered since it would reject an
+        * attempt to cluster a not-previously-clustered index.
         */
-       if (!SearchSysCacheExists(RELOID,
-                                                         ObjectIdGetDatum(rvtc->tableOid),
-                                                         0, 0, 0) ||
+       if (recheck)
+       {
+               HeapTuple               tuple;
+               Form_pg_index   indexForm;
+
+               /*
+                * Check if the relation and index still exist before opening them
+                */
+               if (!SearchSysCacheExists(RELOID,
+                                                                 ObjectIdGetDatum(rvtc->tableOid),
+                                                                 0, 0, 0) ||
                        !SearchSysCacheExists(RELOID,
                                                                  ObjectIdGetDatum(rvtc->indexOid),
                                                                  0, 0, 0))
-               return;
-
-       /* Check that the user still owns the relation */
-       if (!check_cluster_ownership(rvtc->tableOid))
-               return;
+                       return;
 
-       /* Check that the index is still the one with indisclustered set.
-        * If this is a standalone cluster, skip this test.
-        */
-       if (rvtc->isPrevious)
-       {
-               HeapTuple               tuple;
-               Form_pg_index   indexForm;
+               /* Check that the user still owns the relation */
+               if (!check_cluster_permitted(rvtc->tableOid))
+                       return;
 
+               /*
+                * Check that the index is still the one with indisclustered set.
+                */
                tuple = SearchSysCache(INDEXRELID,
                                                           ObjectIdGetDatum(rvtc->indexOid),
                                                           0, 0, 0);
+               if (!HeapTupleIsValid(tuple))
+                       return;                         /* could have gone away... */
                indexForm = (Form_pg_index) GETSTRUCT(tuple);
                if (!indexForm->indisclustered)
                {
@@ -135,7 +297,8 @@ cluster_rel(relToCluster *rvtc)
 
        /*
         * We grab exclusive access to the target rel and index for the
-        * duration of the transaction.
+        * duration of the transaction.  (This is redundant for the single-
+        * transaction case, since cluster() already did it.)
         */
        OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
 
@@ -162,29 +325,43 @@ cluster_rel(relToCluster *rvtc)
                elog(ERROR, "CLUSTER: cannot cluster system relation \"%s\"",
                         RelationGetRelationName(OldHeap));
 
-       /* Save the information of all indexes on the relation. */
-       indexes = get_indexattr_list(OldHeap, rvtc->indexOid);
-
-       /* Drop relcache refcnts, but do NOT give up the locks */
+       /* Drop relcache refcnt on OldIndex, but keep lock */
        index_close(OldIndex);
-       heap_close(OldHeap, NoLock);
 
-       /* rebuild_rel does all the dirty work */
-       rebuild_rel(rvtc->tableOid, rvtc->indexOid, indexes, true);
+       /* rebuild_relation does all the dirty work */
+       rebuild_relation(OldHeap, rvtc->indexOid);
+
+       /* NB: rebuild_relation does heap_close() on OldHeap */
 }
 
+/*
+ * rebuild_relation: rebuild an existing relation
+ *
+ * This is shared code between CLUSTER and TRUNCATE.  In the TRUNCATE
+ * case, the new relation is built and left empty.  In the CLUSTER case,
+ * it is filled with data read from the old relation in the order specified
+ * by the index.
+ *
+ * OldHeap: table to rebuild --- must be opened and exclusive-locked!
+ * indexOid: index to cluster by, or InvalidOid in TRUNCATE case
+ *
+ * NB: this routine closes OldHeap at the right time; caller should not.
+ */
 void
-rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy)
+rebuild_relation(Relation OldHeap, Oid indexOid)
 {
+       Oid                     tableOid = RelationGetRelid(OldHeap);
+       List       *indexes;
        Oid                     OIDNewHeap;
        char            NewHeapName[NAMEDATALEN];
        ObjectAddress object;
 
-       /*
-        * If dataCopy is true, we assume that we will be basing the
-        * copy off an index for cluster operations.
-        */
-       Assert(!dataCopy || OidIsValid(indexOid));
+       /* Save the information about all indexes on the relation. */
+       indexes = get_indexattr_list(OldHeap, indexOid);
+
+       /* Close relcache entry, but keep lock until transaction commit */
+       heap_close(OldHeap, NoLock);
+
        /*
         * Create the new heap, using a temporary name in the same namespace
         * as the existing table.  NOTE: there is some risk of collision with
@@ -204,7 +381,7 @@ rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy)
        /*
         * Copy the heap data into the new table in the desired order.
         */
-       if (dataCopy)
+       if (OidIsValid(indexOid))
                copy_heap_data(OIDNewHeap, tableOid, indexOid);
 
        /* To make the new heap's data visible (probably not needed?). */
@@ -230,9 +407,9 @@ rebuild_rel(Oid tableOid, Oid indexOid, List *indexes, bool dataCopy)
 
        /*
         * Recreate each index on the relation.  We do not need
-        * CommandCounterIncrement() because recreate_indexattr does it.
+        * CommandCounterIncrement() because rebuild_indexes does it.
         */
-       recreate_indexattr(tableOid, indexes);
+       rebuild_indexes(tableOid, indexes);
 }
 
 /*
@@ -335,7 +512,7 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
  * Get the necessary info about the indexes of the relation and
  * return a list of IndexAttrs structures.
  */
-List *
+static List *
 get_indexattr_list(Relation OldHeap, Oid OldIndex)
 {
        List       *indexes = NIL;
@@ -366,7 +543,8 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex)
                        palloc(sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
                memcpy(attrs->classOID, indexForm->indclass,
                           sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
-               attrs->isclustered = (OldIndex == indexOID);
+               /* We adjust the isclustered attribute to correct new state */
+               attrs->isclustered = (indexOID == OldIndex);
 
                /* Name and access method of each index come from pg_class */
                classTuple = SearchSysCache(RELOID,
@@ -397,7 +575,7 @@ get_indexattr_list(Relation OldHeap, Oid OldIndex)
  * the new index (carrying the old index filenode along).
  */
 static void
-recreate_indexattr(Oid OIDOldHeap, List *indexes)
+rebuild_indexes(Oid OIDOldHeap, List *indexes)
 {
        List       *elem;
 
@@ -629,235 +807,69 @@ swap_relfilenodes(Oid r1, Oid r2)
        heap_close(relRelation, RowExclusiveLock);
 }
 
-/*---------------------------------------------------------------------------
- * This cluster code allows for clustering multiple tables at once.    Because
- * of this, we cannot just run everything on a single transaction, or we
- * would be forced to acquire exclusive locks on all the tables being
- * clustered.  To solve this we follow a similar strategy to VACUUM code,
- * clustering each relation in a separate transaction. For this to work,
- * we need to:
- *  - provide a separate memory context so that we can pass information in
- *    a way that trascends transactions
- *  - start a new transaction every time a new relation is clustered
- *  - check for validity of the information on to-be-clustered relations,
- *    as someone might have deleted a relation behind our back, or
- *    clustered one on a different index
- *  - end the transaction
- *
- * The single relation code does not have any overhead.
- *
- * We also allow a relation being specified without index.  In that case,
- * the indisclustered bit will be looked up, and an ERROR will be thrown
- * if there is no index with the bit set.
- *---------------------------------------------------------------------------
- */
-void
-cluster(ClusterStmt *stmt)
-{
-
-       /* This is the single relation case. */
-       if (stmt->relation != NULL)
-       {
-               Oid                             indexOid = InvalidOid,
-                                               tableOid;
-               relToCluster    rvtc;
-               HeapTuple               tuple;
-               Form_pg_class   classForm;
-
-               tableOid = RangeVarGetRelid(stmt->relation, false);
-               if (!check_cluster_ownership(tableOid))
-                       elog(ERROR, "CLUSTER: You do not own relation %s",
-                                stmt->relation->relname);
-
-               tuple = SearchSysCache(RELOID,
-                                                          ObjectIdGetDatum(tableOid),
-                                                          0, 0, 0);
-               if (!HeapTupleIsValid(tuple))
-                       elog(ERROR, "Cache lookup failed for relation %u", tableOid);
-               classForm = (Form_pg_class) GETSTRUCT(tuple);
-
-               if (stmt->indexname == NULL)
-               {
-                       List       *index;
-                       Relation        rel = RelationIdGetRelation(tableOid);
-                       HeapTuple       ituple = NULL,
-                                               idxtuple = NULL;
-
-                       /* We need to fetch the index that has indisclustered set. */
-                       foreach (index, RelationGetIndexList(rel))
-                       {
-                               Form_pg_index   indexForm;
-
-                               indexOid = lfirsti(index);
-                               ituple = SearchSysCache(RELOID,
-                                                                          ObjectIdGetDatum(indexOid),
-                                                                          0, 0, 0);
-                               if (!HeapTupleIsValid(ituple))
-                                       elog(ERROR, "Cache lookup failed for relation %u", indexOid);
-                               idxtuple = SearchSysCache(INDEXRELID,
-                                                                                 ObjectIdGetDatum(HeapTupleGetOid(ituple)),
-                                                                                 0, 0, 0);
-                               if (!HeapTupleIsValid(idxtuple))
-                                       elog(ERROR, "Cache lookup failed for index %u", HeapTupleGetOid(ituple));
-                               indexForm = (Form_pg_index) GETSTRUCT(idxtuple);
-                               if (indexForm->indisclustered)
-                                       break;
-                               indexOid = InvalidOid;
-                       }
-                       if (indexOid == InvalidOid)
-                               elog(ERROR, "CLUSTER: No previously clustered index found on table %s",
-                                        stmt->relation->relname);
-                       RelationClose(rel);
-                       ReleaseSysCache(ituple);
-                       ReleaseSysCache(idxtuple);
-               }
-               else
-               {
-                       /* The index is expected to be in the same namespace as the relation. */
-                       indexOid = get_relname_relid(stmt->indexname, classForm->relnamespace);
-               }
-               ReleaseSysCache(tuple);
-
-               /* XXX Maybe the namespace should be reported as well */
-               if (!OidIsValid(indexOid))
-                       elog(ERROR, "CLUSTER: cannot find index \"%s\" for table \"%s\"",
-                                stmt->indexname, stmt->relation->relname);
-               rvtc.tableOid = tableOid;
-               rvtc.indexOid = indexOid;
-               rvtc.isPrevious = false;
-
-               /* Do the job */
-               cluster_rel(&rvtc);
-       }
-       else
-       {
-               /*
-                * This is the "no relation" case. We need to cluster all tables
-                * that have some index with indisclustered set.
-                */
-
-               relToCluster    *rvtc;
-               List                    *rv,
-                                               *rvs;
-
-               /*
-                * We cannot run CLUSTER inside a user transaction block; if we were inside
-                * a transaction, then our commit- and start-transaction-command calls
-                * would not have the intended effect!
-                */
-               if (IsTransactionBlock())
-                       elog(ERROR, "CLUSTER cannot run inside a BEGIN/END block");
-
-               /* Running CLUSTER from a function would free the function context */
-               if (!MemoryContextContains(QueryContext, stmt))
-                       elog(ERROR, "CLUSTER cannot be called from a function");
-               /*
-                * Create special memory context for cross-transaction storage.
-                *
-                * Since it is a child of QueryContext, it will go away even in case
-                * of error.
-                */
-               cluster_context = AllocSetContextCreate(QueryContext,
-                               "Cluster",
-                               ALLOCSET_DEFAULT_MINSIZE,
-                               ALLOCSET_DEFAULT_INITSIZE,
-                               ALLOCSET_DEFAULT_MAXSIZE);
-
-               /*
-                * Build the list of relations to cluster.  Note that this lives in
-                * cluster_context.
-                */
-               rvs = get_tables_to_cluster(GetUserId());
-
-               /* Ok, now that we've got them all, cluster them one by one */
-               foreach (rv, rvs)
-               {
-                       rvtc = (relToCluster *)lfirst(rv);
-
-                       /* Start a new transaction for this relation. */
-                       StartTransactionCommand(true);
-                       cluster_rel(rvtc);
-                       CommitTransactionCommand(true);
-               }
-       }
-
-       /* Start a new transaction for the cleanup work. */
-       StartTransactionCommand(true);
-
-       /* Clean up working storage */
-       if (stmt->relation == NULL)
-       {
-               MemoryContextDelete(cluster_context);
-               cluster_context = NULL;
-       }
-}
-
-/* Checks if the user owns the relation. Superusers
- * are allowed to cluster any table.
+/*
+ * Checks if the user is allowed to cluster (ie, owns) the relation.
+ * Superusers are allowed to cluster any table.
  */
-bool
-check_cluster_ownership(Oid relOid)
+static bool
+check_cluster_permitted(Oid relOid)
 {
        /* Superusers bypass this check */
        return pg_class_ownercheck(relOid, GetUserId());
 }
 
-/* Get a list of tables that the current user owns and
+/*
+ * Get a list of tables that the current user owns and
  * have indisclustered set.  Return the list in a List * of rvsToCluster
  * with the tableOid and the indexOid on which the table is already
  * clustered.
  */
-List *
-get_tables_to_cluster(AclId owner)
+static List *
+get_tables_to_cluster(MemoryContext cluster_context)
 {
        Relation                indRelation;
        HeapScanDesc    scan;
        ScanKeyData             entry;
        HeapTuple               indexTuple;
        Form_pg_index   index;
-       relToCluster   *rvtc;
+       MemoryContext   old_context;
+       RelToCluster   *rvtc;
        List               *rvs = NIL;
 
        /*
-        * Get all indexes that have indisclustered set. System
-        * relations or nailed-in relations cannot ever have
-        * indisclustered set, because CLUSTER will refuse to
-        * set it when called with one of them as argument.
+        * Get all indexes that have indisclustered set and are owned by
+        * appropriate user. System relations or nailed-in relations cannot ever
+        * have indisclustered set, because CLUSTER will refuse to set it when
+        * called with one of them as argument.
         */
-       indRelation = relation_openr(IndexRelationName, RowExclusiveLock);
-       ScanKeyEntryInitialize(&entry, 0, Anum_pg_index_indisclustered,
-                                                  F_BOOLEQ, true);
+       indRelation = relation_openr(IndexRelationName, AccessShareLock);
+       ScanKeyEntryInitialize(&entry, 0,
+                                                  Anum_pg_index_indisclustered,
+                                                  F_BOOLEQ,
+                                                  BoolGetDatum(true));
        scan = heap_beginscan(indRelation, SnapshotNow, 1, &entry);
        while ((indexTuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
        {
-               MemoryContext   old_context = NULL;
-
                index = (Form_pg_index) GETSTRUCT(indexTuple);
-               if (!check_cluster_ownership(index->indrelid))
+               if (!check_cluster_permitted(index->indrelid))
                        continue;
 
                /*
-                * We have to build the struct in a different memory context so
+                * We have to build the list in a different memory context so
                 * it will survive the cross-transaction processing
                 */
-
                old_context = MemoryContextSwitchTo(cluster_context);
 
-               rvtc = (relToCluster *)palloc(sizeof(relToCluster));
-               rvtc->indexOid = index->indexrelid;
+               rvtc = (RelToCluster *) palloc(sizeof(RelToCluster));
                rvtc->tableOid = index->indrelid;
-               rvtc->isPrevious = true;
-               rvs = lcons((void *)rvtc, rvs);
+               rvtc->indexOid = index->indexrelid;
+               rvs = lcons(rvtc, rvs);
 
                MemoryContextSwitchTo(old_context);
        }
        heap_endscan(scan);
 
-       /*
-        * Release the lock on pg_index. We will check the indexes
-        * later again.
-        *
-        */
-       relation_close(indRelation, RowExclusiveLock);
+       relation_close(indRelation, AccessShareLock);
+
        return rvs;
 }
index 2a1a2b4cf4933dbfeb488edbe8e8941d2bddb71b..29edb61638e507ac8a60a04a73ef66610475af5e 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.62 2002/12/16 18:39:22 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/commands/tablecmds.c,v 1.63 2002/12/30 18:42:14 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -355,7 +355,7 @@ RemoveRelation(const RangeVar *relation, DropBehavior behavior)
  *             Removes all the rows from a relation.
  *
  * Note: This routine only does safety and permissions checks;
- * rebuild_rel in cluster.c does the actual work.
+ * rebuild_relation in cluster.c does the actual work.
  */
 void
 TruncateRelation(const RangeVar *relation)
@@ -366,7 +366,6 @@ TruncateRelation(const RangeVar *relation)
        Relation        fkeyRel;
        SysScanDesc fkeyScan;
        HeapTuple       tuple;
-       List       *indexes;
 
        /* Grab exclusive lock in preparation for truncate */
        rel = heap_openrv(relation, AccessExclusiveLock);
@@ -433,17 +432,13 @@ TruncateRelation(const RangeVar *relation)
        systable_endscan(fkeyScan);
        heap_close(fkeyRel, AccessShareLock);
 
-       /* Save the information of all indexes on the relation. */
-       indexes = get_indexattr_list(rel, InvalidOid);
-
-       /* Keep the lock until transaction commit */
-       heap_close(rel, NoLock);
-
        /*
         * Do the real work using the same technique as cluster, but
-        * without the code copy portion
+        * without the data-copying portion
         */
-       rebuild_rel(relid, InvalidOid, indexes, false);
+       rebuild_relation(rel, InvalidOid);
+
+       /* NB: rebuild_relation does heap_close() */
 }
 
 /*----------
index cc0f139b2dbb9d7c78c2a08cd96a77b7976868bb..962c5311500d791c0aac23e17be3fe26ff68eb5b 100644 (file)
@@ -10,7 +10,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.186 2002/12/30 15:31:48 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/tcop/utility.c,v 1.187 2002/12/30 18:42:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -187,7 +187,6 @@ ProcessUtility(Node *parsetree,
                           CommandDest dest,
                           char *completionTag)
 {
-
        if (completionTag)
                completionTag[0] = '\0';
 
@@ -195,7 +194,6 @@ ProcessUtility(Node *parsetree,
        {
                        /*
                         * ******************************** transactions ********************************
-                        *
                         */
                case T_TransactionStmt:
                        {
@@ -742,11 +740,7 @@ ProcessUtility(Node *parsetree,
                        break;
 
                case T_ClusterStmt:
-                       {
-                               ClusterStmt *stmt = (ClusterStmt *) parsetree;
-
-                               cluster(stmt);
-                       }
+                       cluster((ClusterStmt *) parsetree);
                        break;
 
                case T_VacuumStmt:
@@ -874,7 +868,6 @@ ProcessUtility(Node *parsetree,
 
                                switch (stmt->reindexType)
                                {
-                                       char    *relname;
                                        case INDEX:
                                                CheckOwnership(stmt->relation, false);
                                                ReindexIndex(stmt->relation, stmt->force);
@@ -884,8 +877,7 @@ ProcessUtility(Node *parsetree,
                                                ReindexTable(stmt->relation, stmt->force);
                                                break;
                                        case DATABASE:
-                                               relname = (char *) stmt->name;
-                                               ReindexDatabase(relname, stmt->force, false);
+                                               ReindexDatabase(stmt->name, stmt->force, false);
                                                break;
                                }
                                break;
index c83db667252907a52d774bb0894e9055c73d6c11..7eb11d60b06531f9b5bb5e150309d4d780243f8f 100644 (file)
@@ -6,22 +6,19 @@
  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994-5, Regents of the University of California
  *
- * $Id: cluster.h,v 1.17 2002/11/23 04:05:52 momjian Exp $
+ * $Id: cluster.h,v 1.18 2002/12/30 18:42:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef CLUSTER_H
 #define CLUSTER_H
 
-#include <nodes/parsenodes.h>
-/*
- * functions
- */
-extern void cluster(ClusterStmt *stmt);
+#include "nodes/parsenodes.h"
+#include "utils/rel.h"
 
-extern List *get_indexattr_list(Relation OldHeap, Oid OldIndex);
-extern void rebuild_rel(Oid tableOid, Oid indexOid,
-                                           List *indexes, bool dataCopy);
 
+extern void cluster(ClusterStmt *stmt);
+
+extern void rebuild_relation(Relation OldHeap, Oid indexOid);
 
 #endif   /* CLUSTER_H */
index 105b4c8ec21b3de55914c17e423bea6be6fe4e90..48daf0fc25d3ae55d8585abba347c131d02bded7 100644 (file)
@@ -304,7 +304,7 @@ INSERT INTO clstr_3 VALUES (2);
 INSERT INTO clstr_3 VALUES (1);
 -- "CLUSTER <tablename>" on a table that hasn't been clustered
 CLUSTER clstr_2;
-ERROR:  CLUSTER: No previously clustered index found on table clstr_2
+ERROR:  CLUSTER: No previously clustered index found on table "clstr_2"
 CLUSTER clstr_1_pkey ON clstr_1;
 CLUSTER clstr_2_pkey ON clstr_2;
 SELECT * FROM clstr_1 UNION ALL