]> granicus.if.org Git - postgresql/blob - src/backend/commands/indexcmds.c
df7e358b8976ec1dbd0960f8879d3862253adeb0
[postgresql] / src / backend / commands / indexcmds.c
1 /*-------------------------------------------------------------------------
2  *
3  * indexcmds.c
4  *        POSTGRES define and remove index code.
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/commands/indexcmds.c,v 1.96 2003/01/02 19:29:22 tgl Exp $
12  *
13  *-------------------------------------------------------------------------
14  */
15
16 #include "postgres.h"
17
18 #include "access/heapam.h"
19 #include "catalog/catalog.h"
20 #include "catalog/catname.h"
21 #include "catalog/dependency.h"
22 #include "catalog/heap.h"
23 #include "catalog/index.h"
24 #include "catalog/namespace.h"
25 #include "catalog/pg_opclass.h"
26 #include "catalog/pg_proc.h"
27 #include "commands/defrem.h"
28 #include "commands/tablecmds.h"
29 #include "executor/executor.h"
30 #include "miscadmin.h"
31 #include "optimizer/clauses.h"
32 #include "optimizer/prep.h"
33 #include "parser/parsetree.h"
34 #include "parser/parse_coerce.h"
35 #include "parser/parse_func.h"
36 #include "utils/acl.h"
37 #include "utils/builtins.h"
38 #include "utils/lsyscache.h"
39 #include "utils/syscache.h"
40
41
42 #define IsFuncIndex(ATTR_LIST) (((IndexElem*)lfirst(ATTR_LIST))->funcname != NIL)
43
44 /* non-export function prototypes */
45 static void CheckPredicate(List *predList, List *rangeTable, Oid baseRelOid);
46 static void FuncIndexArgs(IndexInfo *indexInfo, Oid *classOidP,
47                           IndexElem *funcIndex,
48                           Oid relId,
49                           char *accessMethodName, Oid accessMethodId);
50 static void NormIndexAttrs(IndexInfo *indexInfo, Oid *classOidP,
51                            List *attList,
52                            Oid relId,
53                            char *accessMethodName, Oid accessMethodId);
54 static Oid GetAttrOpClass(IndexElem *attribute, Oid attrType,
55                            char *accessMethodName, Oid accessMethodId);
56 static Oid      GetDefaultOpClass(Oid attrType, Oid accessMethodId);
57
58 /*
59  * DefineIndex
60  *              Creates a new index.
61  *
62  * 'attributeList' is a list of IndexElem specifying either a functional
63  *              index or a list of attributes to index on.
64  * 'predicate' is the qual specified in the where clause.
65  * 'rangetable' is needed to interpret the predicate.
66  */
67 void
68 DefineIndex(RangeVar *heapRelation,
69                         char *indexRelationName,
70                         char *accessMethodName,
71                         List *attributeList,
72                         bool unique,
73                         bool primary,
74                         bool isconstraint,
75                         Expr *predicate,
76                         List *rangetable)
77 {
78         Oid                *classObjectId;
79         Oid                     accessMethodId;
80         Oid                     relationId;
81         Oid                     namespaceId;
82         Relation        rel;
83         HeapTuple       tuple;
84         Form_pg_am      accessMethodForm;
85         IndexInfo  *indexInfo;
86         int                     numberOfAttributes;
87         List       *cnfPred = NIL;
88
89         /*
90          * count attributes in index
91          */
92         numberOfAttributes = length(attributeList);
93         if (numberOfAttributes <= 0)
94                 elog(ERROR, "DefineIndex: must specify at least one attribute");
95         if (numberOfAttributes > INDEX_MAX_KEYS)
96                 elog(ERROR, "Cannot use more than %d attributes in an index",
97                          INDEX_MAX_KEYS);
98
99         /*
100          * Open heap relation, acquire a suitable lock on it, remember its OID
101          */
102         rel = heap_openrv(heapRelation, ShareLock);
103
104         /* Note: during bootstrap may see uncataloged relation */
105         if (rel->rd_rel->relkind != RELKIND_RELATION &&
106                 rel->rd_rel->relkind != RELKIND_UNCATALOGED)
107                 elog(ERROR, "DefineIndex: relation \"%s\" is not a table",
108                          heapRelation->relname);
109
110         relationId = RelationGetRelid(rel);
111         namespaceId = RelationGetNamespace(rel);
112
113         if (!IsBootstrapProcessingMode() &&
114                 IsSystemRelation(rel) &&
115                 !IndexesAreActive(rel))
116                 elog(ERROR, "Existing indexes are inactive. REINDEX first");
117
118         heap_close(rel, NoLock);
119
120         /*
121          * Verify we (still) have CREATE rights in the rel's namespace.
122          * (Presumably we did when the rel was created, but maybe not
123          * anymore.) Skip check if bootstrapping, since permissions machinery
124          * may not be working yet.
125          */
126         if (!IsBootstrapProcessingMode())
127         {
128                 AclResult       aclresult;
129
130                 aclresult = pg_namespace_aclcheck(namespaceId, GetUserId(),
131                                                                                   ACL_CREATE);
132                 if (aclresult != ACLCHECK_OK)
133                         aclcheck_error(aclresult, get_namespace_name(namespaceId));
134         }
135
136         /*
137          * look up the access method, verify it can handle the requested
138          * features
139          */
140         tuple = SearchSysCache(AMNAME,
141                                                    PointerGetDatum(accessMethodName),
142                                                    0, 0, 0);
143         if (!HeapTupleIsValid(tuple))
144                 elog(ERROR, "DefineIndex: access method \"%s\" not found",
145                          accessMethodName);
146         accessMethodId = HeapTupleGetOid(tuple);
147         accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
148
149         if (unique && !accessMethodForm->amcanunique)
150                 elog(ERROR, "DefineIndex: access method \"%s\" does not support UNIQUE indexes",
151                          accessMethodName);
152         if (numberOfAttributes > 1 && !accessMethodForm->amcanmulticol)
153                 elog(ERROR, "DefineIndex: access method \"%s\" does not support multi-column indexes",
154                          accessMethodName);
155
156         ReleaseSysCache(tuple);
157
158         /*
159          * Convert the partial-index predicate from parsetree form to an
160          * implicit-AND qual expression, for easier evaluation at runtime.
161          * While we are at it, we reduce it to a canonical (CNF or DNF) form
162          * to simplify the task of proving implications.
163          */
164         if (predicate)
165         {
166                 cnfPred = canonicalize_qual((Expr *) copyObject(predicate), true);
167                 CheckPredicate(cnfPred, rangetable, relationId);
168         }
169
170         /*
171          * Check that all of the attributes in a primary key are marked
172          * as not null, otherwise attempt to ALTER TABLE .. SET NOT NULL
173          */
174         if (primary && !IsFuncIndex(attributeList))
175         {
176                 List   *keys;
177
178                 foreach(keys, attributeList)
179                 {
180                         IndexElem   *key = (IndexElem *) lfirst(keys);
181                         HeapTuple       atttuple;
182
183                         /* System attributes are never null, so no problem */
184                         if (SystemAttributeByName(key->name, rel->rd_rel->relhasoids))
185                                 continue;
186
187                         atttuple = SearchSysCacheAttName(relationId, key->name);
188                         if (HeapTupleIsValid(atttuple))
189                         {
190                                 if (! ((Form_pg_attribute) GETSTRUCT(atttuple))->attnotnull)
191                                 {
192                                         /*
193                                          * Try to make it NOT NULL.
194                                          *
195                                          * XXX: Shouldn't the ALTER TABLE .. SET NOT NULL cascade
196                                          * to child tables?  Currently, since the PRIMARY KEY
197                                          * itself doesn't cascade, we don't cascade the notnull
198                                          * constraint either; but this is pretty debatable.
199                                          */
200                                         AlterTableAlterColumnSetNotNull(relationId, false,
201                                                                                                         key->name);
202                                 }
203                                 ReleaseSysCache(atttuple);
204                         }
205                         else
206                         {
207                                 /* This shouldn't happen if parser did its job ... */
208                                 elog(ERROR, "DefineIndex: column \"%s\" named in key does not exist",
209                                          key->name);
210                         }
211                 }
212         }
213
214         /*
215          * Prepare arguments for index_create, primarily an IndexInfo
216          * structure
217          */
218         indexInfo = makeNode(IndexInfo);
219         indexInfo->ii_Predicate = cnfPred;
220         indexInfo->ii_PredicateState = NIL;
221         indexInfo->ii_FuncOid = InvalidOid;
222         indexInfo->ii_Unique = unique;
223
224         if (IsFuncIndex(attributeList))
225         {
226                 IndexElem  *funcIndex = (IndexElem *) lfirst(attributeList);
227                 int                     nargs;
228
229                 /* Parser should have given us only one list item, but check */
230                 if (numberOfAttributes != 1)
231                         elog(ERROR, "Functional index can only have one attribute");
232
233                 nargs = length(funcIndex->args);
234                 if (nargs > INDEX_MAX_KEYS)
235                         elog(ERROR, "Index function can take at most %d arguments",
236                                  INDEX_MAX_KEYS);
237
238                 indexInfo->ii_NumIndexAttrs = 1;
239                 indexInfo->ii_NumKeyAttrs = nargs;
240
241                 classObjectId = (Oid *) palloc(sizeof(Oid));
242
243                 FuncIndexArgs(indexInfo, classObjectId, funcIndex,
244                                           relationId, accessMethodName, accessMethodId);
245         }
246         else
247         {
248                 indexInfo->ii_NumIndexAttrs = numberOfAttributes;
249                 indexInfo->ii_NumKeyAttrs = numberOfAttributes;
250
251                 classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
252
253                 NormIndexAttrs(indexInfo, classObjectId, attributeList,
254                                            relationId, accessMethodName, accessMethodId);
255         }
256
257         index_create(relationId, indexRelationName,
258                                  indexInfo, accessMethodId, classObjectId,
259                                  primary, isconstraint, allowSystemTableMods);
260
261         /*
262          * We update the relation's pg_class tuple even if it already has
263          * relhasindex = true.  This is needed to cause a shared-cache-inval
264          * message to be sent for the pg_class tuple, which will cause other
265          * backends to flush their relcache entries and in particular their
266          * cached lists of the indexes for this relation.
267          */
268         setRelhasindex(relationId, true, primary, InvalidOid);
269 }
270
271
272 /*
273  * CheckPredicate
274  *              Checks that the given list of partial-index predicates refer
275  *              (via the given range table) only to the given base relation oid.
276  *
277  * This used to also constrain the form of the predicate to forms that
278  * indxpath.c could do something with.  However, that seems overly
279  * restrictive.  One useful application of partial indexes is to apply
280  * a UNIQUE constraint across a subset of a table, and in that scenario
281  * any evaluatable predicate will work.  So accept any predicate here
282  * (except ones requiring a plan), and let indxpath.c fend for itself.
283  */
284
285 static void
286 CheckPredicate(List *predList, List *rangeTable, Oid baseRelOid)
287 {
288         if (length(rangeTable) != 1 || getrelid(1, rangeTable) != baseRelOid)
289                 elog(ERROR,
290                  "Partial-index predicates may refer only to the base relation");
291
292         /*
293          * We don't currently support generation of an actual query plan for a
294          * predicate, only simple scalar expressions; hence these
295          * restrictions.
296          */
297         if (contain_subplans((Node *) predList))
298                 elog(ERROR, "Cannot use subselect in index predicate");
299         if (contain_agg_clause((Node *) predList))
300                 elog(ERROR, "Cannot use aggregate in index predicate");
301
302         /*
303          * A predicate using mutable functions is probably wrong, for the same
304          * reasons that we don't allow a functional index to use one.
305          */
306         if (contain_mutable_functions((Node *) predList))
307                 elog(ERROR, "Functions in index predicate must be marked isImmutable");
308 }
309
310
311 static void
312 FuncIndexArgs(IndexInfo *indexInfo,
313                           Oid *classOidP,
314                           IndexElem *funcIndex,
315                           Oid relId,
316                           char *accessMethodName,
317                           Oid accessMethodId)
318 {
319         Oid                     argTypes[FUNC_MAX_ARGS];
320         List       *arglist;
321         int                     nargs = 0;
322         int                     i;
323         FuncDetailCode fdresult;
324         Oid                     funcid;
325         Oid                     rettype;
326         bool            retset;
327         Oid                *true_typeids;
328
329         /*
330          * process the function arguments, which are a list of T_String
331          * (someday ought to allow more general expressions?)
332          *
333          * Note caller already checked that list is not too long.
334          */
335         MemSet(argTypes, 0, sizeof(argTypes));
336
337         foreach(arglist, funcIndex->args)
338         {
339                 char       *arg = strVal(lfirst(arglist));
340                 HeapTuple       tuple;
341                 Form_pg_attribute att;
342
343                 tuple = SearchSysCacheAttName(relId, arg);
344                 if (!HeapTupleIsValid(tuple))
345                         elog(ERROR, "DefineIndex: column \"%s\" named in key does not exist", arg);
346                 att = (Form_pg_attribute) GETSTRUCT(tuple);
347                 indexInfo->ii_KeyAttrNumbers[nargs] = att->attnum;
348                 argTypes[nargs] = att->atttypid;
349                 ReleaseSysCache(tuple);
350                 nargs++;
351         }
352
353         /*
354          * Lookup the function procedure to get its OID and result type.
355          *
356          * We rely on parse_func.c to find the correct function in the possible
357          * presence of binary-compatible types.  However, parse_func may do
358          * too much: it will accept a function that requires run-time coercion
359          * of input types, and the executor is not currently set up to support
360          * that.  So, check to make sure that the selected function has
361          * exact-match or binary-compatible input types.
362          */
363         fdresult = func_get_detail(funcIndex->funcname, funcIndex->args,
364                                                            nargs, argTypes,
365                                                            &funcid, &rettype, &retset,
366                                                            &true_typeids);
367         if (fdresult != FUNCDETAIL_NORMAL)
368         {
369                 if (fdresult == FUNCDETAIL_AGGREGATE)
370                         elog(ERROR, "DefineIndex: functional index may not use an aggregate function");
371                 else if (fdresult == FUNCDETAIL_COERCION)
372                         elog(ERROR, "DefineIndex: functional index must use a real function, not a type coercion"
373                                  "\n\tTry specifying the index opclass you want to use, instead");
374                 else
375                         func_error("DefineIndex", funcIndex->funcname, nargs, argTypes,
376                                            NULL);
377         }
378
379         if (retset)
380                 elog(ERROR, "DefineIndex: cannot index on a function returning a set");
381
382         for (i = 0; i < nargs; i++)
383         {
384                 if (!IsBinaryCoercible(argTypes[i], true_typeids[i]))
385                         func_error("DefineIndex", funcIndex->funcname, nargs, true_typeids,
386                                            "Index function must be binary-compatible with table datatype");
387         }
388
389         /*
390          * Require that the function be marked immutable.  Using a mutable
391          * function for a functional index is highly questionable, since if
392          * you aren't going to get the same result for the same data every
393          * time, it's not clear what the index entries mean at all.
394          */
395         if (func_volatile(funcid) != PROVOLATILE_IMMUTABLE)
396                 elog(ERROR, "DefineIndex: index function must be marked isImmutable");
397
398         /* Process opclass, using func return type as default type */
399
400         classOidP[0] = GetAttrOpClass(funcIndex, rettype,
401                                                                   accessMethodName, accessMethodId);
402
403         /* OK, return results */
404
405         indexInfo->ii_FuncOid = funcid;
406         /* Need to do the fmgr function lookup now, too */
407         fmgr_info(funcid, &indexInfo->ii_FuncInfo);
408 }
409
410 static void
411 NormIndexAttrs(IndexInfo *indexInfo,
412                            Oid *classOidP,
413                            List *attList,       /* list of IndexElem's */
414                            Oid relId,
415                            char *accessMethodName,
416                            Oid accessMethodId)
417 {
418         List       *rest;
419         int                     attn = 0;
420
421         /*
422          * process attributeList
423          */
424         foreach(rest, attList)
425         {
426                 IndexElem  *attribute = (IndexElem *) lfirst(rest);
427                 HeapTuple       atttuple;
428                 Form_pg_attribute attform;
429
430                 if (attribute->name == NULL)
431                         elog(ERROR, "missing attribute for define index");
432
433                 atttuple = SearchSysCacheAttName(relId, attribute->name);
434                 if (!HeapTupleIsValid(atttuple))
435                         elog(ERROR, "DefineIndex: attribute \"%s\" not found",
436                                  attribute->name);
437                 attform = (Form_pg_attribute) GETSTRUCT(atttuple);
438
439                 indexInfo->ii_KeyAttrNumbers[attn] = attform->attnum;
440
441                 classOidP[attn] = GetAttrOpClass(attribute, attform->atttypid,
442                                                                            accessMethodName, accessMethodId);
443
444                 ReleaseSysCache(atttuple);
445                 attn++;
446         }
447 }
448
449 static Oid
450 GetAttrOpClass(IndexElem *attribute, Oid attrType,
451                            char *accessMethodName, Oid accessMethodId)
452 {
453         char       *schemaname;
454         char       *opcname;
455         HeapTuple       tuple;
456         Oid                     opClassId,
457                                 opInputType;
458
459         if (attribute->opclass == NIL)
460         {
461                 /* no operator class specified, so find the default */
462                 opClassId = GetDefaultOpClass(attrType, accessMethodId);
463                 if (!OidIsValid(opClassId))
464                         elog(ERROR, "data type %s has no default operator class for access method \"%s\""
465                                  "\n\tYou must specify an operator class for the index or define a"
466                                  "\n\tdefault operator class for the data type",
467                                  format_type_be(attrType), accessMethodName);
468                 return opClassId;
469         }
470
471         /*
472          * Specific opclass name given, so look up the opclass.
473          */
474
475         /* deconstruct the name list */
476         DeconstructQualifiedName(attribute->opclass, &schemaname, &opcname);
477
478         if (schemaname)
479         {
480                 /* Look in specific schema only */
481                 Oid                     namespaceId;
482
483                 namespaceId = LookupExplicitNamespace(schemaname);
484                 tuple = SearchSysCache(CLAAMNAMENSP,
485                                                            ObjectIdGetDatum(accessMethodId),
486                                                            PointerGetDatum(opcname),
487                                                            ObjectIdGetDatum(namespaceId),
488                                                            0);
489         }
490         else
491         {
492                 /* Unqualified opclass name, so search the search path */
493                 opClassId = OpclassnameGetOpcid(accessMethodId, opcname);
494                 if (!OidIsValid(opClassId))
495                         elog(ERROR, "DefineIndex: operator class \"%s\" not supported by access method \"%s\"",
496                                  opcname, accessMethodName);
497                 tuple = SearchSysCache(CLAOID,
498                                                            ObjectIdGetDatum(opClassId),
499                                                            0, 0, 0);
500         }
501
502         if (!HeapTupleIsValid(tuple))
503                 elog(ERROR, "DefineIndex: operator class \"%s\" not supported by access method \"%s\"",
504                          NameListToString(attribute->opclass), accessMethodName);
505
506         /*
507          * Verify that the index operator class accepts this datatype.  Note
508          * we will accept binary compatibility.
509          */
510         opClassId = HeapTupleGetOid(tuple);
511         opInputType = ((Form_pg_opclass) GETSTRUCT(tuple))->opcintype;
512
513         if (!IsBinaryCoercible(attrType, opInputType))
514                 elog(ERROR, "operator class \"%s\" does not accept data type %s",
515                  NameListToString(attribute->opclass), format_type_be(attrType));
516
517         ReleaseSysCache(tuple);
518
519         return opClassId;
520 }
521
522 static Oid
523 GetDefaultOpClass(Oid attrType, Oid accessMethodId)
524 {
525         OpclassCandidateList opclass;
526         int                     nexact = 0;
527         int                     ncompatible = 0;
528         Oid                     exactOid = InvalidOid;
529         Oid                     compatibleOid = InvalidOid;
530
531         /* If it's a domain, look at the base type instead */
532         attrType = getBaseType(attrType);
533
534         /*
535          * We scan through all the opclasses available for the access method,
536          * looking for one that is marked default and matches the target type
537          * (either exactly or binary-compatibly, but prefer an exact match).
538          *
539          * We could find more than one binary-compatible match, in which case we
540          * require the user to specify which one he wants.      If we find more
541          * than one exact match, then someone put bogus entries in pg_opclass.
542          *
543          * The initial search is done by namespace.c so that we only consider
544          * opclasses visible in the current namespace search path.
545          */
546         for (opclass = OpclassGetCandidates(accessMethodId);
547                  opclass != NULL;
548                  opclass = opclass->next)
549         {
550                 if (opclass->opcdefault)
551                 {
552                         if (opclass->opcintype == attrType)
553                         {
554                                 nexact++;
555                                 exactOid = opclass->oid;
556                         }
557                         else if (IsBinaryCoercible(attrType, opclass->opcintype))
558                         {
559                                 ncompatible++;
560                                 compatibleOid = opclass->oid;
561                         }
562                 }
563         }
564
565         if (nexact == 1)
566                 return exactOid;
567         if (nexact != 0)
568                 elog(ERROR, "pg_opclass contains multiple default opclasses for data type %s",
569                          format_type_be(attrType));
570         if (ncompatible == 1)
571                 return compatibleOid;
572
573         return InvalidOid;
574 }
575
576 /*
577  * RemoveIndex
578  *              Deletes an index.
579  */
580 void
581 RemoveIndex(RangeVar *relation, DropBehavior behavior)
582 {
583         Oid                     indOid;
584         char            relkind;
585         ObjectAddress object;
586
587         indOid = RangeVarGetRelid(relation, false);
588         relkind = get_rel_relkind(indOid);
589         if (relkind != RELKIND_INDEX)
590                 elog(ERROR, "relation \"%s\" is of type \"%c\"",
591                          relation->relname, relkind);
592
593         object.classId = RelOid_pg_class;
594         object.objectId = indOid;
595         object.objectSubId = 0;
596
597         performDeletion(&object, behavior);
598 }
599
600 /*
601  * ReindexIndex
602  *              Recreate an index.
603  */
604 void
605 ReindexIndex(RangeVar *indexRelation, bool force /* currently unused */ )
606 {
607         Oid                     indOid;
608         HeapTuple       tuple;
609         bool            overwrite;
610
611         /* Choose in-place-or-not mode */
612         overwrite = IsIgnoringSystemIndexes();
613
614         indOid = RangeVarGetRelid(indexRelation, false);
615         tuple = SearchSysCache(RELOID,
616                                                    ObjectIdGetDatum(indOid),
617                                                    0, 0, 0);
618         if (!HeapTupleIsValid(tuple))
619                 elog(ERROR, "index \"%s\" does not exist", indexRelation->relname);
620
621         if (((Form_pg_class) GETSTRUCT(tuple))->relkind != RELKIND_INDEX)
622                 elog(ERROR, "relation \"%s\" is of type \"%c\"",
623                          indexRelation->relname,
624                          ((Form_pg_class) GETSTRUCT(tuple))->relkind);
625
626         if (IsSystemClass((Form_pg_class) GETSTRUCT(tuple)) &&
627                 !IsToastClass((Form_pg_class) GETSTRUCT(tuple)))
628         {
629                 if (!allowSystemTableMods)
630                         elog(ERROR, "\"%s\" is a system index. call REINDEX under standalone postgres with -O -P options",
631                                  indexRelation->relname);
632                 if (!IsIgnoringSystemIndexes())
633                         elog(ERROR, "\"%s\" is a system index. call REINDEX under standalone postgres with -P -O options",
634                                  indexRelation->relname);
635         }
636
637         ReleaseSysCache(tuple);
638
639         /*
640          * In-place REINDEX within a transaction block is dangerous, because
641          * if the transaction is later rolled back we have no way to undo
642          * truncation of the index's physical file.  Disallow it.
643          */
644         if (overwrite)
645                 PreventTransactionChain((void *) indexRelation, "REINDEX");
646
647         if (!reindex_index(indOid, force, overwrite))
648                 elog(WARNING, "index \"%s\" wasn't reindexed", indexRelation->relname);
649 }
650
651 /*
652  * ReindexTable
653  *              Recreate indexes of a table.
654  */
655 void
656 ReindexTable(RangeVar *relation, bool force)
657 {
658         Oid                     heapOid;
659         char            relkind;
660
661         heapOid = RangeVarGetRelid(relation, false);
662         relkind = get_rel_relkind(heapOid);
663
664         if (relkind != RELKIND_RELATION && relkind != RELKIND_TOASTVALUE)
665                 elog(ERROR, "relation \"%s\" is of type \"%c\"",
666                          relation->relname, relkind);
667
668         /*
669          * In-place REINDEX within a transaction block is dangerous, because
670          * if the transaction is later rolled back we have no way to undo
671          * truncation of the index's physical file.  Disallow it.
672          *
673          * XXX we assume that in-place reindex will only be done if
674          * IsIgnoringSystemIndexes() is true.
675          */
676         if (IsIgnoringSystemIndexes())
677                 PreventTransactionChain((void *) relation, "REINDEX");
678
679         if (!reindex_relation(heapOid, force))
680                 elog(WARNING, "table \"%s\" wasn't reindexed", relation->relname);
681 }
682
683 /*
684  * ReindexDatabase
685  *              Recreate indexes of a database.
686  */
687 void
688 ReindexDatabase(const char *dbname, bool force, bool all)
689 {
690         Relation        relationRelation;
691         HeapScanDesc scan;
692         HeapTuple       tuple;
693         MemoryContext private_context;
694         MemoryContext old;
695         int                     relcnt,
696                                 relalc,
697                                 i,
698                                 oncealc = 200;
699         Oid                *relids = (Oid *) NULL;
700
701         AssertArg(dbname);
702
703         if (strcmp(dbname, DatabaseName) != 0)
704                 elog(ERROR, "REINDEX DATABASE: Can be executed only on the currently open database.");
705
706         if (!(superuser() || is_dbadmin(MyDatabaseId)))
707                 elog(ERROR, "REINDEX DATABASE: Permission denied.");
708
709         if (!allowSystemTableMods)
710                 elog(ERROR, "must be called under standalone postgres with -O -P options");
711         if (!IsIgnoringSystemIndexes())
712                 elog(ERROR, "must be called under standalone postgres with -P -O options");
713
714         /*
715          * We cannot run inside a user transaction block; if we were inside a
716          * transaction, then our commit- and start-transaction-command calls
717          * would not have the intended effect!
718          */
719         PreventTransactionChain((void *) dbname, "REINDEX");
720
721         /*
722          * Create a memory context that will survive forced transaction
723          * commits we do below.  Since it is a child of QueryContext, it will
724          * go away eventually even if we suffer an error; there's no need for
725          * special abort cleanup logic.
726          */
727         private_context = AllocSetContextCreate(QueryContext,
728                                                                                         "ReindexDatabase",
729                                                                                         ALLOCSET_DEFAULT_MINSIZE,
730                                                                                         ALLOCSET_DEFAULT_INITSIZE,
731                                                                                         ALLOCSET_DEFAULT_MAXSIZE);
732
733         /*
734          * Scan pg_class to build a list of the relations we need to reindex.
735          */
736         relationRelation = heap_openr(RelationRelationName, AccessShareLock);
737         scan = heap_beginscan(relationRelation, SnapshotNow, 0, NULL);
738         relcnt = relalc = 0;
739         while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
740         {
741                 char            relkind;
742
743                 if (!all)
744                 {
745                         if (!(IsSystemClass((Form_pg_class) GETSTRUCT(tuple)) &&
746                                   !IsToastClass((Form_pg_class) GETSTRUCT(tuple))))
747                                 continue;
748                 }
749                 relkind = ((Form_pg_class) GETSTRUCT(tuple))->relkind;
750                 if (relkind == RELKIND_RELATION || relkind == RELKIND_TOASTVALUE)
751                 {
752                         old = MemoryContextSwitchTo(private_context);
753                         if (relcnt == 0)
754                         {
755                                 relalc = oncealc;
756                                 relids = palloc(sizeof(Oid) * relalc);
757                         }
758                         else if (relcnt >= relalc)
759                         {
760                                 relalc *= 2;
761                                 relids = repalloc(relids, sizeof(Oid) * relalc);
762                         }
763                         MemoryContextSwitchTo(old);
764                         relids[relcnt] = HeapTupleGetOid(tuple);
765                         relcnt++;
766                 }
767         }
768         heap_endscan(scan);
769         heap_close(relationRelation, AccessShareLock);
770
771         /* Now reindex each rel in a separate transaction */
772         CommitTransactionCommand(true);
773         for (i = 0; i < relcnt; i++)
774         {
775                 StartTransactionCommand(true);
776                 SetQuerySnapshot();             /* might be needed for functional index */
777                 if (reindex_relation(relids[i], force))
778                         elog(NOTICE, "relation %u was reindexed", relids[i]);
779                 CommitTransactionCommand(true);
780         }
781         /* Tell xact.c not to chain the upcoming commit */
782         StartTransactionCommand(true);
783
784         MemoryContextDelete(private_context);
785 }