*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.121 2004/05/05 04:48:45 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.122 2004/05/06 16:10:57 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "miscadmin.h"
#include "utils/acl.h"
#include "utils/fmgroids.h"
+#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
#include "utils/relcache.h"
IndexInfo *indexInfo;
Oid accessMethodOID;
Oid *classOID;
- bool isclustered;
} IndexAttrs;
/*
static void
cluster_rel(RelToCluster *rvtc, bool recheck)
{
- Relation OldHeap,
- OldIndex;
+ Relation OldHeap;
/* Check for user-requested abort. */
CHECK_FOR_INTERRUPTS();
/*
* We grab exclusive access to the target rel and index for the
* duration of the transaction. (This is redundant for the single-
- * transaction case, since cluster() already did it.)
+ * transaction case, since cluster() already did it.) The index
+ * lock is taken inside check_index_is_clusterable.
*/
OldHeap = heap_open(rvtc->tableOid, AccessExclusiveLock);
- OldIndex = index_open(rvtc->indexOid);
+ /* Check index is valid to cluster on */
+ check_index_is_clusterable(OldHeap, rvtc->indexOid);
+
+ /* rebuild_relation does all the dirty work */
+ rebuild_relation(OldHeap, rvtc->indexOid);
+
+ /* NB: rebuild_relation does heap_close() on OldHeap */
+}
+
+/*
+ * Verify that the specified index is a legitimate index to cluster on
+ *
+ * Side effect: obtains exclusive lock on the index. The caller should
+ * already have exclusive lock on the table, so the index lock is likely
+ * redundant, but it seems best to grab it anyway to ensure the index
+ * definition can't change under us.
+ */
+void
+check_index_is_clusterable(Relation OldHeap, Oid indexOid)
+{
+ Relation OldIndex;
+
+ OldIndex = index_open(indexOid);
LockRelation(OldIndex, AccessExclusiveLock);
/*
* Check that index is in fact an index on the given relation
*/
if (OldIndex->rd_index == NULL ||
- OldIndex->rd_index->indrelid != rvtc->tableOid)
+ OldIndex->rd_index->indrelid != RelationGetRelid(OldHeap))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not an index for table \"%s\"",
/* Drop relcache refcnt on OldIndex, but keep lock */
index_close(OldIndex);
-
- /* rebuild_relation does all the dirty work */
- rebuild_relation(OldHeap, rvtc->indexOid);
-
- /* NB: rebuild_relation does heap_close() on OldHeap */
}
/*
rebuild_relation(Relation OldHeap, Oid indexOid)
{
Oid tableOid = RelationGetRelid(OldHeap);
+ Oid oldClusterIndex;
List *indexes;
Oid OIDNewHeap;
char NewHeapName[NAMEDATALEN];
ObjectAddress object;
/* Save the information about all indexes on the relation. */
- indexes = get_indexattr_list(OldHeap, indexOid);
+ indexes = get_indexattr_list(OldHeap, &oldClusterIndex);
/* Close relcache entry, but keep lock until transaction commit */
heap_close(OldHeap, NoLock);
* Recreate each index on the relation. We do not need
* CommandCounterIncrement() because rebuild_indexes does it.
*/
- rebuild_indexes(tableOid, indexes);
+ rebuild_indexes(tableOid, indexes,
+ (OidIsValid(indexOid) ? indexOid : oldClusterIndex));
}
/*
/*
* Get the necessary info about the indexes of the relation and
- * return a list of IndexAttrs structures.
+ * return a list of IndexAttrs structures. Also, *OldClusterIndex
+ * is set to the OID of the existing clustered index, or InvalidOid
+ * if there is none.
*/
List *
-get_indexattr_list(Relation OldHeap, Oid OldIndex)
+get_indexattr_list(Relation OldHeap, Oid *OldClusterIndex)
{
List *indexes = NIL;
List *indlist;
+ *OldClusterIndex = InvalidOid;
+
/* Ask the relcache to produce a list of the indexes of the old rel */
foreach(indlist, RelationGetIndexList(OldHeap))
{
palloc(sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
memcpy(attrs->classOID, oldIndex->rd_index->indclass,
sizeof(Oid) * attrs->indexInfo->ii_NumIndexAttrs);
- /* We adjust the isclustered attribute to correct new state */
- attrs->isclustered = (indexOID == OldIndex);
+ if (oldIndex->rd_index->indisclustered)
+ *OldClusterIndex = indexOID;
index_close(oldIndex);
- /*
- * Cons the gathered data into the list. We do not care about
- * ordering, and this is more efficient than append.
- */
- indexes = lcons(attrs, indexes);
+ indexes = lappend(indexes, attrs);
}
return indexes;
/*
* Create new indexes and swap the filenodes with old indexes. Then drop
* the new index (carrying the old index filenode along).
+ *
+ * OIDClusterIndex is the OID of the index to be marked as clustered, or
+ * InvalidOid if none should be marked clustered.
*/
void
-rebuild_indexes(Oid OIDOldHeap, List *indexes)
+rebuild_indexes(Oid OIDOldHeap, List *indexes, Oid OIDClusterIndex)
{
List *elem;
foreach(elem, indexes)
{
IndexAttrs *attrs = (IndexAttrs *) lfirst(elem);
+ Oid oldIndexOID = attrs->indexOID;
Oid newIndexOID;
char newIndexName[NAMEDATALEN];
+ bool isclustered;
ObjectAddress object;
Form_pg_index index;
HeapTuple tuple;
/* Create the new index under a temporary name */
snprintf(newIndexName, sizeof(newIndexName),
- "pg_temp_%u", attrs->indexOID);
+ "pg_temp_%u", oldIndexOID);
/*
* The new index will have primary and constraint status set to
CommandCounterIncrement();
/* Swap the filenodes. */
- swap_relfilenodes(attrs->indexOID, newIndexOID);
+ swap_relfilenodes(oldIndexOID, newIndexOID);
CommandCounterIncrement();
/*
* Make sure that indisclustered is correct: it should be set only
- * for the index we just clustered on.
+ * for the index specified by the caller.
*/
+ isclustered = (oldIndexOID == OIDClusterIndex);
+
pg_index = heap_openr(IndexRelationName, RowExclusiveLock);
tuple = SearchSysCacheCopy(INDEXRELID,
- ObjectIdGetDatum(attrs->indexOID),
+ ObjectIdGetDatum(oldIndexOID),
0, 0, 0);
if (!HeapTupleIsValid(tuple))
- elog(ERROR, "cache lookup failed for index %u", attrs->indexOID);
+ elog(ERROR, "cache lookup failed for index %u", oldIndexOID);
index = (Form_pg_index) GETSTRUCT(tuple);
- if (index->indisclustered != attrs->isclustered)
+ if (index->indisclustered != isclustered)
{
- index->indisclustered = attrs->isclustered;
+ index->indisclustered = isclustered;
simple_heap_update(pg_index, &tuple->t_self, tuple);
CatalogUpdateIndexes(pg_index, tuple);
+ /* Ensure we see the update in the index's relcache entry */
+ CacheInvalidateRelcacheByRelid(oldIndexOID);
}
heap_freetuple(tuple);
heap_close(pg_index, RowExclusiveLock);
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.103 2004/05/05 04:48:45 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.104 2004/05/06 16:10:57 tgl Exp $
*
*-------------------------------------------------------------------------
*/
Oid OIDNewHeap;
char NewHeapName[NAMEDATALEN];
List *indexes;
+ Oid oldClusterIndex;
Relation OldHeap;
ObjectAddress object;
/* Save the information about all indexes on the relation. */
OldHeap = heap_open(tab->relid, NoLock);
- indexes = get_indexattr_list(OldHeap, InvalidOid);
+ indexes = get_indexattr_list(OldHeap, &oldClusterIndex);
heap_close(OldHeap, NoLock);
/*
* Rebuild each index on the relation. We do not need
* CommandCounterIncrement() because rebuild_indexes does it.
*/
- rebuild_indexes(tab->relid, indexes);
+ rebuild_indexes(tab->relid, indexes, oldClusterIndex);
}
else
{
errmsg("index \"%s\" for table \"%s\" does not exist",
indexName, RelationGetRelationName(rel))));
+ /* Check index is valid to cluster on */
+ check_index_is_clusterable(rel, indexOid);
+
indexTuple = SearchSysCache(INDEXRELID,
ObjectIdGetDatum(indexOid),
0, 0, 0);
idxForm->indisclustered = false;
simple_heap_update(pg_index, &idxtuple->t_self, idxtuple);
CatalogUpdateIndexes(pg_index, idxtuple);
+ /* Ensure we see the update in the index's relcache entry */
+ CacheInvalidateRelcacheByRelid(indexOid);
}
else if (idxForm->indexrelid == indexForm->indexrelid)
{
idxForm->indisclustered = true;
simple_heap_update(pg_index, &idxtuple->t_self, idxtuple);
CatalogUpdateIndexes(pg_index, idxtuple);
+ /* Ensure we see the update in the index's relcache entry */
+ CacheInvalidateRelcacheByRelid(indexOid);
}
heap_freetuple(idxtuple);
}