]> granicus.if.org Git - postgresql/blob - src/backend/commands/indexcmds.c
Change some notices to warnings and vice versa according to criteria
[postgresql] / src / backend / commands / indexcmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * indexcmds.c
4  *        POSTGRES define and remove index code.
5  *
6  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/commands/indexcmds.c,v 1.114 2003/10/02 06:34:03 petere Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "catalog/catalog.h"
20 #include "catalog/catname.h"
21 #include "catalog/dependency.h"
22 #include "catalog/heap.h"
23 #include "catalog/index.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_opclass.h"
26 #include "catalog/pg_proc.h"
27 #include "commands/dbcommands.h"
28 #include "commands/defrem.h"
29 #include "commands/tablecmds.h"
30 #include "executor/executor.h"
31 #include "miscadmin.h"
32 #include "optimizer/clauses.h"
33 #include "optimizer/prep.h"
34 #include "parser/parsetree.h"
35 #include "parser/parse_coerce.h"
36 #include "parser/parse_expr.h"
37 #include "parser/parse_func.h"
38 #include "utils/acl.h"
39 #include "utils/builtins.h"
40 #include "utils/lsyscache.h"
41 #include "utils/syscache.h"
42
43
44 /* non-export function prototypes */
45 static void CheckPredicate(List *predList);
46 static void ComputeIndexAttrs(IndexInfo *indexInfo, Oid *classOidP,
47                                   List *attList,
48                                   Oid relId,
49                                   char *accessMethodName, Oid accessMethodId);
50 static Oid GetIndexOpClass(List *opclass, Oid attrType,
51                                 char *accessMethodName, Oid accessMethodId);
52 static Oid      GetDefaultOpClass(Oid attrType, Oid accessMethodId);
53
54 /*
55  * DefineIndex
56  *              Creates a new index.
57  *
58  * 'attributeList' is a list of IndexElem specifying columns and expressions
59  *              to index on.
60  * 'predicate' is the qual specified in the where clause.
61  * 'rangetable' is needed to interpret the predicate.
62  */
63 void
64 DefineIndex(RangeVar *heapRelation,
65                         char *indexRelationName,
66                         char *accessMethodName,
67                         List *attributeList,
68                         bool unique,
69                         bool primary,
70                         bool isconstraint,
71                         Expr *predicate,
72                         List *rangetable)
73 {
74         Oid                *classObjectId;
75         Oid                     accessMethodId;
76         Oid                     relationId;
77         Oid                     namespaceId;
78         Relation        rel;
79         HeapTuple       tuple;
80         Form_pg_am      accessMethodForm;
81         IndexInfo  *indexInfo;
82         int                     numberOfAttributes;
83         List       *cnfPred = NIL;
84
85         /*
86          * count attributes in index
87          */
88         numberOfAttributes = length(attributeList);
89         if (numberOfAttributes <= 0)
90                 ereport(ERROR,
91                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
92                                  errmsg("must specify at least one column")));
93         if (numberOfAttributes > INDEX_MAX_KEYS)
94                 ereport(ERROR,
95                                 (errcode(ERRCODE_TOO_MANY_COLUMNS),
96                                  errmsg("cannot use more than %d columns in an index",
97                                                 INDEX_MAX_KEYS)));
98
99         /*
100          * Open heap relation, acquire a suitable lock on it, remember its OID
101          */
102         rel = heap_openrv(heapRelation, ShareLock);
103
104         /* Note: during bootstrap may see uncataloged relation */
105         if (rel->rd_rel->relkind != RELKIND_RELATION &&
106                 rel->rd_rel->relkind != RELKIND_UNCATALOGED)
107                 ereport(ERROR,
108                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
109                                  errmsg("\"%s\" is not a table",
110                                                 heapRelation->relname)));
111
112         relationId = RelationGetRelid(rel);
113         namespaceId = RelationGetNamespace(rel);
114
115         heap_close(rel, NoLock);
116
117         /*
118          * Verify we (still) have CREATE rights in the rel's namespace.
119          * (Presumably we did when the rel was created, but maybe not
120          * anymore.) Skip check if bootstrapping, since permissions machinery
121          * may not be working yet.
122          */
123         if (!IsBootstrapProcessingMode())
124         {
125                 AclResult       aclresult;
126
127                 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
128                                                                                   ACL_CREATE);
129                 if (aclresult != ACLCHECK_OK)
130                         aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
131                                                    get_namespace_name(namespaceId));
132         }
133
134         /*
135          * look up the access method, verify it can handle the requested
136          * features
137          */
138         tuple = SearchSysCache(AMNAME,
139                                                    PointerGetDatum(accessMethodName),
140                                                    0, 0, 0);
141         if (!HeapTupleIsValid(tuple))
142                 ereport(ERROR,
143                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
144                                  errmsg("access method \"%s\" does not exist",
145                                                 accessMethodName)));
146         accessMethodId = HeapTupleGetOid(tuple);
147         accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
148
149         if (unique && !accessMethodForm->amcanunique)
150                 ereport(ERROR,
151                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
152                    errmsg("access method \"%s\" does not support unique indexes",
153                                   accessMethodName)));
154         if (numberOfAttributes > 1 && !accessMethodForm->amcanmulticol)
155                 ereport(ERROR,
156                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
157                                  errmsg("access method \"%s\" does not support multicolumn indexes",
158                                                 accessMethodName)));
159
160         ReleaseSysCache(tuple);
161
162         /*
163          * If a range table was created then check that only the base rel is
164          * mentioned.
165          */
166         if (rangetable != NIL)
167         {
168                 if (length(rangetable) != 1 || getrelid(1, rangetable) != relationId)
169                         ereport(ERROR,
170                                         (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
171                                          errmsg("index expressions and predicates may refer only to the table being indexed")));
172         }
173
174         /*
175          * Convert the partial-index predicate from parsetree form to an
176          * implicit-AND qual expression, for easier evaluation at runtime.
177          * While we are at it, we reduce it to a canonical (CNF or DNF) form
178          * to simplify the task of proving implications.
179          */
180         if (predicate)
181         {
182                 cnfPred = canonicalize_qual((Expr *) copyObject(predicate), true);
183                 CheckPredicate(cnfPred);
184         }
185
186         /*
187          * Check that all of the attributes in a primary key are marked as not
188          * null, otherwise attempt to ALTER TABLE .. SET NOT NULL
189          */
190         if (primary)
191         {
192                 List       *keys;
193
194                 foreach(keys, attributeList)
195                 {
196                         IndexElem  *key = (IndexElem *) lfirst(keys);
197                         HeapTuple       atttuple;
198
199                         if (!key->name)
200                                 ereport(ERROR,
201                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
202                                                  errmsg("primary keys cannot be expressions")));
203
204                         /* System attributes are never null, so no problem */
205                         if (SystemAttributeByName(key->name, rel->rd_rel->relhasoids))
206                                 continue;
207
208                         atttuple = SearchSysCacheAttName(relationId, key->name);
209                         if (HeapTupleIsValid(atttuple))
210                         {
211                                 if (!((Form_pg_attribute) GETSTRUCT(atttuple))->attnotnull)
212                                 {
213                                         /*
214                                          * Try to make it NOT NULL.
215                                          *
216                                          * XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade
217                                          * to child tables?  Currently, since the PRIMARY KEY
218                                          * itself doesn't cascade, we don't cascade the
219                                          * notnull constraint either; but this is pretty
220                                          * debatable.
221                                          */
222                                         AlterTableAlterColumnSetNotNull(relationId, false,
223                                                                                                         key->name);
224                                 }
225                                 ReleaseSysCache(atttuple);
226                         }
227                         else
228                         {
229                                 /* This shouldn't happen if parser did its job ... */
230                                 ereport(ERROR,
231                                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
232                                           errmsg("column \"%s\" named in key does not exist",
233                                                          key->name)));
234                         }
235                 }
236         }
237
238         /*
239          * Prepare arguments for index_create, primarily an IndexInfo
240          * structure
241          */
242         indexInfo = makeNode(IndexInfo);
243         indexInfo->ii_NumIndexAttrs = numberOfAttributes;
244         indexInfo->ii_Expressions = NIL;        /* for now */
245         indexInfo->ii_ExpressionsState = NIL;
246         indexInfo->ii_Predicate = cnfPred;
247         indexInfo->ii_PredicateState = NIL;
248         indexInfo->ii_Unique = unique;
249
250         classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
251         ComputeIndexAttrs(indexInfo, classObjectId, attributeList,
252                                           relationId, accessMethodName, accessMethodId);
253
254         index_create(relationId, indexRelationName,
255                                  indexInfo, accessMethodId, classObjectId,
256                                  primary, isconstraint, allowSystemTableMods);
257
258         /*
259          * We update the relation's pg_class tuple even if it already has
260          * relhasindex = true.  This is needed to cause a shared-cache-inval
261          * message to be sent for the pg_class tuple, which will cause other
262          * backends to flush their relcache entries and in particular their
263          * cached lists of the indexes for this relation.
264          */
265         setRelhasindex(relationId, true, primary, InvalidOid);
266 }
267
268
269 /*
270  * CheckPredicate
271  *              Checks that the given list of partial-index predicates is valid.
272  *
273  * This used to also constrain the form of the predicate to forms that
274  * indxpath.c could do something with.  However, that seems overly
275  * restrictive.  One useful application of partial indexes is to apply
276  * a UNIQUE constraint across a subset of a table, and in that scenario
277  * any evaluatable predicate will work.  So accept any predicate here
278  * (except ones requiring a plan), and let indxpath.c fend for itself.
279  */
280 static void
281 CheckPredicate(List *predList)
282 {
283         /*
284          * We don't currently support generation of an actual query plan for a
285          * predicate, only simple scalar expressions; hence these
286          * restrictions.
287          */
288         if (contain_subplans((Node *) predList))
289                 ereport(ERROR,
290                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
291                                  errmsg("cannot use subquery in index predicate")));
292         if (contain_agg_clause((Node *) predList))
293                 ereport(ERROR,
294                                 (errcode(ERRCODE_GROUPING_ERROR),
295                                  errmsg("cannot use aggregate in index predicate")));
296
297         /*
298          * A predicate using mutable functions is probably wrong, for the same
299          * reasons that we don't allow an index expression to use one.
300          */
301         if (contain_mutable_functions((Node *) predList))
302                 ereport(ERROR,
303                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
304                 errmsg("functions in index predicate must be marked IMMUTABLE")));
305 }
306
307 static void
308 ComputeIndexAttrs(IndexInfo *indexInfo,
309                                   Oid *classOidP,
310                                   List *attList,        /* list of IndexElem's */
311                                   Oid relId,
312                                   char *accessMethodName,
313                                   Oid accessMethodId)
314 {
315         List       *rest;
316         int                     attn = 0;
317
318         /*
319          * process attributeList
320          */
321         foreach(rest, attList)
322         {
323                 IndexElem  *attribute = (IndexElem *) lfirst(rest);
324                 Oid                     atttype;
325
326                 if (attribute->name != NULL)
327                 {
328                         /* Simple index attribute */
329                         HeapTuple       atttuple;
330                         Form_pg_attribute attform;
331
332                         Assert(attribute->expr == NULL);
333                         atttuple = SearchSysCacheAttName(relId, attribute->name);
334                         if (!HeapTupleIsValid(atttuple))
335                                 ereport(ERROR,
336                                                 (errcode(ERRCODE_UNDEFINED_COLUMN),
337                                                  errmsg("column \"%s\" does not exist",
338                                                                 attribute->name)));
339                         attform = (Form_pg_attribute) GETSTRUCT(atttuple);
340                         indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum;
341                         atttype = attform->atttypid;
342                         ReleaseSysCache(atttuple);
343                 }
344                 else if (attribute->expr && IsA(attribute->expr, Var))
345                 {
346                         /* Tricky tricky, he wrote (column) ... treat as simple attr */
347                         Var                *var = (Var *) attribute->expr;
348
349                         indexInfo->ii_KeyAttrNumbers[attn] = var->varattno;
350                         atttype = get_atttype(relId, var->varattno);
351                 }
352                 else
353                 {
354                         /* Index expression */
355                         Assert(attribute->expr != NULL);
356                         indexInfo->ii_KeyAttrNumbers[attn] = 0;         /* marks expression */
357                         indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
358                                                                                                 attribute->expr);
359                         atttype = exprType(attribute->expr);
360
361                         /*
362                          * We don't currently support generation of an actual query
363                          * plan for an index expression, only simple scalar
364                          * expressions; hence these restrictions.
365                          */
366                         if (contain_subplans(attribute->expr))
367                                 ereport(ERROR,
368                                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
369                                    errmsg("cannot use subquery in index expression")));
370                         if (contain_agg_clause(attribute->expr))
371                                 ereport(ERROR,
372                                                 (errcode(ERRCODE_GROUPING_ERROR),
373                                         errmsg("cannot use aggregate function in index expression")));
374
375                         /*
376                          * A expression using mutable functions is probably wrong,
377                          * since if you aren't going to get the same result for the
378                          * same data every time, it's not clear what the index entries
379                          * mean at all.
380                          */
381                         if (contain_mutable_functions(attribute->expr))
382                                 ereport(ERROR,
383                                                 (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
384                                                  errmsg("functions in index expression must be marked IMMUTABLE")));
385                 }
386
387                 classOidP[attn] = GetIndexOpClass(attribute->opclass,
388                                                                                   atttype,
389                                                                                   accessMethodName,
390                                                                                   accessMethodId);
391                 attn++;
392         }
393 }
394
395 /*
396  * Resolve possibly-defaulted operator class specification
397  */
398 static Oid
399 GetIndexOpClass(List *opclass, Oid attrType,
400                                 char *accessMethodName, Oid accessMethodId)
401 {
402         char       *schemaname;
403         char       *opcname;
404         HeapTuple       tuple;
405         Oid                     opClassId,
406                                 opInputType;
407
408         /*
409          * Release 7.0 removed network_ops, timespan_ops, and datetime_ops, so
410          * we ignore those opclass names so the default *_ops is used.  This
411          * can be removed in some later release.  bjm 2000/02/07
412          *
413          * Release 7.1 removes lztext_ops, so suppress that too for a while.  tgl
414          * 2000/07/30
415          *
416          * Release 7.2 renames timestamp_ops to timestamptz_ops, so suppress that
417          * too for awhile.      I'm starting to think we need a better approach.
418          * tgl 2000/10/01
419          */
420         if (length(opclass) == 1)
421         {
422                 char       *claname = strVal(lfirst(opclass));
423
424                 if (strcmp(claname, "network_ops") == 0 ||
425                         strcmp(claname, "timespan_ops") == 0 ||
426                         strcmp(claname, "datetime_ops") == 0 ||
427                         strcmp(claname, "lztext_ops") == 0 ||
428                         strcmp(claname, "timestamp_ops") == 0)
429                         opclass = NIL;
430         }
431
432         if (opclass == NIL)
433         {
434                 /* no operator class specified, so find the default */
435                 opClassId = GetDefaultOpClass(attrType, accessMethodId);
436                 if (!OidIsValid(opClassId))
437                         ereport(ERROR,
438                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
439                                          errmsg("data type %s has no default operator class for access method \"%s\"",
440                                                         format_type_be(attrType), accessMethodName),
441                                          errhint("You must specify an operator class for the index or define a default operator class for the data type.")));
442                 return opClassId;
443         }
444
445         /*
446          * Specific opclass name given, so look up the opclass.
447          */
448
449         /* deconstruct the name list */
450         DeconstructQualifiedName(opclass, &schemaname, &opcname);
451
452         if (schemaname)
453         {
454                 /* Look in specific schema only */
455                 Oid                     namespaceId;
456
457                 namespaceId = LookupExplicitNamespace(schemaname);
458                 tuple = SearchSysCache(CLAAMNAMENSP,
459                                                            ObjectIdGetDatum(accessMethodId),
460                                                            PointerGetDatum(opcname),
461                                                            ObjectIdGetDatum(namespaceId),
462                                                            0);
463         }
464         else
465         {
466                 /* Unqualified opclass name, so search the search path */
467                 opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
468                 if (!OidIsValid(opClassId))
469                         ereport(ERROR,
470                                         (errcode(ERRCODE_UNDEFINED_OBJECT),
471                                          errmsg("operator class \"%s\" does not exist for access method \"%s\"",
472                                                         opcname, accessMethodName)));
473                 tuple = SearchSysCache(CLAOID,
474                                                            ObjectIdGetDatum(opClassId),
475                                                            0, 0, 0);
476         }
477
478         if (!HeapTupleIsValid(tuple))
479                 ereport(ERROR,
480                                 (errcode(ERRCODE_UNDEFINED_OBJECT),
481                                  errmsg("operator class \"%s\" does not exist for access method \"%s\"",
482                                                 NameListToString(opclass), accessMethodName)));
483
484         /*
485          * Verify that the index operator class accepts this datatype.  Note
486          * we will accept binary compatibility.
487          */
488         opClassId = HeapTupleGetOid(tuple);
489         opInputType = ((Form_pg_opclass) GETSTRUCT(tuple))->opcintype;
490
491         if (!IsBinaryCoercible(attrType, opInputType))
492                 ereport(ERROR,
493                                 (errcode(ERRCODE_DATATYPE_MISMATCH),
494                          errmsg("operator class \"%s\" does not accept data type %s",
495                                   NameListToString(opclass), format_type_be(attrType))));
496
497         ReleaseSysCache(tuple);
498
499         return opClassId;
500 }
501
502 static Oid
503 GetDefaultOpClass(Oid attrType, Oid accessMethodId)
504 {
505         OpclassCandidateList opclass;
506         int                     nexact = 0;
507         int                     ncompatible = 0;
508         Oid                     exactOid = InvalidOid;
509         Oid                     compatibleOid = InvalidOid;
510
511         /* If it's a domain, look at the base type instead */
512         attrType = getBaseType(attrType);
513
514         /*
515          * We scan through all the opclasses available for the access method,
516          * looking for one that is marked default and matches the target type
517          * (either exactly or binary-compatibly, but prefer an exact match).
518          *
519          * We could find more than one binary-compatible match, in which case we
520          * require the user to specify which one he wants.      If we find more
521          * than one exact match, then someone put bogus entries in pg_opclass.
522          *
523          * The initial search is done by namespace.c so that we only consider
524          * opclasses visible in the current namespace search path.  (See also
525          * typcache.c, which applies the same logic, but over all opclasses.)
526          */
527         for (opclass = OpclassGetCandidates(accessMethodId);
528                  opclass != NULL;
529                  opclass = opclass->next)
530         {
531                 if (opclass->opcdefault)
532                 {
533                         if (opclass->opcintype == attrType)
534                         {
535                                 nexact++;
536                                 exactOid = opclass->oid;
537                         }
538                         else if (IsBinaryCoercible(attrType, opclass->opcintype))
539                         {
540                                 ncompatible++;
541                                 compatibleOid = opclass->oid;
542                         }
543                 }
544         }
545
546         if (nexact == 1)
547                 return exactOid;
548         if (nexact != 0)
549                 ereport(ERROR,
550                                 (errcode(ERRCODE_DUPLICATE_OBJECT),
551                                  errmsg("there are multiple default operator classes for data type %s",
552                                                 format_type_be(attrType))));
553         if (ncompatible == 1)
554                 return compatibleOid;
555
556         return InvalidOid;
557 }
558
559 /*
560  * RemoveIndex
561  *              Deletes an index.
562  */
563 void
564 RemoveIndex(RangeVar *relation, DropBehavior behavior)
565 {
566         Oid                     indOid;
567         char            relkind;
568         ObjectAddress object;
569
570         indOid = RangeVarGetRelid(relation, false);
571         relkind = get_rel_relkind(indOid);
572         if (relkind != RELKIND_INDEX)
573                 ereport(ERROR,
574                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
575                                  errmsg("\"%s\" is not an index",
576                                                 relation->relname)));
577
578         object.classId = RelOid_pg_class;
579         object.objectId = indOid;
580         object.objectSubId = 0;
581
582         performDeletion(&object, behavior);
583 }
584
585 /*
586  * ReindexIndex
587  *              Recreate an index.
588  */
589 void
590 ReindexIndex(RangeVar *indexRelation, bool force /* currently unused */ )
591 {
592         Oid                     indOid;
593         HeapTuple       tuple;
594
595         indOid = RangeVarGetRelid(indexRelation, false);
596         tuple = SearchSysCache(RELOID,
597                                                    ObjectIdGetDatum(indOid),
598                                                    0, 0, 0);
599         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
600                 elog(ERROR, "cache lookup failed for relation %u", indOid);
601
602         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
603                 ereport(ERROR,
604                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
605                                  errmsg("\"%s\" is not an index",
606                                                 indexRelation->relname)));
607
608         /* Check permissions */
609         if (!pg_class_ownercheck(indOid, GetUserId()))
610                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
611                                            indexRelation->relname);
612
613         ReleaseSysCache(tuple);
614
615         reindex_index(indOid);
616 }
617
618 /*
619  * ReindexTable
620  *              Recreate indexes of a table.
621  */
622 void
623 ReindexTable(RangeVar *relation, bool force /* currently unused */ )
624 {
625         Oid                     heapOid;
626         HeapTuple       tuple;
627
628         heapOid = RangeVarGetRelid(relation, false);
629         tuple = SearchSysCache(RELOID,
630                                                    ObjectIdGetDatum(heapOid),
631                                                    0, 0, 0);
632         if (!HeapTupleIsValid(tuple))           /* shouldn't happen */
633                 elog(ERROR, "cache lookup failed for relation %u", heapOid);
634
635         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_RELATION &&
636                 ((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_TOASTVALUE)
637                 ereport(ERROR,
638                                 (errcode(ERRCODE_WRONG_OBJECT_TYPE),
639                                  errmsg("\"%s\" is not a table",
640                                                 relation->relname)));
641
642         /* Check permissions */
643         if (!pg_class_ownercheck(heapOid, GetUserId()))
644                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_CLASS,
645                                            relation->relname);
646
647         /* Can't reindex shared tables except in standalone mode */
648         if (((Form_pg_class) GETSTRUCT(tuple))->relisshared && IsUnderPostmaster)
649                 ereport(ERROR,
650                                 (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
651                                  errmsg("shared table \"%s\" can only be reindexed in stand-alone mode",
652                                                 relation->relname)));
653
654         ReleaseSysCache(tuple);
655
656         if (!reindex_relation(heapOid))
657                 ereport(NOTICE,
658                                 (errmsg("table \"%s\" has no indexes",
659                                                 relation->relname)));
660 }
661
662 /*
663  * ReindexDatabase
664  *              Recreate indexes of a database.
665  *
666  * To reduce the probability of deadlocks, each table is reindexed in a
667  * separate transaction, so we can release the lock on it right away.
668  */
669 void
670 ReindexDatabase(const char *dbname, bool force /* currently unused */,
671                                 bool all)
672 {
673         Relation        relationRelation;
674         HeapScanDesc scan;
675         HeapTuple       tuple;
676         MemoryContext private_context;
677         MemoryContext old;
678         List       *relids = NIL;
679
680         AssertArg(dbname);
681
682         if (strcmp(dbname, get_database_name(MyDatabaseId)) != 0)
683                 ereport(ERROR,
684                                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
685                                  errmsg("can only reindex the currently open database")));
686
687         if (!pg_database_ownercheck(MyDatabaseId, GetUserId()))
688                 aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_DATABASE,
689                                            dbname);
690
691         /*
692          * We cannot run inside a user transaction block; if we were inside a
693          * transaction, then our commit- and start-transaction-command calls
694          * would not have the intended effect!
695          */
696         PreventTransactionChain((void *) dbname, "REINDEX DATABASE");
697
698         /*
699          * Create a memory context that will survive forced transaction
700          * commits we do below.  Since it is a child of PortalContext, it will
701          * go away eventually even if we suffer an error; there's no need for
702          * special abort cleanup logic.
703          */
704         private_context = AllocSetContextCreate(PortalContext,
705                                                                                         "ReindexDatabase",
706                                                                                         ALLOCSET_DEFAULT_MINSIZE,
707                                                                                         ALLOCSET_DEFAULT_INITSIZE,
708                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
709
710         /*
711          * We always want to reindex pg_class first.  This ensures that if
712          * there is any corruption in pg_class' indexes, they will be fixed
713          * before we process any other tables.  This is critical because
714          * reindexing itself will try to update pg_class.
715          */
716         old = MemoryContextSwitchTo(private_context);
717         relids = lappendo(relids, RelOid_pg_class);
718         MemoryContextSwitchTo(old);
719
720         /*
721          * Scan pg_class to build a list of the relations we need to reindex.
722          *
723          * We only consider plain relations here (toast rels will be processed
724          * indirectly by reindex_relation).
725          */
726         relationRelation = heap_openr(RelationRelationName, AccessShareLock);
727         scan = heap_beginscan(relationRelation, SnapshotNow, 0, NULL);
728         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
729         {
730                 Form_pg_class classtuple = (Form_pg_class) GETSTRUCT(tuple);
731
732                 if (classtuple->relkind != RELKIND_RELATION)
733                         continue;
734
735                 if (!all)                               /* only system tables? */
736                 {
737                         if (!IsSystemClass(classtuple))
738                                 continue;
739                 }
740
741                 if (IsUnderPostmaster)  /* silently ignore shared tables */
742                 {
743                         if (classtuple->relisshared)
744                                 continue;
745                 }
746
747                 if (HeapTupleGetOid(tuple) == RelOid_pg_class)
748                         continue;                       /* got it already */
749
750                 old = MemoryContextSwitchTo(private_context);
751                 relids = lappendo(relids, HeapTupleGetOid(tuple));
752                 MemoryContextSwitchTo(old);
753         }
754         heap_endscan(scan);
755         heap_close(relationRelation, AccessShareLock);
756
757         /* Now reindex each rel in a separate transaction */
758         CommitTransactionCommand();
759         while (relids)
760         {
761                 Oid             relid = lfirsto(relids);
762
763                 StartTransactionCommand();
764                 SetQuerySnapshot();             /* might be needed for functions in
765                                                                  * indexes */
766                 if (reindex_relation(relid))
767                         ereport(NOTICE,
768                                         (errmsg("table \"%s\" was reindexed",
769                                                         get_rel_name(relid))));
770                 CommitTransactionCommand();
771                 relids = lnext(relids);
772         }
773         StartTransactionCommand();
774
775         MemoryContextDelete(private_context);
776 }