]> granicus.if.org Git - postgresql/blob - src/backend/commands/indexcmds.c
Reimplement the linked list data structure used throughout the backend.
[postgresql] / src / backend / commands / indexcmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * indexcmds.c
4  *        POSTGRES define and remove index code.
5  *
6  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.120 2004/05/26 04:41:11 neilc Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "catalog/catalog.h"
20 #include "catalog/catname.h"
21 #include "catalog/dependency.h"
22 #include "catalog/heap.h"
23 #include "catalog/index.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_opclass.h"
26 #include "catalog/pg_proc.h"
27 #include "commands/dbcommands.h"
28 #include "commands/defrem.h"
29 #include "commands/tablecmds.h"
30 #include "executor/executor.h"
31 #include "miscadmin.h"
32 #include "optimizer/clauses.h"
33 #include "optimizer/prep.h"
34 #include "parser/analyze.h"
35 #include "parser/parsetree.h"
36 #include "parser/parse_coerce.h"
37 #include "parser/parse_expr.h"
38 #include "parser/parse_func.h"
39 #include "utils/acl.h"
40 #include "utils/builtins.h"
41 #include "utils/lsyscache.h"
42 #include "utils/relcache.h"
43 #include "utils/syscache.h"
44
45
46 /* non-export function prototypes */
47 static void CheckPredicate(Expr *predicate);
48 static void ComputeIndexAttrs(IndexInfo *indexInfo, Oid *classOidP,
49                                                           List *attList,
50                                                           Oid relId,
51                                                           char *accessMethodName, Oid accessMethodId,
52                                                           bool isconstraint);
53 static Oid GetIndexOpClass(List *opclass, Oid attrType,
54                                 char *accessMethodName, Oid accessMethodId);
55 static Oid      GetDefaultOpClass(Oid attrType, Oid accessMethodId);
56 static char *CreateIndexName(const char *table_name, const char *column_name,
57                                                          const char *label, Oid inamespace);
58 static bool relationHasPrimaryKey(Relation rel);
59
60
61 /*
62  * DefineIndex
63  *              Creates a new index.
64  *
65  * 'heapRelation': the relation the index will apply to.
66  * 'indexRelationName': the name for the new index, or NULL to indicate
67  *              that a nonconflicting default name should be picked.
68  * 'accessMethodName': name of the AM to use.
69  * 'attributeList': a list of IndexElem specifying columns and expressions
70  *              to index on.
71  * 'predicate': the partial-index condition, or NULL if none.
72  * 'rangetable': needed to interpret the predicate.
73  * 'unique': make the index enforce uniqueness.
74  * 'primary': mark the index as a primary key in the catalogs.
75  * 'isconstraint': index is for a PRIMARY KEY or UNIQUE constraint,
76  *              so build a pg_constraint entry for it.
77  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
78  * 'check_rights': check for CREATE rights in the namespace.  (This should
79  *              be true except when ALTER is deleting/recreating an index.)
80  * 'skip_build': make the catalog entries but leave the index file empty;
81  *              it will be filled later.
82  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
83  */
84 void
85 DefineIndex(RangeVar *heapRelation,
86                         char *indexRelationName,
87                         char *accessMethodName,
88                         List *attributeList,
89                         Expr *predicate,
90                         List *rangetable,
91                         bool unique,
92                         bool primary,
93                         bool isconstraint,
94                         bool is_alter_table,
95                         bool check_rights,
96                         bool skip_build,
97                         bool quiet)
98 {
99         Oid                *classObjectId;
100         Oid                     accessMethodId;
101         Oid                     relationId;
102         Oid                     namespaceId;
103         Relation        rel;
104         HeapTuple       tuple;
105         Form_pg_am      accessMethodForm;
106         IndexInfo  *indexInfo;
107         int                     numberOfAttributes;
108
109         /*
110          * count attributes in index
111          */
112         numberOfAttributes = list_length(attributeList);
113         if (numberOfAttributes <= 0)
114                 ereport(ERROR,
115                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
116                                  errmsg("must specify at least one column")));
117         if (numberOfAttributes > INDEX_MAX_KEYS)
118                 ereport(ERROR,
119                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
120                                  errmsg("cannot use more than %d columns in an index",
121                                                 INDEX_MAX_KEYS)));
122
123         /*
124          * Open heap relation, acquire a suitable lock on it, remember its OID
125          */
126         rel = heap_openrv(heapRelation, ShareLock);
127
128         /* Note: during bootstrap may see uncataloged relation */
129         if (rel->rd_rel->relkind != RELKIND_RELATION &&
130                 rel->rd_rel->relkind != RELKIND_UNCATALOGED)
131                 ereport(ERROR,
132                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
133                                  errmsg("\"%s\" is not a table",
134                                                 heapRelation->relname)));
135
136         relationId = RelationGetRelid(rel);
137         namespaceId = RelationGetNamespace(rel);
138
139         /*
140          * Verify we (still) have CREATE rights in the rel's namespace.
141          * (Presumably we did when the rel was created, but maybe not
142          * anymore.)  Skip check if caller doesn't want it.  Also skip check
143          * if bootstrapping, since permissions machinery may not be working yet.
144          */
145         if (check_rights && !IsBootstrapProcessingMode())
146         {
147                 AclResult       aclresult;
148
149                 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
150                                                                                   ACL_CREATE);
151                 if (aclresult != ACLCHECK_OK)
152                         aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
153                                                    get_namespace_name(namespaceId));
154         }
155
156         /*
157          * Select name for index if caller didn't specify
158          */
159         if (indexRelationName == NULL)
160         {
161                 if (primary)
162                         indexRelationName = CreateIndexName(RelationGetRelationName(rel),
163                                                                                                 NULL,
164                                                                                                 "pkey",
165                                                                                                 namespaceId);
166                 else
167                 {
168                         IndexElem  *iparam = (IndexElem *) linitial(attributeList);
169
170                         indexRelationName = CreateIndexName(RelationGetRelationName(rel),
171                                                                                                 iparam->name,
172                                                                                                 "key",
173                                                                                                 namespaceId);
174                 }
175         }
176
177         /*
178          * look up the access method, verify it can handle the requested
179          * features
180          */
181         tuple = SearchSysCache(AMNAME,
182                                                    PointerGetDatum(accessMethodName),
183                                                    0, 0, 0);
184         if (!HeapTupleIsValid(tuple))
185                 ereport(ERROR,
186                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
187                                  errmsg("access method \"%s\" does not exist",
188                                                 accessMethodName)));
189         accessMethodId = HeapTupleGetOid(tuple);
190         accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
191
192         if (unique && !accessMethodForm->amcanunique)
193                 ereport(ERROR,
194                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
195                    errmsg("access method \"%s\" does not support unique indexes",
196                                   accessMethodName)));
197         if (numberOfAttributes > 1 && !accessMethodForm->amcanmulticol)
198                 ereport(ERROR,
199                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
200                                  errmsg("access method \"%s\" does not support multicolumn indexes",
201                                                 accessMethodName)));
202
203         ReleaseSysCache(tuple);
204
205         /*
206          * If a range table was created then check that only the base rel is
207          * mentioned.
208          */
209         if (rangetable != NIL)
210         {
211                 if (list_length(rangetable) != 1 || getrelid(1, rangetable) != relationId)
212                         ereport(ERROR,
213                                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
214                                          errmsg("index expressions and predicates may refer only to the table being indexed")));
215         }
216
217         /*
218          * Validate predicate, if given
219          */
220         if (predicate)
221                 CheckPredicate(predicate);
222
223         /*
224          * Extra checks when creating a PRIMARY KEY index.
225          */
226         if (primary)
227         {
228                 List       *cmds;
229                 ListCell   *keys;
230
231                 /*
232                  * If ALTER TABLE, check that there isn't already a PRIMARY KEY.
233                  * In CREATE TABLE, we have faith that the parser rejected multiple
234                  * pkey clauses; and CREATE INDEX doesn't have a way to say
235                  * PRIMARY KEY, so it's no problem either.
236                  */
237                 if (is_alter_table &&
238                         relationHasPrimaryKey(rel))
239                 {
240                         ereport(ERROR,
241                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
242                                          errmsg("multiple primary keys for table \"%s\" are not allowed",
243                                                         RelationGetRelationName(rel))));
244                 }
245
246                 /*
247                  * Check that all of the attributes in a primary key are marked as not
248                  * null, otherwise attempt to ALTER TABLE .. SET NOT NULL
249                  */
250                 cmds = NIL;
251                 foreach(keys, attributeList)
252                 {
253                         IndexElem  *key = (IndexElem *) lfirst(keys);
254                         HeapTuple       atttuple;
255
256                         if (!key->name)
257                                 ereport(ERROR,
258                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
259                                                  errmsg("primary keys cannot be expressions")));
260
261                         /* System attributes are never null, so no problem */
262                         if (SystemAttributeByName(key->name, rel->rd_rel->relhasoids))
263                                 continue;
264
265                         atttuple = SearchSysCacheAttName(relationId, key->name);
266                         if (HeapTupleIsValid(atttuple))
267                         {
268                                 if (!((Form_pg_attribute) GETSTRUCT(atttuple))->attnotnull)
269                                 {
270                                         /* Add a subcommand to make this one NOT NULL */
271                                         AlterTableCmd  *cmd = makeNode(AlterTableCmd);
272
273                                         cmd->subtype = AT_SetNotNull;
274                                         cmd->name = key->name;
275
276                                         cmds = lappend(cmds, cmd);
277                                 }
278                                 ReleaseSysCache(atttuple);
279                         }
280                         else
281                         {
282                                 /*
283                                  * This shouldn't happen during CREATE TABLE, but can
284                                  * happen during ALTER TABLE.  Keep message in sync with
285                                  * transformIndexConstraints() in parser/analyze.c.
286                                  */
287                                 ereport(ERROR,
288                                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
289                                           errmsg("column \"%s\" named in key does not exist",
290                                                          key->name)));
291                         }
292                 }
293
294                 /*
295                  * XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade
296                  * to child tables?  Currently, since the PRIMARY KEY
297                  * itself doesn't cascade, we don't cascade the
298                  * notnull constraint(s) either; but this is pretty debatable.
299                  *
300                  * XXX: possible future improvement: when being called from
301                  * ALTER TABLE, it would be more efficient to merge this with
302                  * the outer ALTER TABLE, so as to avoid two scans.  But that
303                  * seems to complicate DefineIndex's API unduly.
304                  */
305                 if (cmds)
306                         AlterTableInternal(relationId, cmds, false);
307         }
308
309         /*
310          * Prepare arguments for index_create, primarily an IndexInfo
311          * structure.  Note that ii_Predicate must be in implicit-AND format.
312          */
313         indexInfo = makeNode(IndexInfo);
314         indexInfo->ii_NumIndexAttrs = numberOfAttributes;
315         indexInfo->ii_Expressions = NIL;        /* for now */
316         indexInfo->ii_ExpressionsState = NIL;
317         indexInfo->ii_Predicate = make_ands_implicit(predicate);
318         indexInfo->ii_PredicateState = NIL;
319         indexInfo->ii_Unique = unique;
320
321         classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
322         ComputeIndexAttrs(indexInfo, classObjectId, attributeList,
323                                           relationId, accessMethodName, accessMethodId,
324                                           isconstraint);
325
326         heap_close(rel, NoLock);
327
328         /*
329          * Report index creation if appropriate (delay this till after most
330          * of the error checks)
331          */
332         if (isconstraint && !quiet)
333                 ereport(NOTICE,
334                                 (errmsg("%s %s will create implicit index \"%s\" for table \"%s\"",
335                                                 is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
336                                                 primary ? "PRIMARY KEY" : "UNIQUE",
337                                                 indexRelationName, RelationGetRelationName(rel))));
338
339         index_create(relationId, indexRelationName,
340                                  indexInfo, accessMethodId, classObjectId,
341                                  primary, isconstraint,
342                                  allowSystemTableMods, skip_build);
343
344         /*
345          * We update the relation's pg_class tuple even if it already has
346          * relhasindex = true.  This is needed to cause a shared-cache-inval
347          * message to be sent for the pg_class tuple, which will cause other
348          * backends to flush their relcache entries and in particular their
349          * cached lists of the indexes for this relation.
350          */
351         setRelhasindex(relationId, true, primary, InvalidOid);
352 }
353
354
355 /*
356  * CheckPredicate
357  *              Checks that the given partial-index predicate is valid.
358  *
359  * This used to also constrain the form of the predicate to forms that
360  * indxpath.c could do something with.  However, that seems overly
361  * restrictive.  One useful application of partial indexes is to apply
362  * a UNIQUE constraint across a subset of a table, and in that scenario
363  * any evaluatable predicate will work.  So accept any predicate here
364  * (except ones requiring a plan), and let indxpath.c fend for itself.
365  */
366 static void
367 CheckPredicate(Expr *predicate)
368 {
369         /*
370          * We don't currently support generation of an actual query plan for a
371          * predicate, only simple scalar expressions; hence these
372          * restrictions.
373          */
374         if (contain_subplans((Node *) predicate))
375                 ereport(ERROR,
376                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
377                                  errmsg("cannot use subquery in index predicate")));
378         if (contain_agg_clause((Node *) predicate))
379                 ereport(ERROR,
380                                 (errcode(ERRCODE_GROUPING_ERROR),
381                                  errmsg("cannot use aggregate in index predicate")));
382
383         /*
384          * A predicate using mutable functions is probably wrong, for the same
385          * reasons that we don't allow an index expression to use one.
386          */
387         if (contain_mutable_functions((Node *) predicate))
388                 ereport(ERROR,
389                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
390                 errmsg("functions in index predicate must be marked IMMUTABLE")));
391 }
392
393 static void
394 ComputeIndexAttrs(IndexInfo *indexInfo,
395                                   Oid *classOidP,
396                                   List *attList,        /* list of IndexElem's */
397                                   Oid relId,
398                                   char *accessMethodName,
399                                   Oid accessMethodId,
400                                   bool isconstraint)
401 {
402         ListCell   *rest;
403         int                     attn = 0;
404
405         /*
406          * process attributeList
407          */
408         foreach(rest, attList)
409         {
410                 IndexElem  *attribute = (IndexElem *) lfirst(rest);
411                 Oid                     atttype;
412
413                 if (attribute->name != NULL)
414                 {
415                         /* Simple index attribute */
416                         HeapTuple       atttuple;
417                         Form_pg_attribute attform;
418
419                         Assert(attribute->expr == NULL);
420                         atttuple = SearchSysCacheAttName(relId, attribute->name);
421                         if (!HeapTupleIsValid(atttuple))
422                         {
423                                 /* difference in error message spellings is historical */
424                                 if (isconstraint)
425                                         ereport(ERROR,
426                                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
427                                                          errmsg("column \"%s\" named in key does not exist",
428                                                                         attribute->name)));
429                                 else
430                                         ereport(ERROR,
431                                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
432                                                          errmsg("column \"%s\" does not exist",
433                                                                         attribute->name)));
434                         }
435                         attform = (Form_pg_attribute) GETSTRUCT(atttuple);
436                         indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum;
437                         atttype = attform->atttypid;
438                         ReleaseSysCache(atttuple);
439                 }
440                 else if (attribute->expr && IsA(attribute->expr, Var))
441                 {
442                         /* Tricky tricky, he wrote (column) ... treat as simple attr */
443                         Var                *var = (Var *) attribute->expr;
444
445                         indexInfo->ii_KeyAttrNumbers[attn] = var->varattno;
446                         atttype = get_atttype(relId, var->varattno);
447                 }
448                 else
449                 {
450                         /* Index expression */
451                         Assert(attribute->expr != NULL);
452                         indexInfo->ii_KeyAttrNumbers[attn] = 0;         /* marks expression */
453                         indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
454                                                                                                 attribute->expr);
455                         atttype = exprType(attribute->expr);
456
457                         /*
458                          * We don't currently support generation of an actual query
459                          * plan for an index expression, only simple scalar
460                          * expressions; hence these restrictions.
461                          */
462                         if (contain_subplans(attribute->expr))
463                                 ereport(ERROR,
464                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
465                                    errmsg("cannot use subquery in index expression")));
466                         if (contain_agg_clause(attribute->expr))
467                                 ereport(ERROR,
468                                                 (errcode(ERRCODE_GROUPING_ERROR),
469                                         errmsg("cannot use aggregate function in index expression")));
470
471                         /*
472                          * A expression using mutable functions is probably wrong,
473                          * since if you aren't going to get the same result for the
474                          * same data every time, it's not clear what the index entries
475                          * mean at all.
476                          */
477                         if (contain_mutable_functions(attribute->expr))
478                                 ereport(ERROR,
479                                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
480                                                  errmsg("functions in index expression must be marked IMMUTABLE")));
481                 }
482
483                 classOidP[attn] = GetIndexOpClass(attribute->opclass,
484                                                                                   atttype,
485                                                                                   accessMethodName,
486                                                                                   accessMethodId);
487                 attn++;
488         }
489 }
490
491 /*
492  * Resolve possibly-defaulted operator class specification
493  */
494 static Oid
495 GetIndexOpClass(List *opclass, Oid attrType,
496                                 char *accessMethodName, Oid accessMethodId)
497 {
498         char       *schemaname;
499         char       *opcname;
500         HeapTuple       tuple;
501         Oid                     opClassId,
502                                 opInputType;
503
504         /*
505          * Release 7.0 removed network_ops, timespan_ops, and datetime_ops, so
506          * we ignore those opclass names so the default *_ops is used.  This
507          * can be removed in some later release.  bjm 2000/02/07
508          *
509          * Release 7.1 removes lztext_ops, so suppress that too for a while.  tgl
510          * 2000/07/30
511          *
512          * Release 7.2 renames timestamp_ops to timestamptz_ops, so suppress that
513          * too for awhile.      I'm starting to think we need a better approach.
514          * tgl 2000/10/01
515          *
516          * Release 7.5 removes bigbox_ops (which was dead code for a long while
517          * anyway).  tgl 2003/11/11
518          */
519         if (list_length(opclass) == 1)
520         {
521                 char       *claname = strVal(linitial(opclass));
522
523                 if (strcmp(claname, "network_ops") == 0 ||
524                         strcmp(claname, "timespan_ops") == 0 ||
525                         strcmp(claname, "datetime_ops") == 0 ||
526                         strcmp(claname, "lztext_ops") == 0 ||
527                         strcmp(claname, "timestamp_ops") == 0 ||
528                         strcmp(claname, "bigbox_ops") == 0)
529                         opclass = NIL;
530         }
531
532         if (opclass == NIL)
533         {
534                 /* no operator class specified, so find the default */
535                 opClassId = GetDefaultOpClass(attrType, accessMethodId);
536                 if (!OidIsValid(opClassId))
537                         ereport(ERROR,
538                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
539                                          errmsg("data type %s has no default operator class for access method \"%s\"",
540                                                         format_type_be(attrType), accessMethodName),
541                                          errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
542                 return opClassId;
543         }
544
545         /*
546          * Specific opclass name given, so look up the opclass.
547          */
548
549         /* deconstruct the name list */
550         DeconstructQualifiedName(opclass, &schemaname, &opcname);
551
552         if (schemaname)
553         {
554                 /* Look in specific schema only */
555                 Oid                     namespaceId;
556
557                 namespaceId = LookupExplicitNamespace(schemaname);
558                 tuple = SearchSysCache(CLAAMNAMENSP,
559                                                            ObjectIdGetDatum(accessMethodId),
560                                                            PointerGetDatum(opcname),
561                                                            ObjectIdGetDatum(namespaceId),
562                                                            0);
563         }
564         else
565         {
566                 /* Unqualified opclass name, so search the search path */
567                 opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
568                 if (!OidIsValid(opClassId))
569                         ereport(ERROR,
570                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
571                                          errmsg("operator class \"%s\" does not exist for access method \"%s\"",
572                                                         opcname, accessMethodName)));
573                 tuple = SearchSysCache(CLAOID,
574                                                            ObjectIdGetDatum(opClassId),
575                                                            0, 0, 0);
576         }
577
578         if (!HeapTupleIsValid(tuple))
579                 ereport(ERROR,
580                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
581                                  errmsg("operator class \"%s\" does not exist for access method \"%s\"",
582                                                 NameListToString(opclass), accessMethodName)));
583
584         /*
585          * Verify that the index operator class accepts this datatype.  Note
586          * we will accept binary compatibility.
587          */
588         opClassId = HeapTupleGetOid(tuple);
589         opInputType = ((Form_pg_opclass) GETSTRUCT(tuple))->opcintype;
590
591         if (!IsBinaryCoercible(attrType, opInputType))
592                 ereport(ERROR,
593                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
594                          errmsg("operator class \"%s\" does not accept data type %s",
595                                   NameListToString(opclass), format_type_be(attrType))));
596
597         ReleaseSysCache(tuple);
598
599         return opClassId;
600 }
601
602 static Oid
603 GetDefaultOpClass(Oid attrType, Oid accessMethodId)
604 {
605         OpclassCandidateList opclass;
606         int                     nexact = 0;
607         int                     ncompatible = 0;
608         Oid                     exactOid = InvalidOid;
609         Oid                     compatibleOid = InvalidOid;
610
611         /* If it's a domain, look at the base type instead */
612         attrType = getBaseType(attrType);
613
614         /*
615          * We scan through all the opclasses available for the access method,
616          * looking for one that is marked default and matches the target type
617          * (either exactly or binary-compatibly, but prefer an exact match).
618          *
619          * We could find more than one binary-compatible match, in which case we
620          * require the user to specify which one he wants.      If we find more
621          * than one exact match, then someone put bogus entries in pg_opclass.
622          *
623          * The initial search is done by namespace.c so that we only consider
624          * opclasses visible in the current namespace search path.  (See also
625          * typcache.c, which applies the same logic, but over all opclasses.)
626          */
627         for (opclass = OpclassGetCandidates(accessMethodId);
628                  opclass != NULL;
629                  opclass = opclass->next)
630         {
631                 if (opclass->opcdefault)
632                 {
633                         if (opclass->opcintype == attrType)
634                         {
635                                 nexact++;
636                                 exactOid = opclass->oid;
637                         }
638                         else if (IsBinaryCoercible(attrType, opclass->opcintype))
639                         {
640                                 ncompatible++;
641                                 compatibleOid = opclass->oid;
642                         }
643                 }
644         }
645
646         if (nexact == 1)
647                 return exactOid;
648         if (nexact != 0)
649                 ereport(ERROR,
650                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
651                                  errmsg("there are multiple default operator classes for data type %s",
652                                                 format_type_be(attrType))));
653         if (ncompatible == 1)
654                 return compatibleOid;
655
656         return InvalidOid;
657 }
658
659 /*
660  * Select a nonconflicting name for an index.
661  */
662 static char *
663 CreateIndexName(const char *table_name, const char *column_name,
664                                 const char *label, Oid inamespace)
665 {
666         int                     pass = 0;
667         char       *iname = NULL;
668         char            typename[NAMEDATALEN];
669
670         /*
671          * The type name for makeObjectName is label, or labelN if that's
672          * necessary to prevent collision with existing indexes.
673          */
674         strncpy(typename, label, sizeof(typename));
675
676         for (;;)
677         {
678                 iname = makeObjectName(table_name, column_name, typename);
679
680                 if (!OidIsValid(get_relname_relid(iname, inamespace)))
681                         break;
682
683                 /* found a conflict, so try a new name component */
684                 pfree(iname);
685                 snprintf(typename, sizeof(typename), "%s%d", label, ++pass);
686         }
687
688         return iname;
689 }
690
691 /*
692  * relationHasPrimaryKey -
693  *
694  *      See whether an existing relation has a primary key.
695  */
696 static bool
697 relationHasPrimaryKey(Relation rel)
698 {
699         bool            result = false;
700         List       *indexoidlist;
701         ListCell   *indexoidscan;
702
703         /*
704          * Get the list of index OIDs for the table from the relcache, and
705          * look up each one in the pg_index syscache until we find one marked
706          * primary key (hopefully there isn't more than one such).
707          */
708         indexoidlist = RelationGetIndexList(rel);
709
710         foreach(indexoidscan, indexoidlist)
711         {
712                 Oid                     indexoid = lfirst_oid(indexoidscan);
713                 HeapTuple       indexTuple;
714
715                 indexTuple = SearchSysCache(INDEXRELID,
716                                                                         ObjectIdGetDatum(indexoid),
717                                                                         0, 0, 0);
718                 if (!HeapTupleIsValid(indexTuple))              /* should not happen */
719                         elog(ERROR, "cache lookup failed for index %u", indexoid);
720                 result = ((Form_pg_index) GETSTRUCT(indexTuple))->indisprimary;
721                 ReleaseSysCache(indexTuple);
722                 if (result)
723                         break;
724         }
725
726         list_free(indexoidlist);
727
728         return result;
729 }
730
731
732 /*
733  * RemoveIndex
734  *              Deletes an index.
735  */
736 void
737 RemoveIndex(RangeVar *relation, DropBehavior behavior)
738 {
739         Oid                     indOid;
740         char            relkind;
741         ObjectAddress object;
742
743         indOid = RangeVarGetRelid(relation, false);
744         relkind = get_rel_relkind(indOid);
745         if (relkind != RELKIND_INDEX)
746                 ereport(ERROR,
747                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
748                                  errmsg("\"%s\" is not an index",
749                                                 relation->relname)));
750
751         object.classId = RelOid_pg_class;
752         object.objectId = indOid;
753         object.objectSubId = 0;
754
755         performDeletion(&object, behavior);
756 }
757
758 /*
759  * ReindexIndex
760  *              Recreate an index.
761  */
762 void
763 ReindexIndex(RangeVar *indexRelation, bool force /* currently unused */ )
764 {
765         Oid                     indOid;
766         HeapTuple       tuple;
767
768         indOid = RangeVarGetRelid(indexRelation, false);
769         tuple = SearchSysCache(RELOID,
770                                                    ObjectIdGetDatum(indOid),
771                                                    0, 0, 0);
772         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
773                 elog(ERROR, "cache lookup failed for relation %u", indOid);
774
775         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
776                 ereport(ERROR,
777                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
778                                  errmsg("\"%s\" is not an index",
779                                                 indexRelation->relname)));
780
781         /* Check permissions */
782         if (!pg_class_ownercheck(indOid, GetUserId()))
783                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
784                                            indexRelation->relname);
785
786         ReleaseSysCache(tuple);
787
788         reindex_index(indOid);
789 }
790
791 /*
792  * ReindexTable
793  *              Recreate indexes of a table.
794  */
795 void
796 ReindexTable(RangeVar *relation, bool force /* currently unused */ )
797 {
798         Oid                     heapOid;
799         HeapTuple       tuple;
800
801         heapOid = RangeVarGetRelid(relation, false);
802         tuple = SearchSysCache(RELOID,
803                                                    ObjectIdGetDatum(heapOid),
804                                                    0, 0, 0);
805         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
806                 elog(ERROR, "cache lookup failed for relation %u", heapOid);
807
808         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION &&
809                 ((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE)
810                 ereport(ERROR,
811                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
812                                  errmsg("\"%s\" is not a table",
813                                                 relation->relname)));
814
815         /* Check permissions */
816         if (!pg_class_ownercheck(heapOid, GetUserId()))
817                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
818                                            relation->relname);
819
820         /* Can't reindex shared tables except in standalone mode */
821         if (((Form_pg_class) GETSTRUCT(tuple))->relisshared && IsUnderPostmaster)
822                 ereport(ERROR,
823                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
824                                  errmsg("shared table \"%s\" can only be reindexed in stand-alone mode",
825                                                 relation->relname)));
826
827         ReleaseSysCache(tuple);
828
829         if (!reindex_relation(heapOid, true))
830                 ereport(NOTICE,
831                                 (errmsg("table \"%s\" has no indexes",
832                                                 relation->relname)));
833 }
834
835 /*
836  * ReindexDatabase
837  *              Recreate indexes of a database.
838  *
839  * To reduce the probability of deadlocks, each table is reindexed in a
840  * separate transaction, so we can release the lock on it right away.
841  */
842 void
843 ReindexDatabase(const char *dbname, bool force /* currently unused */,
844                                 bool all)
845 {
846         Relation         relationRelation;
847         HeapScanDesc scan;
848         HeapTuple        tuple;
849         MemoryContext private_context;
850         MemoryContext old;
851         List            *relids = NIL;
852         ListCell        *l;
853
854         AssertArg(dbname);
855
856         if (strcmp(dbname, get_database_name(MyDatabaseId)) != 0)
857                 ereport(ERROR,
858                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
859                                  errmsg("can only reindex the currently open database")));
860
861         if (!pg_database_ownercheck(MyDatabaseId, GetUserId()))
862                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
863                                            dbname);
864
865         /*
866          * We cannot run inside a user transaction block; if we were inside a
867          * transaction, then our commit- and start-transaction-command calls
868          * would not have the intended effect!
869          */
870         PreventTransactionChain((void *) dbname, "REINDEX DATABASE");
871
872         /*
873          * Create a memory context that will survive forced transaction
874          * commits we do below.  Since it is a child of PortalContext, it will
875          * go away eventually even if we suffer an error; there's no need for
876          * special abort cleanup logic.
877          */
878         private_context = AllocSetContextCreate(PortalContext,
879                                                                                         "ReindexDatabase",
880                                                                                         ALLOCSET_DEFAULT_MINSIZE,
881                                                                                         ALLOCSET_DEFAULT_INITSIZE,
882                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
883
884         /*
885          * We always want to reindex pg_class first.  This ensures that if
886          * there is any corruption in pg_class' indexes, they will be fixed
887          * before we process any other tables.  This is critical because
888          * reindexing itself will try to update pg_class.
889          */
890         old = MemoryContextSwitchTo(private_context);
891         relids = lappend_oid(relids, RelOid_pg_class);
892         MemoryContextSwitchTo(old);
893
894         /*
895          * Scan pg_class to build a list of the relations we need to reindex.
896          *
897          * We only consider plain relations here (toast rels will be processed
898          * indirectly by reindex_relation).
899          */
900         relationRelation = heap_openr(RelationRelationName, AccessShareLock);
901         scan = heap_beginscan(relationRelation, SnapshotNow, 0, NULL);
902         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
903         {
904                 Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
905
906                 if (classtuple->relkind != RELKIND_RELATION)
907                         continue;
908
909                 if (!all)                               /* only system tables? */
910                 {
911                         if (!IsSystemClass(classtuple))
912                                 continue;
913                 }
914
915                 if (IsUnderPostmaster)  /* silently ignore shared tables */
916                 {
917                         if (classtuple->relisshared)
918                                 continue;
919                 }
920
921                 if (HeapTupleGetOid(tuple) == RelOid_pg_class)
922                         continue;                       /* got it already */
923
924                 old = MemoryContextSwitchTo(private_context);
925                 relids = lappend_oid(relids, HeapTupleGetOid(tuple));
926                 MemoryContextSwitchTo(old);
927         }
928         heap_endscan(scan);
929         heap_close(relationRelation, AccessShareLock);
930
931         /* Now reindex each rel in a separate transaction */
932         CommitTransactionCommand();
933         foreach(l, relids)
934         {
935                 Oid             relid = lfirst_oid(l);
936
937                 StartTransactionCommand();
938                 SetQuerySnapshot();             /* might be needed for functions in
939                                                                  * indexes */
940                 if (reindex_relation(relid, true))
941                         ereport(NOTICE,
942                                         (errmsg("table \"%s\" was reindexed",
943                                                         get_rel_name(relid))));
944                 CommitTransactionCommand();
945         }
946         StartTransactionCommand();
947
948         MemoryContextDelete(private_context);
949 }