]> granicus.if.org Git - postgresql/blob - src/backend/commands/indexcmds.c
R-tree is dead ... long live GiST.
[postgresql] / src / backend / commands / indexcmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * indexcmds.c
4  *        POSTGRES define and remove index code.
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.135 2005/11/07 17:36:45 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "catalog/catalog.h"
20 #include "catalog/dependency.h"
21 #include "catalog/heap.h"
22 #include "catalog/index.h"
23 #include "catalog/namespace.h"
24 #include "catalog/pg_opclass.h"
25 #include "catalog/pg_proc.h"
26 #include "catalog/pg_tablespace.h"
27 #include "commands/dbcommands.h"
28 #include "commands/defrem.h"
29 #include "commands/tablecmds.h"
30 #include "commands/tablespace.h"
31 #include "executor/executor.h"
32 #include "mb/pg_wchar.h"
33 #include "miscadmin.h"
34 #include "optimizer/clauses.h"
35 #include "optimizer/prep.h"
36 #include "parser/parsetree.h"
37 #include "parser/parse_coerce.h"
38 #include "parser/parse_expr.h"
39 #include "parser/parse_func.h"
40 #include "utils/acl.h"
41 #include "utils/builtins.h"
42 #include "utils/lsyscache.h"
43 #include "utils/memutils.h"
44 #include "utils/relcache.h"
45 #include "utils/syscache.h"
46
47
48 /* non-export function prototypes */
49 static void CheckPredicate(Expr *predicate);
50 static void ComputeIndexAttrs(IndexInfo *indexInfo, Oid *classOidP,
51                                   List *attList,
52                                   Oid relId,
53                                   char *accessMethodName, Oid accessMethodId,
54                                   bool isconstraint);
55 static Oid GetIndexOpClass(List *opclass, Oid attrType,
56                                 char *accessMethodName, Oid accessMethodId);
57 static Oid      GetDefaultOpClass(Oid attrType, Oid accessMethodId);
58 static bool relationHasPrimaryKey(Relation rel);
59
60
61 /*
62  * DefineIndex
63  *              Creates a new index.
64  *
65  * 'heapRelation': the relation the index will apply to.
66  * 'indexRelationName': the name for the new index, or NULL to indicate
67  *              that a nonconflicting default name should be picked.
68  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
69  *              nonzero to specify a preselected OID for the index.
70  * 'accessMethodName': name of the AM to use.
71  * 'tableSpaceName': name of the tablespace to create the index in.
72  *              NULL specifies using the appropriate default.
73  * 'attributeList': a list of IndexElem specifying columns and expressions
74  *              to index on.
75  * 'predicate': the partial-index condition, or NULL if none.
76  * 'rangetable': needed to interpret the predicate.
77  * 'unique': make the index enforce uniqueness.
78  * 'primary': mark the index as a primary key in the catalogs.
79  * 'isconstraint': index is for a PRIMARY KEY or UNIQUE constraint,
80  *              so build a pg_constraint entry for it.
81  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
82  * 'check_rights': check for CREATE rights in the namespace.  (This should
83  *              be true except when ALTER is deleting/recreating an index.)
84  * 'skip_build': make the catalog entries but leave the index file empty;
85  *              it will be filled later.
86  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
87  */
88 void
89 DefineIndex(RangeVar *heapRelation,
90                         char *indexRelationName,
91                         Oid indexRelationId,
92                         char *accessMethodName,
93                         char *tableSpaceName,
94                         List *attributeList,
95                         Expr *predicate,
96                         List *rangetable,
97                         bool unique,
98                         bool primary,
99                         bool isconstraint,
100                         bool is_alter_table,
101                         bool check_rights,
102                         bool skip_build,
103                         bool quiet)
104 {
105         Oid                *classObjectId;
106         Oid                     accessMethodId;
107         Oid                     relationId;
108         Oid                     namespaceId;
109         Oid                     tablespaceId;
110         Relation        rel;
111         HeapTuple       tuple;
112         Form_pg_am      accessMethodForm;
113         IndexInfo  *indexInfo;
114         int                     numberOfAttributes;
115
116         /*
117          * count attributes in index
118          */
119         numberOfAttributes = list_length(attributeList);
120         if (numberOfAttributes <= 0)
121                 ereport(ERROR,
122                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
123                                  errmsg("must specify at least one column")));
124         if (numberOfAttributes > INDEX_MAX_KEYS)
125                 ereport(ERROR,
126                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
127                                  errmsg("cannot use more than %d columns in an index",
128                                                 INDEX_MAX_KEYS)));
129
130         /*
131          * Open heap relation, acquire a suitable lock on it, remember its OID
132          */
133         rel = heap_openrv(heapRelation, ShareLock);
134
135         /* Note: during bootstrap may see uncataloged relation */
136         if (rel->rd_rel->relkind != RELKIND_RELATION &&
137                 rel->rd_rel->relkind != RELKIND_UNCATALOGED)
138                 ereport(ERROR,
139                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
140                                  errmsg("\"%s\" is not a table",
141                                                 heapRelation->relname)));
142
143         relationId = RelationGetRelid(rel);
144         namespaceId = RelationGetNamespace(rel);
145
146         /*
147          * Verify we (still) have CREATE rights in the rel's namespace.
148          * (Presumably we did when the rel was created, but maybe not anymore.)
149          * Skip check if caller doesn't want it.  Also skip check if
150          * bootstrapping, since permissions machinery may not be working yet.
151          */
152         if (check_rights && !IsBootstrapProcessingMode())
153         {
154                 AclResult       aclresult;
155
156                 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
157                                                                                   ACL_CREATE);
158                 if (aclresult != ACLCHECK_OK)
159                         aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
160                                                    get_namespace_name(namespaceId));
161         }
162
163         /*
164          * Select tablespace to use.  If not specified, use default_tablespace
165          * (which may in turn default to database's default).
166          */
167         if (tableSpaceName)
168         {
169                 tablespaceId = get_tablespace_oid(tableSpaceName);
170                 if (!OidIsValid(tablespaceId))
171                         ereport(ERROR,
172                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
173                                          errmsg("tablespace \"%s\" does not exist",
174                                                         tableSpaceName)));
175         }
176         else
177         {
178                 tablespaceId = GetDefaultTablespace();
179                 /* note InvalidOid is OK in this case */
180         }
181
182         /* Check permissions except when using database's default */
183         if (OidIsValid(tablespaceId))
184         {
185                 AclResult       aclresult;
186
187                 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
188                                                                                    ACL_CREATE);
189                 if (aclresult != ACLCHECK_OK)
190                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
191                                                    get_tablespace_name(tablespaceId));
192         }
193
194         /*
195          * Force shared indexes into the pg_global tablespace.  This is a bit of a
196          * hack but seems simpler than marking them in the BKI commands.
197          */
198         if (rel->rd_rel->relisshared)
199                 tablespaceId = GLOBALTABLESPACE_OID;
200
201         /*
202          * Select name for index if caller didn't specify
203          */
204         if (indexRelationName == NULL)
205         {
206                 if (primary)
207                         indexRelationName = ChooseRelationName(RelationGetRelationName(rel),
208                                                                                                    NULL,
209                                                                                                    "pkey",
210                                                                                                    namespaceId);
211                 else
212                 {
213                         IndexElem  *iparam = (IndexElem *) linitial(attributeList);
214
215                         indexRelationName = ChooseRelationName(RelationGetRelationName(rel),
216                                                                                                    iparam->name,
217                                                                                                    "key",
218                                                                                                    namespaceId);
219                 }
220         }
221
222         /*
223          * look up the access method, verify it can handle the requested features
224          */
225         tuple = SearchSysCache(AMNAME,
226                                                    PointerGetDatum(accessMethodName),
227                                                    0, 0, 0);
228         if (!HeapTupleIsValid(tuple))
229         {
230                 /*
231                  * Hack to provide more-or-less-transparent updating of old RTREE
232                  * indexes to GIST: if RTREE is requested and not found, use GIST.
233                  */
234                 if (strcmp(accessMethodName, "rtree") == 0)
235                 {
236                         ereport(NOTICE,
237                                         (errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
238                         accessMethodName = "gist";
239                         tuple = SearchSysCache(AMNAME,
240                                                                    PointerGetDatum(accessMethodName),
241                                                                    0, 0, 0);
242                 }
243
244                 if (!HeapTupleIsValid(tuple))
245                         ereport(ERROR,
246                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
247                                          errmsg("access method \"%s\" does not exist",
248                                                         accessMethodName)));
249         }
250         accessMethodId = HeapTupleGetOid(tuple);
251         accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
252
253         if (unique && !accessMethodForm->amcanunique)
254                 ereport(ERROR,
255                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
256                            errmsg("access method \"%s\" does not support unique indexes",
257                                           accessMethodName)));
258         if (numberOfAttributes > 1 && !accessMethodForm->amcanmulticol)
259                 ereport(ERROR,
260                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
261                   errmsg("access method \"%s\" does not support multicolumn indexes",
262                                  accessMethodName)));
263
264         ReleaseSysCache(tuple);
265
266         /*
267          * If a range table was created then check that only the base rel is
268          * mentioned.
269          */
270         if (rangetable != NIL)
271         {
272                 if (list_length(rangetable) != 1 || getrelid(1, rangetable) != relationId)
273                         ereport(ERROR,
274                                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
275                                          errmsg("index expressions and predicates may refer only to the table being indexed")));
276         }
277
278         /*
279          * Validate predicate, if given
280          */
281         if (predicate)
282                 CheckPredicate(predicate);
283
284         /*
285          * Extra checks when creating a PRIMARY KEY index.
286          */
287         if (primary)
288         {
289                 List       *cmds;
290                 ListCell   *keys;
291
292                 /*
293                  * If ALTER TABLE, check that there isn't already a PRIMARY KEY. In
294                  * CREATE TABLE, we have faith that the parser rejected multiple pkey
295                  * clauses; and CREATE INDEX doesn't have a way to say PRIMARY KEY, so
296                  * it's no problem either.
297                  */
298                 if (is_alter_table &&
299                         relationHasPrimaryKey(rel))
300                 {
301                         ereport(ERROR,
302                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
303                          errmsg("multiple primary keys for table \"%s\" are not allowed",
304                                         RelationGetRelationName(rel))));
305                 }
306
307                 /*
308                  * Check that all of the attributes in a primary key are marked as not
309                  * null, otherwise attempt to ALTER TABLE .. SET NOT NULL
310                  */
311                 cmds = NIL;
312                 foreach(keys, attributeList)
313                 {
314                         IndexElem  *key = (IndexElem *) lfirst(keys);
315                         HeapTuple       atttuple;
316
317                         if (!key->name)
318                                 ereport(ERROR,
319                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
320                                                  errmsg("primary keys cannot be expressions")));
321
322                         /* System attributes are never null, so no problem */
323                         if (SystemAttributeByName(key->name, rel->rd_rel->relhasoids))
324                                 continue;
325
326                         atttuple = SearchSysCacheAttName(relationId, key->name);
327                         if (HeapTupleIsValid(atttuple))
328                         {
329                                 if (!((Form_pg_attribute) GETSTRUCT(atttuple))->attnotnull)
330                                 {
331                                         /* Add a subcommand to make this one NOT NULL */
332                                         AlterTableCmd *cmd = makeNode(AlterTableCmd);
333
334                                         cmd->subtype = AT_SetNotNull;
335                                         cmd->name = key->name;
336
337                                         cmds = lappend(cmds, cmd);
338                                 }
339                                 ReleaseSysCache(atttuple);
340                         }
341                         else
342                         {
343                                 /*
344                                  * This shouldn't happen during CREATE TABLE, but can happen
345                                  * during ALTER TABLE.  Keep message in sync with
346                                  * transformIndexConstraints() in parser/analyze.c.
347                                  */
348                                 ereport(ERROR,
349                                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
350                                                  errmsg("column \"%s\" named in key does not exist",
351                                                                 key->name)));
352                         }
353                 }
354
355                 /*
356                  * XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade to child
357                  * tables?      Currently, since the PRIMARY KEY itself doesn't cascade,
358                  * we don't cascade the notnull constraint(s) either; but this is
359                  * pretty debatable.
360                  *
361                  * XXX: possible future improvement: when being called from ALTER TABLE,
362                  * it would be more efficient to merge this with the outer ALTER
363                  * TABLE, so as to avoid two scans.  But that seems to complicate
364                  * DefineIndex's API unduly.
365                  */
366                 if (cmds)
367                         AlterTableInternal(relationId, cmds, false);
368         }
369
370         /*
371          * Prepare arguments for index_create, primarily an IndexInfo structure.
372          * Note that ii_Predicate must be in implicit-AND format.
373          */
374         indexInfo = makeNode(IndexInfo);
375         indexInfo->ii_NumIndexAttrs = numberOfAttributes;
376         indexInfo->ii_Expressions = NIL;        /* for now */
377         indexInfo->ii_ExpressionsState = NIL;
378         indexInfo->ii_Predicate = make_ands_implicit(predicate);
379         indexInfo->ii_PredicateState = NIL;
380         indexInfo->ii_Unique = unique;
381
382         classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
383         ComputeIndexAttrs(indexInfo, classObjectId, attributeList,
384                                           relationId, accessMethodName, accessMethodId,
385                                           isconstraint);
386
387         heap_close(rel, NoLock);
388
389         /*
390          * Report index creation if appropriate (delay this till after most of the
391          * error checks)
392          */
393         if (isconstraint && !quiet)
394                 ereport(NOTICE,
395                   (errmsg("%s %s will create implicit index \"%s\" for table \"%s\"",
396                                   is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
397                                   primary ? "PRIMARY KEY" : "UNIQUE",
398                                   indexRelationName, RelationGetRelationName(rel))));
399
400         index_create(relationId, indexRelationName, indexRelationId,
401                                  indexInfo, accessMethodId, tablespaceId, classObjectId,
402                                  primary, isconstraint,
403                                  allowSystemTableMods, skip_build);
404
405         /*
406          * We update the relation's pg_class tuple even if it already has
407          * relhasindex = true.  This is needed to cause a shared-cache-inval
408          * message to be sent for the pg_class tuple, which will cause other
409          * backends to flush their relcache entries and in particular their cached
410          * lists of the indexes for this relation.
411          */
412         setRelhasindex(relationId, true, primary, InvalidOid);
413 }
414
415
416 /*
417  * CheckPredicate
418  *              Checks that the given partial-index predicate is valid.
419  *
420  * This used to also constrain the form of the predicate to forms that
421  * indxpath.c could do something with.  However, that seems overly
422  * restrictive.  One useful application of partial indexes is to apply
423  * a UNIQUE constraint across a subset of a table, and in that scenario
424  * any evaluatable predicate will work.  So accept any predicate here
425  * (except ones requiring a plan), and let indxpath.c fend for itself.
426  */
427 static void
428 CheckPredicate(Expr *predicate)
429 {
430         /*
431          * We don't currently support generation of an actual query plan for a
432          * predicate, only simple scalar expressions; hence these restrictions.
433          */
434         if (contain_subplans((Node *) predicate))
435                 ereport(ERROR,
436                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
437                                  errmsg("cannot use subquery in index predicate")));
438         if (contain_agg_clause((Node *) predicate))
439                 ereport(ERROR,
440                                 (errcode(ERRCODE_GROUPING_ERROR),
441                                  errmsg("cannot use aggregate in index predicate")));
442
443         /*
444          * A predicate using mutable functions is probably wrong, for the same
445          * reasons that we don't allow an index expression to use one.
446          */
447         if (contain_mutable_functions((Node *) predicate))
448                 ereport(ERROR,
449                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
450                    errmsg("functions in index predicate must be marked IMMUTABLE")));
451 }
452
453 static void
454 ComputeIndexAttrs(IndexInfo *indexInfo,
455                                   Oid *classOidP,
456                                   List *attList,        /* list of IndexElem's */
457                                   Oid relId,
458                                   char *accessMethodName,
459                                   Oid accessMethodId,
460                                   bool isconstraint)
461 {
462         ListCell   *rest;
463         int                     attn = 0;
464
465         /*
466          * process attributeList
467          */
468         foreach(rest, attList)
469         {
470                 IndexElem  *attribute = (IndexElem *) lfirst(rest);
471                 Oid                     atttype;
472
473                 if (attribute->name != NULL)
474                 {
475                         /* Simple index attribute */
476                         HeapTuple       atttuple;
477                         Form_pg_attribute attform;
478
479                         Assert(attribute->expr == NULL);
480                         atttuple = SearchSysCacheAttName(relId, attribute->name);
481                         if (!HeapTupleIsValid(atttuple))
482                         {
483                                 /* difference in error message spellings is historical */
484                                 if (isconstraint)
485                                         ereport(ERROR,
486                                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
487                                                   errmsg("column \"%s\" named in key does not exist",
488                                                                  attribute->name)));
489                                 else
490                                         ereport(ERROR,
491                                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
492                                                          errmsg("column \"%s\" does not exist",
493                                                                         attribute->name)));
494                         }
495                         attform = (Form_pg_attribute) GETSTRUCT(atttuple);
496                         indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum;
497                         atttype = attform->atttypid;
498                         ReleaseSysCache(atttuple);
499                 }
500                 else if (attribute->expr && IsA(attribute->expr, Var))
501                 {
502                         /* Tricky tricky, he wrote (column) ... treat as simple attr */
503                         Var                *var = (Var *) attribute->expr;
504
505                         indexInfo->ii_KeyAttrNumbers[attn] = var->varattno;
506                         atttype = get_atttype(relId, var->varattno);
507                 }
508                 else
509                 {
510                         /* Index expression */
511                         Assert(attribute->expr != NULL);
512                         indexInfo->ii_KeyAttrNumbers[attn] = 0;         /* marks expression */
513                         indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
514                                                                                                 attribute->expr);
515                         atttype = exprType(attribute->expr);
516
517                         /*
518                          * We don't currently support generation of an actual query plan
519                          * for an index expression, only simple scalar expressions; hence
520                          * these restrictions.
521                          */
522                         if (contain_subplans(attribute->expr))
523                                 ereport(ERROR,
524                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
525                                                  errmsg("cannot use subquery in index expression")));
526                         if (contain_agg_clause(attribute->expr))
527                                 ereport(ERROR,
528                                                 (errcode(ERRCODE_GROUPING_ERROR),
529                                 errmsg("cannot use aggregate function in index expression")));
530
531                         /*
532                          * A expression using mutable functions is probably wrong, since
533                          * if you aren't going to get the same result for the same data
534                          * every time, it's not clear what the index entries mean at all.
535                          */
536                         if (contain_mutable_functions(attribute->expr))
537                                 ereport(ERROR,
538                                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
539                                                  errmsg("functions in index expression must be marked IMMUTABLE")));
540                 }
541
542                 classOidP[attn] = GetIndexOpClass(attribute->opclass,
543                                                                                   atttype,
544                                                                                   accessMethodName,
545                                                                                   accessMethodId);
546                 attn++;
547         }
548 }
549
550 /*
551  * Resolve possibly-defaulted operator class specification
552  */
553 static Oid
554 GetIndexOpClass(List *opclass, Oid attrType,
555                                 char *accessMethodName, Oid accessMethodId)
556 {
557         char       *schemaname;
558         char       *opcname;
559         HeapTuple       tuple;
560         Oid                     opClassId,
561                                 opInputType;
562
563         /*
564          * Release 7.0 removed network_ops, timespan_ops, and datetime_ops, so we
565          * ignore those opclass names so the default *_ops is used.  This can be
566          * removed in some later release.  bjm 2000/02/07
567          *
568          * Release 7.1 removes lztext_ops, so suppress that too for a while.  tgl
569          * 2000/07/30
570          *
571          * Release 7.2 renames timestamp_ops to timestamptz_ops, so suppress that too
572          * for awhile.  I'm starting to think we need a better approach. tgl
573          * 2000/10/01
574          *
575          * Release 8.0 removes bigbox_ops (which was dead code for a long while
576          * anyway).  tgl 2003/11/11
577          */
578         if (list_length(opclass) == 1)
579         {
580                 char       *claname = strVal(linitial(opclass));
581
582                 if (strcmp(claname, "network_ops") == 0 ||
583                         strcmp(claname, "timespan_ops") == 0 ||
584                         strcmp(claname, "datetime_ops") == 0 ||
585                         strcmp(claname, "lztext_ops") == 0 ||
586                         strcmp(claname, "timestamp_ops") == 0 ||
587                         strcmp(claname, "bigbox_ops") == 0)
588                         opclass = NIL;
589         }
590
591         if (opclass == NIL)
592         {
593                 /* no operator class specified, so find the default */
594                 opClassId = GetDefaultOpClass(attrType, accessMethodId);
595                 if (!OidIsValid(opClassId))
596                         ereport(ERROR,
597                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
598                                          errmsg("data type %s has no default operator class for access method \"%s\"",
599                                                         format_type_be(attrType), accessMethodName),
600                                          errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
601                 return opClassId;
602         }
603
604         /*
605          * Specific opclass name given, so look up the opclass.
606          */
607
608         /* deconstruct the name list */
609         DeconstructQualifiedName(opclass, &schemaname, &opcname);
610
611         if (schemaname)
612         {
613                 /* Look in specific schema only */
614                 Oid                     namespaceId;
615
616                 namespaceId = LookupExplicitNamespace(schemaname);
617                 tuple = SearchSysCache(CLAAMNAMENSP,
618                                                            ObjectIdGetDatum(accessMethodId),
619                                                            PointerGetDatum(opcname),
620                                                            ObjectIdGetDatum(namespaceId),
621                                                            0);
622         }
623         else
624         {
625                 /* Unqualified opclass name, so search the search path */
626                 opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
627                 if (!OidIsValid(opClassId))
628                         ereport(ERROR,
629                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
630                                          errmsg("operator class \"%s\" does not exist for access method \"%s\"",
631                                                         opcname, accessMethodName)));
632                 tuple = SearchSysCache(CLAOID,
633                                                            ObjectIdGetDatum(opClassId),
634                                                            0, 0, 0);
635         }
636
637         if (!HeapTupleIsValid(tuple))
638                 ereport(ERROR,
639                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
640                                  errmsg("operator class \"%s\" does not exist for access method \"%s\"",
641                                                 NameListToString(opclass), accessMethodName)));
642
643         /*
644          * Verify that the index operator class accepts this datatype.  Note we
645          * will accept binary compatibility.
646          */
647         opClassId = HeapTupleGetOid(tuple);
648         opInputType = ((Form_pg_opclass) GETSTRUCT(tuple))->opcintype;
649
650         if (!IsBinaryCoercible(attrType, opInputType))
651                 ereport(ERROR,
652                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
653                                  errmsg("operator class \"%s\" does not accept data type %s",
654                                           NameListToString(opclass), format_type_be(attrType))));
655
656         ReleaseSysCache(tuple);
657
658         return opClassId;
659 }
660
661 static Oid
662 GetDefaultOpClass(Oid attrType, Oid accessMethodId)
663 {
664         OpclassCandidateList opclass;
665         int                     nexact = 0;
666         int                     ncompatible = 0;
667         Oid                     exactOid = InvalidOid;
668         Oid                     compatibleOid = InvalidOid;
669
670         /* If it's a domain, look at the base type instead */
671         attrType = getBaseType(attrType);
672
673         /*
674          * We scan through all the opclasses available for the access method,
675          * looking for one that is marked default and matches the target type
676          * (either exactly or binary-compatibly, but prefer an exact match).
677          *
678          * We could find more than one binary-compatible match, in which case we
679          * require the user to specify which one he wants.      If we find more than
680          * one exact match, then someone put bogus entries in pg_opclass.
681          *
682          * The initial search is done by namespace.c so that we only consider
683          * opclasses visible in the current namespace search path.      (See also
684          * typcache.c, which applies the same logic, but over all opclasses.)
685          */
686         for (opclass = OpclassGetCandidates(accessMethodId);
687                  opclass != NULL;
688                  opclass = opclass->next)
689         {
690                 if (opclass->opcdefault)
691                 {
692                         if (opclass->opcintype == attrType)
693                         {
694                                 nexact++;
695                                 exactOid = opclass->oid;
696                         }
697                         else if (IsBinaryCoercible(attrType, opclass->opcintype))
698                         {
699                                 ncompatible++;
700                                 compatibleOid = opclass->oid;
701                         }
702                 }
703         }
704
705         if (nexact == 1)
706                 return exactOid;
707         if (nexact != 0)
708                 ereport(ERROR,
709                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
710                 errmsg("there are multiple default operator classes for data type %s",
711                            format_type_be(attrType))));
712         if (ncompatible == 1)
713                 return compatibleOid;
714
715         return InvalidOid;
716 }
717
718 /*
719  *      makeObjectName()
720  *
721  *      Create a name for an implicitly created index, sequence, constraint, etc.
722  *
723  *      The parameters are typically: the original table name, the original field
724  *      name, and a "type" string (such as "seq" or "pkey").    The field name
725  *      and/or type can be NULL if not relevant.
726  *
727  *      The result is a palloc'd string.
728  *
729  *      The basic result we want is "name1_name2_label", omitting "_name2" or
730  *      "_label" when those parameters are NULL.  However, we must generate
731  *      a name with less than NAMEDATALEN characters!  So, we truncate one or
732  *      both names if necessary to make a short-enough string.  The label part
733  *      is never truncated (so it had better be reasonably short).
734  *
735  *      The caller is responsible for checking uniqueness of the generated
736  *      name and retrying as needed; retrying will be done by altering the
737  *      "label" string (which is why we never truncate that part).
738  */
739 char *
740 makeObjectName(const char *name1, const char *name2, const char *label)
741 {
742         char       *name;
743         int                     overhead = 0;   /* chars needed for label and underscores */
744         int                     availchars;             /* chars available for name(s) */
745         int                     name1chars;             /* chars allocated to name1 */
746         int                     name2chars;             /* chars allocated to name2 */
747         int                     ndx;
748
749         name1chars = strlen(name1);
750         if (name2)
751         {
752                 name2chars = strlen(name2);
753                 overhead++;                             /* allow for separating underscore */
754         }
755         else
756                 name2chars = 0;
757         if (label)
758                 overhead += strlen(label) + 1;
759
760         availchars = NAMEDATALEN - 1 - overhead;
761         Assert(availchars > 0);         /* else caller chose a bad label */
762
763         /*
764          * If we must truncate,  preferentially truncate the longer name. This
765          * logic could be expressed without a loop, but it's simple and obvious as
766          * a loop.
767          */
768         while (name1chars + name2chars > availchars)
769         {
770                 if (name1chars > name2chars)
771                         name1chars--;
772                 else
773                         name2chars--;
774         }
775
776         name1chars = pg_mbcliplen(name1, name1chars, name1chars);
777         if (name2)
778                 name2chars = pg_mbcliplen(name2, name2chars, name2chars);
779
780         /* Now construct the string using the chosen lengths */
781         name = palloc(name1chars + name2chars + overhead + 1);
782         memcpy(name, name1, name1chars);
783         ndx = name1chars;
784         if (name2)
785         {
786                 name[ndx++] = '_';
787                 memcpy(name + ndx, name2, name2chars);
788                 ndx += name2chars;
789         }
790         if (label)
791         {
792                 name[ndx++] = '_';
793                 strcpy(name + ndx, label);
794         }
795         else
796                 name[ndx] = '\0';
797
798         return name;
799 }
800
801 /*
802  * Select a nonconflicting name for a new relation.  This is ordinarily
803  * used to choose index names (which is why it's here) but it can also
804  * be used for sequences, or any autogenerated relation kind.
805  *
806  * name1, name2, and label are used the same way as for makeObjectName(),
807  * except that the label can't be NULL; digits will be appended to the label
808  * if needed to create a name that is unique within the specified namespace.
809  *
810  * Note: it is theoretically possible to get a collision anyway, if someone
811  * else chooses the same name concurrently.  This is fairly unlikely to be
812  * a problem in practice, especially if one is holding an exclusive lock on
813  * the relation identified by name1.  However, if choosing multiple names
814  * within a single command, you'd better create the new object and do
815  * CommandCounterIncrement before choosing the next one!
816  *
817  * Returns a palloc'd string.
818  */
819 char *
820 ChooseRelationName(const char *name1, const char *name2,
821                                    const char *label, Oid namespace)
822 {
823         int                     pass = 0;
824         char       *relname = NULL;
825         char            modlabel[NAMEDATALEN];
826
827         /* try the unmodified label first */
828         StrNCpy(modlabel, label, sizeof(modlabel));
829
830         for (;;)
831         {
832                 relname = makeObjectName(name1, name2, modlabel);
833
834                 if (!OidIsValid(get_relname_relid(relname, namespace)))
835                         break;
836
837                 /* found a conflict, so try a new name component */
838                 pfree(relname);
839                 snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
840         }
841
842         return relname;
843 }
844
845 /*
846  * relationHasPrimaryKey -
847  *
848  *      See whether an existing relation has a primary key.
849  */
850 static bool
851 relationHasPrimaryKey(Relation rel)
852 {
853         bool            result = false;
854         List       *indexoidlist;
855         ListCell   *indexoidscan;
856
857         /*
858          * Get the list of index OIDs for the table from the relcache, and look up
859          * each one in the pg_index syscache until we find one marked primary key
860          * (hopefully there isn't more than one such).
861          */
862         indexoidlist = RelationGetIndexList(rel);
863
864         foreach(indexoidscan, indexoidlist)
865         {
866                 Oid                     indexoid = lfirst_oid(indexoidscan);
867                 HeapTuple       indexTuple;
868
869                 indexTuple = SearchSysCache(INDEXRELID,
870                                                                         ObjectIdGetDatum(indexoid),
871                                                                         0, 0, 0);
872                 if (!HeapTupleIsValid(indexTuple))              /* should not happen */
873                         elog(ERROR, "cache lookup failed for index %u", indexoid);
874                 result = ((Form_pg_index) GETSTRUCT(indexTuple))->indisprimary;
875                 ReleaseSysCache(indexTuple);
876                 if (result)
877                         break;
878         }
879
880         list_free(indexoidlist);
881
882         return result;
883 }
884
885
886 /*
887  * RemoveIndex
888  *              Deletes an index.
889  */
890 void
891 RemoveIndex(RangeVar *relation, DropBehavior behavior)
892 {
893         Oid                     indOid;
894         char            relkind;
895         ObjectAddress object;
896
897         indOid = RangeVarGetRelid(relation, false);
898         relkind = get_rel_relkind(indOid);
899         if (relkind != RELKIND_INDEX)
900                 ereport(ERROR,
901                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
902                                  errmsg("\"%s\" is not an index",
903                                                 relation->relname)));
904
905         object.classId = RelationRelationId;
906         object.objectId = indOid;
907         object.objectSubId = 0;
908
909         performDeletion(&object, behavior);
910 }
911
912 /*
913  * ReindexIndex
914  *              Recreate a specific index.
915  */
916 void
917 ReindexIndex(RangeVar *indexRelation)
918 {
919         Oid                     indOid;
920         HeapTuple       tuple;
921
922         indOid = RangeVarGetRelid(indexRelation, false);
923         tuple = SearchSysCache(RELOID,
924                                                    ObjectIdGetDatum(indOid),
925                                                    0, 0, 0);
926         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
927                 elog(ERROR, "cache lookup failed for relation %u", indOid);
928
929         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
930                 ereport(ERROR,
931                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
932                                  errmsg("\"%s\" is not an index",
933                                                 indexRelation->relname)));
934
935         /* Check permissions */
936         if (!pg_class_ownercheck(indOid, GetUserId()))
937                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
938                                            indexRelation->relname);
939
940         ReleaseSysCache(tuple);
941
942         reindex_index(indOid);
943 }
944
945 /*
946  * ReindexTable
947  *              Recreate all indexes of a table (and of its toast table, if any)
948  */
949 void
950 ReindexTable(RangeVar *relation)
951 {
952         Oid                     heapOid;
953         HeapTuple       tuple;
954
955         heapOid = RangeVarGetRelid(relation, false);
956         tuple = SearchSysCache(RELOID,
957                                                    ObjectIdGetDatum(heapOid),
958                                                    0, 0, 0);
959         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
960                 elog(ERROR, "cache lookup failed for relation %u", heapOid);
961
962         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION &&
963                 ((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE)
964                 ereport(ERROR,
965                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
966                                  errmsg("\"%s\" is not a table",
967                                                 relation->relname)));
968
969         /* Check permissions */
970         if (!pg_class_ownercheck(heapOid, GetUserId()))
971                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
972                                            relation->relname);
973
974         /* Can't reindex shared tables except in standalone mode */
975         if (((Form_pg_class) GETSTRUCT(tuple))->relisshared && IsUnderPostmaster)
976                 ereport(ERROR,
977                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
978                                  errmsg("shared table \"%s\" can only be reindexed in stand-alone mode",
979                                                 relation->relname)));
980
981         ReleaseSysCache(tuple);
982
983         if (!reindex_relation(heapOid, true))
984                 ereport(NOTICE,
985                                 (errmsg("table \"%s\" has no indexes",
986                                                 relation->relname)));
987 }
988
989 /*
990  * ReindexDatabase
991  *              Recreate indexes of a database.
992  *
993  * To reduce the probability of deadlocks, each table is reindexed in a
994  * separate transaction, so we can release the lock on it right away.
995  */
996 void
997 ReindexDatabase(const char *databaseName, bool do_system, bool do_user)
998 {
999         Relation        relationRelation;
1000         HeapScanDesc scan;
1001         HeapTuple       tuple;
1002         MemoryContext private_context;
1003         MemoryContext old;
1004         List       *relids = NIL;
1005         ListCell   *l;
1006
1007         AssertArg(databaseName);
1008
1009         if (strcmp(databaseName, get_database_name(MyDatabaseId)) != 0)
1010                 ereport(ERROR,
1011                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1012                                  errmsg("can only reindex the currently open database")));
1013
1014         if (!pg_database_ownercheck(MyDatabaseId, GetUserId()))
1015                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1016                                            databaseName);
1017
1018         /*
1019          * We cannot run inside a user transaction block; if we were inside a
1020          * transaction, then our commit- and start-transaction-command calls would
1021          * not have the intended effect!
1022          */
1023         PreventTransactionChain((void *) databaseName, "REINDEX DATABASE");
1024
1025         /*
1026          * Create a memory context that will survive forced transaction commits we
1027          * do below.  Since it is a child of PortalContext, it will go away
1028          * eventually even if we suffer an error; there's no need for special
1029          * abort cleanup logic.
1030          */
1031         private_context = AllocSetContextCreate(PortalContext,
1032                                                                                         "ReindexDatabase",
1033                                                                                         ALLOCSET_DEFAULT_MINSIZE,
1034                                                                                         ALLOCSET_DEFAULT_INITSIZE,
1035                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
1036
1037         /*
1038          * We always want to reindex pg_class first.  This ensures that if there
1039          * is any corruption in pg_class' indexes, they will be fixed before we
1040          * process any other tables.  This is critical because reindexing itself
1041          * will try to update pg_class.
1042          */
1043         if (do_system)
1044         {
1045                 old = MemoryContextSwitchTo(private_context);
1046                 relids = lappend_oid(relids, RelationRelationId);
1047                 MemoryContextSwitchTo(old);
1048         }
1049
1050         /*
1051          * Scan pg_class to build a list of the relations we need to reindex.
1052          *
1053          * We only consider plain relations here (toast rels will be processed
1054          * indirectly by reindex_relation).
1055          */
1056         relationRelation = heap_open(RelationRelationId, AccessShareLock);
1057         scan = heap_beginscan(relationRelation, SnapshotNow, 0, NULL);
1058         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1059         {
1060                 Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
1061
1062                 if (classtuple->relkind != RELKIND_RELATION)
1063                         continue;
1064
1065                 /* Check user/system classification, and optionally skip */
1066                 if (IsSystemClass(classtuple))
1067                 {
1068                         if (!do_system)
1069                                 continue;
1070                 }
1071                 else
1072                 {
1073                         if (!do_user)
1074                                 continue;
1075                 }
1076
1077                 if (IsUnderPostmaster)  /* silently ignore shared tables */
1078                 {
1079                         if (classtuple->relisshared)
1080                                 continue;
1081                 }
1082
1083                 if (HeapTupleGetOid(tuple) == RelationRelationId)
1084                         continue;                       /* got it already */
1085
1086                 old = MemoryContextSwitchTo(private_context);
1087                 relids = lappend_oid(relids, HeapTupleGetOid(tuple));
1088                 MemoryContextSwitchTo(old);
1089         }
1090         heap_endscan(scan);
1091         heap_close(relationRelation, AccessShareLock);
1092
1093         /* Now reindex each rel in a separate transaction */
1094         CommitTransactionCommand();
1095         foreach(l, relids)
1096         {
1097                 Oid                     relid = lfirst_oid(l);
1098
1099                 StartTransactionCommand();
1100                 /* functions in indexes may want a snapshot set */
1101                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
1102                 if (reindex_relation(relid, true))
1103                         ereport(NOTICE,
1104                                         (errmsg("table \"%s\" was reindexed",
1105                                                         get_rel_name(relid))));
1106                 CommitTransactionCommand();
1107         }
1108         StartTransactionCommand();
1109
1110         MemoryContextDelete(private_context);
1111 }