]> granicus.if.org Git - postgresql/blob - src/backend/commands/indexcmds.c
Clean up code associated with updating pg_class statistics columns
[postgresql] / src / backend / commands / indexcmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * indexcmds.c
4  *        POSTGRES define and remove index code.
5  *
6  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/commands/indexcmds.c,v 1.139 2006/05/10 23:18:39 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/genam.h"
19 #include "access/heapam.h"
20 #include "catalog/catalog.h"
21 #include "catalog/dependency.h"
22 #include "catalog/heap.h"
23 #include "catalog/index.h"
24 #include "catalog/indexing.h"
25 #include "catalog/pg_opclass.h"
26 #include "catalog/pg_tablespace.h"
27 #include "commands/dbcommands.h"
28 #include "commands/defrem.h"
29 #include "commands/tablecmds.h"
30 #include "commands/tablespace.h"
31 #include "mb/pg_wchar.h"
32 #include "miscadmin.h"
33 #include "optimizer/clauses.h"
34 #include "parser/parsetree.h"
35 #include "parser/parse_coerce.h"
36 #include "parser/parse_expr.h"
37 #include "parser/parse_func.h"
38 #include "utils/acl.h"
39 #include "utils/builtins.h"
40 #include "utils/fmgroids.h"
41 #include "utils/lsyscache.h"
42 #include "utils/memutils.h"
43 #include "utils/relcache.h"
44 #include "utils/syscache.h"
45
46
47 /* non-export function prototypes */
48 static void CheckPredicate(Expr *predicate);
49 static void ComputeIndexAttrs(IndexInfo *indexInfo, Oid *classOidP,
50                                   List *attList,
51                                   Oid relId,
52                                   char *accessMethodName, Oid accessMethodId,
53                                   bool isconstraint);
54 static Oid GetIndexOpClass(List *opclass, Oid attrType,
55                                 char *accessMethodName, Oid accessMethodId);
56 static bool relationHasPrimaryKey(Relation rel);
57
58
59 /*
60  * DefineIndex
61  *              Creates a new index.
62  *
63  * 'heapRelation': the relation the index will apply to.
64  * 'indexRelationName': the name for the new index, or NULL to indicate
65  *              that a nonconflicting default name should be picked.
66  * 'indexRelationId': normally InvalidOid, but during bootstrap can be
67  *              nonzero to specify a preselected OID for the index.
68  * 'accessMethodName': name of the AM to use.
69  * 'tableSpaceName': name of the tablespace to create the index in.
70  *              NULL specifies using the appropriate default.
71  * 'attributeList': a list of IndexElem specifying columns and expressions
72  *              to index on.
73  * 'predicate': the partial-index condition, or NULL if none.
74  * 'rangetable': needed to interpret the predicate.
75  * 'unique': make the index enforce uniqueness.
76  * 'primary': mark the index as a primary key in the catalogs.
77  * 'isconstraint': index is for a PRIMARY KEY or UNIQUE constraint,
78  *              so build a pg_constraint entry for it.
79  * 'is_alter_table': this is due to an ALTER rather than a CREATE operation.
80  * 'check_rights': check for CREATE rights in the namespace.  (This should
81  *              be true except when ALTER is deleting/recreating an index.)
82  * 'skip_build': make the catalog entries but leave the index file empty;
83  *              it will be filled later.
84  * 'quiet': suppress the NOTICE chatter ordinarily provided for constraints.
85  */
86 void
87 DefineIndex(RangeVar *heapRelation,
88                         char *indexRelationName,
89                         Oid indexRelationId,
90                         char *accessMethodName,
91                         char *tableSpaceName,
92                         List *attributeList,
93                         Expr *predicate,
94                         List *rangetable,
95                         bool unique,
96                         bool primary,
97                         bool isconstraint,
98                         bool is_alter_table,
99                         bool check_rights,
100                         bool skip_build,
101                         bool quiet)
102 {
103         Oid                *classObjectId;
104         Oid                     accessMethodId;
105         Oid                     relationId;
106         Oid                     namespaceId;
107         Oid                     tablespaceId;
108         Relation        rel;
109         HeapTuple       tuple;
110         Form_pg_am      accessMethodForm;
111         IndexInfo  *indexInfo;
112         int                     numberOfAttributes;
113
114         /*
115          * count attributes in index
116          */
117         numberOfAttributes = list_length(attributeList);
118         if (numberOfAttributes <= 0)
119                 ereport(ERROR,
120                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
121                                  errmsg("must specify at least one column")));
122         if (numberOfAttributes > INDEX_MAX_KEYS)
123                 ereport(ERROR,
124                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
125                                  errmsg("cannot use more than %d columns in an index",
126                                                 INDEX_MAX_KEYS)));
127
128         /*
129          * Open heap relation, acquire a suitable lock on it, remember its OID
130          */
131         rel = heap_openrv(heapRelation, ShareLock);
132
133         /* Note: during bootstrap may see uncataloged relation */
134         if (rel->rd_rel->relkind != RELKIND_RELATION &&
135                 rel->rd_rel->relkind != RELKIND_UNCATALOGED)
136                 ereport(ERROR,
137                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
138                                  errmsg("\"%s\" is not a table",
139                                                 heapRelation->relname)));
140
141         relationId = RelationGetRelid(rel);
142         namespaceId = RelationGetNamespace(rel);
143
144         /*
145          * Verify we (still) have CREATE rights in the rel's namespace.
146          * (Presumably we did when the rel was created, but maybe not anymore.)
147          * Skip check if caller doesn't want it.  Also skip check if
148          * bootstrapping, since permissions machinery may not be working yet.
149          */
150         if (check_rights && !IsBootstrapProcessingMode())
151         {
152                 AclResult       aclresult;
153
154                 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
155                                                                                   ACL_CREATE);
156                 if (aclresult != ACLCHECK_OK)
157                         aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
158                                                    get_namespace_name(namespaceId));
159         }
160
161         /*
162          * Select tablespace to use.  If not specified, use default_tablespace
163          * (which may in turn default to database's default).
164          */
165         if (tableSpaceName)
166         {
167                 tablespaceId = get_tablespace_oid(tableSpaceName);
168                 if (!OidIsValid(tablespaceId))
169                         ereport(ERROR,
170                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
171                                          errmsg("tablespace \"%s\" does not exist",
172                                                         tableSpaceName)));
173         }
174         else
175         {
176                 tablespaceId = GetDefaultTablespace();
177                 /* note InvalidOid is OK in this case */
178         }
179
180         /* Check permissions except when using database's default */
181         if (OidIsValid(tablespaceId))
182         {
183                 AclResult       aclresult;
184
185                 aclresult = pg_tablespace_aclcheck(tablespaceId, GetUserId(),
186                                                                                    ACL_CREATE);
187                 if (aclresult != ACLCHECK_OK)
188                         aclcheck_error(aclresult, ACL_KIND_TABLESPACE,
189                                                    get_tablespace_name(tablespaceId));
190         }
191
192         /*
193          * Force shared indexes into the pg_global tablespace.  This is a bit of a
194          * hack but seems simpler than marking them in the BKI commands.
195          */
196         if (rel->rd_rel->relisshared)
197                 tablespaceId = GLOBALTABLESPACE_OID;
198
199         /*
200          * Select name for index if caller didn't specify
201          */
202         if (indexRelationName == NULL)
203         {
204                 if (primary)
205                         indexRelationName = ChooseRelationName(RelationGetRelationName(rel),
206                                                                                                    NULL,
207                                                                                                    "pkey",
208                                                                                                    namespaceId);
209                 else
210                 {
211                         IndexElem  *iparam = (IndexElem *) linitial(attributeList);
212
213                         indexRelationName = ChooseRelationName(RelationGetRelationName(rel),
214                                                                                                    iparam->name,
215                                                                                                    "key",
216                                                                                                    namespaceId);
217                 }
218         }
219
220         /*
221          * look up the access method, verify it can handle the requested features
222          */
223         tuple = SearchSysCache(AMNAME,
224                                                    PointerGetDatum(accessMethodName),
225                                                    0, 0, 0);
226         if (!HeapTupleIsValid(tuple))
227         {
228                 /*
229                  * Hack to provide more-or-less-transparent updating of old RTREE
230                  * indexes to GIST: if RTREE is requested and not found, use GIST.
231                  */
232                 if (strcmp(accessMethodName, "rtree") == 0)
233                 {
234                         ereport(NOTICE,
235                                         (errmsg("substituting access method \"gist\" for obsolete method \"rtree\"")));
236                         accessMethodName = "gist";
237                         tuple = SearchSysCache(AMNAME,
238                                                                    PointerGetDatum(accessMethodName),
239                                                                    0, 0, 0);
240                 }
241
242                 if (!HeapTupleIsValid(tuple))
243                         ereport(ERROR,
244                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
245                                          errmsg("access method \"%s\" does not exist",
246                                                         accessMethodName)));
247         }
248         accessMethodId = HeapTupleGetOid(tuple);
249         accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
250
251         if (unique && !accessMethodForm->amcanunique)
252                 ereport(ERROR,
253                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
254                            errmsg("access method \"%s\" does not support unique indexes",
255                                           accessMethodName)));
256         if (numberOfAttributes > 1 && !accessMethodForm->amcanmulticol)
257                 ereport(ERROR,
258                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
259                   errmsg("access method \"%s\" does not support multicolumn indexes",
260                                  accessMethodName)));
261
262         ReleaseSysCache(tuple);
263
264         /*
265          * If a range table was created then check that only the base rel is
266          * mentioned.
267          */
268         if (rangetable != NIL)
269         {
270                 if (list_length(rangetable) != 1 || getrelid(1, rangetable) != relationId)
271                         ereport(ERROR,
272                                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
273                                          errmsg("index expressions and predicates may refer only to the table being indexed")));
274         }
275
276         /*
277          * Validate predicate, if given
278          */
279         if (predicate)
280                 CheckPredicate(predicate);
281
282         /*
283          * Extra checks when creating a PRIMARY KEY index.
284          */
285         if (primary)
286         {
287                 List       *cmds;
288                 ListCell   *keys;
289
290                 /*
291                  * If ALTER TABLE, check that there isn't already a PRIMARY KEY. In
292                  * CREATE TABLE, we have faith that the parser rejected multiple pkey
293                  * clauses; and CREATE INDEX doesn't have a way to say PRIMARY KEY, so
294                  * it's no problem either.
295                  */
296                 if (is_alter_table &&
297                         relationHasPrimaryKey(rel))
298                 {
299                         ereport(ERROR,
300                                         (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
301                          errmsg("multiple primary keys for table \"%s\" are not allowed",
302                                         RelationGetRelationName(rel))));
303                 }
304
305                 /*
306                  * Check that all of the attributes in a primary key are marked as not
307                  * null, otherwise attempt to ALTER TABLE .. SET NOT NULL
308                  */
309                 cmds = NIL;
310                 foreach(keys, attributeList)
311                 {
312                         IndexElem  *key = (IndexElem *) lfirst(keys);
313                         HeapTuple       atttuple;
314
315                         if (!key->name)
316                                 ereport(ERROR,
317                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
318                                                  errmsg("primary keys cannot be expressions")));
319
320                         /* System attributes are never null, so no problem */
321                         if (SystemAttributeByName(key->name, rel->rd_rel->relhasoids))
322                                 continue;
323
324                         atttuple = SearchSysCacheAttName(relationId, key->name);
325                         if (HeapTupleIsValid(atttuple))
326                         {
327                                 if (!((Form_pg_attribute) GETSTRUCT(atttuple))->attnotnull)
328                                 {
329                                         /* Add a subcommand to make this one NOT NULL */
330                                         AlterTableCmd *cmd = makeNode(AlterTableCmd);
331
332                                         cmd->subtype = AT_SetNotNull;
333                                         cmd->name = key->name;
334
335                                         cmds = lappend(cmds, cmd);
336                                 }
337                                 ReleaseSysCache(atttuple);
338                         }
339                         else
340                         {
341                                 /*
342                                  * This shouldn't happen during CREATE TABLE, but can happen
343                                  * during ALTER TABLE.  Keep message in sync with
344                                  * transformIndexConstraints() in parser/analyze.c.
345                                  */
346                                 ereport(ERROR,
347                                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
348                                                  errmsg("column \"%s\" named in key does not exist",
349                                                                 key->name)));
350                         }
351                 }
352
353                 /*
354                  * XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade to child
355                  * tables?      Currently, since the PRIMARY KEY itself doesn't cascade,
356                  * we don't cascade the notnull constraint(s) either; but this is
357                  * pretty debatable.
358                  *
359                  * XXX: possible future improvement: when being called from ALTER
360                  * TABLE, it would be more efficient to merge this with the outer
361                  * ALTER TABLE, so as to avoid two scans.  But that seems to
362                  * complicate DefineIndex's API unduly.
363                  */
364                 if (cmds)
365                         AlterTableInternal(relationId, cmds, false);
366         }
367
368         /*
369          * Prepare arguments for index_create, primarily an IndexInfo structure.
370          * Note that ii_Predicate must be in implicit-AND format.
371          */
372         indexInfo = makeNode(IndexInfo);
373         indexInfo->ii_NumIndexAttrs = numberOfAttributes;
374         indexInfo->ii_Expressions = NIL;        /* for now */
375         indexInfo->ii_ExpressionsState = NIL;
376         indexInfo->ii_Predicate = make_ands_implicit(predicate);
377         indexInfo->ii_PredicateState = NIL;
378         indexInfo->ii_Unique = unique;
379
380         classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
381         ComputeIndexAttrs(indexInfo, classObjectId, attributeList,
382                                           relationId, accessMethodName, accessMethodId,
383                                           isconstraint);
384
385         heap_close(rel, NoLock);
386
387         /*
388          * Report index creation if appropriate (delay this till after most of the
389          * error checks)
390          */
391         if (isconstraint && !quiet)
392                 ereport(NOTICE,
393                   (errmsg("%s %s will create implicit index \"%s\" for table \"%s\"",
394                                   is_alter_table ? "ALTER TABLE / ADD" : "CREATE TABLE /",
395                                   primary ? "PRIMARY KEY" : "UNIQUE",
396                                   indexRelationName, RelationGetRelationName(rel))));
397
398         index_create(relationId, indexRelationName, indexRelationId,
399                                  indexInfo, accessMethodId, tablespaceId, classObjectId,
400                                  primary, false, isconstraint,
401                                  allowSystemTableMods, skip_build);
402 }
403
404
405 /*
406  * CheckPredicate
407  *              Checks that the given partial-index predicate is valid.
408  *
409  * This used to also constrain the form of the predicate to forms that
410  * indxpath.c could do something with.  However, that seems overly
411  * restrictive.  One useful application of partial indexes is to apply
412  * a UNIQUE constraint across a subset of a table, and in that scenario
413  * any evaluatable predicate will work.  So accept any predicate here
414  * (except ones requiring a plan), and let indxpath.c fend for itself.
415  */
416 static void
417 CheckPredicate(Expr *predicate)
418 {
419         /*
420          * We don't currently support generation of an actual query plan for a
421          * predicate, only simple scalar expressions; hence these restrictions.
422          */
423         if (contain_subplans((Node *) predicate))
424                 ereport(ERROR,
425                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
426                                  errmsg("cannot use subquery in index predicate")));
427         if (contain_agg_clause((Node *) predicate))
428                 ereport(ERROR,
429                                 (errcode(ERRCODE_GROUPING_ERROR),
430                                  errmsg("cannot use aggregate in index predicate")));
431
432         /*
433          * A predicate using mutable functions is probably wrong, for the same
434          * reasons that we don't allow an index expression to use one.
435          */
436         if (contain_mutable_functions((Node *) predicate))
437                 ereport(ERROR,
438                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
439                    errmsg("functions in index predicate must be marked IMMUTABLE")));
440 }
441
442 static void
443 ComputeIndexAttrs(IndexInfo *indexInfo,
444                                   Oid *classOidP,
445                                   List *attList,        /* list of IndexElem's */
446                                   Oid relId,
447                                   char *accessMethodName,
448                                   Oid accessMethodId,
449                                   bool isconstraint)
450 {
451         ListCell   *rest;
452         int                     attn = 0;
453
454         /*
455          * process attributeList
456          */
457         foreach(rest, attList)
458         {
459                 IndexElem  *attribute = (IndexElem *) lfirst(rest);
460                 Oid                     atttype;
461
462                 if (attribute->name != NULL)
463                 {
464                         /* Simple index attribute */
465                         HeapTuple       atttuple;
466                         Form_pg_attribute attform;
467
468                         Assert(attribute->expr == NULL);
469                         atttuple = SearchSysCacheAttName(relId, attribute->name);
470                         if (!HeapTupleIsValid(atttuple))
471                         {
472                                 /* difference in error message spellings is historical */
473                                 if (isconstraint)
474                                         ereport(ERROR,
475                                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
476                                                   errmsg("column \"%s\" named in key does not exist",
477                                                                  attribute->name)));
478                                 else
479                                         ereport(ERROR,
480                                                         (errcode(ERRCODE_UNDEFINED_COLUMN),
481                                                          errmsg("column \"%s\" does not exist",
482                                                                         attribute->name)));
483                         }
484                         attform = (Form_pg_attribute) GETSTRUCT(atttuple);
485                         indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum;
486                         atttype = attform->atttypid;
487                         ReleaseSysCache(atttuple);
488                 }
489                 else if (attribute->expr && IsA(attribute->expr, Var))
490                 {
491                         /* Tricky tricky, he wrote (column) ... treat as simple attr */
492                         Var                *var = (Var *) attribute->expr;
493
494                         indexInfo->ii_KeyAttrNumbers[attn] = var->varattno;
495                         atttype = get_atttype(relId, var->varattno);
496                 }
497                 else
498                 {
499                         /* Index expression */
500                         Assert(attribute->expr != NULL);
501                         indexInfo->ii_KeyAttrNumbers[attn] = 0;         /* marks expression */
502                         indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
503                                                                                                 attribute->expr);
504                         atttype = exprType(attribute->expr);
505
506                         /*
507                          * We don't currently support generation of an actual query plan
508                          * for an index expression, only simple scalar expressions; hence
509                          * these restrictions.
510                          */
511                         if (contain_subplans(attribute->expr))
512                                 ereport(ERROR,
513                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
514                                                  errmsg("cannot use subquery in index expression")));
515                         if (contain_agg_clause(attribute->expr))
516                                 ereport(ERROR,
517                                                 (errcode(ERRCODE_GROUPING_ERROR),
518                                 errmsg("cannot use aggregate function in index expression")));
519
520                         /*
521                          * A expression using mutable functions is probably wrong, since
522                          * if you aren't going to get the same result for the same data
523                          * every time, it's not clear what the index entries mean at all.
524                          */
525                         if (contain_mutable_functions(attribute->expr))
526                                 ereport(ERROR,
527                                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
528                                                  errmsg("functions in index expression must be marked IMMUTABLE")));
529                 }
530
531                 classOidP[attn] = GetIndexOpClass(attribute->opclass,
532                                                                                   atttype,
533                                                                                   accessMethodName,
534                                                                                   accessMethodId);
535                 attn++;
536         }
537 }
538
539 /*
540  * Resolve possibly-defaulted operator class specification
541  */
542 static Oid
543 GetIndexOpClass(List *opclass, Oid attrType,
544                                 char *accessMethodName, Oid accessMethodId)
545 {
546         char       *schemaname;
547         char       *opcname;
548         HeapTuple       tuple;
549         Oid                     opClassId,
550                                 opInputType;
551
552         /*
553          * Release 7.0 removed network_ops, timespan_ops, and datetime_ops, so we
554          * ignore those opclass names so the default *_ops is used.  This can be
555          * removed in some later release.  bjm 2000/02/07
556          *
557          * Release 7.1 removes lztext_ops, so suppress that too for a while.  tgl
558          * 2000/07/30
559          *
560          * Release 7.2 renames timestamp_ops to timestamptz_ops, so suppress that
561          * too for awhile.      I'm starting to think we need a better approach. tgl
562          * 2000/10/01
563          *
564          * Release 8.0 removes bigbox_ops (which was dead code for a long while
565          * anyway).  tgl 2003/11/11
566          */
567         if (list_length(opclass) == 1)
568         {
569                 char       *claname = strVal(linitial(opclass));
570
571                 if (strcmp(claname, "network_ops") == 0 ||
572                         strcmp(claname, "timespan_ops") == 0 ||
573                         strcmp(claname, "datetime_ops") == 0 ||
574                         strcmp(claname, "lztext_ops") == 0 ||
575                         strcmp(claname, "timestamp_ops") == 0 ||
576                         strcmp(claname, "bigbox_ops") == 0)
577                         opclass = NIL;
578         }
579
580         if (opclass == NIL)
581         {
582                 /* no operator class specified, so find the default */
583                 opClassId = GetDefaultOpClass(attrType, accessMethodId);
584                 if (!OidIsValid(opClassId))
585                         ereport(ERROR,
586                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
587                                          errmsg("data type %s has no default operator class for access method \"%s\"",
588                                                         format_type_be(attrType), accessMethodName),
589                                          errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
590                 return opClassId;
591         }
592
593         /*
594          * Specific opclass name given, so look up the opclass.
595          */
596
597         /* deconstruct the name list */
598         DeconstructQualifiedName(opclass, &schemaname, &opcname);
599
600         if (schemaname)
601         {
602                 /* Look in specific schema only */
603                 Oid                     namespaceId;
604
605                 namespaceId = LookupExplicitNamespace(schemaname);
606                 tuple = SearchSysCache(CLAAMNAMENSP,
607                                                            ObjectIdGetDatum(accessMethodId),
608                                                            PointerGetDatum(opcname),
609                                                            ObjectIdGetDatum(namespaceId),
610                                                            0);
611         }
612         else
613         {
614                 /* Unqualified opclass name, so search the search path */
615                 opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
616                 if (!OidIsValid(opClassId))
617                         ereport(ERROR,
618                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
619                                          errmsg("operator class \"%s\" does not exist for access method \"%s\"",
620                                                         opcname, accessMethodName)));
621                 tuple = SearchSysCache(CLAOID,
622                                                            ObjectIdGetDatum(opClassId),
623                                                            0, 0, 0);
624         }
625
626         if (!HeapTupleIsValid(tuple))
627                 ereport(ERROR,
628                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
629                                  errmsg("operator class \"%s\" does not exist for access method \"%s\"",
630                                                 NameListToString(opclass), accessMethodName)));
631
632         /*
633          * Verify that the index operator class accepts this datatype.  Note we
634          * will accept binary compatibility.
635          */
636         opClassId = HeapTupleGetOid(tuple);
637         opInputType = ((Form_pg_opclass) GETSTRUCT(tuple))->opcintype;
638
639         if (!IsBinaryCoercible(attrType, opInputType))
640                 ereport(ERROR,
641                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
642                                  errmsg("operator class \"%s\" does not accept data type %s",
643                                           NameListToString(opclass), format_type_be(attrType))));
644
645         ReleaseSysCache(tuple);
646
647         return opClassId;
648 }
649
650 /*
651  * GetDefaultOpClass
652  *
653  * Given the OIDs of a datatype and an access method, find the default
654  * operator class, if any.      Returns InvalidOid if there is none.
655  */
656 Oid
657 GetDefaultOpClass(Oid type_id, Oid am_id)
658 {
659         int                     nexact = 0;
660         int                     ncompatible = 0;
661         Oid                     exactOid = InvalidOid;
662         Oid                     compatibleOid = InvalidOid;
663         Relation        rel;
664         ScanKeyData skey[1];
665         SysScanDesc scan;
666         HeapTuple       tup;
667
668         /* If it's a domain, look at the base type instead */
669         type_id = getBaseType(type_id);
670
671         /*
672          * We scan through all the opclasses available for the access method,
673          * looking for one that is marked default and matches the target type
674          * (either exactly or binary-compatibly, but prefer an exact match).
675          *
676          * We could find more than one binary-compatible match, in which case we
677          * require the user to specify which one he wants.      If we find more than
678          * one exact match, then someone put bogus entries in pg_opclass.
679          */
680         rel = heap_open(OperatorClassRelationId, AccessShareLock);
681
682         ScanKeyInit(&skey[0],
683                                 Anum_pg_opclass_opcamid,
684                                 BTEqualStrategyNumber, F_OIDEQ,
685                                 ObjectIdGetDatum(am_id));
686
687         scan = systable_beginscan(rel, OpclassAmNameNspIndexId, true,
688                                                           SnapshotNow, 1, skey);
689
690         while (HeapTupleIsValid(tup = systable_getnext(scan)))
691         {
692                 Form_pg_opclass opclass = (Form_pg_opclass) GETSTRUCT(tup);
693
694                 if (opclass->opcdefault)
695                 {
696                         if (opclass->opcintype == type_id)
697                         {
698                                 nexact++;
699                                 exactOid = HeapTupleGetOid(tup);
700                         }
701                         else if (IsBinaryCoercible(type_id, opclass->opcintype))
702                         {
703                                 ncompatible++;
704                                 compatibleOid = HeapTupleGetOid(tup);
705                         }
706                 }
707         }
708
709         systable_endscan(scan);
710
711         heap_close(rel, AccessShareLock);
712
713         if (nexact == 1)
714                 return exactOid;
715         if (nexact != 0)
716                 ereport(ERROR,
717                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
718                 errmsg("there are multiple default operator classes for data type %s",
719                            format_type_be(type_id))));
720         if (ncompatible == 1)
721                 return compatibleOid;
722
723         return InvalidOid;
724 }
725
726 /*
727  *      makeObjectName()
728  *
729  *      Create a name for an implicitly created index, sequence, constraint, etc.
730  *
731  *      The parameters are typically: the original table name, the original field
732  *      name, and a "type" string (such as "seq" or "pkey").    The field name
733  *      and/or type can be NULL if not relevant.
734  *
735  *      The result is a palloc'd string.
736  *
737  *      The basic result we want is "name1_name2_label", omitting "_name2" or
738  *      "_label" when those parameters are NULL.  However, we must generate
739  *      a name with less than NAMEDATALEN characters!  So, we truncate one or
740  *      both names if necessary to make a short-enough string.  The label part
741  *      is never truncated (so it had better be reasonably short).
742  *
743  *      The caller is responsible for checking uniqueness of the generated
744  *      name and retrying as needed; retrying will be done by altering the
745  *      "label" string (which is why we never truncate that part).
746  */
747 char *
748 makeObjectName(const char *name1, const char *name2, const char *label)
749 {
750         char       *name;
751         int                     overhead = 0;   /* chars needed for label and underscores */
752         int                     availchars;             /* chars available for name(s) */
753         int                     name1chars;             /* chars allocated to name1 */
754         int                     name2chars;             /* chars allocated to name2 */
755         int                     ndx;
756
757         name1chars = strlen(name1);
758         if (name2)
759         {
760                 name2chars = strlen(name2);
761                 overhead++;                             /* allow for separating underscore */
762         }
763         else
764                 name2chars = 0;
765         if (label)
766                 overhead += strlen(label) + 1;
767
768         availchars = NAMEDATALEN - 1 - overhead;
769         Assert(availchars > 0);         /* else caller chose a bad label */
770
771         /*
772          * If we must truncate,  preferentially truncate the longer name. This
773          * logic could be expressed without a loop, but it's simple and obvious as
774          * a loop.
775          */
776         while (name1chars + name2chars > availchars)
777         {
778                 if (name1chars > name2chars)
779                         name1chars--;
780                 else
781                         name2chars--;
782         }
783
784         name1chars = pg_mbcliplen(name1, name1chars, name1chars);
785         if (name2)
786                 name2chars = pg_mbcliplen(name2, name2chars, name2chars);
787
788         /* Now construct the string using the chosen lengths */
789         name = palloc(name1chars + name2chars + overhead + 1);
790         memcpy(name, name1, name1chars);
791         ndx = name1chars;
792         if (name2)
793         {
794                 name[ndx++] = '_';
795                 memcpy(name + ndx, name2, name2chars);
796                 ndx += name2chars;
797         }
798         if (label)
799         {
800                 name[ndx++] = '_';
801                 strcpy(name + ndx, label);
802         }
803         else
804                 name[ndx] = '\0';
805
806         return name;
807 }
808
809 /*
810  * Select a nonconflicting name for a new relation.  This is ordinarily
811  * used to choose index names (which is why it's here) but it can also
812  * be used for sequences, or any autogenerated relation kind.
813  *
814  * name1, name2, and label are used the same way as for makeObjectName(),
815  * except that the label can't be NULL; digits will be appended to the label
816  * if needed to create a name that is unique within the specified namespace.
817  *
818  * Note: it is theoretically possible to get a collision anyway, if someone
819  * else chooses the same name concurrently.  This is fairly unlikely to be
820  * a problem in practice, especially if one is holding an exclusive lock on
821  * the relation identified by name1.  However, if choosing multiple names
822  * within a single command, you'd better create the new object and do
823  * CommandCounterIncrement before choosing the next one!
824  *
825  * Returns a palloc'd string.
826  */
827 char *
828 ChooseRelationName(const char *name1, const char *name2,
829                                    const char *label, Oid namespace)
830 {
831         int                     pass = 0;
832         char       *relname = NULL;
833         char            modlabel[NAMEDATALEN];
834
835         /* try the unmodified label first */
836         StrNCpy(modlabel, label, sizeof(modlabel));
837
838         for (;;)
839         {
840                 relname = makeObjectName(name1, name2, modlabel);
841
842                 if (!OidIsValid(get_relname_relid(relname, namespace)))
843                         break;
844
845                 /* found a conflict, so try a new name component */
846                 pfree(relname);
847                 snprintf(modlabel, sizeof(modlabel), "%s%d", label, ++pass);
848         }
849
850         return relname;
851 }
852
853 /*
854  * relationHasPrimaryKey -
855  *
856  *      See whether an existing relation has a primary key.
857  */
858 static bool
859 relationHasPrimaryKey(Relation rel)
860 {
861         bool            result = false;
862         List       *indexoidlist;
863         ListCell   *indexoidscan;
864
865         /*
866          * Get the list of index OIDs for the table from the relcache, and look up
867          * each one in the pg_index syscache until we find one marked primary key
868          * (hopefully there isn't more than one such).
869          */
870         indexoidlist = RelationGetIndexList(rel);
871
872         foreach(indexoidscan, indexoidlist)
873         {
874                 Oid                     indexoid = lfirst_oid(indexoidscan);
875                 HeapTuple       indexTuple;
876
877                 indexTuple = SearchSysCache(INDEXRELID,
878                                                                         ObjectIdGetDatum(indexoid),
879                                                                         0, 0, 0);
880                 if (!HeapTupleIsValid(indexTuple))              /* should not happen */
881                         elog(ERROR, "cache lookup failed for index %u", indexoid);
882                 result = ((Form_pg_index) GETSTRUCT(indexTuple))->indisprimary;
883                 ReleaseSysCache(indexTuple);
884                 if (result)
885                         break;
886         }
887
888         list_free(indexoidlist);
889
890         return result;
891 }
892
893
894 /*
895  * RemoveIndex
896  *              Deletes an index.
897  */
898 void
899 RemoveIndex(RangeVar *relation, DropBehavior behavior)
900 {
901         Oid                     indOid;
902         char            relkind;
903         ObjectAddress object;
904
905         indOid = RangeVarGetRelid(relation, false);
906         relkind = get_rel_relkind(indOid);
907         if (relkind != RELKIND_INDEX)
908                 ereport(ERROR,
909                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
910                                  errmsg("\"%s\" is not an index",
911                                                 relation->relname)));
912
913         object.classId = RelationRelationId;
914         object.objectId = indOid;
915         object.objectSubId = 0;
916
917         performDeletion(&object, behavior);
918 }
919
920 /*
921  * ReindexIndex
922  *              Recreate a specific index.
923  */
924 void
925 ReindexIndex(RangeVar *indexRelation)
926 {
927         Oid                     indOid;
928         HeapTuple       tuple;
929
930         indOid = RangeVarGetRelid(indexRelation, false);
931         tuple = SearchSysCache(RELOID,
932                                                    ObjectIdGetDatum(indOid),
933                                                    0, 0, 0);
934         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
935                 elog(ERROR, "cache lookup failed for relation %u", indOid);
936
937         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
938                 ereport(ERROR,
939                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
940                                  errmsg("\"%s\" is not an index",
941                                                 indexRelation->relname)));
942
943         /* Check permissions */
944         if (!pg_class_ownercheck(indOid, GetUserId()))
945                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
946                                            indexRelation->relname);
947
948         ReleaseSysCache(tuple);
949
950         reindex_index(indOid);
951 }
952
953 /*
954  * ReindexTable
955  *              Recreate all indexes of a table (and of its toast table, if any)
956  */
957 void
958 ReindexTable(RangeVar *relation)
959 {
960         Oid                     heapOid;
961         HeapTuple       tuple;
962
963         heapOid = RangeVarGetRelid(relation, false);
964         tuple = SearchSysCache(RELOID,
965                                                    ObjectIdGetDatum(heapOid),
966                                                    0, 0, 0);
967         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
968                 elog(ERROR, "cache lookup failed for relation %u", heapOid);
969
970         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION &&
971                 ((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE)
972                 ereport(ERROR,
973                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
974                                  errmsg("\"%s\" is not a table",
975                                                 relation->relname)));
976
977         /* Check permissions */
978         if (!pg_class_ownercheck(heapOid, GetUserId()))
979                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
980                                            relation->relname);
981
982         /* Can't reindex shared tables except in standalone mode */
983         if (((Form_pg_class) GETSTRUCT(tuple))->relisshared && IsUnderPostmaster)
984                 ereport(ERROR,
985                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
986                                  errmsg("shared table \"%s\" can only be reindexed in stand-alone mode",
987                                                 relation->relname)));
988
989         ReleaseSysCache(tuple);
990
991         if (!reindex_relation(heapOid, true))
992                 ereport(NOTICE,
993                                 (errmsg("table \"%s\" has no indexes",
994                                                 relation->relname)));
995 }
996
997 /*
998  * ReindexDatabase
999  *              Recreate indexes of a database.
1000  *
1001  * To reduce the probability of deadlocks, each table is reindexed in a
1002  * separate transaction, so we can release the lock on it right away.
1003  */
1004 void
1005 ReindexDatabase(const char *databaseName, bool do_system, bool do_user)
1006 {
1007         Relation        relationRelation;
1008         HeapScanDesc scan;
1009         HeapTuple       tuple;
1010         MemoryContext private_context;
1011         MemoryContext old;
1012         List       *relids = NIL;
1013         ListCell   *l;
1014
1015         AssertArg(databaseName);
1016
1017         if (strcmp(databaseName, get_database_name(MyDatabaseId)) != 0)
1018                 ereport(ERROR,
1019                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1020                                  errmsg("can only reindex the currently open database")));
1021
1022         if (!pg_database_ownercheck(MyDatabaseId, GetUserId()))
1023                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
1024                                            databaseName);
1025
1026         /*
1027          * We cannot run inside a user transaction block; if we were inside a
1028          * transaction, then our commit- and start-transaction-command calls would
1029          * not have the intended effect!
1030          */
1031         PreventTransactionChain((void *) databaseName, "REINDEX DATABASE");
1032
1033         /*
1034          * Create a memory context that will survive forced transaction commits we
1035          * do below.  Since it is a child of PortalContext, it will go away
1036          * eventually even if we suffer an error; there's no need for special
1037          * abort cleanup logic.
1038          */
1039         private_context = AllocSetContextCreate(PortalContext,
1040                                                                                         "ReindexDatabase",
1041                                                                                         ALLOCSET_DEFAULT_MINSIZE,
1042                                                                                         ALLOCSET_DEFAULT_INITSIZE,
1043                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
1044
1045         /*
1046          * We always want to reindex pg_class first.  This ensures that if there
1047          * is any corruption in pg_class' indexes, they will be fixed before we
1048          * process any other tables.  This is critical because reindexing itself
1049          * will try to update pg_class.
1050          */
1051         if (do_system)
1052         {
1053                 old = MemoryContextSwitchTo(private_context);
1054                 relids = lappend_oid(relids, RelationRelationId);
1055                 MemoryContextSwitchTo(old);
1056         }
1057
1058         /*
1059          * Scan pg_class to build a list of the relations we need to reindex.
1060          *
1061          * We only consider plain relations here (toast rels will be processed
1062          * indirectly by reindex_relation).
1063          */
1064         relationRelation = heap_open(RelationRelationId, AccessShareLock);
1065         scan = heap_beginscan(relationRelation, SnapshotNow, 0, NULL);
1066         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
1067         {
1068                 Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
1069
1070                 if (classtuple->relkind != RELKIND_RELATION)
1071                         continue;
1072
1073                 /* Check user/system classification, and optionally skip */
1074                 if (IsSystemClass(classtuple))
1075                 {
1076                         if (!do_system)
1077                                 continue;
1078                 }
1079                 else
1080                 {
1081                         if (!do_user)
1082                                 continue;
1083                 }
1084
1085                 if (IsUnderPostmaster)  /* silently ignore shared tables */
1086                 {
1087                         if (classtuple->relisshared)
1088                                 continue;
1089                 }
1090
1091                 if (HeapTupleGetOid(tuple) == RelationRelationId)
1092                         continue;                       /* got it already */
1093
1094                 old = MemoryContextSwitchTo(private_context);
1095                 relids = lappend_oid(relids, HeapTupleGetOid(tuple));
1096                 MemoryContextSwitchTo(old);
1097         }
1098         heap_endscan(scan);
1099         heap_close(relationRelation, AccessShareLock);
1100
1101         /* Now reindex each rel in a separate transaction */
1102         CommitTransactionCommand();
1103         foreach(l, relids)
1104         {
1105                 Oid                     relid = lfirst_oid(l);
1106
1107                 StartTransactionCommand();
1108                 /* functions in indexes may want a snapshot set */
1109                 ActiveSnapshot = CopySnapshot(GetTransactionSnapshot());
1110                 if (reindex_relation(relid, true))
1111                         ereport(NOTICE,
1112                                         (errmsg("table \"%s\" was reindexed",
1113                                                         get_rel_name(relid))));
1114                 CommitTransactionCommand();
1115         }
1116         StartTransactionCommand();
1117
1118         MemoryContextDelete(private_context);
1119 }