1 /*-------------------------------------------------------------------------
5 * Routines for opclass manipulation commands
7 * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
8 * Portions Copyright (c) 1994, Regents of the University of California
12 * $Header: /cvsroot/pgsql/src/backend/commands/opclasscmds.c,v 1.18 2003/08/17 19:58:04 tgl Exp $
14 *-------------------------------------------------------------------------
18 #include "access/genam.h"
19 #include "access/heapam.h"
20 #include "catalog/catname.h"
21 #include "catalog/dependency.h"
22 #include "catalog/indexing.h"
23 #include "catalog/namespace.h"
24 #include "catalog/pg_am.h"
25 #include "catalog/pg_amop.h"
26 #include "catalog/pg_amproc.h"
27 #include "catalog/pg_opclass.h"
28 #include "commands/defrem.h"
29 #include "miscadmin.h"
30 #include "parser/parse_func.h"
31 #include "parser/parse_oper.h"
32 #include "parser/parse_type.h"
33 #include "utils/acl.h"
34 #include "utils/builtins.h"
35 #include "utils/fmgroids.h"
36 #include "utils/lsyscache.h"
37 #include "utils/syscache.h"
40 static void storeOperators(Oid opclassoid, int numOperators,
41 Oid *operators, bool *recheck);
42 static void storeProcedures(Oid opclassoid, int numProcs, Oid *procedures);
47 * Define a new index operator class.
50 DefineOpClass(CreateOpClassStmt *stmt)
52 char *opcname; /* name of opclass we're creating */
53 Oid amoid, /* our AM's oid */
54 typeoid, /* indexable datatype oid */
55 storageoid, /* storage datatype oid, if any */
56 namespaceoid, /* namespace to create opclass in */
57 opclassoid; /* oid of opclass we create */
58 int numOperators, /* amstrategies value */
59 numProcs; /* amsupport value */
60 Oid *operators, /* oids of operators, by strategy num */
61 *procedures; /* oids of support procs */
62 bool *recheck; /* do operators need recheck */
66 Datum values[Natts_pg_opclass];
67 char nulls[Natts_pg_opclass];
74 /* Convert list of names to a name and namespace */
75 namespaceoid = QualifiedNameGetCreationNamespace(stmt->opclassname,
78 /* Check we have creation rights in target namespace */
79 aclresult = pg_namespace_aclcheck(namespaceoid, GetUserId(), ACL_CREATE);
80 if (aclresult != ACLCHECK_OK)
81 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
82 get_namespace_name(namespaceoid));
84 /* Get necessary info about access method */
85 tup = SearchSysCache(AMNAME,
86 CStringGetDatum(stmt->amname),
88 if (!HeapTupleIsValid(tup))
90 (errcode(ERRCODE_UNDEFINED_OBJECT),
91 errmsg("access method \"%s\" does not exist",
94 amoid = HeapTupleGetOid(tup);
95 numOperators = ((Form_pg_am) GETSTRUCT(tup))->amstrategies;
96 numProcs = ((Form_pg_am) GETSTRUCT(tup))->amsupport;
98 /* XXX Should we make any privilege check against the AM? */
100 ReleaseSysCache(tup);
103 * Currently, we require superuser privileges to create an opclass.
104 * This seems necessary because we have no way to validate that the
105 * offered set of operators and functions are consistent with the AM's
106 * expectations. It would be nice to provide such a check someday, if
107 * it can be done without solving the halting problem :-(
111 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
112 errmsg("must be superuser to create an operator class")));
114 /* Look up the datatype */
115 typeoid = typenameTypeId(stmt->datatype);
118 /* XXX this is unnecessary given the superuser check above */
119 /* Check we have ownership of the datatype */
120 if (!pg_type_ownercheck(typeoid, GetUserId()))
121 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_TYPE,
122 format_type_be(typeoid));
125 /* Storage datatype is optional */
126 storageoid = InvalidOid;
129 * Create work arrays to hold info about operators and procedures. We
130 * do this mainly so that we can detect duplicate strategy numbers and
131 * support-proc numbers.
133 operators = (Oid *) palloc0(sizeof(Oid) * numOperators);
134 procedures = (Oid *) palloc0(sizeof(Oid) * numProcs);
135 recheck = (bool *) palloc0(sizeof(bool) * numOperators);
138 * Scan the "items" list to obtain additional info.
140 foreach(iteml, stmt->items)
142 CreateOpClassItem *item = lfirst(iteml);
147 Assert(IsA(item, CreateOpClassItem));
148 switch (item->itemtype)
150 case OPCLASS_ITEM_OPERATOR:
151 if (item->number <= 0 || item->number > numOperators)
153 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
154 errmsg("invalid operator number %d,"
155 " must be between 1 and %d",
156 item->number, numOperators)));
157 if (operators[item->number - 1] != InvalidOid)
159 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
160 errmsg("operator number %d appears more than once",
162 if (item->args != NIL)
164 TypeName *typeName1 = (TypeName *) lfirst(item->args);
165 TypeName *typeName2 = (TypeName *) lsecond(item->args);
167 operOid = LookupOperNameTypeNames(item->name,
174 /* Default to binary op on input datatype */
175 operOid = LookupOperName(item->name, typeoid, typeoid,
178 /* Caller must have execute permission on operators */
179 funcOid = get_opcode(operOid);
180 aclresult = pg_proc_aclcheck(funcOid, GetUserId(),
182 if (aclresult != ACLCHECK_OK)
183 aclcheck_error(aclresult, ACL_KIND_PROC,
184 get_func_name(funcOid));
185 operators[item->number - 1] = operOid;
186 recheck[item->number - 1] = item->recheck;
188 case OPCLASS_ITEM_FUNCTION:
189 if (item->number <= 0 || item->number > numProcs)
191 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
192 errmsg("invalid procedure number %d,"
193 " must be between 1 and %d",
194 item->number, numProcs)));
195 if (procedures[item->number - 1] != InvalidOid)
197 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
198 errmsg("DefineOpClass: procedure number %d appears more than once",
200 funcOid = LookupFuncNameTypeNames(item->name, item->args,
202 /* Caller must have execute permission on functions */
203 aclresult = pg_proc_aclcheck(funcOid, GetUserId(),
205 if (aclresult != ACLCHECK_OK)
206 aclcheck_error(aclresult, ACL_KIND_PROC,
207 get_func_name(funcOid));
208 procedures[item->number - 1] = funcOid;
210 case OPCLASS_ITEM_STORAGETYPE:
211 if (OidIsValid(storageoid))
213 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
214 errmsg("storage type specified more than once")));
215 storageoid = typenameTypeId(item->storedtype);
218 elog(ERROR, "unrecognized item type: %d", item->itemtype);
224 * If storagetype is specified, make sure it's legal.
226 if (OidIsValid(storageoid))
228 /* Just drop the spec if same as column datatype */
229 if (storageoid == typeoid)
230 storageoid = InvalidOid;
234 * Currently, only GiST allows storagetype different from
235 * datatype. This hardcoded test should be eliminated in
236 * favor of adding another boolean column to pg_am ...
238 if (amoid != GIST_AM_OID)
240 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
241 errmsg("storage type may not be different from datatype for access method \"%s\"",
246 rel = heap_openr(OperatorClassRelationName, RowExclusiveLock);
249 * Make sure there is no existing opclass of this name (this is just
250 * to give a more friendly error message than "duplicate key").
252 if (SearchSysCacheExists(CLAAMNAMENSP,
253 ObjectIdGetDatum(amoid),
254 CStringGetDatum(opcname),
255 ObjectIdGetDatum(namespaceoid),
258 (errcode(ERRCODE_DUPLICATE_OBJECT),
259 errmsg("operator class \"%s\" for access method \"%s\" already exists",
260 opcname, stmt->amname)));
263 * If we are creating a default opclass, check there isn't one
264 * already. (Note we do not restrict this test to visible opclasses;
265 * this ensures that typcache.c can find unique solutions to its
273 ScanKeyEntryInitialize(&skey[0], 0x0,
274 Anum_pg_opclass_opcamid, F_OIDEQ,
275 ObjectIdGetDatum(amoid));
277 scan = systable_beginscan(rel, OpclassAmNameNspIndex, true,
278 SnapshotNow, 1, skey);
280 while (HeapTupleIsValid(tup = systable_getnext(scan)))
282 Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
284 if (opclass->opcintype == typeoid && opclass->opcdefault)
286 (errcode(ERRCODE_DUPLICATE_OBJECT),
287 errmsg("could not make class \"%s\" be default for type %s",
289 TypeNameToString(stmt->datatype)),
290 errdetail("Class \"%s\" already is the default.",
291 NameStr(opclass->opcname))));
294 systable_endscan(scan);
298 * Okay, let's create the pg_opclass entry.
300 for (i = 0; i < Natts_pg_opclass; ++i)
303 values[i] = (Datum) NULL; /* redundant, but safe */
307 values[i++] = ObjectIdGetDatum(amoid); /* opcamid */
308 namestrcpy(&opcName, opcname);
309 values[i++] = NameGetDatum(&opcName); /* opcname */
310 values[i++] = ObjectIdGetDatum(namespaceoid); /* opcnamespace */
311 values[i++] = Int32GetDatum(GetUserId()); /* opcowner */
312 values[i++] = ObjectIdGetDatum(typeoid); /* opcintype */
313 values[i++] = BoolGetDatum(stmt->isDefault); /* opcdefault */
314 values[i++] = ObjectIdGetDatum(storageoid); /* opckeytype */
316 tup = heap_formtuple(rel->rd_att, values, nulls);
318 opclassoid = simple_heap_insert(rel, tup);
320 CatalogUpdateIndexes(rel, tup);
325 * Now add tuples to pg_amop and pg_amproc tying in the operators and
328 storeOperators(opclassoid, numOperators, operators, recheck);
329 storeProcedures(opclassoid, numProcs, procedures);
332 * Create dependencies. Note: we do not create a dependency link to
333 * the AM, because we don't currently support DROP ACCESS METHOD.
335 myself.classId = RelationGetRelid(rel);
336 myself.objectId = opclassoid;
337 myself.objectSubId = 0;
339 /* dependency on namespace */
340 referenced.classId = get_system_catalog_relid(NamespaceRelationName);
341 referenced.objectId = namespaceoid;
342 referenced.objectSubId = 0;
343 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
345 /* dependency on indexed datatype */
346 referenced.classId = RelOid_pg_type;
347 referenced.objectId = typeoid;
348 referenced.objectSubId = 0;
349 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
351 /* dependency on storage datatype */
352 if (OidIsValid(storageoid))
354 referenced.classId = RelOid_pg_type;
355 referenced.objectId = storageoid;
356 referenced.objectSubId = 0;
357 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
360 /* dependencies on operators */
361 referenced.classId = get_system_catalog_relid(OperatorRelationName);
362 for (i = 0; i < numOperators; i++)
364 if (operators[i] == InvalidOid)
366 referenced.objectId = operators[i];
367 referenced.objectSubId = 0;
368 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
371 /* dependencies on procedures */
372 for (i = 0; i < numProcs; i++)
374 if (procedures[i] == InvalidOid)
376 referenced.classId = RelOid_pg_proc;
377 referenced.objectId = procedures[i];
378 referenced.objectSubId = 0;
379 recordDependencyOn(&myself, &referenced, DEPENDENCY_NORMAL);
382 heap_close(rel, RowExclusiveLock);
386 * Dump the operators to pg_amop
389 storeOperators(Oid opclassoid, int numOperators,
390 Oid *operators, bool *recheck)
393 Datum values[Natts_pg_amop];
394 char nulls[Natts_pg_amop];
399 rel = heap_openr(AccessMethodOperatorRelationName, RowExclusiveLock);
401 for (j = 0; j < numOperators; j++)
403 if (operators[j] == InvalidOid)
406 for (i = 0; i < Natts_pg_amop; ++i)
409 values[i] = (Datum) NULL;
413 values[i++] = ObjectIdGetDatum(opclassoid); /* amopclaid */
414 values[i++] = Int16GetDatum(j + 1); /* amopstrategy */
415 values[i++] = BoolGetDatum(recheck[j]); /* amopreqcheck */
416 values[i++] = ObjectIdGetDatum(operators[j]); /* amopopr */
418 tup = heap_formtuple(rel->rd_att, values, nulls);
420 simple_heap_insert(rel, tup);
422 CatalogUpdateIndexes(rel, tup);
427 heap_close(rel, RowExclusiveLock);
431 * Dump the procedures (support routines) to pg_amproc
434 storeProcedures(Oid opclassoid, int numProcs, Oid *procedures)
437 Datum values[Natts_pg_amproc];
438 char nulls[Natts_pg_amproc];
443 rel = heap_openr(AccessMethodProcedureRelationName, RowExclusiveLock);
445 for (j = 0; j < numProcs; j++)
447 if (procedures[j] == InvalidOid)
450 for (i = 0; i < Natts_pg_amproc; ++i)
453 values[i] = (Datum) NULL;
457 values[i++] = ObjectIdGetDatum(opclassoid); /* amopclaid */
458 values[i++] = Int16GetDatum(j + 1); /* amprocnum */
459 values[i++] = ObjectIdGetDatum(procedures[j]); /* amproc */
461 tup = heap_formtuple(rel->rd_att, values, nulls);
463 simple_heap_insert(rel, tup);
465 CatalogUpdateIndexes(rel, tup);
470 heap_close(rel, RowExclusiveLock);
476 * Deletes an opclass.
479 RemoveOpClass(RemoveOpClassStmt *stmt)
486 ObjectAddress object;
489 * Get the access method's OID.
491 amID = GetSysCacheOid(AMNAME,
492 CStringGetDatum(stmt->amname),
494 if (!OidIsValid(amID))
496 (errcode(ERRCODE_UNDEFINED_OBJECT),
497 errmsg("access method \"%s\" does not exist",
501 * Look up the opclass.
504 /* deconstruct the name list */
505 DeconstructQualifiedName(stmt->opclassname, &schemaname, &opcname);
509 /* Look in specific schema only */
512 namespaceId = LookupExplicitNamespace(schemaname);
513 tuple = SearchSysCache(CLAAMNAMENSP,
514 ObjectIdGetDatum(amID),
515 PointerGetDatum(opcname),
516 ObjectIdGetDatum(namespaceId),
521 /* Unqualified opclass name, so search the search path */
522 opcID = OpclassnameGetOpcid(amID, opcname);
523 if (!OidIsValid(opcID))
525 (errcode(ERRCODE_UNDEFINED_OBJECT),
526 errmsg("operator class \"%s\" does not exist for access method \"%s\"",
527 opcname, stmt->amname)));
528 tuple = SearchSysCache(CLAOID,
529 ObjectIdGetDatum(opcID),
533 if (!HeapTupleIsValid(tuple))
535 (errcode(ERRCODE_UNDEFINED_OBJECT),
536 errmsg("operator class \"%s\" does not exist for access method \"%s\"",
537 NameListToString(stmt->opclassname), stmt->amname)));
539 opcID = HeapTupleGetOid(tuple);
541 /* Permission check: must own opclass or its namespace */
542 if (!pg_opclass_ownercheck(opcID, GetUserId()) &&
543 !pg_namespace_ownercheck(((Form_pg_opclass) GETSTRUCT(tuple))->opcnamespace,
545 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_OPCLASS,
546 NameListToString(stmt->opclassname));
548 ReleaseSysCache(tuple);
553 object.classId = get_system_catalog_relid(OperatorClassRelationName);
554 object.objectId = opcID;
555 object.objectSubId = 0;
557 performDeletion(&object, stmt->behavior);
561 * Guts of opclass deletion.
564 RemoveOpClassById(Oid opclassOid)
572 * First remove the pg_opclass entry itself.
574 rel = heap_openr(OperatorClassRelationName, RowExclusiveLock);
576 tup = SearchSysCache(CLAOID,
577 ObjectIdGetDatum(opclassOid),
579 if (!HeapTupleIsValid(tup)) /* should not happen */
580 elog(ERROR, "cache lookup failed for opclass %u", opclassOid);
582 simple_heap_delete(rel, &tup->t_self);
584 ReleaseSysCache(tup);
586 heap_close(rel, RowExclusiveLock);
589 * Remove associated entries in pg_amop.
591 ScanKeyEntryInitialize(&skey[0], 0,
592 Anum_pg_amop_amopclaid, F_OIDEQ,
593 ObjectIdGetDatum(opclassOid));
595 rel = heap_openr(AccessMethodOperatorRelationName, RowExclusiveLock);
597 scan = systable_beginscan(rel, AccessMethodStrategyIndex, true,
598 SnapshotNow, 1, skey);
600 while (HeapTupleIsValid(tup = systable_getnext(scan)))
601 simple_heap_delete(rel, &tup->t_self);
603 systable_endscan(scan);
604 heap_close(rel, RowExclusiveLock);
607 * Remove associated entries in pg_amproc.
609 ScanKeyEntryInitialize(&skey[0], 0,
610 Anum_pg_amproc_amopclaid, F_OIDEQ,
611 ObjectIdGetDatum(opclassOid));
613 rel = heap_openr(AccessMethodProcedureRelationName, RowExclusiveLock);
615 scan = systable_beginscan(rel, AccessMethodProcedureIndex, true,
616 SnapshotNow, 1, skey);
618 while (HeapTupleIsValid(tup = systable_getnext(scan)))
619 simple_heap_delete(rel, &tup->t_self);
621 systable_endscan(scan);
622 heap_close(rel, RowExclusiveLock);
630 RenameOpClass(List *name, const char *access_method, const char *newname)
641 amOid = GetSysCacheOid(AMNAME,
642 CStringGetDatum(access_method),
644 if (!OidIsValid(amOid))
646 (errcode(ERRCODE_UNDEFINED_OBJECT),
647 errmsg("access method \"%s\" does not exist",
650 rel = heap_openr(OperatorClassRelationName, RowExclusiveLock);
653 * Look up the opclass
655 DeconstructQualifiedName(name, &schemaname, &opcname);
659 namespaceOid = LookupExplicitNamespace(schemaname);
661 tup = SearchSysCacheCopy(CLAAMNAMENSP,
662 ObjectIdGetDatum(amOid),
663 PointerGetDatum(opcname),
664 ObjectIdGetDatum(namespaceOid),
666 if (!HeapTupleIsValid(tup))
668 (errcode(ERRCODE_UNDEFINED_OBJECT),
669 errmsg("operator class \"%s\" does not exist for access method \"%s\"",
670 opcname, access_method)));
672 opcOid = HeapTupleGetOid(tup);
676 opcOid = OpclassnameGetOpcid(amOid, opcname);
677 if (!OidIsValid(opcOid))
679 (errcode(ERRCODE_UNDEFINED_OBJECT),
680 errmsg("operator class \"%s\" does not exist for access method \"%s\"",
681 opcname, access_method)));
683 tup = SearchSysCacheCopy(CLAOID,
684 ObjectIdGetDatum(opcOid),
686 if (!HeapTupleIsValid(tup)) /* should not happen */
687 elog(ERROR, "cache lookup failed for opclass %u", opcOid);
689 namespaceOid = ((Form_pg_opclass) GETSTRUCT(tup))->opcnamespace;
692 /* make sure the new name doesn't exist */
693 if (SearchSysCacheExists(CLAAMNAMENSP,
694 ObjectIdGetDatum(amOid),
695 CStringGetDatum(newname),
696 ObjectIdGetDatum(namespaceOid),
700 (errcode(ERRCODE_DUPLICATE_OBJECT),
701 errmsg("operator class \"%s\" for access method \"%s\" already exists in schema \"%s\"",
702 newname, access_method,
703 get_namespace_name(namespaceOid))));
707 if (!pg_opclass_ownercheck(opcOid, GetUserId()))
708 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_OPCLASS,
709 NameListToString(name));
711 /* must have CREATE privilege on namespace */
712 aclresult = pg_namespace_aclcheck(namespaceOid, GetUserId(), ACL_CREATE);
713 if (aclresult != ACLCHECK_OK)
714 aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
715 get_namespace_name(namespaceOid));
718 namestrcpy(&(((Form_pg_opclass) GETSTRUCT(tup))->opcname), newname);
719 simple_heap_update(rel, &tup->t_self, tup);
720 CatalogUpdateIndexes(rel, tup);
722 heap_close(rel, NoLock);